From fcce4bf64c227df4c235f9619668291b84176b7b Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Thu, 2 Aug 2012 16:26:41 -0400 Subject: [PATCH] CC-1665: Scheduled stream rebroadcasting and recording -cancelling a webstream works --- python_apps/pypo/pypopush.py | 85 ++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index a6ebef2a6..09dd7de37 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -49,6 +49,7 @@ class PypoPush(Thread): self.pushed_objects = {} self.logger = logging.getLogger('push') + self.current_stream_info = None def main(self): loops = 0 @@ -73,38 +74,54 @@ class PypoPush(Thread): tnow = datetime.utcnow() current_event_chain, original_chain = self.get_current_chain(chains, tnow) - if len(current_event_chain) > 0 and len(liquidsoap_queue_approx) == 0: - #Something is scheduled but Liquidsoap is not playing anything! - #Need to schedule it immediately..this might happen if Liquidsoap crashed. + + if len(current_event_chain) > 0: try: chains.remove(original_chain) except ValueError, e: self.logger.error(str(e)) + if len(liquidsoap_queue_approx) == 0 and current_event_chain[0]['type'] == 'file': + #Something is scheduled but Liquidsoap is not playing anything! + #Need to schedule it immediately..this might happen if Liquidsoap crashed. + self.modify_cue_point(current_event_chain[0]) + next_media_item_chain = current_event_chain + time_until_next_play = 0 + #sleep for 0.2 seconds to give pypo-file time to copy. + time.sleep(0.2) + continue + if not self.current_stream_info and current_event_chain[0]['type'] == 'stream': + #a stream is schedule but Liquidsoap is not playing it. Need to start it. + next_media_item_chain = current_event_chain + time_until_next_play = 0 + continue - self.modify_cue_point(current_event_chain[0]) - next_media_item_chain = current_event_chain - time_until_next_play = 0 - #sleep for 0.2 seconds to give pypo-file time to copy. - time.sleep(0.2) + #At this point we know that Liquidsoap is playing something, and that something + #is scheduled. We need to verify whether the schedule we just received matches + #what Liquidsoap is playing, and if not, correct it. + media_chain = filter(lambda item: (item["type"] == "file"), current_event_chain) + stream_chain = filter(lambda item: (item["type"] == "stream"), current_event_chain) + self.handle_new_schedule(media_schedule, liquidsoap_queue_approx, media_chain, stream_chain) + + + #At this point everything in the present has been taken care of and Liquidsoap + #is playing whatever is scheduled. + #Now we need to prepare ourselves for future scheduled events. + # + next_media_item_chain = self.get_next_schedule_chain(chains, tnow) + + self.logger.debug("Next schedule chain: %s", next_media_item_chain) + if next_media_item_chain is not None: + try: + chains.remove(next_media_item_chain) + except ValueError, e: + self.logger.error(str(e)) + + chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) + self.logger.debug("Blocking %s seconds until show start", time_until_next_play) else: - media_chain = filter(lambda item: (item["type"] == "file"), current_event_chain) - self.handle_new_media_schedule(media_schedule, liquidsoap_queue_approx, media_chain) - - next_media_item_chain = self.get_next_schedule_chain(chains, tnow) - - self.logger.debug("Next schedule chain: %s", next_media_item_chain) - if next_media_item_chain is not None: - try: - chains.remove(next_media_item_chain) - except ValueError, e: - self.logger.error(str(e)) - - chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") - time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) - self.logger.debug("Blocking %s seconds until show start", time_until_next_play) - else: - self.logger.debug("Blocking indefinitely since no show scheduled") - time_until_next_play = None + self.logger.debug("Blocking indefinitely since no show scheduled") + time_until_next_play = None except Empty, e: #We only get here when a new chain of tracks are ready to be played. self.push_to_liquidsoap(next_media_item_chain) @@ -166,7 +183,7 @@ class PypoPush(Thread): return liquidsoap_queue_approx - def handle_new_media_schedule(self, media_schedule, liquidsoap_queue_approx, media_chain): + def handle_new_schedule(self, media_schedule, liquidsoap_queue_approx, media_chain, stream_chain): """ This function's purpose is to gracefully handle situations where Liquidsoap already has a track in its queue, but the schedule @@ -175,6 +192,18 @@ class PypoPush(Thread): queue. """ + if self.current_stream_info: + if len(stream_chain) == 0: + #Liquidsoap is rebroadcasting a webstream, but there is no stream + #in the schedule. Let's stop streaming. + self.stop_web_stream(self.current_stream_info) + elif self.current_stream_info['uri'] != stream_chain[0]['uri']: + #Liquidsoap is rebroadcasting a webstream and a webstream is scheduled + #to play, but they are not the same! + self.stop_web_stream(self.current_stream_info) + self.start_web_stream(stream_chain[0]) + + problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx) if problem_at_iteration is not None: @@ -388,6 +417,7 @@ class PypoPush(Thread): tn.write("exit\n") self.logger.debug(tn.read_all()) + self.current_stream_info = media_item except Exception, e: self.logger.error(str(e)) finally: @@ -406,6 +436,7 @@ class PypoPush(Thread): tn.write("exit\n") self.logger.debug(tn.read_all()) + self.current_stream_info = None except Exception, e: self.logger.error(str(e)) finally: