cc-3476-dynamic-timeout

-finally found a way to make this simple
This commit is contained in:
Martin Konecny 2012-03-21 18:26:16 -04:00
parent a013e3b4d0
commit 2070de03ce

View file

@ -46,190 +46,66 @@ class PypoPush(Thread):
self.push_ahead = 5
self.last_end_time = 0
self.time_until_next_play = None
self.pushed_objects = {}
self.logger = logging.getLogger('push')
def push(self):
def main(self):
loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
next_media_item = None
next_media_item_chain = None
media_schedule = None
time_until_next_play = None
while True:
try:
if self.time_until_next_play is None:
if time_until_next_play is None:
media_schedule = self.queue.get(block=True)
else:
media_schedule = self.queue.get(block=True, timeout=self.time_until_next_play)
media_schedule = self.queue.get(block=True, timeout=time_until_next_play)
#We get to the following lines only if a schedule was received.
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
self.handle_new_media_schedule(media_schedule, liquidsoap_queue_approx)
next_media_item = self.get_next_schedule_item(media_schedule, liquidsoap_queue_approx)
next_media_item_chain = self.get_next_schedule_chain(media_schedule)
self.logger.debug("Next schedule chain: %s", next_media_item_chain)
if next_media_item_chain is not None:
tnow = datetime.utcnow()
self.time_until_next_play = self.date_interval_to_seconds(next_media_item['starts'] - tnow)
except Empty, e:
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
while len(liquidsoap_queue_approx) < 2:
self.push_to_liquidsoap(next_media_item, None)
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
next_media_item = self.get_next_schedule_item(media_schedule, liquidsoap_queue_approx)
tnow = datetime.utcnow()
self.time_until_next_play = self.date_interval_to_seconds(next_media_item['starts'] - tnow)
def push_old(self):
"""
The Push Loop - the push loop periodically checks if there is a playlist
that should be scheduled at the current time.
If yes, the current liquidsoap playlist gets replaced with the corresponding one,
then liquidsoap is asked (via telnet) to reload and immediately play it.
"""
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
try:
if self.time_until_next_play is None:
self.media = self.queue.get(block=True)
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
time_until_next_play = self.date_interval_to_seconds(chain_start - tnow)
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
else:
self.media = self.queue.get(block=True, timeout=self.time_until_next_play)
"""
If we get to this line, it means we've received a new schedule. Iterate over it to
detect the start time of the next media item and update the "time_until_next_play"
"""
media_item = self.get_next_schedule_item()
next_media_item = media_item
tnow = datetime.utcnow()
self.time_until_next_play = self.date_interval_to_seconds(next_media_item['starts'] - tnow)
self.logger.debug("Blocking indefinitely since no show scheduled next")
time_until_next_play = None
except Empty, e:
#We only get here when a new chain of tracks are ready to be played.
self.push_to_liquidsoap(next_media_item_chain)
#TODO
time.sleep(2)
"""
self.logger.debug("Received data from pypo-fetch")
self.logger.debug('media %s' % json.dumps(self.media))
self.handle_new_media_schedule(self.media, liquidsoap_queue_approx)
media = self.media
next_media_item_chain = self.get_next_schedule_chain(media_schedule)
if next_media_item_chain is not None:
tnow = datetime.utcnow()
tcoming = tnow + timedelta(seconds=self.push_ahead)
next_media_item_start_time = None
for key in media.keys():
media_item = media[key]
item_start = datetime.strptime(media_item['start'][0:19], "%Y-%m-%d-%H-%M-%S")
item_end = datetime.strptime(media_item['end'][0:19], "%Y-%m-%d-%H-%M-%S")
if len(liquidsoap_queue_approx) == 0 and item_start <= tnow and tnow < item_end:
#Something is scheduled now, but Liquidsoap is not playing anything! Let's play the current media_item
self.logger.debug("Found media_item that should be playing! Starting...")
adjusted_cue_in = tnow - item_start
adjusted_cue_in_seconds = self.date_interval_to_seconds(adjusted_cue_in)
self.logger.debug("Found media_item that should be playing! Adjust cue point by %ss" % adjusted_cue_in_seconds)
self.push_to_liquidsoap(media_item, adjusted_cue_in_seconds)
elif len(liquidsoap_queue_approx) == 0 and tnow <= item_start:
if next_media_item_start_time is None:
next_media_item_start_time = item_start
elif item_start < next_media_item_start_time:
next_media_item_start_time = item_start
elif len(liquidsoap_queue_approx) == 0 and tnow <= item_start and item_start < tcoming:
self.logger.debug('Preparing to push media item scheduled at: %s', key)
if self.push_to_liquidsoap(media_item, None):
self.logger.debug("Pushed to liquidsoap, updating 'played' status.")
if self.time_until_next_play < 1:
self.time_until_next_play = self.date_interval_to_seconds(next_media_item_start_time - tnow)
chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
time_until_next_play = self.date_interval_to_seconds(chain_start - tnow)
self.logger.debug("Blocking %s seconds until show start", time_until_next_play)
else:
self.time_until_next_play = self.date_interval_to_seconds(next_media_item_start_time - tnow)/2
"""
self.logger.debug("Got a schedule, and next item occurs in")
except Empty, e:
"""
Timeout occurred, meaning the schedule is ready to start!
"""
self.logger.debug('Preparing to push media item scheduled at: %s', key)
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
if len(liquidsoap_queue_approx) < 2:
if self.push_to_liquidsoap(media_item, None):
self.logger.debug("Pushed to liquidsoap, updating 'played' status.")
media_item = self.get_next_schedule_item()
next_media_item_start_time = media_item['starts']
self.time_until_next_play = self.date_interval_to_seconds(next_media_item_start_time - tnow)
except Exception, e:
self.logger.error(e)
def date_interval_to_seconds(self, interval):
return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / 10**6
def push_to_liquidsoap(self, media_item, adjusted_cue_in=None):
"""
This function looks at the media item, and either pushes it to the Liquidsoap
queue immediately, or if the queue is empty - waits until the start time of the
media item before pushing it.
"""
if adjusted_cue_in is not None:
media_item["cue_in"] = adjusted_cue_in + float(media_item["cue_in"])
try:
if media_item["start"] == self.last_end_time:
"""
this media item is attached to the end of the last
track, so let's push it now so that Liquidsoap can start playing
it immediately after (and prepare crossfades if need be).
"""
self.logger.debug("Push track immediately.")
self.telnet_to_liquidsoap(media_item)
self.last_end_time = media_item["end"]
else:
"""
this media item does not start right after a current playing track.
We need to sleep, and then wake up when this track starts.
"""
self.logger.debug("sleep until track start.")
self.sleep_until_start(media_item)
self.telnet_to_liquidsoap(media_item)
self.last_end_time = media_item["end"]
except Exception, e:
self.logger.error('Pypo Push Exception: %s', e)
return False
return True
self.logger.debug("Blocking indefinitely since no show scheduled next")
time_until_next_play = None
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
loops += 1
def get_queue_items_from_liquidsoap(self):
"""
This function connects to Liquidsoap to find what media items are in its queue.
"""
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
@ -266,7 +142,6 @@ class PypoPush(Thread):
return liquidsoap_queue_approx
def handle_new_media_schedule(self, media, liquidsoap_queue_approx):
"""
This function's purpose is to gracefully handle situations where
@ -276,47 +151,88 @@ class PypoPush(Thread):
queue.
"""
if len(liquidsoap_queue_approx) == 0:
"""
liquidsoap doesn't have anything in its queue, so we have nothing
to worry about. Life is good.
"""
#iterate through the items we got from the liquidsoap queue and
#see if they are the same as the newly received schedule
iteration = 0
problem_at_iteration = None
for queue_item in liquidsoap_queue_approx:
if queue_item['start'] in media.keys():
if queue_item['id'] == media['start']['id']:
#Everything OK for this iteration.
pass
elif len(liquidsoap_queue_approx) == 1:
queue_item_0_start = liquidsoap_queue_approx[0]['start']
else:
#A different item has been scheduled at the same time!
problem_at_iteration = iteration
break
else:
#There are no more items scheduled for this time!
problem_at_iteration = iteration
break
iteration+=1
if problem_at_iteration is not None:
#The first item in the Liquidsoap queue (the one that is currently playing)
#has changed or been removed from the schedule. We need to clear the entire
#queue, and push the new schedule
self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx)
"""
The purpose of this function is to take a look at the last received schedule from
pypo-fetch and return the next chain of media_items. A chain is defined as a sequence
of media_items where the end time of media_item 'n' is the start time of media_item
'n+1'
"""
def get_next_schedule_chain(self, media_schedule):
chains = []
current_chain = []
for mkey in media_schedule:
media_item = media_schedule[mkey]
if len(current_chain) == 0:
current_chain.append(media_item)
elif media_item['start'] == current_chain[-1]['end']:
current_chain.append(media_item)
else:
#current item is not a continuation of the chain.
#Start a new one instead
chains.append(current_chain)
current_chain = [media_item]
if len(current_chain) > 0:
chains.append(current_chain)
self.logger.debug('media_schedule %s', media_schedule)
self.logger.debug("chains %s", chains)
#all media_items are now divided into chains. Let's find the one that
#starts closest in the future.
tnow = datetime.utcnow()
closest_start = None
closest_chain = None
for chain in chains:
chain_start = datetime.strptime(chain[0]['start'], "%Y-%m-%d-%H-%M-%S")
self.logger.debug("tnow %s, chain_start %s", tnow, chain_start)
if (closest_start == None or chain_start < closest_start) and chain_start > tnow:
closest_start = chain_start
closest_chain = chain
return closest_chain
def date_interval_to_seconds(self, interval):
return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / 10**6
def push_to_liquidsoap(self, media_item_chain):
try:
if liquidsoap_queue_approx[0]['id'] != media[queue_item_0_start]['id']:
"""
liquidsoap's queue does not match the schedule we just received from the Airtime server.
The queue is only of length 1 which means the item in the queue is playing.
Need to do source.skip.
Since only one item, we don't have to worry about the current item ending and us calling
source.skip unintentionally on the next item (there is no next item).
"""
self.logger.debug("%s from ls does not exist in queue new schedule. Removing" % liquidsoap_queue_approx[0]['id'], media)
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
except KeyError, k:
self.logger.debug("%s from ls does not exist in queue schedule: %s Removing" % (queue_item_0_start, media))
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
elif len(liquidsoap_queue_approx) == 2:
queue_item_0_start = liquidsoap_queue_approx[0]['start']
queue_item_1_start = liquidsoap_queue_approx[1]['start']
if queue_item_1_start in media.keys():
if liquidsoap_queue_approx[1]['id'] != media[queue_item_1_start]['id']:
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[1])
else:
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[1])
if queue_item_0_start in media.keys():
if liquidsoap_queue_approx[0]['id'] != media[queue_item_0_start]['id']:
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
else:
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
for media_item in media_item_chain:
self.telnet_to_liquidsoap(media_item)
except Exception, e:
self.logger.error('Pypo Push Exception: %s', e)
def clear_liquidsoap_queue(self):
self.logger.debug("Clearing Liquidsoap queue")
@ -332,15 +248,17 @@ class PypoPush(Thread):
finally:
self.telnet_lock.release()
def remove_from_liquidsoap_queue(self, media_item, do_only_source_skip=False):
if 'queue_id' in media_item:
queue_id = media_item['queue_id']
def remove_from_liquidsoap_queue(self, problem_at_iteration, liquidsoap_queue_approx):
iteration = 0
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = "queue.remove %s\n" % queue_id
for queue_item in liquidsoap_queue_approx:
if iteration >= problem_at_iteration:
msg = "queue.remove %s\n" % queue_item['queue_id']
self.logger.debug(msg)
tn.write(msg)
response = tn.read_until("\r\n").strip("\r\n")
@ -353,6 +271,7 @@ class PypoPush(Thread):
msg = "source.skip\n"
self.logger.debug(msg)
tn.write(msg)
iteration += 1
tn.write("exit\n")
tn.read_all()
@ -361,9 +280,6 @@ class PypoPush(Thread):
finally:
self.telnet_lock.release()
else:
self.logger.error("'queue_id' key doesn't exist in media_item dict()")
def sleep_until_start(self, media_item):
"""
The purpose of this function is to look at the difference between
@ -432,14 +348,7 @@ class PypoPush(Thread):
% (media['id'], float(media['cue_in']), float(media['cue_out']), media['row_id'], media['dst'])
def run(self):
loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
while True:
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
try: self.push()
try: self.main()
except Exception, e:
self.logger.error('Pypo Push Exception: %s', e)
loops += 1