From 73cbead4c3c6ed285c1f0fd49e6f36ccb9589730 Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Tue, 11 Mar 2014 18:01:29 -0400 Subject: [PATCH] CC-5729: Pypo race condition on show source kick event kills playout * Fixed a race condition in pypo which could kill playout. Input kick events and non-file/webstream events could result in an exception being thrown. Fixed the logic error behind this, caught the exception just in case, and added some code to prevent uncaught pypopush exceptions from terminating the thread. * Could prevent playout breakage in similar situations --- python_apps/pypo/pypoliquidsoap.py | 130 +++++++++++++++-------------- python_apps/pypo/pypopush.py | 10 ++- 2 files changed, 72 insertions(+), 68 deletions(-) diff --git a/python_apps/pypo/pypoliquidsoap.py b/python_apps/pypo/pypoliquidsoap.py index 93647b3b6..74211fddd 100644 --- a/python_apps/pypo/pypoliquidsoap.py +++ b/python_apps/pypo/pypoliquidsoap.py @@ -117,76 +117,79 @@ class PypoLiquidsoap(): #independent_event: true #}, + try: + scheduled_now_files = \ + filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now) - scheduled_now_files = \ - filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now) + scheduled_now_webstream = \ + filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \ + scheduled_now) - scheduled_now_webstream = \ - filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \ - scheduled_now) + schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files)) - schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files)) - - row_id_map = {} - liq_queue_ids = set() - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if not self.is_media_item_finished(mi): - liq_queue_ids.add(mi["row_id"]) - row_id_map[mi["row_id"]] = mi - - to_be_removed = set() - to_be_added = set() - - #Iterate over the new files, and compare them to currently scheduled - #tracks. If already in liquidsoap queue still need to make sure they don't - #have different attributes - #if replay gain changes, it shouldn't change the amplification of the currently playing song - for i in scheduled_now_files: - if i["row_id"] in row_id_map: - mi = row_id_map[i["row_id"]] - correct = mi['start'] == i['start'] and \ - mi['end'] == i['end'] and \ - mi['row_id'] == i['row_id'] - - if not correct: - #need to re-add - self.logger.info("Track %s found to have new attr." % i) - to_be_removed.add(i["row_id"]) - to_be_added.add(i["row_id"]) - - - to_be_removed.update(liq_queue_ids - schedule_ids) - to_be_added.update(schedule_ids - liq_queue_ids) - - if to_be_removed: - self.logger.info("Need to remove items from Liquidsoap: %s" % \ - to_be_removed) - - #remove files from Liquidsoap's queue + row_id_map = {} + liq_queue_ids = set() for i in self.liq_queue_tracker: mi = self.liq_queue_tracker[i] - if mi is not None and mi["row_id"] in to_be_removed: - self.stop(i) + if not self.is_media_item_finished(mi): + liq_queue_ids.add(mi["row_id"]) + row_id_map[mi["row_id"]] = mi - if to_be_added: - self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ - to_be_added) + to_be_removed = set() + to_be_added = set() - for i in scheduled_now: - if i["row_id"] in to_be_added: - self.modify_cue_point(i) - self.play(i) + #Iterate over the new files, and compare them to currently scheduled + #tracks. If already in liquidsoap queue still need to make sure they don't + #have different attributes + #if replay gain changes, it shouldn't change the amplification of the currently playing song + for i in scheduled_now_files: + if i["row_id"] in row_id_map: + mi = row_id_map[i["row_id"]] + correct = mi['start'] == i['start'] and \ + mi['end'] == i['end'] and \ + mi['row_id'] == i['row_id'] + + if not correct: + #need to re-add + self.logger.info("Track %s found to have new attr." % i) + to_be_removed.add(i["row_id"]) + to_be_added.add(i["row_id"]) + + + to_be_removed.update(liq_queue_ids - schedule_ids) + to_be_added.update(schedule_ids - liq_queue_ids) + + if to_be_removed: + self.logger.info("Need to remove items from Liquidsoap: %s" % \ + to_be_removed) + + #remove files from Liquidsoap's queue + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi is not None and mi["row_id"] in to_be_removed: + self.stop(i) + + if to_be_added: + self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ + to_be_added) + + for i in scheduled_now_files: + if i["row_id"] in to_be_added: + self.modify_cue_point(i) + self.play(i) + + #handle webstreams + current_stream_id = self.telnet_liquidsoap.get_current_stream_id() + if scheduled_now_webstream: + if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]): + self.play(scheduled_now_webstream[0]) + elif current_stream_id != "-1": + #something is playing and it shouldn't be. + self.telnet_liquidsoap.stop_web_stream_buffer() + self.telnet_liquidsoap.stop_web_stream_output() + except KeyError as e: + self.logger.error("Error: Malformed event in schedule. " + str(e)) - #handle webstreams - current_stream_id = self.telnet_liquidsoap.get_current_stream_id() - if scheduled_now_webstream: - if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]): - self.play(scheduled_now_webstream[0]) - elif current_stream_id != "-1": - #something is playing and it shouldn't be. - self.telnet_liquidsoap.stop_web_stream_buffer() - self.telnet_liquidsoap.stop_web_stream_output() def stop(self, queue): self.telnet_liquidsoap.queue_remove(queue) @@ -200,8 +203,7 @@ class PypoLiquidsoap(): self.liq_queue_tracker[i] = None def modify_cue_point(self, link): - if not self.is_file(link): - return + assert self.is_file(link) tnow = datetime.utcnow() diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 8a5600258..75079ca1f 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -153,8 +153,10 @@ class PypoPush(Thread): self.telnet_lock.release() def run(self): - try: self.main() - except Exception, e: - top = traceback.format_exc() - self.logger.error('Pypo Push Exception: %s', top) + while True: + try: self.main() + except Exception, e: + top = traceback.format_exc() + self.logger.error('Pypo Push Exception: %s', top) + time.sleep(5)