diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index de30d65dd..16be5bf58 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -23,6 +23,7 @@ from pypofile import PypoFile from recorder import Recorder from listenerstat import ListenerStat from pypomessagehandler import PypoMessageHandler +from pypoliquidsoap import PypoLiquidsoap from media.update.replaygainupdater import ReplayGainUpdater from media.update.silananalyzer import SilanAnalyzer @@ -194,7 +195,8 @@ if __name__ == '__main__': recorder_q = Queue() pypoPush_q = Queue() - + pypo_liquidsoap = PypoLiquidsoap(logger, telnet_lock,\ + ls_host, ls_port) """ This queue is shared between pypo-fetch and pypo-file, where pypo-file @@ -212,11 +214,11 @@ if __name__ == '__main__': pfile.daemon = True pfile.start() - pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock) + pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap) pf.daemon = True pf.start() - pp = PypoPush(pypoPush_q, telnet_lock) + pp = PypoPush(pypoPush_q, telnet_lock, pypo_liquidsoap) pp.daemon = True pp.start() diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 82f404ce5..743e29677 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -47,7 +47,7 @@ except Exception, e: sys.exit() class PypoFetch(Thread): - def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock): + def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap): Thread.__init__(self) self.api_client = api_client.AirtimeApiClient() self.fetch_queue = pypoFetch_q @@ -58,7 +58,9 @@ class PypoFetch(Thread): self.telnet_lock = telnet_lock - self.logger = logging.getLogger(); + self.logger = logging.getLogger() + + self.pypo_liquidsoap = pypo_liquidsoap self.cache_dir = os.path.join(config["cache_dir"], "scheduler") self.logger.debug("Cache dir %s", self.cache_dir) @@ -574,6 +576,14 @@ class PypoFetch(Thread): # Bootstrap: since we are just starting up, we need to grab the # most recent schedule. After that we can just wait for updates. success = self.persistent_manual_schedule_fetch(max_attempts=5) + + #Make sure all Liquidsoap queues are empty. This is important in the + #case where we've just restarted the pypo scheduler, but Liquidsoap still + #is playing tracks. In this case let's just restart everything from scratch + #so that we can repopulate our dictionary that keeps track of what + #Liquidsoap is playing much more easily. + self.pypo_liquidsoap.clear_all_queues() + if success: self.logger.info("Bootstrap schedule received: %s", self.schedule_data) self.set_bootstrap_variables() diff --git a/python_apps/pypo/pypoliquidsoap.py b/python_apps/pypo/pypoliquidsoap.py index 8a9554516..11a7fd9be 100644 --- a/python_apps/pypo/pypoliquidsoap.py +++ b/python_apps/pypo/pypoliquidsoap.py @@ -20,7 +20,8 @@ class PypoLiquidsoap(): self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ logger,\ host,\ - port) + port,\ + self.liq_queue_tracker.keys()) def play(self, media_item): @@ -90,9 +91,6 @@ class PypoLiquidsoap(): return available_queue - def get_queues(): - return self.liq_queue_tracker - def verify_correct_present_media(self, scheduled_now): #verify whether Liquidsoap is currently playing the correct files. @@ -105,8 +103,6 @@ class PypoLiquidsoap(): #get liquidsoap items for each queue. Since each queue can only have one #item, we should have a max of 8 items. - #TODO: Verify start, end, replay_gain is the same - #2013-03-21-22-56-00_0: { #id: 1, #type: "stream_output_start", @@ -222,6 +218,9 @@ class PypoLiquidsoap(): return seconds + def clear_all_queues(self): + self.telnet_liquidsoap.queue_clear_all() + class UnknownMediaItemType(Exception): pass diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 4d6c21964..daac1caee 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -51,7 +51,7 @@ def is_file(media_item): return media_item['type'] == 'file' class PypoPush(Thread): - def __init__(self, q, telnet_lock): + def __init__(self, q, telnet_lock, pypo_liquidsoap): Thread.__init__(self) self.api_client = api_client.AirtimeApiClient() self.queue = q @@ -64,8 +64,10 @@ class PypoPush(Thread): self.queue_id = 0 self.future_scheduled_queue = Queue() - self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\ - LS_HOST, LS_PORT) + + #self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\ + #LS_HOST, LS_PORT) + self.pypo_liquidsoap = pypo_liquidsoap self.plq = PypoLiqQueue(self.future_scheduled_queue, \ self.pypo_liquidsoap, \ diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 223bc475e..ae795bdc7 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -7,11 +7,12 @@ def create_liquidsoap_annotation(media): class TelnetLiquidsoap: - def __init__(self, telnet_lock, logger, ls_host, ls_port): + def __init__(self, telnet_lock, logger, ls_host, ls_port, queues): self.telnet_lock = telnet_lock self.ls_host = ls_host self.ls_port = ls_port self.logger = logger + self.queues = queues self.current_prebuffering_stream_id = None def __connect(self): @@ -20,6 +21,23 @@ class TelnetLiquidsoap: def __is_empty(self, tn, queue_id): return True + def queue_clear_all(self): + try: + self.telnet_lock.acquire() + tn = self.__connect() + + for i in self.queues: + msg = 'queues.%s_skip\n' % i + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + def queue_remove(self, queue_id): try: self.telnet_lock.acquire()