diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 7710947d2..5aa14a233 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -45,191 +45,67 @@ class PypoPush(Thread): self.push_ahead = 5 self.last_end_time = 0 - - self.time_until_next_play = None - + self.pushed_objects = {} self.logger = logging.getLogger('push') - def push(self): + def main(self): + loops = 0 + heartbeat_period = math.floor(30/PUSH_INTERVAL) - next_media_item = None + next_media_item_chain = None media_schedule = None - + time_until_next_play = None + while True: try: - if self.time_until_next_play is None: + if time_until_next_play is None: media_schedule = self.queue.get(block=True) else: - media_schedule = self.queue.get(block=True, timeout=self.time_until_next_play) + media_schedule = self.queue.get(block=True, timeout=time_until_next_play) - + #We get to the following lines only if a schedule was received. liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() self.handle_new_media_schedule(media_schedule, liquidsoap_queue_approx) - next_media_item = self.get_next_schedule_item(media_schedule, liquidsoap_queue_approx) + next_media_item_chain = self.get_next_schedule_chain(media_schedule) + self.logger.debug("Next schedule chain: %s", next_media_item_chain) - tnow = datetime.utcnow() - self.time_until_next_play = self.date_interval_to_seconds(next_media_item['starts'] - tnow) + if next_media_item_chain is not None: + tnow = datetime.utcnow() + chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + time_until_next_play = self.date_interval_to_seconds(chain_start - tnow) + self.logger.debug("Blocking %s seconds until show start", time_until_next_play) + else: + self.logger.debug("Blocking indefinitely since no show scheduled next") + time_until_next_play = None except Empty, e: + #We only get here when a new chain of tracks are ready to be played. + self.push_to_liquidsoap(next_media_item_chain) - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() + #TODO + time.sleep(2) - while len(liquidsoap_queue_approx) < 2: - self.push_to_liquidsoap(next_media_item, None) - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - next_media_item = self.get_next_schedule_item(media_schedule, liquidsoap_queue_approx) - - tnow = datetime.utcnow() - self.time_until_next_play = self.date_interval_to_seconds(next_media_item['starts'] - tnow) - - - def push_old(self): - """ - The Push Loop - the push loop periodically checks if there is a playlist - that should be scheduled at the current time. - If yes, the current liquidsoap playlist gets replaced with the corresponding one, - then liquidsoap is asked (via telnet) to reload and immediately play it. - """ - - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - - try: - if self.time_until_next_play is None: - self.media = self.queue.get(block=True) - else: - self.media = self.queue.get(block=True, timeout=self.time_until_next_play) - - """ - If we get to this line, it means we've received a new schedule. Iterate over it to - detect the start time of the next media item and update the "time_until_next_play" - """ - media_item = self.get_next_schedule_item() - next_media_item = media_item - - tnow = datetime.utcnow() - self.time_until_next_play = self.date_interval_to_seconds(next_media_item['starts'] - tnow) - except Empty, e: - - - - """ - self.logger.debug("Received data from pypo-fetch") - self.logger.debug('media %s' % json.dumps(self.media)) - self.handle_new_media_schedule(self.media, liquidsoap_queue_approx) - - media = self.media - - tnow = datetime.utcnow() - tcoming = tnow + timedelta(seconds=self.push_ahead) - - next_media_item_start_time = None - - for key in media.keys(): - media_item = media[key] - - item_start = datetime.strptime(media_item['start'][0:19], "%Y-%m-%d-%H-%M-%S") - item_end = datetime.strptime(media_item['end'][0:19], "%Y-%m-%d-%H-%M-%S") + next_media_item_chain = self.get_next_schedule_chain(media_schedule) + if next_media_item_chain is not None: + tnow = datetime.utcnow() + chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + time_until_next_play = self.date_interval_to_seconds(chain_start - tnow) + self.logger.debug("Blocking %s seconds until show start", time_until_next_play) + else: + self.logger.debug("Blocking indefinitely since no show scheduled next") + time_until_next_play = None - if len(liquidsoap_queue_approx) == 0 and item_start <= tnow and tnow < item_end: - #Something is scheduled now, but Liquidsoap is not playing anything! Let's play the current media_item - - self.logger.debug("Found media_item that should be playing! Starting...") - - adjusted_cue_in = tnow - item_start - adjusted_cue_in_seconds = self.date_interval_to_seconds(adjusted_cue_in) - - self.logger.debug("Found media_item that should be playing! Adjust cue point by %ss" % adjusted_cue_in_seconds) - self.push_to_liquidsoap(media_item, adjusted_cue_in_seconds) - - elif len(liquidsoap_queue_approx) == 0 and tnow <= item_start: - if next_media_item_start_time is None: - next_media_item_start_time = item_start - elif item_start < next_media_item_start_time: - next_media_item_start_time = item_start - elif len(liquidsoap_queue_approx) == 0 and tnow <= item_start and item_start < tcoming: - self.logger.debug('Preparing to push media item scheduled at: %s', key) - - if self.push_to_liquidsoap(media_item, None): - self.logger.debug("Pushed to liquidsoap, updating 'played' status.") - - - if self.time_until_next_play < 1: - self.time_until_next_play = self.date_interval_to_seconds(next_media_item_start_time - tnow) - else: - self.time_until_next_play = self.date_interval_to_seconds(next_media_item_start_time - tnow)/2 - """ - - - - self.logger.debug("Got a schedule, and next item occurs in") - except Empty, e: - """ - Timeout occurred, meaning the schedule is ready to start! - """ - self.logger.debug('Preparing to push media item scheduled at: %s', key) - - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - if len(liquidsoap_queue_approx) < 2: - if self.push_to_liquidsoap(media_item, None): - self.logger.debug("Pushed to liquidsoap, updating 'played' status.") - - media_item = self.get_next_schedule_item() - next_media_item_start_time = media_item['starts'] - self.time_until_next_play = self.date_interval_to_seconds(next_media_item_start_time - tnow) - - except Exception, e: - self.logger.error(e) + if loops % heartbeat_period == 0: + self.logger.info("heartbeat") + loops = 0 + loops += 1 - - def date_interval_to_seconds(self, interval): - return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / 10**6 - - def push_to_liquidsoap(self, media_item, adjusted_cue_in=None): - """ - This function looks at the media item, and either pushes it to the Liquidsoap - queue immediately, or if the queue is empty - waits until the start time of the - media item before pushing it. - """ - - if adjusted_cue_in is not None: - media_item["cue_in"] = adjusted_cue_in + float(media_item["cue_in"]) - - - try: - if media_item["start"] == self.last_end_time: - """ - this media item is attached to the end of the last - track, so let's push it now so that Liquidsoap can start playing - it immediately after (and prepare crossfades if need be). - """ - self.logger.debug("Push track immediately.") - self.telnet_to_liquidsoap(media_item) - self.last_end_time = media_item["end"] - else: - """ - this media item does not start right after a current playing track. - We need to sleep, and then wake up when this track starts. - """ - self.logger.debug("sleep until track start.") - self.sleep_until_start(media_item) - - self.telnet_to_liquidsoap(media_item) - self.last_end_time = media_item["end"] - except Exception, e: - self.logger.error('Pypo Push Exception: %s', e) - return False - - return True - - def get_queue_items_from_liquidsoap(self): """ This function connects to Liquidsoap to find what media items are in its queue. """ - - try: self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) @@ -265,8 +141,7 @@ class PypoPush(Thread): break return liquidsoap_queue_approx - - + def handle_new_media_schedule(self, media, liquidsoap_queue_approx): """ This function's purpose is to gracefully handle situations where @@ -275,49 +150,90 @@ class PypoPush(Thread): call other functions that will connect to Liquidsoap and alter its queue. """ + + #iterate through the items we got from the liquidsoap queue and + #see if they are the same as the newly received schedule + iteration = 0 + problem_at_iteration = None + for queue_item in liquidsoap_queue_approx: + if queue_item['start'] in media.keys(): + if queue_item['id'] == media['start']['id']: + #Everything OK for this iteration. + pass + else: + #A different item has been scheduled at the same time! + problem_at_iteration = iteration + break + else: + #There are no more items scheduled for this time! + problem_at_iteration = iteration + break + iteration+=1 + + + if problem_at_iteration is not None: + #The first item in the Liquidsoap queue (the one that is currently playing) + #has changed or been removed from the schedule. We need to clear the entire + #queue, and push the new schedule + self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx) + + + + """ + The purpose of this function is to take a look at the last received schedule from + pypo-fetch and return the next chain of media_items. A chain is defined as a sequence + of media_items where the end time of media_item 'n' is the start time of media_item + 'n+1' + """ + def get_next_schedule_chain(self, media_schedule): + chains = [] + + current_chain = [] + for mkey in media_schedule: + media_item = media_schedule[mkey] + if len(current_chain) == 0: + current_chain.append(media_item) + elif media_item['start'] == current_chain[-1]['end']: + current_chain.append(media_item) + else: + #current item is not a continuation of the chain. + #Start a new one instead + chains.append(current_chain) + current_chain = [media_item] + + if len(current_chain) > 0: + chains.append(current_chain) + + self.logger.debug('media_schedule %s', media_schedule) + self.logger.debug("chains %s", chains) + + #all media_items are now divided into chains. Let's find the one that + #starts closest in the future. + + tnow = datetime.utcnow() + closest_start = None + closest_chain = None + for chain in chains: + chain_start = datetime.strptime(chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + self.logger.debug("tnow %s, chain_start %s", tnow, chain_start) + if (closest_start == None or chain_start < closest_start) and chain_start > tnow: + closest_start = chain_start + closest_chain = chain + + return closest_chain + + + def date_interval_to_seconds(self, interval): + return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / 10**6 - if len(liquidsoap_queue_approx) == 0: - """ - liquidsoap doesn't have anything in its queue, so we have nothing - to worry about. Life is good. - """ - pass - elif len(liquidsoap_queue_approx) == 1: - queue_item_0_start = liquidsoap_queue_approx[0]['start'] - try: - if liquidsoap_queue_approx[0]['id'] != media[queue_item_0_start]['id']: - """ - liquidsoap's queue does not match the schedule we just received from the Airtime server. - The queue is only of length 1 which means the item in the queue is playing. - Need to do source.skip. - - Since only one item, we don't have to worry about the current item ending and us calling - source.skip unintentionally on the next item (there is no next item). - """ - - self.logger.debug("%s from ls does not exist in queue new schedule. Removing" % liquidsoap_queue_approx[0]['id'], media) - self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0]) - except KeyError, k: - self.logger.debug("%s from ls does not exist in queue schedule: %s Removing" % (queue_item_0_start, media)) - self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0]) - - - elif len(liquidsoap_queue_approx) == 2: - queue_item_0_start = liquidsoap_queue_approx[0]['start'] - queue_item_1_start = liquidsoap_queue_approx[1]['start'] - - if queue_item_1_start in media.keys(): - if liquidsoap_queue_approx[1]['id'] != media[queue_item_1_start]['id']: - self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[1]) - else: - self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[1]) - - if queue_item_0_start in media.keys(): - if liquidsoap_queue_approx[0]['id'] != media[queue_item_0_start]['id']: - self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0]) - else: - self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0]) - + def push_to_liquidsoap(self, media_item_chain): + + try: + for media_item in media_item_chain: + self.telnet_to_liquidsoap(media_item) + except Exception, e: + self.logger.error('Pypo Push Exception: %s', e) + def clear_liquidsoap_queue(self): self.logger.debug("Clearing Liquidsoap queue") try: @@ -332,38 +248,38 @@ class PypoPush(Thread): finally: self.telnet_lock.release() - def remove_from_liquidsoap_queue(self, media_item, do_only_source_skip=False): - if 'queue_id' in media_item: - queue_id = media_item['queue_id'] + def remove_from_liquidsoap_queue(self, problem_at_iteration, liquidsoap_queue_approx): + iteration = 0 + + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = "queue.remove %s\n" % queue_id - self.logger.debug(msg) - tn.write(msg) - response = tn.read_until("\r\n").strip("\r\n") - - if "No such request in my queue" in response: - """ - Cannot remove because Liquidsoap started playing the item. Need - to use source.skip instead - """ - msg = "source.skip\n" + for queue_item in liquidsoap_queue_approx: + if iteration >= problem_at_iteration: + + msg = "queue.remove %s\n" % queue_item['queue_id'] self.logger.debug(msg) tn.write(msg) + response = tn.read_until("\r\n").strip("\r\n") - tn.write("exit\n") - tn.read_all() - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() + if "No such request in my queue" in response: + """ + Cannot remove because Liquidsoap started playing the item. Need + to use source.skip instead + """ + msg = "source.skip\n" + self.logger.debug(msg) + tn.write(msg) + iteration += 1 + + tn.write("exit\n") + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() - else: - self.logger.error("'queue_id' key doesn't exist in media_item dict()") - def sleep_until_start(self, media_item): """ The purpose of this function is to look at the difference between @@ -432,14 +348,7 @@ class PypoPush(Thread): % (media['id'], float(media['cue_in']), float(media['cue_out']), media['row_id'], media['dst']) def run(self): - loops = 0 - heartbeat_period = math.floor(30/PUSH_INTERVAL) - - while True: - if loops % heartbeat_period == 0: - self.logger.info("heartbeat") - loops = 0 - try: self.push() - except Exception, e: - self.logger.error('Pypo Push Exception: %s', e) - loops += 1 + try: self.main() + except Exception, e: + self.logger.error('Pypo Push Exception: %s', e) +