diff --git a/python_apps/pypo/airtime-playout b/python_apps/pypo/airtime-playout index 56aa587cd..5521c91ed 100755 --- a/python_apps/pypo/airtime-playout +++ b/python_apps/pypo/airtime-playout @@ -3,14 +3,14 @@ virtualenv_bin="/usr/lib/airtime/airtime_virtualenv/bin/" . ${virtualenv_bin}activate -pypo_user="pypo" +# Absolute path to this script +SCRIPT=`readlink -f $0` +# Absolute directory this script is in +pypo_path=`dirname $SCRIPT` -# Location of pypo_cli.py Python script -pypo_path="/usr/lib/airtime/pypo/bin/" api_client_path="/usr/lib/airtime/" pypo_script="pypocli.py" cd ${pypo_path} -exec 2>&1 set +e cat /etc/default/locale | grep -i "LANG=.*UTF-\?8" @@ -26,6 +26,6 @@ export LC_ALL=`cat /etc/default/locale | grep "LANG=" | cut -d= -f2 | tr -d "\n\ export TERM=xterm -exec python ${pypo_path}${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1 +exec python ${pypo_path}/${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1 # EOF diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 02dfe6c46..de30d65dd 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -18,7 +18,6 @@ from Queue import Queue from threading import Lock from pypopush import PypoPush -from pypoliqqueue import PypoLiqQueue from pypofetch import PypoFetch from pypofile import PypoFile from recorder import Recorder @@ -229,14 +228,6 @@ if __name__ == '__main__': stat.daemon = True stat.start() - #pypoLiq_q = Queue() - #liq_queue_tracker = dict() - #telnet_liquidsoap = TelnetLiquidsoap() - #plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ - #telnet_liquidsoap) - #plq.daemon = True - #plq.start() - # all join() are commented out because we want to exit entire pypo # if pypofetch terminates #pmh.join() diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 66243a15a..17d5e5898 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -8,6 +8,7 @@ import json import telnetlib import copy import subprocess +import signal from datetime import datetime from Queue import Empty @@ -24,6 +25,12 @@ logging.config.fileConfig(logging_cfg) logger = logging.getLogger() LogWriter.override_std_err(logger) +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + #need to wait for Python 2.7 for this.. #logging.captureWarnings(True) @@ -34,8 +41,6 @@ try: LS_PORT = config['ls_port'] #POLL_INTERVAL = int(config['poll_interval']) POLL_INTERVAL = 1800 - - except Exception, e: logger.error('Error loading config file: %s', e) sys.exit() diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py index 54d7c5d93..439255704 100644 --- a/python_apps/pypo/pypoliqqueue.py +++ b/python_apps/pypo/pypoliqqueue.py @@ -1,14 +1,12 @@ from threading import Thread from collections import deque from datetime import datetime -from pypofetch import PypoFetch - -import eventtypes import traceback import sys import time + from Queue import Empty import signal @@ -19,14 +17,11 @@ def keyboardInterruptHandler(signum, frame): signal.signal(signal.SIGINT, keyboardInterruptHandler) class PypoLiqQueue(Thread): - def __init__(self, q, telnet_lock, logger, liq_queue_tracker, \ - telnet_liquidsoap): + def __init__(self, q, pypo_liquidsoap, logger): Thread.__init__(self) self.queue = q - self.telnet_lock = telnet_lock self.logger = logger - self.liq_queue_tracker = liq_queue_tracker - self.telnet_liquidsoap = telnet_liquidsoap + self.pypo_liquidsoap = pypo_liquidsoap def main(self): time_until_next_play = None @@ -46,7 +41,7 @@ class PypoLiqQueue(Thread): except Empty, e: #Time to push a scheduled item. media_item = schedule_deque.popleft() - self.telnet_to_liquidsoap(media_item) + self.pypo_liquidsoap.push_item(media_item) if len(schedule_deque): time_until_next_play = \ self.date_interval_to_seconds( @@ -71,78 +66,6 @@ class PypoLiqQueue(Thread): else: time_until_next_play = None - def is_media_item_finished(self, media_item): - if media_item is None: - return True - else: - return datetime.utcnow() > media_item['end'] - - def find_available_queue(self): - available_queue = None - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if mi == None or self.is_media_item_finished(mi): - #queue "i" is available. Push to this queue - available_queue = i - - if available_queue == None: - raise NoQueueAvailableException() - - return available_queue - - def telnet_to_liquidsoap(self, media_item): - """ - telnets to liquidsoap and pushes the media_item to its queue. Push the - show name of every media_item as well, just to keep Liquidsoap up-to-date - about which show is playing. - """ - - if media_item["type"] == eventtypes.FILE: - self.handle_file_type(media_item) - elif media_item["type"] == eventtypes.EVENT: - self.handle_event_type(media_item) - elif media_item["type"] == eventtypes.STREAM_BUFFER_START: - self.telnet_liquidsoap.start_web_stream_buffer(media_item) - elif media_item["type"] == eventtypes.STREAM_OUTPUT_START: - if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id: - #this is called if the stream wasn't scheduled sufficiently ahead of time - #so that the prebuffering stage could take effect. Let's do the prebuffering now. - self.telnet_liquidsoap.start_web_stream_buffer(media_item) - self.telnet_liquidsoap.start_web_stream(media_item) - elif media_item['type'] == eventtypes.STREAM_BUFFER_END: - self.telnet_liquidsoap.stop_web_stream_buffer(media_item) - elif media_item['type'] == eventtypes.STREAM_OUTPUT_END: - self.telnet_liquidsoap.stop_web_stream_output(media_item) - else: raise UnknownMediaItemType(str(media_item)) - - def handle_event_type(self, media_item): - if media_item['event_type'] == "kick_out": - PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") - elif media_item['event_type'] == "switch_off": - PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") - - - def handle_file_type(self, media_item): - """ - Wait maximum 5 seconds (50 iterations) for file to become ready, - otherwise give up on it. - """ - iter_num = 0 - while not media_item['file_ready'] and iter_num < 50: - time.sleep(0.1) - iter_num += 1 - - if media_item['file_ready']: - available_queue = self.find_available_queue() - - try: - self.telnet_liquidsoap.queue_push(available_queue, media_item) - self.liq_queue_tracker[available_queue] = media_item - except Exception as e: - self.logger.error(e) - raise - else: - self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) def date_interval_to_seconds(self, interval): """ @@ -155,14 +78,10 @@ class PypoLiqQueue(Thread): return seconds - def run(self): try: self.main() except Exception, e: self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc()) -class NoQueueAvailableException(Exception): - pass -class UnknownMediaItemType(Exception): - pass + diff --git a/python_apps/pypo/pypoliquidsoap.py b/python_apps/pypo/pypoliquidsoap.py new file mode 100644 index 000000000..97390b93f --- /dev/null +++ b/python_apps/pypo/pypoliquidsoap.py @@ -0,0 +1,214 @@ +from pypofetch import PypoFetch +from telnetliquidsoap import TelnetLiquidsoap + +from datetime import datetime +from datetime import timedelta + +import eventtypes +import time + +class PypoLiquidsoap(): + def __init__(self, logger, telnet_lock, host, port): + self.logger = logger + self.liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + + self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ + logger,\ + host,\ + port) + + + def push_item(self, media_item): + if media_item["type"] == eventtypes.FILE: + self.handle_file_type(media_item) + elif media_item["type"] == eventtypes.EVENT: + self.handle_event_type(media_item) + elif media_item["type"] == eventtypes.STREAM_BUFFER_START: + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + elif media_item["type"] == eventtypes.STREAM_OUTPUT_START: + if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id: + #this is called if the stream wasn't scheduled sufficiently ahead of time + #so that the prebuffering stage could take effect. Let's do the prebuffering now. + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + self.telnet_liquidsoap.start_web_stream(media_item) + elif media_item['type'] == eventtypes.STREAM_BUFFER_END: + self.telnet_liquidsoap.stop_web_stream_buffer() + elif media_item['type'] == eventtypes.STREAM_OUTPUT_END: + self.telnet_liquidsoap.stop_web_stream_output() + else: raise UnknownMediaItemType(str(media_item)) + + def handle_file_type(self, media_item): + """ + Wait maximum 5 seconds (50 iterations) for file to become ready, + otherwise give up on it. + """ + iter_num = 0 + while not media_item['file_ready'] and iter_num < 50: + time.sleep(0.1) + iter_num += 1 + + if media_item['file_ready']: + available_queue = self.find_available_queue() + + try: + self.telnet_liquidsoap.queue_push(available_queue, media_item) + self.liq_queue_tracker[available_queue] = media_item + except Exception as e: + self.logger.error(e) + raise + else: + self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) + + def handle_event_type(self, media_item): + if media_item['event_type'] == "kick_out": + PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") + elif media_item['event_type'] == "switch_off": + PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") + + + def is_media_item_finished(self, media_item): + if media_item is None: + return True + else: + return datetime.utcnow() > media_item['end'] + + def find_available_queue(self): + available_queue = None + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi == None or self.is_media_item_finished(mi): + #queue "i" is available. Push to this queue + available_queue = i + + if available_queue == None: + raise NoQueueAvailableException() + + return available_queue + + def get_queues(): + return self.liq_queue_tracker + + + def verify_correct_present_media(self, scheduled_now): + #verify whether Liquidsoap is currently playing the correct files. + #if we find an item that Liquidsoap is not playing, then push it + #into one of Liquidsoap's queues. If Liquidsoap is already playing + #it do nothing. If Liquidsoap is playing a track that isn't in + #currently_playing then stop it. + + #Check for Liquidsoap media we should source.skip + #get liquidsoap items for each queue. Since each queue can only have one + #item, we should have a max of 8 items. + + #TODO: Verify start, end, replay_gain is the same + #TODO: Verify this is a file or webstream and also handle webstreams + #TODO: Do webstreams span a time or do they have an instantaneous point? + + #2013-03-21-22-56-00_0: { + #id: 1, + #type: "stream_output_start", + #row_id: 41, + #uri: "http://stream2.radioblackout.org:80/blackout.ogg", + #start: "2013-03-21-22-56-00", + #end: "2013-03-21-23-26-00", + #show_name: "Untitled Show", + #independent_event: true + #}, + + + scheduled_now_files = \ + filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now) + + scheduled_now_webstream = \ + filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \ + scheduled_now) + + schedule_ids = set(map(lambda x: i["row_id"], scheduled_now_files)) + + + liq_queue_ids = set() + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if not self.is_media_item_finished(mi): + liq_queue_ids.add(mi["row_id"]) + + to_be_removed = liq_queue_ids - schedule_ids + to_be_added = schedule_ids - liq_queue_ids + + if len(to_be_removed): + self.logger.info("Need to remove items from Liquidsoap: %s" % \ + to_be_removed) + + #remove files from Liquidsoap's queue + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi is not None and mi["row_id"] in to_be_removed: + self.stop(i) + + #stop any webstreams + if self.telnet_liquidsoap.get_current_stream_id() in to_be_removed: + self.telnet_liquidsoap.stop_web_stream_buffer() + self.telnet_liquidsoap.stop_web_stream_output() + + if len(to_be_added): + self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ + to_be_added) + + for i in scheduled_now: + if i["row_id"] in to_be_added: + self.modify_cue_point(i) + self.play(i) + + current_stream_id = self.telnet_liquidsoap.get_current_stream_id() + #if len(scheduled_now_webstream): + #if current_stream_id != scheduled_now_webstream[0]: + #self.play(scheduled_now_webstream[0]) + + def play(self, media_item): + self.telnet_liquidsoap.push_item(media_item) + + def stop(self, queue): + self.telnet_liquidsoap.queue_remove(queue) + self.liq_queue_tracker[queue] = None + + def is_file(self, media_item): + return media_item["type"] == eventtypes.FILE + + def modify_cue_point(self, link): + if not self.is_file(link): + return + + tnow = datetime.utcnow() + + link_start = link['start'] + + diff_td = tnow - link_start + diff_sec = self.date_interval_to_seconds(diff_td) + + if diff_sec > 0: + self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) + original_cue_in_td = timedelta(seconds=float(link['cue_in'])) + link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec + + def date_interval_to_seconds(self, interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + if seconds < 0: seconds = 0 + + return seconds + + +class UnknownMediaItemType(Exception): + pass + +class NoQueueAvailableException(Exception): + pass diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 8e85a8e01..4d6c21964 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -13,8 +13,8 @@ import traceback import os from pypofetch import PypoFetch -from telnetliquidsoap import TelnetLiquidsoap from pypoliqqueue import PypoLiqQueue +from pypoliquidsoap import PypoLiquidsoap from Queue import Empty, Queue @@ -62,28 +62,18 @@ class PypoPush(Thread): self.logger = logging.getLogger('push') self.current_prebuffering_stream_id = None self.queue_id = 0 - self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ - self.logger,\ - LS_HOST,\ - LS_PORT\ - ) - - self.liq_queue_tracker = { - "s0": None, - "s1": None, - "s2": None, - "s3": None, - } self.future_scheduled_queue = Queue() + self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\ + LS_HOST, LS_PORT) + self.plq = PypoLiqQueue(self.future_scheduled_queue, \ - telnet_lock, \ - self.logger, \ - self.liq_queue_tracker, \ - self.telnet_liquidsoap) + self.pypo_liquidsoap, \ + self.logger) self.plq.daemon = True self.plq.start() + def main(self): loops = 0 heartbeat_period = math.floor(30 / PUSH_INTERVAL) @@ -97,12 +87,13 @@ class PypoPush(Thread): self.logger.error(str(e)) raise else: + self.logger.debug(media_schedule) #separate media_schedule list into currently_playing and #scheduled_for_future lists currently_playing, scheduled_for_future = \ self.separate_present_future(media_schedule) - self.verify_correct_present_media(currently_playing) + self.pypo_liquidsoap.verify_correct_present_media(currently_playing) self.future_scheduled_queue.put(scheduled_for_future) if loops % heartbeat_period == 0: @@ -131,59 +122,6 @@ class PypoPush(Thread): return present, future - def verify_correct_present_media(self, scheduled_now): - #verify whether Liquidsoap is currently playing the correct files. - #if we find an item that Liquidsoap is not playing, then push it - #into one of Liquidsoap's queues. If Liquidsoap is already playing - #it do nothing. If Liquidsoap is playing a track that isn't in - #currently_playing then stop it. - - #Check for Liquidsoap media we should source.skip - #get liquidsoap items for each queue. Since each queue can only have one - #item, we should have a max of 8 items. - - #TODO: Verify start, end, replay_gain is the same - #TODO: Verify this is a file or webstream and also handle webstreams - - schedule_ids = set() - for i in scheduled_now: - schedule_ids.add(i["row_id"]) - - liq_queue_ids = set() - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if not self.plq.is_media_item_finished(mi): - liq_queue_ids.add(mi["row_id"]) - - to_be_removed = liq_queue_ids - schedule_ids - to_be_added = schedule_ids - liq_queue_ids - - if len(to_be_removed): - self.logger.info("Need to remove items from Liquidsoap: %s" % \ - to_be_removed) - - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if mi is not None and mi["row_id"] in to_be_removed: - self.telnet_liquidsoap.queue_remove(i) - self.liq_queue_tracker[i] = None - - #liquidsoap.stop_play(mi) - - - if len(to_be_added): - self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ - to_be_added) - - for i in scheduled_now: - if i["row_id"] in to_be_added: - self.modify_cue_point(i) - queue_id = self.plq.find_available_queue() - self.telnet_liquidsoap.queue_push(queue_id, i) - self.liq_queue_tracker[queue_id] = i - - #liquidsoap.start_play(i) - def get_current_stream_id_from_liquidsoap(self): response = "-1" try: @@ -222,20 +160,6 @@ class PypoPush(Thread): #self.logger.debug("Is current item correct?: %s", str(correct)) #return correct - def modify_cue_point(self, link): - tnow = datetime.utcnow() - - link_start = link['start'] - - diff_td = tnow - link_start - diff_sec = self.date_interval_to_seconds(diff_td) - - if diff_sec > 0: - self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) - original_cue_in_td = timedelta(seconds=float(link['cue_in'])) - link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec - - def date_interval_to_seconds(self, interval): """ Convert timedelta object into int representing the number of seconds. If diff --git a/python_apps/pypo/recorder.py b/python_apps/pypo/recorder.py index 70d99144d..b3818f32d 100644 --- a/python_apps/pypo/recorder.py +++ b/python_apps/pypo/recorder.py @@ -303,8 +303,6 @@ class Recorder(Thread): heartbeat_period = math.floor(30 / PUSH_INTERVAL) while True: - if self.loops % heartbeat_period == 0: - self.logger.info("heartbeat") if self.loops * PUSH_INTERVAL > 3600: self.loops = 0 """ diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index bfd49ec99..28e034ef8 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -63,13 +63,12 @@ class TelnetLiquidsoap: self.telnet_lock.release() - def stop_web_stream_buffer(self, media_item): + def stop_web_stream_buffer(self): try: self.telnet_lock.acquire() tn = telnetlib.Telnet(self.ls_host, self.ls_port) #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - #msg = 'dynamic_source.read_stop %s\n' % media_item['row_id'] msg = 'http.stop\n' self.logger.debug(msg) tn.write(msg) @@ -86,7 +85,7 @@ class TelnetLiquidsoap: finally: self.telnet_lock.release() - def stop_web_stream_output(self, media_item): + def stop_web_stream_output(self): try: self.telnet_lock.acquire() tn = telnetlib.Telnet(self.ls_host, self.ls_port) @@ -142,13 +141,30 @@ class TelnetLiquidsoap: tn.write("exit\n") self.logger.debug(tn.read_all()) - #TODO.. self.current_prebuffering_stream_id = media_item['row_id'] except Exception, e: self.logger.error(str(e)) finally: self.telnet_lock.release() - + + def get_current_stream_id(): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + msg = 'dynamic_source.get_id\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + stream_id = tn.read_all().splitlines()[0] + self.logger.debug("stream_id: %s" % stream_id) + + return stream_id + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() class DummyTelnetLiquidsoap: