CC-3470: Smarter way for pypo to recover when it should be playing something.

-fixed
This commit is contained in:
Martin Konecny 2012-03-17 13:55:56 -04:00
parent 6634e9f663
commit 903698efc5
2 changed files with 85 additions and 83 deletions

View File

@ -83,7 +83,7 @@ class PypoFetch(Thread):
if command == 'update_schedule':
self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, False)
self.process_schedule(self.schedule_data)
elif command == 'update_stream_setting':
self.logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting'])
@ -373,7 +373,7 @@ class PypoFetch(Thread):
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
- runs the cleanup routine, to get rid of unused cached files
"""
def process_schedule(self, schedule_data, bootstrapping):
def process_schedule(self, schedule_data):
self.logger.debug(schedule_data)
media = schedule_data["media"]
@ -397,7 +397,7 @@ class PypoFetch(Thread):
media_item['dst'] = dst
self.media_prepare_queue.put(copy.copy(media))
self.prepare_media(media, bootstrapping)
self.prepare_media(media)
except Exception, e: self.logger.error("%s", e)
# Send the data to pypo-push
@ -406,12 +406,12 @@ class PypoFetch(Thread):
# cleanup
try: self.cleanup(media)
try: self.cache_cleanup(media)
except Exception, e: self.logger.error("%s", e)
def prepare_media(self, media, bootstrapping):
def prepare_media(self, media):
"""
Iterate through the list of media items in "media" and
download them.
@ -421,70 +421,14 @@ class PypoFetch(Thread):
for mkey in mediaKeys:
self.logger.debug("Media item starting at %s", mkey)
media_item = media[mkey]
if bootstrapping:
self.check_for_previous_crash(media_item)
entry = self.create_liquidsoap_annotation(media_item)
media_item['show_name'] = "TODO"
media_item["annotation"] = entry
media_item['show_name'] = "TODO"
except Exception, e:
self.logger.error("%s", e)
return media
def create_liquidsoap_annotation(self, media):
"""entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['fade_in']) / 1000, \
float(media['fade_out']) / 1000, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], media['dst'])"""
entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], media['dst'])
return entry
def check_for_previous_crash(self, media_item):
start = media_item['start']
end = media_item['end']
dtnow = datetime.utcnow()
str_tnow_s = dtnow.strftime('%Y-%m-%d-%H-%M-%S')
if start <= str_tnow_s and str_tnow_s < end:
#song is currently playing and we just started pypo. Maybe there
#was a power outage? Let's restart playback of this song.
start_split = map(int, start.split('-'))
media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None)
self.logger.debug("Found media item that started at %s.", media_start)
delta = dtnow - media_start #we get a TimeDelta object from this operation
self.logger.info("Starting media item at %d second point", delta.seconds)
"""
Set the cue_in. This is used by Liquidsoap to determine at what point in the media
item it should start playing. If the cue_in happens to be > cue_out, then make cue_in = cue_out
"""
media_item['cue_in'] = delta.seconds + 10 if delta.seconds + 10 < media_item['cue_out'] else media_item['cue_out']
"""
Set the start time, which is used by pypo-push to determine when a media item is scheduled.
Pushing the start time into the future will ensure pypo-push will push this to Liquidsoap.
"""
td = timedelta(seconds=10)
media_item['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')
self.logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S'))
def handle_media_file(self, media_item, dst):
"""
Download and cache the media item.
@ -568,7 +512,7 @@ class PypoFetch(Thread):
self.api_client.get_media(media_item['uri'], dst)
"""
def cleanup(self, media):
def cache_cleanup(self, media):
"""
Get list of all files in the cache dir and remove them if they aren't being used anymore.
Input dict() media, lists all files that are scheduled or currently playing. Not being in this
@ -595,7 +539,7 @@ class PypoFetch(Thread):
success, self.schedule_data = self.api_client.get_schedule()
if success:
self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
self.process_schedule(self.schedule_data, True)
self.process_schedule(self.schedule_data)
self.check_switch_status()
loops = 1
@ -622,7 +566,7 @@ class PypoFetch(Thread):
success, self.schedule_data = self.api_client.get_schedule()
if success:
self.process_schedule(self.schedule_data, False)
self.process_schedule(self.schedule_data)
loops += 1

View File

@ -1,3 +1,6 @@
from datetime import datetime
from datetime import timedelta
import os
import sys
import time
@ -49,7 +52,6 @@ class PypoPush(Thread):
self.telnet_lock = telnet_lock
self.liquidsoap_state_play = True
self.push_ahead = 10
self.last_end_time = 0
@ -81,41 +83,69 @@ class PypoPush(Thread):
media = self.media
self.logger.debug(liquidsoap_queue_approx)
if len(liquidsoap_queue_approx) < MAX_LIQUIDSOAP_QUEUE_LENGTH:
currently_on_air = False
if media:
tnow = datetime.utcnow()
tcoming = tnow + timedelta(seconds=self.push_ahead)
"""
tnow = time.gmtime(timenow)
tcoming = time.gmtime(timenow + self.push_ahead)
str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5])
str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5])
"""
for key in media.keys():
media_item = media[key]
item_start = media_item['start'][0:19]
if str_tnow_s <= item_start and item_start < str_tcoming_s:
item_start = datetime.strptime(media_item['start'][0:19], "%Y-%m-%d-%H-%M-%S")
item_end = datetime.strptime(media_item['end'][0:19], "%Y-%m-%d-%H-%M-%S")
if len(liquidsoap_queue_approx) == 0 and item_start <= tnow and tnow < item_end:
"""
If the media item starts in the next 30 seconds, push it to the queue.
Something is scheduled now, but Liquidsoap is not playing anything! Let's play the current media_item
"""
self.logger.debug("Found media_item that should be playing! Starting...")
adjusted_cue_in = tnow - item_start
adjusted_cue_in_seconds = self.date_interval_to_seconds(adjusted_cue_in)
self.logger.debug("Found media_item that should be playing! Adjust cue point by %ss" % adjusted_cue_in_seconds)
self.push_to_liquidsoap(media_item, adjusted_cue_in_seconds)
elif tnow <= item_start and item_start < tcoming:
"""
If the media item starts in the next 10 seconds, push it to the queue.
"""
self.logger.debug('Preparing to push media item scheduled at: %s', key)
if self.push_to_liquidsoap(media_item):
if self.push_to_liquidsoap(media_item, None):
self.logger.debug("Pushed to liquidsoap, updating 'played' status.")
"""
Temporary solution to make sure we don't push the same track multiple times.
Temporary solution to make sure we don't push the same track multiple times. Not a full solution because if we
get a new schedule, the key becomes available again.
"""
del media[key]
currently_on_air = True
self.liquidsoap_state_play = True
def date_interval_to_seconds(self, interval):
return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / 10**6
def push_to_liquidsoap(self, media_item):
def push_to_liquidsoap(self, media_item, adjusted_cue_in=None):
"""
This function looks at the media item, and either pushes it to the Liquidsoap
queue immediately, or if the queue is empty - waits until the start time of the
media item before pushing it.
"""
"""
if adjusted_cue_in is not None:
media_item["cue_in"] = adjusted_cue_in + float(media_item["cue_in"])
try:
if media_item["start"] == self.last_end_time:
"""
@ -165,8 +195,9 @@ class PypoPush(Thread):
This function connects to Liquidsoap to find what media items are in its queue.
"""
self.telnet_lock.acquire()
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = 'queue.queue\n'
@ -190,7 +221,14 @@ class PypoPush(Thread):
if item in self.pushed_objects:
liquidsoap_queue_approx.append(self.pushed_objects[item])
else:
"""
We should only reach here if Pypo crashed and restarted (because self.pushed_objects was reset). In this case
let's clear the entire Liquidsoap queue.
"""
self.logger.error("ID exists in liquidsoap queue that does not exist in our pushed_objects queue: " + item)
self.clear_liquidsoap_queue()
liquidsoap_queue_approx = []
break
return liquidsoap_queue_approx
@ -249,12 +287,27 @@ class PypoPush(Thread):
else:
self.remove_from_liquidsoap_queue(liquidsoap_queue_approx[0])
def clear_liquidsoap_queue(self):
self.logger.debug("Clearing Liquidsoap queue")
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = "source.skip\n"
tn.write(msg)
tn.write("exit\n")
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def remove_from_liquidsoap_queue(self, media_item, do_only_source_skip=False):
if 'queue_id' in media_item:
queue_id = media_item['queue_id']
self.telnet_lock.acquire()
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = "queue.remove %s\n" % queue_id
tn.write(msg)
@ -311,13 +364,14 @@ class PypoPush(Thread):
about which show is playing.
"""
self.telnet_lock.acquire()
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
annotation = media_item['annotation']
annotation = self.create_liquidsoap_annotation(media_item)
msg = 'queue.push %s\n' % annotation.encode('utf-8')
self.logger.debug(msg)
tn.write(msg)
@ -341,6 +395,10 @@ class PypoPush(Thread):
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def create_liquidsoap_annotation(self, media):
return 'annotate:media_id="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], float(media['cue_in']), float(media['cue_out']), media['row_id'], media['dst'])
def run(self):
loops = 0