CC-3336: Refactor schedule API used by pypo

This commit is contained in:
Martin Konecny 2012-02-27 13:52:35 -05:00
parent 4631e199cc
commit e8f329aef1
4 changed files with 229 additions and 272 deletions

View file

@ -46,17 +46,17 @@ class PypoFetch(Thread):
Thread.__init__(self)
self.api_client = api_client.api_client_factory(config)
self.logger = logging.getLogger();
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
logger.info("Creating cache directory at %s", self.cache_dir)
self.logger.info("Creating cache directory at %s", self.cache_dir)
self.queue = q
self.schedule_data = []
logger = logging.getLogger('fetch')
logger.info("PypoFetch: init complete")
self.logger.info("PypoFetch: init complete")
def init_rabbit_mq(self):
logger = logging.getLogger('fetch')
logger.info("Initializing RabbitMQ stuff")
self.logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
@ -64,7 +64,7 @@ class PypoFetch(Thread):
channel = connection.channel()
self.simple_queue = SimpleQueue(channel, schedule_queue)
except Exception, e:
logger.error(e)
self.logger.error(e)
return False
return True
@ -75,49 +75,46 @@ class PypoFetch(Thread):
"""
def handle_message(self, message):
try:
logger = logging.getLogger('fetch')
logger.info("Received event from RabbitMQ: %s" % message)
self.logger.info("Received event from RabbitMQ: %s" % message)
m = json.loads(message)
command = m['event_type']
logger.info("Handling command: " + command)
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, "scheduler", False)
self.process_schedule(self.schedule_data, False)
elif command == 'update_stream_setting':
logger.info("Updating stream setting...")
self.logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting'])
elif command == 'update_stream_format':
logger.info("Updating stream format...")
self.logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m['stream_format'])
elif command == 'update_station_name':
logger.info("Updating station name...")
self.logger.info("Updating station name...")
self.update_liquidsoap_station_name(m['station_name'])
elif command == 'cancel_current_show':
logger.info("Cancel current show command received...")
self.logger.info("Cancel current show command received...")
self.stop_current_show()
except Exception, e:
logger.error("Exception in handling RabbitMQ message: %s", e)
self.logger.error("Exception in handling RabbitMQ message: %s", e)
def stop_current_show(self):
logger = logging.getLogger('fetch')
logger.debug('Notifying Liquidsoap to stop playback.')
self.logger.debug('Notifying Liquidsoap to stop playback.')
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
tn.write('source.skip\n')
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.debug(e)
logger.debug('Could not connect to liquidsoap')
self.logger.debug(e)
self.logger.debug('Could not connect to liquidsoap')
def regenerateLiquidsoapConf(self, setting):
logger = logging.getLogger('fetch')
existing = {}
# create a temp file
fh = open('/etc/airtime/liquidsoap.cfg', 'r')
logger.info("Reading existing config...")
self.logger.info("Reading existing config...")
# read existing conf file and build dict
while 1:
line = fh.readline()
@ -147,7 +144,7 @@ class PypoFetch(Thread):
#restart flag
restart = False
logger.info("Looking for changes...")
self.logger.info("Looking for changes...")
# look for changes
for s in setting:
if "output_sound_device" in s[u'keyname'] or "icecast_vorbis_metadata" in s[u'keyname']:
@ -155,13 +152,13 @@ class PypoFetch(Thread):
state_change_restart[stream] = False
# This is the case where restart is required no matter what
if (existing[s[u'keyname']] != s[u'value']):
logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
restart = True;
else:
stream, dump = s[u'keyname'].split('_',1)
if "_output" in s[u'keyname']:
if (existing[s[u'keyname']] != s[u'value']):
logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
restart = True;
state_change_restart[stream] = True
elif ( s[u'value'] != 'disabled'):
@ -173,22 +170,22 @@ class PypoFetch(Thread):
if stream not in change:
change[stream] = False
if not (s[u'value'] == existing[s[u'keyname']]):
logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value'])
self.logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value'])
change[stream] = True
# set flag change for sound_device alway True
logger.info("Change:%s, State_Change:%s...", change, state_change_restart)
self.logger.info("Change:%s, State_Change:%s...", change, state_change_restart)
for k, v in state_change_restart.items():
if k == "sound_device" and v:
restart = True
elif v and change[k]:
logger.info("'Need-to-restart' state detected for %s...", k)
self.logger.info("'Need-to-restart' state detected for %s...", k)
restart = True
# rewrite
if restart:
fh = open('/etc/airtime/liquidsoap.cfg', 'w')
logger.info("Rewriting liquidsoap.cfg...")
self.logger.info("Rewriting liquidsoap.cfg...")
fh.write("################################################\n")
fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n")
fh.write("################################################\n")
@ -210,17 +207,16 @@ class PypoFetch(Thread):
fh.close()
# restarting pypo.
# we could just restart liquidsoap but it take more time somehow.
logger.info("Restarting pypo...")
self.logger.info("Restarting pypo...")
sys.exit(0)
else:
logger.info("No change detected in setting...")
self.logger.info("No change detected in setting...")
self.update_liquidsoap_connection_status()
"""
updates the status of liquidsoap connection to the streaming server
This fucntion updates the bootup time variable in liquidsoap script
"""
def update_liquidsoap_connection_status(self):
logger = logging.getLogger('fetch')
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
# update the boot up time of liquidsoap. Since liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
@ -238,7 +234,7 @@ class PypoFetch(Thread):
# streamin info is in the form of:
# eg. s1:true,2:true,3:false
streams = stream_info.split(",")
logger.info(streams)
self.logger.info(streams)
fake_time = current_time + 1
for s in streams:
@ -252,33 +248,31 @@ class PypoFetch(Thread):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
logger = logging.getLogger('fetch')
logger.info(LS_HOST)
logger.info(LS_PORT)
self.logger.info(LS_HOST)
self.logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
logger.info(command)
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
self.logger.error("Exception %s", e)
def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
logger = logging.getLogger('fetch')
logger.info(LS_HOST)
logger.info(LS_PORT)
self.logger.info(LS_HOST)
self.logger.info(LS_PORT)
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
logger.info(command)
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
self.logger.error("Exception %s", e)
"""
Process the schedule
@ -289,166 +283,161 @@ class PypoFetch(Thread):
- runs the cleanup routine, to get rid of unused cached files
"""
def process_schedule(self, schedule_data, bootstrapping):
logger = logging.getLogger('fetch')
playlists = schedule_data["playlists"]
media = schedule_data["media"]
# Download all the media and put playlists in liquidsoap "annotate" format
try:
liquidsoap_playlists = self.prepare_playlists(playlists, bootstrapping)
except Exception, e: logger.error("%s", e)
media = self.prepare_media(media, bootstrapping)
except Exception, e: self.logger.error("%s", e)
# Send the data to pypo-push
scheduled_data = dict()
scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists
scheduled_data['schedule'] = playlists
self.queue.put(scheduled_data)
scheduled_data['liquidsoap_annotation_queue'] = liquidsoap_annotation_queue
self.queue.put(media)
"""
# cleanup
try: self.cleanup()
except Exception, e: logger.error("%s", e)
except Exception, e: self.logger.error("%s", e)
"""
"""
In this function every audio file is cut as necessary (cue_in/cue_out != 0)
and stored in a playlist folder.
file is e.g. 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3
"""
def prepare_playlists(self, playlists, bootstrapping):
logger = logging.getLogger('fetch')
liquidsoap_playlists = dict()
# Dont do anything if playlists is empty
if not playlists:
logger.debug("Schedule is empty.")
return liquidsoap_playlists
scheduleKeys = sorted(playlists.iterkeys())
def prepare_media(self, media, bootstrapping):
"""
Iterate through the list of media items in "media" and
download them.
"""
try:
for pkey in scheduleKeys:
logger.info("Playlist starting at %s", pkey)
playlist = playlists[pkey]
mediaKeys = sorted(media.iterkeys())
for mkey in mediaKeys:
self.logger.debug("Media item starting at %s", mkey)
media_item = media[mkey]
if bootstrapping:
check_for_crash(media_item)
# create playlist directory
try:
os.mkdir(self.cache_dir + str(pkey))
"""
Extract year, month, date from mkey
"""
y_m_d = mkey[0:10]
download_dir = os.mkdir(os.path.join(self.cache_dir, y_m_d))
fileExt = os.path.splitext(media_item['uri'])[1]
dst = os.path.join(download_dir, media_item['id']+fileExt)
except Exception, e:
logger.warning(e)
self.logger.warning(e)
if self.handle_media_file(media_item, dst):
entry = create_liquidsoap_annotation(media_item, dst)
#entry['show_name'] = playlist['show_name']
entry['show_name'] = "TODO"
media_item["annotation"] = entry
ls_playlist = self.handle_media_file(playlist, pkey, bootstrapping)
liquidsoap_playlists[pkey] = ls_playlist
except Exception, e:
logger.error("%s", e)
return liquidsoap_playlists
self.logger.error("%s", e)
return media
def create_liquidsoap_annotation(media, dst):
pl_entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['fade_in']) / 1000, \
float(media['fade_out']) / 1000, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], dst)
"""
Download and cache the media files.
This handles both remote and local files.
Returns an updated ls_playlist string.
"""
def handle_media_file(self, playlist, pkey, bootstrapping):
logger = logging.getLogger('fetch')
"""
Tracks are only added to the playlist if they are accessible
on the file system and larger than 0 bytes.
So this can lead to playlists shorter than expectet.
(there is a hardware silence detector for this cases...)
"""
entry = dict()
entry['type'] = 'file'
entry['annotate'] = pl_entry
return entry
ls_playlist = []
def check_for_crash(media_item):
start = media_item['start']
end = media_item['end']
dtnow = datetime.utcnow()
str_tnow_s = dtnow.strftime('%Y-%m-%d-%H-%M-%S')
sortedKeys = sorted(playlist['medias'].iterkeys())
for key in sortedKeys:
media = playlist['medias'][key]
logger.debug("Processing track %s", media['uri'])
if start <= str_tnow_s and str_tnow_s < end:
#song is currently playing and we just started pypo. Maybe there
#was a power outage? Let's restart playback of this song.
start_split = map(int, start.split('-'))
media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None)
self.logger.debug("Found media item that started at %s.", media_start)
if bootstrapping:
start = media['start']
end = media['end']
if end <= str_tnow_s:
continue
elif start <= str_tnow_s and str_tnow_s < end:
#song is currently playing and we just started pypo. Maybe there
#was a power outage? Let's restart playback of this song.
start_split = map(int, start.split('-'))
media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None)
logger.debug("Found media item that started at %s.", media_start)
delta = dtnow - media_start #we get a TimeDelta object from this operation
logger.info("Starting media item at %d second point", delta.seconds)
media['cue_in'] = delta.seconds + 10
td = timedelta(seconds=10)
playlist['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')
logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S'))
delta = dtnow - media_start #we get a TimeDelta object from this operation
self.logger.info("Starting media item at %d second point", delta.seconds)
"""
Set the cue_in. This is used by Liquidsoap to determine at what point in the media
item it should start playing. If the cue_in happens to be > cue_out, then make cue_in = cue_out
"""
media_item['cue_in'] = delta.seconds + 10 if delta.seconds + 10 < media_item['cue_out'] else media_item['cue_out']
"""
Set the start time, which is used by pypo-push to determine when a media item is scheduled.
Pushing the start time into the future will ensure pypo-push will push this to Liquidsoap.
"""
td = timedelta(seconds=10)
media_item['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')
self.logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S'))
def handle_media_file(self, media_item, dst):
"""
Download and cache the media item.
"""
self.logger.debug("Processing track %s", media_item['uri'])
fileExt = os.path.splitext(media['uri'])[1]
try:
dst = os.path.join(self.cache_dir, pkey, media['id']+fileExt)
# download media file
self.handle_remote_file(media, dst)
if True == os.access(dst, os.R_OK):
# check filesize (avoid zero-byte files)
try: fsize = os.path.getsize(dst)
except Exception, e:
logger.error("%s", e)
fsize = 0
try:
#blocking function to download the media item
self.download_file(media_item, dst)
if os.access(dst, os.R_OK):
# check filesize (avoid zero-byte files)
try:
fsize = os.path.getsize(dst)
if fsize > 0:
pl_entry = \
'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], 0, \
float(media['fade_in']) / 1000, \
float(media['fade_out']) / 1000, \
float(media['cue_in']), \
float(media['cue_out']), \
media['row_id'], dst)
return True
except Exception, e:
self.logger.error("%s", e)
fsize = 0
else:
self.logger.warning("Cannot read file %s.", dst)
"""
Tracks are only added to the playlist if they are accessible
on the file system and larger than 0 bytes.
So this can lead to playlists shorter than expectet.
(there is a hardware silence detector for this cases...)
"""
entry = dict()
entry['type'] = 'file'
entry['annotate'] = pl_entry
entry['show_name'] = playlist['show_name']
ls_playlist.append(entry)
else:
logger.warning("zero-size file - skipping %s. will not add it to playlist at %s", media['uri'], dst)
else:
logger.warning("something went wrong. file %s not available. will not add it to playlist", dst)
except Exception, e: logger.info("%s", e)
return ls_playlist
except Exception, e:
self.logger.info("%s", e)
return False
"""
Download a file from a remote server and store it in the cache.
"""
def handle_remote_file(self, media, dst):
logger = logging.getLogger('fetch')
def download_file(self, media_item, dst):
if os.path.isfile(dst):
pass
#logger.debug("file already in cache: %s", dst)
#self.logger.debug("file already in cache: %s", dst)
else:
logger.debug("try to download %s", media['uri'])
self.api_client.get_media(media['uri'], dst)
self.logger.debug("try to download %s", media_item['uri'])
self.api_client.get_media(media_item['uri'], dst)
"""
Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR"
and deletes them.
"""
def cleanup(self):
logger = logging.getLogger('fetch')
offset = 3600 * int(config["cache_for"])
now = time.time()
@ -458,39 +447,40 @@ class PypoFetch(Thread):
timestamp = calendar.timegm(time.strptime(dir, "%Y-%m-%d-%H-%M-%S"))
if (now - timestamp) > offset:
try:
logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp)
self.logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp)
shutil.rmtree(os.path.join(r, dir))
except Exception, e:
logger.error("%s", e)
self.logger.error("%s", e)
pass
else:
logger.info('sucessfully removed %s', os.path.join(r, dir))
self.logger.info('sucessfully removed %s', os.path.join(r, dir))
except Exception, e:
logger.error(e)
self.logger.error(e)
def main(self):
logger = logging.getLogger('fetch')
try: os.mkdir(self.cache_dir)
except Exception, e: pass
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
status, self.schedule_data = self.api_client.get_schedule()
if status == 1:
logger.info("Bootstrap schedule received: %s", self.schedule_data)
self.process_schedule(self.schedule_data, "scheduler", True)
logger.info("Bootstrap complete: got initial copy of the schedule")
try:
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
success, self.schedule_data = self.api_client.get_schedule()
if success:
self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
self.process_schedule(self.schedule_data, True)
self.logger.info("Bootstrap complete: got initial copy of the schedule")
while not self.init_rabbit_mq():
logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
except Exception, e:
self.logger.error(str(e))
loops = 1
while True:
logger.info("Loop #%s", loops)
self.logger.info("Loop #%s", loops)
try:
try:
message = self.simple_queue.get(block=True)
@ -498,17 +488,17 @@ class PypoFetch(Thread):
# ACK the message to take it off the queue
message.ack()
except MessageStateError, m:
logger.error("Message ACK error: %s", m)
self.logger.error("Message ACK error: %s", m)
except Exception, e:
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
logger.error("Exception, %s", e)
self.logger.error("Exception, %s", e)
status, self.schedule_data = self.api_client.get_schedule()
if status == 1:
self.process_schedule(self.schedule_data, "scheduler", False)
self.process_schedule(self.schedule_data, False)
loops += 1