From 0d4a006c19a9d4543ba55b9fce8e99823dac86af Mon Sep 17 00:00:00 2001 From: martin Date: Thu, 3 Mar 2011 00:22:28 -0500 Subject: [PATCH] -refactored pypo-cli into 3 seperate classes --- pypo/pypo-cli.py | 516 ++++------------------------------------------ pypo/pypofetch.py | 313 ++++++++++++++++++++++++++++ pypo/pypopush.py | 204 ++++++++++++++++++ 3 files changed, 555 insertions(+), 478 deletions(-) create mode 100644 pypo/pypofetch.py create mode 100644 pypo/pypopush.py diff --git a/pypo/pypo-cli.py b/pypo/pypo-cli.py index e3ded243e..7a27b13be 100755 --- a/pypo/pypo-cli.py +++ b/pypo/pypo-cli.py @@ -9,32 +9,34 @@ The main functions are "fetch" (./pypo_cli.py -f) and "push" (./pypo_cli.py -p) # python defaults (debian default) import time -import calendar +#import calendar -import os -import traceback + +#import traceback from optparse import * import sys -import time -import datetime +#import datetime import logging import logging.config -import shutil -import urllib -import urllib2 -import pickle -import telnetlib -import random -import string -import operator -import inspect +#import shutil +#import urllib +#import urllib2 +#import pickle +#import telnetlib +#import random +#import string +#import operator +#import inspect + +from pypopush import PypoPush +from pypofetch import PypoFetch # additional modules (should be checked) from configobj import ConfigObj # custom imports -from util import * -from api_clients import * +from util import CueFile +from api_clients import api_client PYPO_VERSION = '1.1' @@ -121,447 +123,6 @@ class Global: print media -class PypoPush: - def __init__(self): - self.api_client = api_client.api_client_factory(config) - self.cue_file = CueFile() - self.set_export_source('scheduler') - - """ - push_ahead2 MUST be < push_ahead. The difference in these two values - gives the number of seconds of the window of opportunity for the scheduler - to catch when a playlist is to be played. - """ - self.push_ahead = 15 - self.push_ahead2 = 10 - - def set_export_source(self, export_source): - self.export_source = export_source - self.cache_dir = config["cache_dir"] + self.export_source + '/' - self.schedule_file = self.cache_dir + 'schedule.pickle' - self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" - - """ - The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid) - checks if there is a playlist that should be scheduled at the current time. - If yes, the temporary liquidsoap playlist gets replaced with the corresponding one, - then liquidsoap is asked (via telnet) to reload and immediately play it. - """ - def push(self, export_source): - logger = logging.getLogger() - - self.schedule = self.load_schedule() - playedItems = self.load_schedule_tracker() - - tcoming = time.localtime(time.time() + self.push_ahead) - tcoming2 = time.localtime(time.time() + self.push_ahead2) - tnow = time.localtime(time.time()) - - str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5]) - str_tcoming2_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming2[0], tcoming2[1], tcoming2[2], tcoming2[3], tcoming2[4], tcoming2[5]) - - if self.schedule == None: - logger.warn('Unable to loop schedule - maybe write in progress?') - logger.warn('Will try again in next loop.') - - else: - for pkey in self.schedule: - playedFlag = (pkey in playedItems) and playedItems[pkey].get("played", 0) - if pkey[0:19] == str_tcoming_s or (pkey[0:19] < str_tcoming_s and pkey[0:19] > str_tcoming2_s and not playedFlag): - logger.debug('Preparing to push playlist scheduled at: %s', pkey) - playlist = self.schedule[pkey] - - ptype = playlist['subtype'] - - # We have a match, replace the current playlist and - # force liquidsoap to refresh. - if (self.push_liquidsoap(pkey, self.schedule, ptype) == 1): - logger.debug("Pushed to liquidsoap, updating 'played' status.") - # Marked the current playlist as 'played' in the schedule tracker - # so it is not called again in the next push loop. - # Write changes back to tracker file. - playedItems[pkey] = playlist - playedItems[pkey]['played'] = 1 - schedule_tracker = open(self.schedule_tracker_file, "w") - pickle.dump(playedItems, schedule_tracker) - schedule_tracker.close() - logger.debug("Wrote schedule to disk: "+str(json.dumps(playedItems))) - - # Call API to update schedule states - logger.debug("Doing callback to server to update 'played' status.") - self.api_client.notify_scheduled_item_start_playing(pkey, self.schedule) - - - def push_liquidsoap(self, pkey, schedule, ptype): - logger = logging.getLogger() - src = self.cache_dir + str(pkey) + '/list.lsp' - - try: - if True == os.access(src, os.R_OK): - logger.debug('OK - Can read playlist file') - - pl_file = open(src, "r") - file_content = pl_file.read() - pl_file.close() - logger.debug('file content: %s' % (file_content)) - playlist = json.loads(file_content) - - #strptime returns struct_time in local time - #mktime takes a time_struct and returns a floating point - #gmtime Convert a time expressed in seconds since the epoch to a struct_time in UTC - #mktime: expresses the time in local time, not UTC. It returns a floating point number, for compatibility with time(). - epoch_start = calendar.timegm(time.gmtime(time.mktime(time.strptime(pkey, '%Y-%m-%d-%H-%M-%S')))) - - #Return the time as a floating point number expressed in seconds since the epoch, in UTC. - epoch_now = time.time() - - logger.debug("Epoch start: " + str(epoch_start)) - logger.debug("Epoch now: " + str(epoch_now)) - - sleep_time = epoch_start - epoch_now; - - if sleep_time < 0: - sleep_time = 0 - - logger.debug('sleeping for %s s' % (sleep_time)) - time.sleep(sleep_time) - - tn = telnetlib.Telnet(LS_HOST, 1234) - - #skip the currently playing song if any. - logger.debug("source.skip\n") - tn.write("source.skip\n") - - # Get any extra information for liquidsoap (which will be sent back to us) - liquidsoap_data = self.api_client.get_liquidsoap_data(pkey, schedule) - - #Sending schedule table row id string. - logger.debug("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"]))) - tn.write("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"]))) - - for item in playlist: - annotate = str(item['annotate']) - logger.debug(annotate) - tn.write('queue.push %s' % (annotate)) - tn.write("\n") - - tn.write("exit\n") - logger.debug(tn.read_all()) - - status = 1 - except Exception, e: - logger.error('%s', e) - status = 0 - - return status - - - def load_schedule(self): - logger = logging.getLogger() - schedule = None - - # create the file if it doesnt exist - if (not os.path.exists(self.schedule_file)): - logger.debug('creating file ' + self.schedule_file) - open(self.schedule_file, 'w').close() - else: - # load the schedule from cache - #logger.debug('loading schedule file '+self.schedule_file) - try: - schedule_file = open(self.schedule_file, "r") - schedule = pickle.load(schedule_file) - schedule_file.close() - - except Exception, e: - logger.error('%s', e) - - return schedule - - - def load_schedule_tracker(self): - logger = logging.getLogger() - playedItems = dict() - - # create the file if it doesnt exist - if (not os.path.exists(self.schedule_tracker_file)): - logger.debug('creating file ' + self.schedule_tracker_file) - schedule_tracker = open(self.schedule_tracker_file, 'w') - pickle.dump(playedItems, schedule_tracker) - schedule_tracker.close() - else: - try: - schedule_tracker = open(self.schedule_tracker_file, "r") - playedItems = pickle.load(schedule_tracker) - schedule_tracker.close() - except Exception, e: - logger.error('Unable to load schedule tracker file: %s', e) - - return playedItems - - -class PypoFetch: - def __init__(self): - self.api_client = api_client.api_client_factory(config) - self.cue_file = CueFile() - self.set_export_source('scheduler') - - def set_export_source(self, export_source): - self.export_source = export_source - self.cache_dir = config["cache_dir"] + self.export_source + '/' - self.schedule_file = self.cache_dir + 'schedule.pickle' - self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" - - """ - Fetching part of pypo - - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") - - Saves a serialized file of the schedule - - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied - to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) - - runs the cleanup routine, to get rid of unused cashed files - """ - def fetch(self, export_source): - """ - wrapper script for fetching the whole schedule (in json) - """ - logger = logging.getLogger() - - try: os.mkdir(self.cache_dir) - except Exception, e: pass - - # get schedule - try: - while self.get_schedule() != 1: - logger.warning("failed to read from export url") - time.sleep(1) - - except Exception, e: logger.error("%s", e) - - # prepare the playlists - if config["cue_style"] == 'pre': - try: self.prepare_playlists_cue() - except Exception, e: logger.error("%s", e) - elif config["cue_style"] == 'otf': - try: self.prepare_playlists(self.export_source) - except Exception, e: logger.error("%s", e) - - # cleanup - try: self.cleanup(self.export_source) - except Exception, e: logger.error("%s", e) - - def get_schedule(self): - logger = logging.getLogger() - status, response = self.api_client.get_schedule() - - if status == 1: - logger.info("dump serialized schedule to %s", self.schedule_file) - schedule = response['playlists'] - try: - schedule_file = open(self.schedule_file, "w") - pickle.dump(schedule, schedule_file) - schedule_file.close() - - except Exception, e: - logger.critical("Exception %s", e) - status = 0 - - return status - - - """ - Alternative version of playout preparation. Every playlist entry is - pre-cued if neccessary (cue_in/cue_out != 0) and stored in the - playlist folder. - file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 - """ - def prepare_playlists_cue(self): - logger = logging.getLogger() - - # Load schedule from disk - schedule = self.load_schedule() - - # Dont do anything if schedule is empty - if (not schedule): - logger.debug("Schedule is empty.") - return - - scheduleKeys = sorted(schedule.iterkeys()) - - try: - for pkey in scheduleKeys: - logger.info("found playlist at %s", pkey) - playlist = schedule[pkey] - - # create playlist directory - try: - os.mkdir(self.cache_dir + str(pkey)) - except Exception, e: - pass - - logger.debug('*****************************************') - logger.debug('pkey: ' + str(pkey)) - logger.debug('cached at : ' + self.cache_dir + str(pkey)) - logger.debug('subtype: ' + str(playlist['subtype'])) - logger.debug('played: ' + str(playlist['played'])) - logger.debug('schedule id: ' + str(playlist['schedule_id'])) - logger.debug('duration: ' + str(playlist['duration'])) - logger.debug('source id: ' + str(playlist['x_ident'])) - logger.debug('*****************************************') - - if int(playlist['played']) == 1: - logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey) - - elif int(playlist['subtype']) > 0 and int(playlist['subtype']) < 5: - ls_playlist = self.handle_media_file(playlist, pkey) - - # write playlist file - plfile = open(self.cache_dir + str(pkey) + '/list.lsp', "w") - plfile.write(json.dumps(ls_playlist)) - plfile.close() - logger.info('ls playlist file written to %s', self.cache_dir + str(pkey) + '/list.lsp') - - except Exception, e: - logger.info("%s", e) - - def handle_media_file(self, playlist, pkey): - """ - This handles both remote and local files. - Returns an updated ls_playlist string. - """ - ls_playlist = [] - - logger = logging.getLogger() - for media in playlist['medias']: - logger.debug("Processing track %s", media['uri']) - - fileExt = os.path.splitext(media['uri'])[1] - try: - if str(media['cue_in']) == '0' and str(media['cue_out']) == '0': - logger.debug('No cue in/out detected for this file') - dst = "%s%s/%s%s" % (self.cache_dir, str(pkey), str(media['id']), str(fileExt)) - do_cue = False - else: - logger.debug('Cue in/out detected') - dst = "%s%s/%s_cue_%s-%s%s" % \ - (self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt)) - do_cue = True - - # check if it is a remote file, if yes download - if media['uri'][0:4] == 'http': - self.handle_remote_file(media, dst, do_cue) - else: - logger.debug("invalid media uri: %s", media['uri']) - - - if True == os.access(dst, os.R_OK): - # check filesize (avoid zero-byte files) - try: fsize = os.path.getsize(dst) - except Exception, e: - logger.error("%s", e) - fsize = 0 - - if fsize > 0: - pl_entry = \ - 'annotate:export_source="%s",media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",schedule_table_id="%s":%s'\ - % (str(media['export_source']), media['id'], 0, str(float(media['fade_in']) / 1000), \ - str(float(media['fade_out']) / 1000), media['row_id'],dst) - - logger.debug(pl_entry) - - """ - Tracks are only added to the playlist if they are accessible - on the file system and larger than 0 bytes. - So this can lead to playlists shorter than expectet. - (there is a hardware silence detector for this cases...) - """ - entry = dict() - entry['type'] = 'file' - entry['annotate'] = pl_entry - ls_playlist.append(entry) - - logger.debug("everything ok, adding %s to playlist", pl_entry) - else: - print 'zero-file: ' + dst + ' from ' + media['uri'] - logger.warning("zero-size file - skipping %s. will not add it to playlist", dst) - - else: - logger.warning("something went wrong. file %s not available. will not add it to playlist", dst) - - except Exception, e: logger.info("%s", e) - return ls_playlist - - - def handle_remote_file(self, media, dst, do_cue): - logger = logging.getLogger() - if do_cue == False: - if os.path.isfile(dst): - logger.debug("file already in cache: %s", dst) - else: - logger.debug("try to download %s", media['uri']) - self.api_client.get_media(media['uri'], dst) - - else: - if os.path.isfile(dst): - logger.debug("file already in cache: %s", dst) - - else: - logger.debug("try to download and cue %s", media['uri']) - - fileExt = os.path.splitext(media['uri'])[1] - dst_tmp = config["tmp_dir"] + "".join([random.choice(string.letters) for i in xrange(10)]) + fileExt - self.api_client.get_media(media['uri'], dst_tmp) - - # cue - logger.debug("STARTING CUE") - debugDst = self.cue_file.cue(dst_tmp, dst, float(media['cue_in']) / 1000, float(media['cue_out']) / 1000) - logger.debug(debugDst) - logger.debug("END CUE") - - if True == os.access(dst, os.R_OK): - try: fsize = os.path.getsize(dst) - except Exception, e: - logger.error("%s", e) - fsize = 0 - - if fsize > 0: - logger.debug('try to remove temporary file: %s' + dst_tmp) - try: os.remove(dst_tmp) - except Exception, e: - logger.error("%s", e) - - else: - logger.warning('something went wrong cueing: %s - using uncued file' + dst) - try: os.rename(dst_tmp, dst) - except Exception, e: - logger.error("%s", e) - - - def cleanup(self, export_source): - """ - Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" - and deletes them. - """ - logger = logging.getLogger() - - offset = 3600 * int(config["cache_for"]) - now = time.time() - - for r, d, f in os.walk(self.cache_dir): - for dir in d: - try: - timestamp = time.mktime(time.strptime(dir, "%Y-%m-%d-%H-%M-%S")) - if (now - timestamp) > offset: - try: - logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp) - shutil.rmtree(os.path.join(r, dir)) - except Exception, e: - logger.error("%s", e) - pass - else: - logger.info('sucessfully removed %s', os.path.join(r, dir)) - except Exception, e: - print e - logger.error("%s", e) - if __name__ == '__main__': print '###########################################' @@ -574,18 +135,17 @@ if __name__ == '__main__': g = Global() g.selfcheck() - pp = PypoPush() - pf = PypoFetch() + logger = logging.getLogger() + loops = 0 - while True: - logger = logging.getLogger() - loops = 0 + if options.test: + g.test_api() + sys.exit() - if options.test: - g.test_api() - sys.exit() - - while options.fetch_scheduler: + + if options.fetch_scheduler: + pf = PypoFetch() + while True: try: pf.fetch('scheduler') except Exception, e: print e @@ -596,7 +156,9 @@ if __name__ == '__main__': loops += 1 time.sleep(POLL_INTERVAL) - while options.push_scheduler: + if options.push_scheduler: + pp = PypoPush() + while True: try: pp.push('scheduler') except Exception, e: print 'PUSH ERROR!! WILL EXIT NOW:(' @@ -609,15 +171,13 @@ if __name__ == '__main__': loops += 1 time.sleep(PUSH_INTERVAL) - while options.check: - try: g.check_schedule() - except Exception, e: - print e - sys.exit() + if options.check: + try: g.check_schedule() + except Exception, e: + print e - while options.cleanup: - try: pf.cleanup('scheduler') - except Exception, e: - print e - sys.exit() + if options.cleanup: + try: pf.cleanup('scheduler') + except Exception, e: + print e sys.exit() diff --git a/pypo/pypofetch.py b/pypo/pypofetch.py new file mode 100644 index 000000000..fd79f3b1f --- /dev/null +++ b/pypo/pypofetch.py @@ -0,0 +1,313 @@ +import os +import sys +import time +import logging +import logging.config +import shutil +import pickle +import random +import string +import json + +from api_clients import api_client +from util import CueFile + +from configobj import ConfigObj + +# loading config file +try: + config = ConfigObj('config.cfg') + POLL_INTERVAL = float(config['poll_interval']) + PUSH_INTERVAL = 0.5 + #PUSH_INTERVAL = float(config['push_interval']) + LS_HOST = config['ls_host'] + LS_PORT = config['ls_port'] +except Exception, e: + print 'Error loading config file: ', e + sys.exit() + +class PypoFetch: + def __init__(self): + self.api_client = api_client.api_client_factory(config) + self.cue_file = CueFile() + self.set_export_source('scheduler') + + def set_export_source(self, export_source): + self.export_source = export_source + self.cache_dir = config["cache_dir"] + self.export_source + '/' + self.schedule_file = self.cache_dir + 'schedule.pickle' + self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" + + """ + Fetching part of pypo + - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") + - Saves a serialized file of the schedule + - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied + to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) + - runs the cleanup routine, to get rid of unused cashed files + """ + def fetch(self, export_source): + """ + wrapper script for fetching the whole schedule (in json) + """ + logger = logging.getLogger() + + try: os.mkdir(self.cache_dir) + except Exception, e: pass + + # get schedule + try: + while self.get_schedule() != 1: + logger.warning("failed to read from export url") + time.sleep(1) + + except Exception, e: logger.error("%s", e) + + # prepare the playlists + if config["cue_style"] == 'pre': + try: self.prepare_playlists_cue() + except Exception, e: logger.error("%s", e) + elif config["cue_style"] == 'otf': + try: self.prepare_playlists(self.export_source) + except Exception, e: logger.error("%s", e) + + # cleanup + try: self.cleanup(self.export_source) + except Exception, e: logger.error("%s", e) + + def get_schedule(self): + logger = logging.getLogger() + status, response = self.api_client.get_schedule() + + if status == 1: + logger.info("dump serialized schedule to %s", self.schedule_file) + schedule = response['playlists'] + try: + schedule_file = open(self.schedule_file, "w") + pickle.dump(schedule, schedule_file) + schedule_file.close() + + except Exception, e: + logger.critical("Exception %s", e) + status = 0 + + return status + + #TODO this is a duplicate function!!! + def load_schedule(self): + logger = logging.getLogger() + schedule = None + + # create the file if it doesnt exist + if (not os.path.exists(self.schedule_file)): + logger.debug('creating file ' + self.schedule_file) + open(self.schedule_file, 'w').close() + else: + # load the schedule from cache + #logger.debug('loading schedule file '+self.schedule_file) + try: + schedule_file = open(self.schedule_file, "r") + schedule = pickle.load(schedule_file) + schedule_file.close() + + except Exception, e: + logger.error('%s', e) + + return schedule + + + """ + Alternative version of playout preparation. Every playlist entry is + pre-cued if neccessary (cue_in/cue_out != 0) and stored in the + playlist folder. + file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 + """ + def prepare_playlists_cue(self): + logger = logging.getLogger() + + # Load schedule from disk + schedule = self.load_schedule() + + # Dont do anything if schedule is empty + if (not schedule): + logger.debug("Schedule is empty.") + return + + scheduleKeys = sorted(schedule.iterkeys()) + + try: + for pkey in scheduleKeys: + logger.info("found playlist at %s", pkey) + playlist = schedule[pkey] + + # create playlist directory + try: + os.mkdir(self.cache_dir + str(pkey)) + except Exception, e: + pass + + logger.debug('*****************************************') + logger.debug('pkey: ' + str(pkey)) + logger.debug('cached at : ' + self.cache_dir + str(pkey)) + logger.debug('subtype: ' + str(playlist['subtype'])) + logger.debug('played: ' + str(playlist['played'])) + logger.debug('schedule id: ' + str(playlist['schedule_id'])) + logger.debug('duration: ' + str(playlist['duration'])) + logger.debug('source id: ' + str(playlist['x_ident'])) + logger.debug('*****************************************') + + if int(playlist['played']) == 1: + logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey) + + elif int(playlist['subtype']) > 0 and int(playlist['subtype']) < 5: + ls_playlist = self.handle_media_file(playlist, pkey) + + # write playlist file + plfile = open(self.cache_dir + str(pkey) + '/list.lsp', "w") + plfile.write(json.dumps(ls_playlist)) + plfile.close() + logger.info('ls playlist file written to %s', self.cache_dir + str(pkey) + '/list.lsp') + + except Exception, e: + logger.info("%s", e) + + def handle_media_file(self, playlist, pkey): + """ + This handles both remote and local files. + Returns an updated ls_playlist string. + """ + ls_playlist = [] + + logger = logging.getLogger() + for media in playlist['medias']: + logger.debug("Processing track %s", media['uri']) + + fileExt = os.path.splitext(media['uri'])[1] + try: + if str(media['cue_in']) == '0' and str(media['cue_out']) == '0': + logger.debug('No cue in/out detected for this file') + dst = "%s%s/%s%s" % (self.cache_dir, str(pkey), str(media['id']), str(fileExt)) + do_cue = False + else: + logger.debug('Cue in/out detected') + dst = "%s%s/%s_cue_%s-%s%s" % \ + (self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt)) + do_cue = True + + # check if it is a remote file, if yes download + if media['uri'][0:4] == 'http': + self.handle_remote_file(media, dst, do_cue) + else: + logger.debug("invalid media uri: %s", media['uri']) + + + if True == os.access(dst, os.R_OK): + # check filesize (avoid zero-byte files) + try: fsize = os.path.getsize(dst) + except Exception, e: + logger.error("%s", e) + fsize = 0 + + if fsize > 0: + pl_entry = \ + 'annotate:export_source="%s",media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",schedule_table_id="%s":%s'\ + % (str(media['export_source']), media['id'], 0, str(float(media['fade_in']) / 1000), \ + str(float(media['fade_out']) / 1000), media['row_id'],dst) + + logger.debug(pl_entry) + + """ + Tracks are only added to the playlist if they are accessible + on the file system and larger than 0 bytes. + So this can lead to playlists shorter than expectet. + (there is a hardware silence detector for this cases...) + """ + entry = dict() + entry['type'] = 'file' + entry['annotate'] = pl_entry + ls_playlist.append(entry) + + logger.debug("everything ok, adding %s to playlist", pl_entry) + else: + print 'zero-file: ' + dst + ' from ' + media['uri'] + logger.warning("zero-size file - skipping %s. will not add it to playlist", dst) + + else: + logger.warning("something went wrong. file %s not available. will not add it to playlist", dst) + + except Exception, e: logger.info("%s", e) + return ls_playlist + + + def handle_remote_file(self, media, dst, do_cue): + logger = logging.getLogger() + if do_cue == False: + if os.path.isfile(dst): + logger.debug("file already in cache: %s", dst) + else: + logger.debug("try to download %s", media['uri']) + self.api_client.get_media(media['uri'], dst) + + else: + if os.path.isfile(dst): + logger.debug("file already in cache: %s", dst) + + else: + logger.debug("try to download and cue %s", media['uri']) + + fileExt = os.path.splitext(media['uri'])[1] + dst_tmp = config["tmp_dir"] + "".join([random.choice(string.letters) for i in xrange(10)]) + fileExt + self.api_client.get_media(media['uri'], dst_tmp) + + # cue + logger.debug("STARTING CUE") + debugDst = self.cue_file.cue(dst_tmp, dst, float(media['cue_in']) / 1000, float(media['cue_out']) / 1000) + logger.debug(debugDst) + logger.debug("END CUE") + + if True == os.access(dst, os.R_OK): + try: fsize = os.path.getsize(dst) + except Exception, e: + logger.error("%s", e) + fsize = 0 + + if fsize > 0: + logger.debug('try to remove temporary file: %s' + dst_tmp) + try: os.remove(dst_tmp) + except Exception, e: + logger.error("%s", e) + + else: + logger.warning('something went wrong cueing: %s - using uncued file' + dst) + try: os.rename(dst_tmp, dst) + except Exception, e: + logger.error("%s", e) + + + def cleanup(self, export_source): + """ + Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" + and deletes them. + """ + logger = logging.getLogger() + + offset = 3600 * int(config["cache_for"]) + now = time.time() + + for r, d, f in os.walk(self.cache_dir): + for dir in d: + try: + timestamp = time.mktime(time.strptime(dir, "%Y-%m-%d-%H-%M-%S")) + if (now - timestamp) > offset: + try: + logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp) + shutil.rmtree(os.path.join(r, dir)) + except Exception, e: + logger.error("%s", e) + pass + else: + logger.info('sucessfully removed %s', os.path.join(r, dir)) + except Exception, e: + print e + logger.error("%s", e) + diff --git a/pypo/pypopush.py b/pypo/pypopush.py new file mode 100644 index 000000000..3b2a39d67 --- /dev/null +++ b/pypo/pypopush.py @@ -0,0 +1,204 @@ +import os +import sys +import time +import logging +import logging.config +import pickle +import telnetlib +import calendar +import json + +from api_clients import api_client +from util import CueFile + +from configobj import ConfigObj + +# loading config file +try: + config = ConfigObj('config.cfg') + POLL_INTERVAL = float(config['poll_interval']) + PUSH_INTERVAL = 0.5 + #PUSH_INTERVAL = float(config['push_interval']) + LS_HOST = config['ls_host'] + LS_PORT = config['ls_port'] +except Exception, e: + print 'Error loading config file: ', e + sys.exit() + +class PypoPush: + def __init__(self): + self.api_client = api_client.api_client_factory(config) + self.cue_file = CueFile() + self.set_export_source('scheduler') + + """ + push_ahead2 MUST be < push_ahead. The difference in these two values + gives the number of seconds of the window of opportunity for the scheduler + to catch when a playlist is to be played. + """ + self.push_ahead = 15 + self.push_ahead2 = 10 + + def set_export_source(self, export_source): + self.export_source = export_source + self.cache_dir = config["cache_dir"] + self.export_source + '/' + self.schedule_file = self.cache_dir + 'schedule.pickle' + self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" + + """ + The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid) + checks if there is a playlist that should be scheduled at the current time. + If yes, the temporary liquidsoap playlist gets replaced with the corresponding one, + then liquidsoap is asked (via telnet) to reload and immediately play it. + """ + def push(self, export_source): + logger = logging.getLogger() + + self.schedule = self.load_schedule() + playedItems = self.load_schedule_tracker() + + tcoming = time.localtime(time.time() + self.push_ahead) + tcoming2 = time.localtime(time.time() + self.push_ahead2) + tnow = time.localtime(time.time()) + + str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5]) + str_tcoming2_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming2[0], tcoming2[1], tcoming2[2], tcoming2[3], tcoming2[4], tcoming2[5]) + + if self.schedule == None: + logger.warn('Unable to loop schedule - maybe write in progress?') + logger.warn('Will try again in next loop.') + + else: + for pkey in self.schedule: + playedFlag = (pkey in playedItems) and playedItems[pkey].get("played", 0) + if pkey[0:19] == str_tcoming_s or (pkey[0:19] < str_tcoming_s and pkey[0:19] > str_tcoming2_s and not playedFlag): + logger.debug('Preparing to push playlist scheduled at: %s', pkey) + playlist = self.schedule[pkey] + + ptype = playlist['subtype'] + + # We have a match, replace the current playlist and + # force liquidsoap to refresh. + if (self.push_liquidsoap(pkey, self.schedule, ptype) == 1): + logger.debug("Pushed to liquidsoap, updating 'played' status.") + # Marked the current playlist as 'played' in the schedule tracker + # so it is not called again in the next push loop. + # Write changes back to tracker file. + playedItems[pkey] = playlist + playedItems[pkey]['played'] = 1 + schedule_tracker = open(self.schedule_tracker_file, "w") + pickle.dump(playedItems, schedule_tracker) + schedule_tracker.close() + logger.debug("Wrote schedule to disk: "+str(json.dumps(playedItems))) + + # Call API to update schedule states + logger.debug("Doing callback to server to update 'played' status.") + self.api_client.notify_scheduled_item_start_playing(pkey, self.schedule) + + + def push_liquidsoap(self, pkey, schedule, ptype): + logger = logging.getLogger() + src = self.cache_dir + str(pkey) + '/list.lsp' + + try: + if True == os.access(src, os.R_OK): + logger.debug('OK - Can read playlist file') + + pl_file = open(src, "r") + file_content = pl_file.read() + pl_file.close() + logger.debug('file content: %s' % (file_content)) + playlist = json.loads(file_content) + + #strptime returns struct_time in local time + #mktime takes a time_struct and returns a floating point + #gmtime Convert a time expressed in seconds since the epoch to a struct_time in UTC + #mktime: expresses the time in local time, not UTC. It returns a floating point number, for compatibility with time(). + epoch_start = calendar.timegm(time.gmtime(time.mktime(time.strptime(pkey, '%Y-%m-%d-%H-%M-%S')))) + + #Return the time as a floating point number expressed in seconds since the epoch, in UTC. + epoch_now = time.time() + + logger.debug("Epoch start: " + str(epoch_start)) + logger.debug("Epoch now: " + str(epoch_now)) + + sleep_time = epoch_start - epoch_now; + + if sleep_time < 0: + sleep_time = 0 + + logger.debug('sleeping for %s s' % (sleep_time)) + time.sleep(sleep_time) + + tn = telnetlib.Telnet(LS_HOST, 1234) + + #skip the currently playing song if any. + logger.debug("source.skip\n") + tn.write("source.skip\n") + + # Get any extra information for liquidsoap (which will be sent back to us) + liquidsoap_data = self.api_client.get_liquidsoap_data(pkey, schedule) + + #Sending schedule table row id string. + logger.debug("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"]))) + tn.write("vars.pypo_data %s\n"%(str(liquidsoap_data["schedule_id"]))) + + for item in playlist: + annotate = str(item['annotate']) + logger.debug(annotate) + tn.write('queue.push %s' % (annotate)) + tn.write("\n") + + tn.write("exit\n") + logger.debug(tn.read_all()) + + status = 1 + except Exception, e: + logger.error('%s', e) + status = 0 + + return status + + + def load_schedule(self): + logger = logging.getLogger() + schedule = None + + # create the file if it doesnt exist + if (not os.path.exists(self.schedule_file)): + logger.debug('creating file ' + self.schedule_file) + open(self.schedule_file, 'w').close() + else: + # load the schedule from cache + #logger.debug('loading schedule file '+self.schedule_file) + try: + schedule_file = open(self.schedule_file, "r") + schedule = pickle.load(schedule_file) + schedule_file.close() + + except Exception, e: + logger.error('%s', e) + + return schedule + + + def load_schedule_tracker(self): + logger = logging.getLogger() + playedItems = dict() + + # create the file if it doesnt exist + if (not os.path.exists(self.schedule_tracker_file)): + logger.debug('creating file ' + self.schedule_tracker_file) + schedule_tracker = open(self.schedule_tracker_file, 'w') + pickle.dump(playedItems, schedule_tracker) + schedule_tracker.close() + else: + try: + schedule_tracker = open(self.schedule_tracker_file, "r") + playedItems = pickle.load(schedule_tracker) + schedule_tracker.close() + except Exception, e: + logger.error('Unable to load schedule tracker file: %s', e) + + return playedItems +