Merge branch 'cc-1469-crossfade' into devel
Conflicts: python_apps/pypo/pypopush.py
This commit is contained in:
commit
7a6940c96c
|
@ -3,14 +3,14 @@
|
|||
virtualenv_bin="/usr/lib/airtime/airtime_virtualenv/bin/"
|
||||
. ${virtualenv_bin}activate
|
||||
|
||||
pypo_user="pypo"
|
||||
# Absolute path to this script
|
||||
SCRIPT=`readlink -f $0`
|
||||
# Absolute directory this script is in
|
||||
pypo_path=`dirname $SCRIPT`
|
||||
|
||||
# Location of pypo_cli.py Python script
|
||||
pypo_path="/usr/lib/airtime/pypo/bin/"
|
||||
api_client_path="/usr/lib/airtime/"
|
||||
pypo_script="pypocli.py"
|
||||
cd ${pypo_path}
|
||||
exec 2>&1
|
||||
|
||||
set +e
|
||||
cat /etc/default/locale | grep -i "LANG=.*UTF-\?8"
|
||||
|
@ -26,6 +26,6 @@ export LC_ALL=`cat /etc/default/locale | grep "LANG=" | cut -d= -f2 | tr -d "\n\
|
|||
export TERM=xterm
|
||||
|
||||
|
||||
exec python ${pypo_path}${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1
|
||||
exec python ${pypo_path}/${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1
|
||||
|
||||
# EOF
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
FILE = "file"
|
||||
EVENT = "event"
|
||||
STREAM_BUFFER_START = "stream_buffer_start"
|
||||
STREAM_OUTPUT_START = "stream_output_start"
|
||||
STREAM_BUFFER_END = "stream_buffer_end"
|
||||
STREAM_OUTPUT_END = "stream_output_end"
|
|
@ -354,28 +354,32 @@ end
|
|||
# Add a skip function to a source
|
||||
# when it does not have one
|
||||
# by default
|
||||
def add_skip_command(s)
|
||||
# A command to skip
|
||||
def skip(_)
|
||||
# get playing (active) queue and flush it
|
||||
l = list.hd(server.execute("queue.secondary_queue"))
|
||||
l = string.split(separator=" ",l)
|
||||
list.iter(fun (rid) -> ignore(server.execute("queue.remove #{rid}")), l)
|
||||
#def add_skip_command(s)
|
||||
# # A command to skip
|
||||
# def skip(_)
|
||||
# # get playing (active) queue and flush it
|
||||
# l = list.hd(server.execute("queue.secondary_queue"))
|
||||
# l = string.split(separator=" ",l)
|
||||
# list.iter(fun (rid) -> ignore(server.execute("queue.remove #{rid}")), l)
|
||||
#
|
||||
# l = list.hd(server.execute("queue.primary_queue"))
|
||||
# l = string.split(separator=" ", l)
|
||||
# if list.length(l) > 0 then
|
||||
# source.skip(s)
|
||||
# "Skipped"
|
||||
# else
|
||||
# "Not skipped"
|
||||
# end
|
||||
# end
|
||||
# # Register the command:
|
||||
# server.register(namespace="source",
|
||||
# usage="skip",
|
||||
# description="Skip the current song.",
|
||||
# "skip",fun(s) -> begin log("source.skip") skip(s) end)
|
||||
#end
|
||||
|
||||
l = list.hd(server.execute("queue.primary_queue"))
|
||||
l = string.split(separator=" ", l)
|
||||
if list.length(l) > 0 then
|
||||
source.skip(s)
|
||||
"Skipped"
|
||||
else
|
||||
"Not skipped"
|
||||
end
|
||||
end
|
||||
# Register the command:
|
||||
server.register(namespace="source",
|
||||
usage="skip",
|
||||
description="Skip the current song.",
|
||||
"skip",fun(s) -> begin log("source.skip") skip(s) end)
|
||||
def clear_queue(s)
|
||||
source.skip(s)
|
||||
end
|
||||
|
||||
def set_dynamic_source_id(id) =
|
||||
|
|
|
@ -35,7 +35,34 @@ just_switched = ref false
|
|||
|
||||
%include "ls_lib.liq"
|
||||
|
||||
queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
|
||||
sources = ref []
|
||||
source_id = ref 0
|
||||
|
||||
def create_source()
|
||||
l = request.equeue(id="s#{!source_id}", length=0.5)
|
||||
sources := list.append([l], !sources)
|
||||
server.register(namespace="queues",
|
||||
"s#{!source_id}_skip",
|
||||
fun (s) -> begin log("queues.s#{!source_id}_skip")
|
||||
clear_queue(l)
|
||||
"Done"
|
||||
end)
|
||||
source_id := !source_id + 1
|
||||
end
|
||||
|
||||
create_source()
|
||||
create_source()
|
||||
create_source()
|
||||
create_source()
|
||||
|
||||
create_source()
|
||||
create_source()
|
||||
create_source()
|
||||
create_source()
|
||||
|
||||
queue = add(!sources)
|
||||
|
||||
queue = audio_to_stereo(id="queue_src", queue)
|
||||
queue = cue_cut(queue)
|
||||
queue = amplify(1., override="replay_gain", queue)
|
||||
|
||||
|
@ -248,7 +275,7 @@ end
|
|||
|
||||
|
||||
# Attach a skip command to the source s:
|
||||
add_skip_command(s)
|
||||
#add_skip_command(s)
|
||||
|
||||
server.register(namespace="streams",
|
||||
description="Stop Master DJ source.",
|
||||
|
|
|
@ -13,8 +13,8 @@ import signal
|
|||
import logging
|
||||
import locale
|
||||
import os
|
||||
from Queue import Queue
|
||||
|
||||
from Queue import Queue
|
||||
from threading import Lock
|
||||
|
||||
from pypopush import PypoPush
|
||||
|
@ -63,7 +63,7 @@ try:
|
|||
LogWriter.override_std_err(logger)
|
||||
except Exception, e:
|
||||
print "Couldn't configure logging"
|
||||
sys.exit()
|
||||
sys.exit(1)
|
||||
|
||||
def configure_locale():
|
||||
logger.debug("Before %s", locale.nl_langinfo(locale.CODESET))
|
||||
|
@ -229,7 +229,7 @@ if __name__ == '__main__':
|
|||
stat.start()
|
||||
|
||||
# all join() are commented out because we want to exit entire pypo
|
||||
# if pypofetch is exiting
|
||||
# if pypofetch terminates
|
||||
#pmh.join()
|
||||
#recorder.join()
|
||||
#pp.join()
|
||||
|
|
|
@ -7,16 +7,17 @@ import logging.config
|
|||
import json
|
||||
import telnetlib
|
||||
import copy
|
||||
from threading import Thread
|
||||
import subprocess
|
||||
import signal
|
||||
from datetime import datetime
|
||||
|
||||
from Queue import Empty
|
||||
from threading import Thread
|
||||
from subprocess import Popen, PIPE
|
||||
from configobj import ConfigObj
|
||||
|
||||
from api_clients import api_client
|
||||
from std_err_override import LogWriter
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from configobj import ConfigObj
|
||||
|
||||
# configure logging
|
||||
logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg")
|
||||
|
@ -24,6 +25,12 @@ logging.config.fileConfig(logging_cfg)
|
|||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
|
@ -34,8 +41,6 @@ try:
|
|||
LS_PORT = config['ls_port']
|
||||
#POLL_INTERVAL = int(config['poll_interval'])
|
||||
POLL_INTERVAL = 1800
|
||||
|
||||
|
||||
except Exception, e:
|
||||
logger.error('Error loading config file: %s', e)
|
||||
sys.exit()
|
||||
|
@ -481,6 +486,7 @@ class PypoFetch(Thread):
|
|||
except Exception, e:
|
||||
pass
|
||||
|
||||
media_copy = {}
|
||||
for key in media:
|
||||
media_item = media[key]
|
||||
if (media_item['type'] == 'file'):
|
||||
|
@ -490,12 +496,17 @@ class PypoFetch(Thread):
|
|||
media_item['file_ready'] = False
|
||||
media_filtered[key] = media_item
|
||||
|
||||
media_item['start'] = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
media_item['end'] = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S")
|
||||
media_copy[media_item['start']] = media_item
|
||||
|
||||
|
||||
self.media_prepare_queue.put(copy.copy(media_filtered))
|
||||
except Exception, e: self.logger.error("%s", e)
|
||||
|
||||
# Send the data to pypo-push
|
||||
self.logger.debug("Pushing to pypo-push")
|
||||
self.push_queue.put(media)
|
||||
self.push_queue.put(media_copy)
|
||||
|
||||
|
||||
# cleanup
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
from threading import Thread
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
|
||||
import traceback
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
from Queue import Empty
|
||||
|
||||
import signal
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
class PypoLiqQueue(Thread):
|
||||
def __init__(self, q, pypo_liquidsoap, logger):
|
||||
Thread.__init__(self)
|
||||
self.queue = q
|
||||
self.logger = logger
|
||||
self.pypo_liquidsoap = pypo_liquidsoap
|
||||
|
||||
def main(self):
|
||||
time_until_next_play = None
|
||||
schedule_deque = deque()
|
||||
media_schedule = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
if time_until_next_play is None:
|
||||
self.logger.info("waiting indefinitely for schedule")
|
||||
media_schedule = self.queue.get(block=True)
|
||||
else:
|
||||
self.logger.info("waiting %ss until next scheduled item" % \
|
||||
time_until_next_play)
|
||||
media_schedule = self.queue.get(block=True, \
|
||||
timeout=time_until_next_play)
|
||||
except Empty, e:
|
||||
#Time to push a scheduled item.
|
||||
media_item = schedule_deque.popleft()
|
||||
self.pypo_liquidsoap.push_item(media_item)
|
||||
if len(schedule_deque):
|
||||
time_until_next_play = \
|
||||
self.date_interval_to_seconds(
|
||||
schedule_deque[0]['start'] - datetime.utcnow())
|
||||
if time_until_next_play < 0:
|
||||
time_until_next_play = 0
|
||||
else:
|
||||
time_until_next_play = None
|
||||
else:
|
||||
self.logger.info("New schedule received: %s", media_schedule)
|
||||
|
||||
#new schedule received. Replace old one with this.
|
||||
schedule_deque.clear()
|
||||
|
||||
keys = sorted(media_schedule.keys())
|
||||
for i in keys:
|
||||
schedule_deque.append(media_schedule[i])
|
||||
|
||||
if len(keys):
|
||||
time_until_next_play = self.date_interval_to_seconds(\
|
||||
keys[0] - datetime.utcnow())
|
||||
else:
|
||||
time_until_next_play = None
|
||||
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
if seconds < 0: seconds = 0
|
||||
|
||||
return seconds
|
||||
|
||||
def run(self):
|
||||
try: self.main()
|
||||
except Exception, e:
|
||||
self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc())
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,212 @@
|
|||
from pypofetch import PypoFetch
|
||||
from telnetliquidsoap import TelnetLiquidsoap
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
|
||||
import eventtypes
|
||||
import time
|
||||
|
||||
class PypoLiquidsoap():
|
||||
def __init__(self, logger, telnet_lock, host, port):
|
||||
self.logger = logger
|
||||
self.liq_queue_tracker = {
|
||||
"s0": None,
|
||||
"s1": None,
|
||||
"s2": None,
|
||||
"s3": None,
|
||||
}
|
||||
|
||||
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \
|
||||
logger,\
|
||||
host,\
|
||||
port)
|
||||
|
||||
|
||||
def play(self, media_item):
|
||||
if media_item["type"] == eventtypes.FILE:
|
||||
self.handle_file_type(media_item)
|
||||
elif media_item["type"] == eventtypes.EVENT:
|
||||
self.handle_event_type(media_item)
|
||||
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
|
||||
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
||||
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
|
||||
if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id:
|
||||
#this is called if the stream wasn't scheduled sufficiently ahead of time
|
||||
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
|
||||
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
||||
self.telnet_liquidsoap.start_web_stream(media_item)
|
||||
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
|
||||
self.telnet_liquidsoap.stop_web_stream_buffer()
|
||||
elif media_item['type'] == eventtypes.STREAM_OUTPUT_END:
|
||||
self.telnet_liquidsoap.stop_web_stream_output()
|
||||
else: raise UnknownMediaItemType(str(media_item))
|
||||
|
||||
def handle_file_type(self, media_item):
|
||||
"""
|
||||
Wait maximum 5 seconds (50 iterations) for file to become ready,
|
||||
otherwise give up on it.
|
||||
"""
|
||||
iter_num = 0
|
||||
while not media_item['file_ready'] and iter_num < 50:
|
||||
time.sleep(0.1)
|
||||
iter_num += 1
|
||||
|
||||
if media_item['file_ready']:
|
||||
available_queue = self.find_available_queue()
|
||||
|
||||
try:
|
||||
self.telnet_liquidsoap.queue_push(available_queue, media_item)
|
||||
self.liq_queue_tracker[available_queue] = media_item
|
||||
except Exception as e:
|
||||
self.logger.error(e)
|
||||
raise
|
||||
else:
|
||||
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
|
||||
|
||||
def handle_event_type(self, media_item):
|
||||
if media_item['event_type'] == "kick_out":
|
||||
PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj")
|
||||
elif media_item['event_type'] == "switch_off":
|
||||
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
|
||||
|
||||
|
||||
def is_media_item_finished(self, media_item):
|
||||
if media_item is None:
|
||||
return True
|
||||
else:
|
||||
return datetime.utcnow() > media_item['end']
|
||||
|
||||
def find_available_queue(self):
|
||||
available_queue = None
|
||||
for i in self.liq_queue_tracker:
|
||||
mi = self.liq_queue_tracker[i]
|
||||
if mi == None or self.is_media_item_finished(mi):
|
||||
#queue "i" is available. Push to this queue
|
||||
available_queue = i
|
||||
|
||||
if available_queue == None:
|
||||
raise NoQueueAvailableException()
|
||||
|
||||
return available_queue
|
||||
|
||||
def get_queues():
|
||||
return self.liq_queue_tracker
|
||||
|
||||
|
||||
def verify_correct_present_media(self, scheduled_now):
|
||||
#verify whether Liquidsoap is currently playing the correct files.
|
||||
#if we find an item that Liquidsoap is not playing, then push it
|
||||
#into one of Liquidsoap's queues. If Liquidsoap is already playing
|
||||
#it do nothing. If Liquidsoap is playing a track that isn't in
|
||||
#currently_playing then stop it.
|
||||
|
||||
#Check for Liquidsoap media we should source.skip
|
||||
#get liquidsoap items for each queue. Since each queue can only have one
|
||||
#item, we should have a max of 8 items.
|
||||
|
||||
#TODO: Verify start, end, replay_gain is the same
|
||||
#TODO: Verify this is a file or webstream and also handle webstreams
|
||||
#TODO: Do webstreams span a time or do they have an instantaneous point?
|
||||
|
||||
#2013-03-21-22-56-00_0: {
|
||||
#id: 1,
|
||||
#type: "stream_output_start",
|
||||
#row_id: 41,
|
||||
#uri: "http://stream2.radioblackout.org:80/blackout.ogg",
|
||||
#start: "2013-03-21-22-56-00",
|
||||
#end: "2013-03-21-23-26-00",
|
||||
#show_name: "Untitled Show",
|
||||
#independent_event: true
|
||||
#},
|
||||
|
||||
|
||||
scheduled_now_files = \
|
||||
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
|
||||
|
||||
scheduled_now_webstream = \
|
||||
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \
|
||||
scheduled_now)
|
||||
|
||||
schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files))
|
||||
|
||||
|
||||
liq_queue_ids = set()
|
||||
for i in self.liq_queue_tracker:
|
||||
mi = self.liq_queue_tracker[i]
|
||||
if not self.is_media_item_finished(mi):
|
||||
liq_queue_ids.add(mi["row_id"])
|
||||
|
||||
to_be_removed = liq_queue_ids - schedule_ids
|
||||
to_be_added = schedule_ids - liq_queue_ids
|
||||
|
||||
if len(to_be_removed):
|
||||
self.logger.info("Need to remove items from Liquidsoap: %s" % \
|
||||
to_be_removed)
|
||||
|
||||
#remove files from Liquidsoap's queue
|
||||
for i in self.liq_queue_tracker:
|
||||
mi = self.liq_queue_tracker[i]
|
||||
if mi is not None and mi["row_id"] in to_be_removed:
|
||||
self.stop(i)
|
||||
|
||||
|
||||
|
||||
if len(to_be_added):
|
||||
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
|
||||
to_be_added)
|
||||
|
||||
for i in scheduled_now:
|
||||
if i["row_id"] in to_be_added:
|
||||
self.modify_cue_point(i)
|
||||
self.play(i)
|
||||
|
||||
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
|
||||
if len(scheduled_now_webstream):
|
||||
if current_stream_id != scheduled_now_webstream[0]:
|
||||
self.play(scheduled_now_webstream[0])
|
||||
elif current_stream_id != "-1":
|
||||
#something is playing and it shouldn't be.
|
||||
self.telnet_liquidsoap.stop_web_stream_buffer()
|
||||
self.telnet_liquidsoap.stop_web_stream_output()
|
||||
|
||||
def stop(self, queue):
|
||||
self.telnet_liquidsoap.queue_remove(queue)
|
||||
self.liq_queue_tracker[queue] = None
|
||||
|
||||
def is_file(self, media_item):
|
||||
return media_item["type"] == eventtypes.FILE
|
||||
|
||||
def modify_cue_point(self, link):
|
||||
if not self.is_file(link):
|
||||
return
|
||||
|
||||
tnow = datetime.utcnow()
|
||||
|
||||
link_start = link['start']
|
||||
|
||||
diff_td = tnow - link_start
|
||||
diff_sec = self.date_interval_to_seconds(diff_td)
|
||||
|
||||
if diff_sec > 0:
|
||||
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
|
||||
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
|
||||
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
if seconds < 0: seconds = 0
|
||||
|
||||
return seconds
|
||||
|
||||
|
||||
class UnknownMediaItemType(Exception):
|
||||
pass
|
||||
|
||||
class NoQueueAvailableException(Exception):
|
||||
pass
|
|
@ -9,10 +9,14 @@ import logging.config
|
|||
import telnetlib
|
||||
import calendar
|
||||
import math
|
||||
import traceback
|
||||
import os
|
||||
from pypofetch import PypoFetch
|
||||
|
||||
from Queue import Empty
|
||||
from pypofetch import PypoFetch
|
||||
from pypoliqqueue import PypoLiqQueue
|
||||
from pypoliquidsoap import PypoLiquidsoap
|
||||
|
||||
from Queue import Empty, Queue
|
||||
|
||||
from threading import Thread
|
||||
|
||||
|
@ -36,7 +40,6 @@ try:
|
|||
LS_HOST = config['ls_host']
|
||||
LS_PORT = config['ls_port']
|
||||
PUSH_INTERVAL = 2
|
||||
MAX_LIQUIDSOAP_QUEUE_LENGTH = 2
|
||||
except Exception, e:
|
||||
logger.error('Error loading config file %s', e)
|
||||
sys.exit()
|
||||
|
@ -58,88 +61,67 @@ class PypoPush(Thread):
|
|||
self.pushed_objects = {}
|
||||
self.logger = logging.getLogger('push')
|
||||
self.current_prebuffering_stream_id = None
|
||||
self.queue_id = 0
|
||||
|
||||
self.future_scheduled_queue = Queue()
|
||||
self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\
|
||||
LS_HOST, LS_PORT)
|
||||
|
||||
self.plq = PypoLiqQueue(self.future_scheduled_queue, \
|
||||
self.pypo_liquidsoap, \
|
||||
self.logger)
|
||||
self.plq.daemon = True
|
||||
self.plq.start()
|
||||
|
||||
|
||||
def main(self):
|
||||
loops = 0
|
||||
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
|
||||
|
||||
next_media_item_chain = None
|
||||
media_schedule = None
|
||||
time_until_next_play = None
|
||||
chains = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
if time_until_next_play is None:
|
||||
media_schedule = self.queue.get(block=True)
|
||||
else:
|
||||
media_schedule = self.queue.get(block=True, timeout=time_until_next_play)
|
||||
|
||||
chains = self.get_all_chains(media_schedule)
|
||||
|
||||
#We get to the following lines only if a schedule was received.
|
||||
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
|
||||
liquidsoap_stream_id = self.get_current_stream_id_from_liquidsoap()
|
||||
|
||||
tnow = datetime.utcnow()
|
||||
current_event_chain, original_chain = self.get_current_chain(chains, tnow)
|
||||
|
||||
if len(current_event_chain) > 0:
|
||||
try:
|
||||
chains.remove(original_chain)
|
||||
except ValueError, e:
|
||||
self.logger.error(str(e))
|
||||
|
||||
#At this point we know that Liquidsoap is playing something, and that something
|
||||
#is scheduled. We need to verify whether the schedule we just received matches
|
||||
#what Liquidsoap is playing, and if not, correct it.
|
||||
|
||||
self.handle_new_schedule(media_schedule, liquidsoap_queue_approx, liquidsoap_stream_id, current_event_chain)
|
||||
|
||||
|
||||
#At this point everything in the present has been taken care of and Liquidsoap
|
||||
#is playing whatever is scheduled.
|
||||
#Now we need to prepare ourselves for future scheduled events.
|
||||
#
|
||||
next_media_item_chain = self.get_next_schedule_chain(chains, tnow)
|
||||
|
||||
self.logger.debug("Next schedule chain: %s", next_media_item_chain)
|
||||
if next_media_item_chain is not None:
|
||||
try:
|
||||
chains.remove(next_media_item_chain)
|
||||
except ValueError, e:
|
||||
self.logger.error(str(e))
|
||||
|
||||
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow())
|
||||
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
|
||||
else:
|
||||
self.logger.debug("Blocking indefinitely since no show scheduled")
|
||||
time_until_next_play = None
|
||||
except Empty, e:
|
||||
#We only get here when a new chain of tracks are ready to be played.
|
||||
self.push_to_liquidsoap(next_media_item_chain)
|
||||
|
||||
next_media_item_chain = self.get_next_schedule_chain(chains, datetime.utcnow())
|
||||
if next_media_item_chain is not None:
|
||||
try:
|
||||
chains.remove(next_media_item_chain)
|
||||
except ValueError, e:
|
||||
self.logger.error(str(e))
|
||||
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow())
|
||||
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
|
||||
else:
|
||||
self.logger.debug("Blocking indefinitely since no show scheduled next")
|
||||
time_until_next_play = None
|
||||
media_schedule = self.queue.get(block=True)
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
raise
|
||||
else:
|
||||
self.logger.debug(media_schedule)
|
||||
#separate media_schedule list into currently_playing and
|
||||
#scheduled_for_future lists
|
||||
currently_playing, scheduled_for_future = \
|
||||
self.separate_present_future(media_schedule)
|
||||
|
||||
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
|
||||
self.future_scheduled_queue.put(scheduled_for_future)
|
||||
|
||||
if loops % heartbeat_period == 0:
|
||||
self.logger.info("heartbeat")
|
||||
loops = 0
|
||||
loops += 1
|
||||
|
||||
|
||||
def separate_present_future(self, media_schedule):
|
||||
tnow = datetime.utcnow()
|
||||
|
||||
present = []
|
||||
future = {}
|
||||
|
||||
sorted_keys = sorted(media_schedule.keys())
|
||||
for mkey in sorted_keys:
|
||||
media_item = media_schedule[mkey]
|
||||
|
||||
diff_td = tnow - media_item['start']
|
||||
diff_sec = self.date_interval_to_seconds(diff_td)
|
||||
|
||||
if diff_sec >= 0:
|
||||
present.append(media_item)
|
||||
else:
|
||||
future[media_item['start']] = media_item
|
||||
|
||||
return present, future
|
||||
|
||||
def get_current_stream_id_from_liquidsoap(self):
|
||||
response = "-1"
|
||||
try:
|
||||
|
@ -159,289 +141,24 @@ class PypoPush(Thread):
|
|||
|
||||
return response
|
||||
|
||||
def get_queue_items_from_liquidsoap(self):
|
||||
"""
|
||||
This function connects to Liquidsoap to find what media items are in its queue.
|
||||
"""
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
|
||||
msg = 'queue.queue\n'
|
||||
tn.write(msg)
|
||||
response = tn.read_until("\r\n").strip(" \r\n")
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error("Error connecting to Liquidsoap: %s", e)
|
||||
response = []
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
liquidsoap_queue_approx = []
|
||||
|
||||
if len(response) > 0:
|
||||
items_in_queue = response.split(" ")
|
||||
|
||||
self.logger.debug("items_in_queue: %s", items_in_queue)
|
||||
|
||||
for item in items_in_queue:
|
||||
if item in self.pushed_objects:
|
||||
liquidsoap_queue_approx.append(self.pushed_objects[item])
|
||||
else:
|
||||
"""
|
||||
We should only reach here if Pypo crashed and restarted (because self.pushed_objects was reset). In this case
|
||||
let's clear the entire Liquidsoap queue.
|
||||
"""
|
||||
self.logger.error("ID exists in liquidsoap queue that does not exist in our pushed_objects queue: " + item)
|
||||
self.clear_liquidsoap_queue()
|
||||
liquidsoap_queue_approx = []
|
||||
break
|
||||
|
||||
return liquidsoap_queue_approx
|
||||
|
||||
def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id):
|
||||
correct = False
|
||||
if media_item is None:
|
||||
correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1")
|
||||
else:
|
||||
if is_file(media_item):
|
||||
if len(liquidsoap_queue_approx) == 0:
|
||||
correct = False
|
||||
else:
|
||||
correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \
|
||||
liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \
|
||||
liquidsoap_queue_approx[0]['end'] == media_item['end'] and \
|
||||
liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain']
|
||||
elif is_stream(media_item):
|
||||
correct = liquidsoap_stream_id == str(media_item['row_id'])
|
||||
|
||||
self.logger.debug("Is current item correct?: %s", str(correct))
|
||||
return correct
|
||||
|
||||
|
||||
#clear all webstreams and files from Liquidsoap
|
||||
def clear_all_liquidsoap_items(self):
|
||||
self.remove_from_liquidsoap_queue(0, None)
|
||||
self.stop_web_stream_all()
|
||||
|
||||
def handle_new_schedule(self, media_schedule, liquidsoap_queue_approx, liquidsoap_stream_id, current_event_chain):
|
||||
"""
|
||||
This function's purpose is to gracefully handle situations where
|
||||
Liquidsoap already has a track in its queue, but the schedule
|
||||
has changed. If the schedule has changed, this function's job is to
|
||||
call other functions that will connect to Liquidsoap and alter its
|
||||
queue.
|
||||
"""
|
||||
file_chain = filter(lambda item: (item["type"] == "file"), current_event_chain)
|
||||
stream_chain = filter(lambda item: (item["type"] == "stream_output_start"), current_event_chain)
|
||||
|
||||
self.logger.debug(current_event_chain)
|
||||
|
||||
#Take care of the case where the current playing may be incorrect
|
||||
if len(current_event_chain) > 0:
|
||||
|
||||
current_item = current_event_chain[0]
|
||||
if not self.is_correct_current_item(current_item, liquidsoap_queue_approx, liquidsoap_stream_id):
|
||||
self.clear_all_liquidsoap_items()
|
||||
if is_stream(current_item):
|
||||
if current_item['row_id'] != self.current_prebuffering_stream_id:
|
||||
#this is called if the stream wasn't scheduled sufficiently ahead of time
|
||||
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
|
||||
self.start_web_stream_buffer(current_item)
|
||||
self.start_web_stream(current_item)
|
||||
if is_file(current_item):
|
||||
file_chain = self.modify_first_link_cue_point(file_chain)
|
||||
self.push_to_liquidsoap(file_chain)
|
||||
#we've changed the queue, so let's refetch it
|
||||
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
|
||||
|
||||
elif not self.is_correct_current_item(None, liquidsoap_queue_approx, liquidsoap_stream_id):
|
||||
#Liquidsoap is playing something even though it shouldn't be
|
||||
self.clear_all_liquidsoap_items()
|
||||
|
||||
|
||||
#If the current item scheduled is a file, then files come in chains, and
|
||||
#therefore we need to make sure the entire chain is correct.
|
||||
if len(current_event_chain) > 0 and is_file(current_event_chain[0]):
|
||||
problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx)
|
||||
|
||||
if problem_at_iteration is not None:
|
||||
#Items that are in Liquidsoap's queue aren't scheduled anymore. We need to connect
|
||||
#and remove these items.
|
||||
self.logger.debug("Change in link %s of current chain", problem_at_iteration)
|
||||
self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx[problem_at_iteration:])
|
||||
|
||||
if problem_at_iteration is None and len(file_chain) > len(liquidsoap_queue_approx):
|
||||
self.logger.debug("New schedule has longer current chain.")
|
||||
problem_at_iteration = len(liquidsoap_queue_approx)
|
||||
|
||||
if problem_at_iteration is not None:
|
||||
self.logger.debug("Change in chain at link %s", problem_at_iteration)
|
||||
|
||||
chain_to_push = file_chain[problem_at_iteration:]
|
||||
if len(chain_to_push) > 0:
|
||||
chain_to_push = self.modify_first_link_cue_point(chain_to_push)
|
||||
self.push_to_liquidsoap(chain_to_push)
|
||||
|
||||
|
||||
"""
|
||||
Compare whats in the liquidsoap_queue to the new schedule we just
|
||||
received in media_schedule. This function only iterates over liquidsoap_queue_approx
|
||||
and finds if every item in that list is still scheduled in "media_schedule". It doesn't
|
||||
take care of the case where media_schedule has more items than liquidsoap_queue_approx
|
||||
"""
|
||||
def find_removed_items(self, media_schedule, liquidsoap_queue_approx):
|
||||
#iterate through the items we got from the liquidsoap queue and
|
||||
#see if they are the same as the newly received schedule
|
||||
iteration = 0
|
||||
problem_at_iteration = None
|
||||
for queue_item in liquidsoap_queue_approx:
|
||||
if queue_item['start'] in media_schedule.keys():
|
||||
media_item = media_schedule[queue_item['start']]
|
||||
if queue_item['row_id'] == media_item['row_id']:
|
||||
if queue_item['end'] == media_item['end']:
|
||||
#Everything OK for this iteration.
|
||||
pass
|
||||
else:
|
||||
problem_at_iteration = iteration
|
||||
break
|
||||
else:
|
||||
#A different item has been scheduled at the same time! Need to remove
|
||||
#all tracks from the Liquidsoap queue starting at this point, and re-add
|
||||
#them.
|
||||
problem_at_iteration = iteration
|
||||
break
|
||||
else:
|
||||
#There are no more items scheduled for this time! The user has shortened
|
||||
#the playlist, so we simply need to remove tracks from the queue.
|
||||
problem_at_iteration = iteration
|
||||
break
|
||||
iteration += 1
|
||||
return problem_at_iteration
|
||||
|
||||
|
||||
|
||||
def get_all_chains(self, media_schedule):
|
||||
chains = []
|
||||
|
||||
current_chain = []
|
||||
|
||||
sorted_keys = sorted(media_schedule.keys())
|
||||
|
||||
for mkey in sorted_keys:
|
||||
media_item = media_schedule[mkey]
|
||||
if media_item['independent_event']:
|
||||
if len(current_chain) > 0:
|
||||
chains.append(current_chain)
|
||||
|
||||
chains.append([media_item])
|
||||
current_chain = []
|
||||
elif len(current_chain) == 0:
|
||||
current_chain.append(media_item)
|
||||
elif media_item['start'] == current_chain[-1]['end']:
|
||||
current_chain.append(media_item)
|
||||
else:
|
||||
#current item is not a continuation of the chain.
|
||||
#Start a new one instead
|
||||
chains.append(current_chain)
|
||||
current_chain = [media_item]
|
||||
|
||||
if len(current_chain) > 0:
|
||||
chains.append(current_chain)
|
||||
|
||||
return chains
|
||||
|
||||
def modify_cue_point(self, link):
|
||||
tnow = datetime.utcnow()
|
||||
|
||||
link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
|
||||
diff_td = tnow - link_start
|
||||
diff_sec = self.date_interval_to_seconds(diff_td)
|
||||
|
||||
if diff_sec > 0:
|
||||
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
|
||||
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
|
||||
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
|
||||
|
||||
def modify_first_link_cue_point(self, chain):
|
||||
if not len(chain):
|
||||
return []
|
||||
|
||||
first_link = chain[0]
|
||||
|
||||
self.modify_cue_point(first_link)
|
||||
|
||||
#ATM, we should never have an exception here. However in the future, it
|
||||
#would be nice to allow cue_out to be None which would then allow us to
|
||||
#fallback to the length of the track as the end point.
|
||||
try:
|
||||
end = first_link['cue_out']
|
||||
except TypeError:
|
||||
#cue_out is type None
|
||||
end = first_link['length']
|
||||
|
||||
if float(first_link['cue_in']) >= float(end):
|
||||
chain = chain [1:]
|
||||
|
||||
return chain
|
||||
|
||||
"""
|
||||
Returns two chains, original chain and current_chain. current_chain is a subset of
|
||||
original_chain but can also be equal to original chain.
|
||||
|
||||
We return original chain because the user of this function may want to clean
|
||||
up the input 'chains' list
|
||||
|
||||
chain, original = get_current_chain(chains)
|
||||
|
||||
and
|
||||
chains.remove(chain) can throw a ValueError exception
|
||||
|
||||
but
|
||||
chains.remove(original) won't
|
||||
"""
|
||||
def get_current_chain(self, chains, tnow):
|
||||
current_chain = []
|
||||
original_chain = None
|
||||
|
||||
for chain in chains:
|
||||
iteration = 0
|
||||
for link in chain:
|
||||
link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
link_end = datetime.strptime(link['end'], "%Y-%m-%d-%H-%M-%S")
|
||||
|
||||
self.logger.debug("tnow %s, chain_start %s", tnow, link_start)
|
||||
if link_start <= tnow and tnow < link_end:
|
||||
current_chain = chain[iteration:]
|
||||
original_chain = chain
|
||||
break
|
||||
iteration += 1
|
||||
|
||||
return current_chain, original_chain
|
||||
|
||||
"""
|
||||
The purpose of this function is to take a look at the last received schedule from
|
||||
pypo-fetch and return the next chain of media_items. A chain is defined as a sequence
|
||||
of media_items where the end time of media_item 'n' is the start time of media_item
|
||||
'n+1'
|
||||
"""
|
||||
def get_next_schedule_chain(self, chains, tnow):
|
||||
#all media_items are now divided into chains. Let's find the one that
|
||||
#starts closest in the future.
|
||||
closest_start = None
|
||||
closest_chain = None
|
||||
for chain in chains:
|
||||
chain_start = datetime.strptime(chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
chain_end = datetime.strptime(chain[-1]['end'], "%Y-%m-%d-%H-%M-%S")
|
||||
self.logger.debug("tnow %s, chain_start %s", tnow, chain_start)
|
||||
if (closest_start == None or chain_start < closest_start) and (chain_start > tnow or (chain_start < tnow and chain_end > tnow)):
|
||||
closest_start = chain_start
|
||||
closest_chain = chain
|
||||
|
||||
return closest_chain
|
||||
#def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id):
|
||||
#correct = False
|
||||
#if media_item is None:
|
||||
#correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1")
|
||||
#else:
|
||||
#if is_file(media_item):
|
||||
#if len(liquidsoap_queue_approx) == 0:
|
||||
#correct = False
|
||||
#else:
|
||||
#correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \
|
||||
#liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \
|
||||
#liquidsoap_queue_approx[0]['end'] == media_item['end'] and \
|
||||
#liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain']
|
||||
#elif is_stream(media_item):
|
||||
#correct = liquidsoap_stream_id == str(media_item['row_id'])
|
||||
|
||||
#self.logger.debug("Is current item correct?: %s", str(correct))
|
||||
#return correct
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
|
@ -450,101 +167,9 @@ class PypoPush(Thread):
|
|||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
if seconds < 0: seconds = 0
|
||||
|
||||
return seconds
|
||||
|
||||
def push_to_liquidsoap(self, event_chain):
|
||||
|
||||
try:
|
||||
for media_item in event_chain:
|
||||
if media_item['type'] == "file":
|
||||
|
||||
"""
|
||||
Wait maximum 5 seconds (50 iterations) for file to become ready, otherwise
|
||||
give up on it.
|
||||
"""
|
||||
iter_num = 0
|
||||
while not media_item['file_ready'] and iter_num < 50:
|
||||
time.sleep(0.1)
|
||||
iter_num += 1
|
||||
|
||||
if media_item['file_ready']:
|
||||
self.telnet_to_liquidsoap(media_item)
|
||||
else:
|
||||
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
|
||||
elif media_item['type'] == "event":
|
||||
if media_item['event_type'] == "kick_out":
|
||||
PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj")
|
||||
elif media_item['event_type'] == "switch_off":
|
||||
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
|
||||
elif media_item['type'] == 'stream_buffer_start':
|
||||
self.start_web_stream_buffer(media_item)
|
||||
elif media_item['type'] == "stream_output_start":
|
||||
if media_item['row_id'] != self.current_prebuffering_stream_id:
|
||||
#this is called if the stream wasn't scheduled sufficiently ahead of time
|
||||
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
|
||||
self.start_web_stream_buffer(media_item)
|
||||
self.start_web_stream(media_item)
|
||||
elif media_item['type'] == "stream_buffer_end":
|
||||
self.stop_web_stream_buffer(media_item)
|
||||
elif media_item['type'] == "stream_output_end":
|
||||
self.stop_web_stream_output(media_item)
|
||||
except Exception, e:
|
||||
self.logger.error('Pypo Push Exception: %s', e)
|
||||
|
||||
|
||||
def start_web_stream_buffer(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
|
||||
msg = 'dynamic_source.id %s\n' % media_item['row_id']
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
#msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
|
||||
msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1')
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
show_name = media_item['show_name']
|
||||
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
|
||||
tn.write(msg)
|
||||
self.logger.debug(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
self.current_prebuffering_stream_id = media_item['row_id']
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
|
||||
def start_web_stream(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
|
||||
#TODO: DO we need this?
|
||||
msg = 'streams.scheduled_play_start\n'
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.output_start\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
self.current_prebuffering_stream_id = None
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def stop_web_stream_all(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
@ -571,173 +196,9 @@ class PypoPush(Thread):
|
|||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def stop_web_stream_buffer(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||
|
||||
#msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
|
||||
msg = 'http.stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.id -1\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def stop_web_stream_output(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||
|
||||
msg = 'dynamic_source.output_stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def clear_liquidsoap_queue(self):
|
||||
self.logger.debug("Clearing Liquidsoap queue")
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
msg = "source.skip\n"
|
||||
tn.write(msg)
|
||||
tn.write("exit\n")
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def remove_from_liquidsoap_queue(self, problem_at_iteration, liquidsoap_queue_approx):
|
||||
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
|
||||
if problem_at_iteration == 0:
|
||||
msg = "source.skip\n"
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
else:
|
||||
# Remove things in reverse order.
|
||||
queue_copy = liquidsoap_queue_approx[::-1]
|
||||
|
||||
for queue_item in queue_copy:
|
||||
msg = "queue.remove %s\n" % queue_item['queue_id']
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
response = tn.read_until("\r\n").strip("\r\n")
|
||||
|
||||
if "No such request in my queue" in response:
|
||||
"""
|
||||
Cannot remove because Liquidsoap started playing the item. Need
|
||||
to use source.skip instead
|
||||
"""
|
||||
msg = "source.skip\n"
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = "queue.queue\n"
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def sleep_until_start(self, media_item):
|
||||
"""
|
||||
The purpose of this function is to look at the difference between
|
||||
"now" and when the media_item starts, and sleep for that period of time.
|
||||
After waking from sleep, this function returns.
|
||||
"""
|
||||
|
||||
mi_start = media_item['start'][0:19]
|
||||
|
||||
#strptime returns struct_time in local time
|
||||
epoch_start = calendar.timegm(time.strptime(mi_start, '%Y-%m-%d-%H-%M-%S'))
|
||||
|
||||
#Return the time as a floating point number expressed in seconds since the epoch, in UTC.
|
||||
epoch_now = time.time()
|
||||
|
||||
self.logger.debug("Epoch start: %s" % epoch_start)
|
||||
self.logger.debug("Epoch now: %s" % epoch_now)
|
||||
|
||||
sleep_time = epoch_start - epoch_now
|
||||
|
||||
if sleep_time < 0:
|
||||
sleep_time = 0
|
||||
|
||||
self.logger.debug('sleeping for %s s' % (sleep_time))
|
||||
time.sleep(sleep_time)
|
||||
|
||||
def telnet_to_liquidsoap(self, media_item):
|
||||
"""
|
||||
telnets to liquidsoap and pushes the media_item to its queue. Push the
|
||||
show name of every media_item as well, just to keep Liquidsoap up-to-date
|
||||
about which show is playing.
|
||||
"""
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||
|
||||
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
|
||||
|
||||
annotation = self.create_liquidsoap_annotation(media_item)
|
||||
msg = 'queue.push %s\n' % annotation.encode('utf-8')
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
queue_id = tn.read_until("\r\n").strip("\r\n")
|
||||
|
||||
#remember the media_item's queue id which we may use
|
||||
#later if we need to remove it from the queue.
|
||||
media_item['queue_id'] = queue_id
|
||||
|
||||
#add media_item to the end of our queue
|
||||
self.pushed_objects[queue_id] = media_item
|
||||
|
||||
show_name = media_item['show_name']
|
||||
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
|
||||
tn.write(msg)
|
||||
self.logger.debug(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def create_liquidsoap_annotation(self, media):
|
||||
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
|
||||
return 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s",replay_gain="%s dB":%s' \
|
||||
% (media['id'], float(media['fade_in']) / 1000, float(media['fade_out']) / 1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['replay_gain'], media['dst'])
|
||||
|
||||
def run(self):
|
||||
try: self.main()
|
||||
except Exception, e:
|
||||
import traceback
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Pypo Push Exception: %s', top)
|
||||
|
||||
|
|
|
@ -303,8 +303,6 @@ class Recorder(Thread):
|
|||
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
|
||||
|
||||
while True:
|
||||
if self.loops % heartbeat_period == 0:
|
||||
self.logger.info("heartbeat")
|
||||
if self.loops * PUSH_INTERVAL > 3600:
|
||||
self.loops = 0
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
import telnetlib
|
||||
|
||||
def create_liquidsoap_annotation(media):
|
||||
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
|
||||
return 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s",replay_gain="%s dB":%s' \
|
||||
% (media['id'], float(media['fade_in']) / 1000, float(media['fade_out']) / 1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['replay_gain'], media['dst'])
|
||||
|
||||
class TelnetLiquidsoap:
|
||||
|
||||
def __init__(self, telnet_lock, logger, ls_host, ls_port):
|
||||
self.telnet_lock = telnet_lock
|
||||
self.ls_host = ls_host
|
||||
self.ls_port = ls_port
|
||||
self.logger = logger
|
||||
self.current_prebuffering_stream_id = None
|
||||
|
||||
def __connect(self):
|
||||
return telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
def __is_empty(self, tn, queue_id):
|
||||
return True
|
||||
|
||||
def queue_remove(self, queue_id):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = self.__connect()
|
||||
|
||||
msg = 'queues.%s_skip\n' % queue_id
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
|
||||
def queue_push(self, queue_id, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = self.__connect()
|
||||
|
||||
if not self.__is_empty(tn, queue_id):
|
||||
raise QueueNotEmptyException()
|
||||
|
||||
annotation = create_liquidsoap_annotation(media_item)
|
||||
msg = '%s.push %s\n' % (queue_id, annotation.encode('utf-8'))
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
show_name = media_item['show_name']
|
||||
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
|
||||
tn.write(msg)
|
||||
self.logger.debug(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
|
||||
def stop_web_stream_buffer(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||
|
||||
msg = 'http.stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.id -1\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def stop_web_stream_output(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||
|
||||
msg = 'dynamic_source.output_stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def start_web_stream(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
#TODO: DO we need this?
|
||||
msg = 'streams.scheduled_play_start\n'
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.output_start\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
self.current_prebuffering_stream_id = None
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def start_web_stream_buffer(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
msg = 'dynamic_source.id %s\n' % media_item['row_id']
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1')
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
self.current_prebuffering_stream_id = media_item['row_id']
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def get_current_stream_id(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
msg = 'dynamic_source.get_id\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
stream_id = tn.read_all().splitlines()[0]
|
||||
self.logger.debug("stream_id: %s" % stream_id)
|
||||
|
||||
return stream_id
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
class DummyTelnetLiquidsoap:
|
||||
|
||||
def __init__(self, telnet_lock, logger):
|
||||
self.telnet_lock = telnet_lock
|
||||
self.liquidsoap_mock_queues = {}
|
||||
self.logger = logger
|
||||
|
||||
for i in range(4):
|
||||
self.liquidsoap_mock_queues["s"+str(i)] = []
|
||||
|
||||
def queue_push(self, queue_id, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
||||
self.logger.info("Pushing %s to queue %s" % (media_item, queue_id))
|
||||
from datetime import datetime
|
||||
print "Time now: %s" % datetime.utcnow()
|
||||
|
||||
annotation = create_liquidsoap_annotation(media_item)
|
||||
self.liquidsoap_mock_queues[queue_id].append(annotation)
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def queue_remove(self, queue_id):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
||||
self.logger.info("Purging queue %s" % queue_id)
|
||||
from datetime import datetime
|
||||
print "Time now: %s" % datetime.utcnow()
|
||||
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
class QueueNotEmptyException(Exception):
|
||||
pass
|
|
@ -0,0 +1,98 @@
|
|||
from pypoliqqueue import PypoLiqQueue
|
||||
from telnetliquidsoap import DummyTelnetLiquidsoap, TelnetLiquidsoap
|
||||
|
||||
|
||||
from Queue import Queue
|
||||
from threading import Lock
|
||||
|
||||
import sys
|
||||
import signal
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
# configure logging
|
||||
format = '%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s'
|
||||
logging.basicConfig(level=logging.DEBUG, format=format)
|
||||
logging.captureWarnings(True)
|
||||
|
||||
telnet_lock = Lock()
|
||||
pypoPush_q = Queue()
|
||||
|
||||
|
||||
pypoLiq_q = Queue()
|
||||
liq_queue_tracker = {
|
||||
"s0": None,
|
||||
"s1": None,
|
||||
"s2": None,
|
||||
"s3": None,
|
||||
}
|
||||
|
||||
#dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging)
|
||||
dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, \
|
||||
"localhost", \
|
||||
1234)
|
||||
|
||||
plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logging, liq_queue_tracker, \
|
||||
dummy_telnet_liquidsoap)
|
||||
plq.daemon = True
|
||||
plq.start()
|
||||
|
||||
|
||||
print "Time now: %s" % datetime.utcnow()
|
||||
|
||||
media_schedule = {}
|
||||
|
||||
start_dt = datetime.utcnow() + timedelta(seconds=1)
|
||||
end_dt = datetime.utcnow() + timedelta(seconds=6)
|
||||
|
||||
media_schedule[start_dt] = {"id": 5, \
|
||||
"type":"file", \
|
||||
"row_id":9, \
|
||||
"uri":"", \
|
||||
"dst":"/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3", \
|
||||
"fade_in":0, \
|
||||
"fade_out":0, \
|
||||
"cue_in":0, \
|
||||
"cue_out":300, \
|
||||
"start": start_dt, \
|
||||
"end": end_dt, \
|
||||
"show_name":"Untitled", \
|
||||
"replay_gain": 0, \
|
||||
"independent_event": True \
|
||||
}
|
||||
|
||||
|
||||
|
||||
start_dt = datetime.utcnow() + timedelta(seconds=2)
|
||||
end_dt = datetime.utcnow() + timedelta(seconds=6)
|
||||
|
||||
media_schedule[start_dt] = {"id": 5, \
|
||||
"type":"file", \
|
||||
"row_id":9, \
|
||||
"uri":"", \
|
||||
"dst":"/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3", \
|
||||
"fade_in":0, \
|
||||
"fade_out":0, \
|
||||
"cue_in":0, \
|
||||
"cue_out":300, \
|
||||
"start": start_dt, \
|
||||
"end": end_dt, \
|
||||
"show_name":"Untitled", \
|
||||
"replay_gain": 0, \
|
||||
"independent_event": True \
|
||||
}
|
||||
pypoLiq_q.put(media_schedule)
|
||||
|
||||
plq.join()
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue