CC-5729: Pypo race condition on show source kick event kills playout

* Fixed a race condition in pypo which could kill playout. Input kick
  events and non-file/webstream events could result in an exception
  being thrown. Fixed the logic error behind this, caught the exception
  just in case, and added some code to prevent uncaught pypopush
  exceptions from terminating the thread.
* Could prevent playout breakage in similar situations
This commit is contained in:
Albert Santoni 2014-03-11 18:01:29 -04:00
parent 7660e0cd84
commit 73cbead4c3
2 changed files with 72 additions and 68 deletions

View file

@ -117,76 +117,79 @@ class PypoLiquidsoap():
#independent_event: true #independent_event: true
#}, #},
try:
scheduled_now_files = \
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
scheduled_now_files = \ scheduled_now_webstream = \
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now) filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \
scheduled_now)
scheduled_now_webstream = \ schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files))
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \
scheduled_now)
schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files)) row_id_map = {}
liq_queue_ids = set()
row_id_map = {}
liq_queue_ids = set()
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if not self.is_media_item_finished(mi):
liq_queue_ids.add(mi["row_id"])
row_id_map[mi["row_id"]] = mi
to_be_removed = set()
to_be_added = set()
#Iterate over the new files, and compare them to currently scheduled
#tracks. If already in liquidsoap queue still need to make sure they don't
#have different attributes
#if replay gain changes, it shouldn't change the amplification of the currently playing song
for i in scheduled_now_files:
if i["row_id"] in row_id_map:
mi = row_id_map[i["row_id"]]
correct = mi['start'] == i['start'] and \
mi['end'] == i['end'] and \
mi['row_id'] == i['row_id']
if not correct:
#need to re-add
self.logger.info("Track %s found to have new attr." % i)
to_be_removed.add(i["row_id"])
to_be_added.add(i["row_id"])
to_be_removed.update(liq_queue_ids - schedule_ids)
to_be_added.update(schedule_ids - liq_queue_ids)
if to_be_removed:
self.logger.info("Need to remove items from Liquidsoap: %s" % \
to_be_removed)
#remove files from Liquidsoap's queue
for i in self.liq_queue_tracker: for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i] mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed: if not self.is_media_item_finished(mi):
self.stop(i) liq_queue_ids.add(mi["row_id"])
row_id_map[mi["row_id"]] = mi
if to_be_added: to_be_removed = set()
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ to_be_added = set()
to_be_added)
for i in scheduled_now: #Iterate over the new files, and compare them to currently scheduled
if i["row_id"] in to_be_added: #tracks. If already in liquidsoap queue still need to make sure they don't
self.modify_cue_point(i) #have different attributes
self.play(i) #if replay gain changes, it shouldn't change the amplification of the currently playing song
for i in scheduled_now_files:
if i["row_id"] in row_id_map:
mi = row_id_map[i["row_id"]]
correct = mi['start'] == i['start'] and \
mi['end'] == i['end'] and \
mi['row_id'] == i['row_id']
if not correct:
#need to re-add
self.logger.info("Track %s found to have new attr." % i)
to_be_removed.add(i["row_id"])
to_be_added.add(i["row_id"])
to_be_removed.update(liq_queue_ids - schedule_ids)
to_be_added.update(schedule_ids - liq_queue_ids)
if to_be_removed:
self.logger.info("Need to remove items from Liquidsoap: %s" % \
to_be_removed)
#remove files from Liquidsoap's queue
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.stop(i)
if to_be_added:
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
to_be_added)
for i in scheduled_now_files:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
self.play(i)
#handle webstreams
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
if scheduled_now_webstream:
if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]):
self.play(scheduled_now_webstream[0])
elif current_stream_id != "-1":
#something is playing and it shouldn't be.
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
except KeyError as e:
self.logger.error("Error: Malformed event in schedule. " + str(e))
#handle webstreams
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
if scheduled_now_webstream:
if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]):
self.play(scheduled_now_webstream[0])
elif current_stream_id != "-1":
#something is playing and it shouldn't be.
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
def stop(self, queue): def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue) self.telnet_liquidsoap.queue_remove(queue)
@ -200,8 +203,7 @@ class PypoLiquidsoap():
self.liq_queue_tracker[i] = None self.liq_queue_tracker[i] = None
def modify_cue_point(self, link): def modify_cue_point(self, link):
if not self.is_file(link): assert self.is_file(link)
return
tnow = datetime.utcnow() tnow = datetime.utcnow()

View file

@ -153,8 +153,10 @@ class PypoPush(Thread):
self.telnet_lock.release() self.telnet_lock.release()
def run(self): def run(self):
try: self.main() while True:
except Exception, e: try: self.main()
top = traceback.format_exc() except Exception, e:
self.logger.error('Pypo Push Exception: %s', top) top = traceback.format_exc()
self.logger.error('Pypo Push Exception: %s', top)
time.sleep(5)