diff --git a/airtime_mvc/application/models/Schedule.php b/airtime_mvc/application/models/Schedule.php index 536655c57..513230c5c 100644 --- a/airtime_mvc/application/models/Schedule.php +++ b/airtime_mvc/application/models/Schedule.php @@ -423,46 +423,22 @@ class Application_Model_Schedule { $rows = array(); $sql = "SELECT st.file_id AS file_id," - ." st.starts AS starts," - ." st.ends AS ends," + ." st.starts AS start," + ." st.ends AS end," ." si.starts as show_start," ." si.ends as show_end" ." FROM $CC_CONFIG[scheduleTable] as st" ." LEFT JOIN $CC_CONFIG[showInstances] as si" ." ON st.instance_id = si.id" - ." ORDER BY starts"; - - - /* - $sql = "SELECT pt.creator as creator" - ." st.file_id AS file_id" - ." st.starts AS starts" - ." st.ends AS ends" - ." st.name as show_name" - ." si.starts as show_start" - ." si.ends as show_end" - ." FROM $CC_CONFIG[scheduleTable] as st" - ." LEFT JOIN $CC_CONFIG[showInstances] as si" - ." ON st.instance_id = si.id" - ." LEFT JOIN $CC_CONFIG[showTable] as sh" - ." ON si.show_id = sh.id" - //The next line ensures we only get songs that haven't ended yet - ." WHERE (st.ends >= TIMESTAMP '$p_currentDateTime')" - ." AND (st.ends <= TIMESTAMP '$p_toDateTime')" - //next line makes sure that we aren't returning items that - //are past the show's scheduled timeslot. - ." AND (st.starts < si.ends)" - ." ORDER BY starts"; - * */ + ." ORDER BY start"; + Logging::log($sql); + $rows = $CC_DBC->GetAll($sql); - if (!PEAR::isError($rows)) { - foreach ($rows as &$row) { - $row["start"] = $row["starts"]; - $row["end"] = $row["ends"]; - } + if (PEAR::isError($rows)) { + return null; } - + return $rows; } @@ -506,13 +482,16 @@ class Application_Model_Schedule { $data = array(); $utcTimeZone = new DateTimeZone("UTC"); + $data["status"] = array(); + $data["media"] = array(); + foreach ($items as $item){ $storedFile = Application_Model_StoredFile::Recall($item["file_id"]); $uri = $storedFile->getFileUrlUsingConfigAddress(); $showEndDateTime = new DateTime($item["show_end"], $utcTimeZone); - $trackEndDateTime = new DateTime($item["ends"], $utcTimeZone); + $trackEndDateTime = new DateTime($item["end"], $utcTimeZone); /* Note: cue_out and end are always the same. */ /* TODO: Not all tracks will have "show_end" */ @@ -523,16 +502,16 @@ class Application_Model_Schedule { $item["cue_out"] = $item["cue_out"] - $diff; } - $starts = Application_Model_Schedule::AirtimeTimeToPypoTime($item["starts"]); - $data[$starts] = array( + $start = Application_Model_Schedule::AirtimeTimeToPypoTime($item["start"]); + $data["media"][$start] = array( 'id' => $storedFile->getGunid(), 'uri' => $uri, 'fade_in' => Application_Model_Schedule::WallTimeToMillisecs($item["fade_in"]), 'fade_out' => Application_Model_Schedule::WallTimeToMillisecs($item["fade_out"]), 'cue_in' => Application_Model_DateHelper::CalculateLengthInSeconds($item["cue_in"]), 'cue_out' => Application_Model_DateHelper::CalculateLengthInSeconds($item["cue_out"]), - 'start' => $starts, - 'end' => Application_Model_Schedule::AirtimeTimeToPypoTime($item["ends"]) + 'start' => $start, + 'end' => Application_Model_Schedule::AirtimeTimeToPypoTime($item["end"]) ); } diff --git a/python_apps/api_clients/api_client.py b/python_apps/api_clients/api_client.py index c6bd1763f..e33d612f0 100755 --- a/python_apps/api_clients/api_client.py +++ b/python_apps/api_clients/api_client.py @@ -255,15 +255,15 @@ class AirTimeApiClient(ApiClientInterface): export_url = export_url.replace('%%api_key%%', self.config["api_key"]) response = "" - status = 0 try: response_json = self.get_response_from_server(export_url) response = json.loads(response_json) - status = response['check'] + success = True except Exception, e: logger.error(e) + success = False - return status, response + return success, response def get_media(self, uri, dst): diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index a985e79cc..6919a78f2 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -46,17 +46,17 @@ class PypoFetch(Thread): Thread.__init__(self) self.api_client = api_client.api_client_factory(config) + self.logger = logging.getLogger(); + self.cache_dir = os.path.join(config["cache_dir"], "scheduler") - logger.info("Creating cache directory at %s", self.cache_dir) + self.logger.info("Creating cache directory at %s", self.cache_dir) self.queue = q self.schedule_data = [] - logger = logging.getLogger('fetch') - logger.info("PypoFetch: init complete") + self.logger.info("PypoFetch: init complete") def init_rabbit_mq(self): - logger = logging.getLogger('fetch') - logger.info("Initializing RabbitMQ stuff") + self.logger.info("Initializing RabbitMQ stuff") try: schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") @@ -64,7 +64,7 @@ class PypoFetch(Thread): channel = connection.channel() self.simple_queue = SimpleQueue(channel, schedule_queue) except Exception, e: - logger.error(e) + self.logger.error(e) return False return True @@ -75,49 +75,46 @@ class PypoFetch(Thread): """ def handle_message(self, message): try: - logger = logging.getLogger('fetch') - logger.info("Received event from RabbitMQ: %s" % message) + self.logger.info("Received event from RabbitMQ: %s" % message) m = json.loads(message) command = m['event_type'] - logger.info("Handling command: " + command) + self.logger.info("Handling command: " + command) if command == 'update_schedule': self.schedule_data = m['schedule'] - self.process_schedule(self.schedule_data, "scheduler", False) + self.process_schedule(self.schedule_data, False) elif command == 'update_stream_setting': - logger.info("Updating stream setting...") + self.logger.info("Updating stream setting...") self.regenerateLiquidsoapConf(m['setting']) elif command == 'update_stream_format': - logger.info("Updating stream format...") + self.logger.info("Updating stream format...") self.update_liquidsoap_stream_format(m['stream_format']) elif command == 'update_station_name': - logger.info("Updating station name...") + self.logger.info("Updating station name...") self.update_liquidsoap_station_name(m['station_name']) elif command == 'cancel_current_show': - logger.info("Cancel current show command received...") + self.logger.info("Cancel current show command received...") self.stop_current_show() except Exception, e: - logger.error("Exception in handling RabbitMQ message: %s", e) + self.logger.error("Exception in handling RabbitMQ message: %s", e) def stop_current_show(self): - logger = logging.getLogger('fetch') - logger.debug('Notifying Liquidsoap to stop playback.') + self.logger.debug('Notifying Liquidsoap to stop playback.') try: tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn.write('source.skip\n') tn.write('exit\n') tn.read_all() except Exception, e: - logger.debug(e) - logger.debug('Could not connect to liquidsoap') + self.logger.debug(e) + self.logger.debug('Could not connect to liquidsoap') def regenerateLiquidsoapConf(self, setting): - logger = logging.getLogger('fetch') existing = {} # create a temp file fh = open('/etc/airtime/liquidsoap.cfg', 'r') - logger.info("Reading existing config...") + self.logger.info("Reading existing config...") # read existing conf file and build dict while 1: line = fh.readline() @@ -147,7 +144,7 @@ class PypoFetch(Thread): #restart flag restart = False - logger.info("Looking for changes...") + self.logger.info("Looking for changes...") # look for changes for s in setting: if "output_sound_device" in s[u'keyname'] or "icecast_vorbis_metadata" in s[u'keyname']: @@ -155,13 +152,13 @@ class PypoFetch(Thread): state_change_restart[stream] = False # This is the case where restart is required no matter what if (existing[s[u'keyname']] != s[u'value']): - logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) + self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) restart = True; else: stream, dump = s[u'keyname'].split('_',1) if "_output" in s[u'keyname']: if (existing[s[u'keyname']] != s[u'value']): - logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) + self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) restart = True; state_change_restart[stream] = True elif ( s[u'value'] != 'disabled'): @@ -173,22 +170,22 @@ class PypoFetch(Thread): if stream not in change: change[stream] = False if not (s[u'value'] == existing[s[u'keyname']]): - logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value']) + self.logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value']) change[stream] = True # set flag change for sound_device alway True - logger.info("Change:%s, State_Change:%s...", change, state_change_restart) + self.logger.info("Change:%s, State_Change:%s...", change, state_change_restart) for k, v in state_change_restart.items(): if k == "sound_device" and v: restart = True elif v and change[k]: - logger.info("'Need-to-restart' state detected for %s...", k) + self.logger.info("'Need-to-restart' state detected for %s...", k) restart = True # rewrite if restart: fh = open('/etc/airtime/liquidsoap.cfg', 'w') - logger.info("Rewriting liquidsoap.cfg...") + self.logger.info("Rewriting liquidsoap.cfg...") fh.write("################################################\n") fh.write("# THIS FILE IS AUTO GENERATED. DO NOT CHANGE!! #\n") fh.write("################################################\n") @@ -210,17 +207,16 @@ class PypoFetch(Thread): fh.close() # restarting pypo. # we could just restart liquidsoap but it take more time somehow. - logger.info("Restarting pypo...") + self.logger.info("Restarting pypo...") sys.exit(0) else: - logger.info("No change detected in setting...") + self.logger.info("No change detected in setting...") self.update_liquidsoap_connection_status() """ updates the status of liquidsoap connection to the streaming server This fucntion updates the bootup time variable in liquidsoap script """ def update_liquidsoap_connection_status(self): - logger = logging.getLogger('fetch') tn = telnetlib.Telnet(LS_HOST, LS_PORT) # update the boot up time of liquidsoap. Since liquidsoap is not restarting, # we are manually adjusting the bootup time variable so the status msg will get @@ -238,7 +234,7 @@ class PypoFetch(Thread): # streamin info is in the form of: # eg. s1:true,2:true,3:false streams = stream_info.split(",") - logger.info(streams) + self.logger.info(streams) fake_time = current_time + 1 for s in streams: @@ -252,33 +248,31 @@ class PypoFetch(Thread): # Push stream metadata to liquidsoap # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! try: - logger = logging.getLogger('fetch') - logger.info(LS_HOST) - logger.info(LS_PORT) + self.logger.info(LS_HOST) + self.logger.info(LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT) command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') - logger.info(command) + self.logger.info(command) tn.write(command) tn.write('exit\n') tn.read_all() except Exception, e: - logger.error("Exception %s", e) + self.logger.error("Exception %s", e) def update_liquidsoap_station_name(self, station_name): # Push stream metadata to liquidsoap # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! try: - logger = logging.getLogger('fetch') - logger.info(LS_HOST) - logger.info(LS_PORT) + self.logger.info(LS_HOST) + self.logger.info(LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT) command = ('vars.station_name %s\n' % station_name).encode('utf-8') - logger.info(command) + self.logger.info(command) tn.write(command) tn.write('exit\n') tn.read_all() except Exception, e: - logger.error("Exception %s", e) + self.logger.error("Exception %s", e) """ Process the schedule @@ -289,166 +283,161 @@ class PypoFetch(Thread): - runs the cleanup routine, to get rid of unused cached files """ def process_schedule(self, schedule_data, bootstrapping): - logger = logging.getLogger('fetch') - playlists = schedule_data["playlists"] + media = schedule_data["media"] # Download all the media and put playlists in liquidsoap "annotate" format try: - liquidsoap_playlists = self.prepare_playlists(playlists, bootstrapping) - except Exception, e: logger.error("%s", e) + media = self.prepare_media(media, bootstrapping) + except Exception, e: self.logger.error("%s", e) # Send the data to pypo-push scheduled_data = dict() - scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists - scheduled_data['schedule'] = playlists - self.queue.put(scheduled_data) + scheduled_data['liquidsoap_annotation_queue'] = liquidsoap_annotation_queue + self.queue.put(media) + """ # cleanup try: self.cleanup() - except Exception, e: logger.error("%s", e) + except Exception, e: self.logger.error("%s", e) + """ + - """ - In this function every audio file is cut as necessary (cue_in/cue_out != 0) - and stored in a playlist folder. - file is e.g. 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 - """ - def prepare_playlists(self, playlists, bootstrapping): - logger = logging.getLogger('fetch') - - liquidsoap_playlists = dict() - - # Dont do anything if playlists is empty - if not playlists: - logger.debug("Schedule is empty.") - return liquidsoap_playlists - - scheduleKeys = sorted(playlists.iterkeys()) - + def prepare_media(self, media, bootstrapping): + """ + Iterate through the list of media items in "media" and + download them. + """ try: - for pkey in scheduleKeys: - logger.info("Playlist starting at %s", pkey) - playlist = playlists[pkey] + mediaKeys = sorted(media.iterkeys()) + for mkey in mediaKeys: + self.logger.debug("Media item starting at %s", mkey) + media_item = media[mkey] + + if bootstrapping: + check_for_crash(media_item) # create playlist directory try: - os.mkdir(self.cache_dir + str(pkey)) + """ + Extract year, month, date from mkey + """ + y_m_d = mkey[0:10] + download_dir = os.mkdir(os.path.join(self.cache_dir, y_m_d)) + fileExt = os.path.splitext(media_item['uri'])[1] + dst = os.path.join(download_dir, media_item['id']+fileExt) except Exception, e: - logger.warning(e) + self.logger.warning(e) + + if self.handle_media_file(media_item, dst): + entry = create_liquidsoap_annotation(media_item, dst) + #entry['show_name'] = playlist['show_name'] + entry['show_name'] = "TODO" + media_item["annotation"] = entry - ls_playlist = self.handle_media_file(playlist, pkey, bootstrapping) - - liquidsoap_playlists[pkey] = ls_playlist except Exception, e: - logger.error("%s", e) - return liquidsoap_playlists + self.logger.error("%s", e) + + return media + + def create_liquidsoap_annotation(media, dst): + pl_entry = \ + 'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \ + % (media['id'], 0, \ + float(media['fade_in']) / 1000, \ + float(media['fade_out']) / 1000, \ + float(media['cue_in']), \ + float(media['cue_out']), \ + media['row_id'], dst) - """ - Download and cache the media files. - This handles both remote and local files. - Returns an updated ls_playlist string. - """ - def handle_media_file(self, playlist, pkey, bootstrapping): - logger = logging.getLogger('fetch') + """ + Tracks are only added to the playlist if they are accessible + on the file system and larger than 0 bytes. + So this can lead to playlists shorter than expectet. + (there is a hardware silence detector for this cases...) + """ + entry = dict() + entry['type'] = 'file' + entry['annotate'] = pl_entry + return entry - ls_playlist = [] + def check_for_crash(media_item): + start = media_item['start'] + end = media_item['end'] dtnow = datetime.utcnow() str_tnow_s = dtnow.strftime('%Y-%m-%d-%H-%M-%S') - - sortedKeys = sorted(playlist['medias'].iterkeys()) - - for key in sortedKeys: - media = playlist['medias'][key] - logger.debug("Processing track %s", media['uri']) + + if start <= str_tnow_s and str_tnow_s < end: + #song is currently playing and we just started pypo. Maybe there + #was a power outage? Let's restart playback of this song. + start_split = map(int, start.split('-')) + media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None) + self.logger.debug("Found media item that started at %s.", media_start) - if bootstrapping: - start = media['start'] - end = media['end'] - - if end <= str_tnow_s: - continue - elif start <= str_tnow_s and str_tnow_s < end: - #song is currently playing and we just started pypo. Maybe there - #was a power outage? Let's restart playback of this song. - start_split = map(int, start.split('-')) - media_start = datetime(start_split[0], start_split[1], start_split[2], start_split[3], start_split[4], start_split[5], 0, None) - logger.debug("Found media item that started at %s.", media_start) - - delta = dtnow - media_start #we get a TimeDelta object from this operation - logger.info("Starting media item at %d second point", delta.seconds) - media['cue_in'] = delta.seconds + 10 - td = timedelta(seconds=10) - playlist['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S') - logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')) + delta = dtnow - media_start #we get a TimeDelta object from this operation + self.logger.info("Starting media item at %d second point", delta.seconds) + """ + Set the cue_in. This is used by Liquidsoap to determine at what point in the media + item it should start playing. If the cue_in happens to be > cue_out, then make cue_in = cue_out + """ + media_item['cue_in'] = delta.seconds + 10 if delta.seconds + 10 < media_item['cue_out'] else media_item['cue_out'] + + """ + Set the start time, which is used by pypo-push to determine when a media item is scheduled. + Pushing the start time into the future will ensure pypo-push will push this to Liquidsoap. + """ + td = timedelta(seconds=10) + media_item['start'] = (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S') + self.logger.info("Crash detected, setting playlist to restart at %s", (dtnow + td).strftime('%Y-%m-%d-%H-%M-%S')) + + def handle_media_file(self, media_item, dst): + """ + Download and cache the media item. + """ + + self.logger.debug("Processing track %s", media_item['uri']) - fileExt = os.path.splitext(media['uri'])[1] - try: - dst = os.path.join(self.cache_dir, pkey, media['id']+fileExt) - - # download media file - self.handle_remote_file(media, dst) - - if True == os.access(dst, os.R_OK): - # check filesize (avoid zero-byte files) - try: fsize = os.path.getsize(dst) - except Exception, e: - logger.error("%s", e) - fsize = 0 - + try: + #blocking function to download the media item + self.download_file(media_item, dst) + + if os.access(dst, os.R_OK): + # check filesize (avoid zero-byte files) + try: + fsize = os.path.getsize(dst) if fsize > 0: - pl_entry = \ - 'annotate:media_id="%s",liq_start_next="%s",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \ - % (media['id'], 0, \ - float(media['fade_in']) / 1000, \ - float(media['fade_out']) / 1000, \ - float(media['cue_in']), \ - float(media['cue_out']), \ - media['row_id'], dst) + return True + except Exception, e: + self.logger.error("%s", e) + fsize = 0 + else: + self.logger.warning("Cannot read file %s.", dst) - """ - Tracks are only added to the playlist if they are accessible - on the file system and larger than 0 bytes. - So this can lead to playlists shorter than expectet. - (there is a hardware silence detector for this cases...) - """ - entry = dict() - entry['type'] = 'file' - entry['annotate'] = pl_entry - entry['show_name'] = playlist['show_name'] - ls_playlist.append(entry) - - else: - logger.warning("zero-size file - skipping %s. will not add it to playlist at %s", media['uri'], dst) - - else: - logger.warning("something went wrong. file %s not available. will not add it to playlist", dst) - - except Exception, e: logger.info("%s", e) - return ls_playlist + except Exception, e: + self.logger.info("%s", e) + + return False """ Download a file from a remote server and store it in the cache. """ - def handle_remote_file(self, media, dst): - logger = logging.getLogger('fetch') + def download_file(self, media_item, dst): if os.path.isfile(dst): pass - #logger.debug("file already in cache: %s", dst) + #self.logger.debug("file already in cache: %s", dst) else: - logger.debug("try to download %s", media['uri']) - self.api_client.get_media(media['uri'], dst) + self.logger.debug("try to download %s", media_item['uri']) + self.api_client.get_media(media_item['uri'], dst) """ Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" and deletes them. """ def cleanup(self): - logger = logging.getLogger('fetch') - offset = 3600 * int(config["cache_for"]) now = time.time() @@ -458,39 +447,40 @@ class PypoFetch(Thread): timestamp = calendar.timegm(time.strptime(dir, "%Y-%m-%d-%H-%M-%S")) if (now - timestamp) > offset: try: - logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp) + self.logger.debug('trying to remove %s - timestamp: %s', os.path.join(r, dir), timestamp) shutil.rmtree(os.path.join(r, dir)) except Exception, e: - logger.error("%s", e) + self.logger.error("%s", e) pass else: - logger.info('sucessfully removed %s', os.path.join(r, dir)) + self.logger.info('sucessfully removed %s', os.path.join(r, dir)) except Exception, e: - logger.error(e) + self.logger.error(e) def main(self): - logger = logging.getLogger('fetch') - try: os.mkdir(self.cache_dir) except Exception, e: pass - # Bootstrap: since we are just starting up, we need to grab the - # most recent schedule. After that we can just wait for updates. - status, self.schedule_data = self.api_client.get_schedule() - if status == 1: - logger.info("Bootstrap schedule received: %s", self.schedule_data) - self.process_schedule(self.schedule_data, "scheduler", True) - logger.info("Bootstrap complete: got initial copy of the schedule") + try: + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + success, self.schedule_data = self.api_client.get_schedule() + if success: + self.logger.info("Bootstrap schedule received: %s", self.schedule_data) + self.process_schedule(self.schedule_data, True) + self.logger.info("Bootstrap complete: got initial copy of the schedule") - while not self.init_rabbit_mq(): - logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") - time.sleep(5) + while not self.init_rabbit_mq(): + self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") + time.sleep(5) + except Exception, e: + self.logger.error(str(e)) loops = 1 while True: - logger.info("Loop #%s", loops) + self.logger.info("Loop #%s", loops) try: try: message = self.simple_queue.get(block=True) @@ -498,17 +488,17 @@ class PypoFetch(Thread): # ACK the message to take it off the queue message.ack() except MessageStateError, m: - logger.error("Message ACK error: %s", m) + self.logger.error("Message ACK error: %s", m) except Exception, e: """ There is a problem with the RabbitMq messenger service. Let's log the error and get the schedule via HTTP polling """ - logger.error("Exception, %s", e) + self.logger.error("Exception, %s", e) status, self.schedule_data = self.api_client.get_schedule() if status == 1: - self.process_schedule(self.schedule_data, "scheduler", False) + self.process_schedule(self.schedule_data, False) loops += 1 diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 1d7c132c9..2ecdbbe6e 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -36,11 +36,10 @@ class PypoPush(Thread): self.api_client = api_client.api_client_factory(config) self.queue = q - self.schedule = dict() - self.playlists = dict() + self.media = dict() self.liquidsoap_state_play = True - self.push_ahead = 10 + self.push_ahead = 30 """ The Push Loop - the push loop periodically checks if there is a playlist @@ -56,35 +55,30 @@ class PypoPush(Thread): if not self.queue.empty(): # make sure we get the latest schedule while not self.queue.empty(): - scheduled_data = self.queue.get() - logger.debug("Received data from pypo-fetch") - self.schedule = scheduled_data['schedule'] - self.playlists = scheduled_data['liquidsoap_playlists'] - - logger.debug('schedule %s' % json.dumps(self.schedule)) - logger.debug('playlists %s' % json.dumps(self.playlists)) + self.media = self.queue.get() + logger.debug("Received data from pypo-fetch") + logger.debug('media %s' % json.dumps(self.media)) - schedule = self.schedule - playlists = self.playlists + media = self.media currently_on_air = False - if schedule: + if media: tnow = time.gmtime(timenow) tcoming = time.gmtime(timenow + self.push_ahead) str_tnow_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tnow[0], tnow[1], tnow[2], tnow[3], tnow[4], tnow[5]) str_tcoming_s = "%04d-%02d-%02d-%02d-%02d-%02d" % (tcoming[0], tcoming[1], tcoming[2], tcoming[3], tcoming[4], tcoming[5]) - for pkey in schedule: - plstart = schedule[pkey]['start'][0:19] - - if str_tnow_s <= plstart and plstart < str_tcoming_s: - logger.debug('Preparing to push playlist scheduled at: %s', pkey) - playlist = schedule[pkey] - - - # We have a match, replace the current playlist and - # force liquidsoap to refresh. - if (self.push_liquidsoap(pkey, schedule, playlists) == 1): + + for media_item in media: + item_start = media_item['start'][0:19] + + if str_tnow_s <= item_start and item_start < str_tcoming_s: + """ + If the media item starts in the next 30 seconds, push it to the queue. + """ + logger.debug('Preparing to push media item scheduled at: %s', pkey) + + if self.push_to_liquidsoap(media_item): logger.debug("Pushed to liquidsoap, updating 'played' status.") currently_on_air = True @@ -93,33 +87,31 @@ class PypoPush(Thread): # Call API to update schedule states logger.debug("Doing callback to server to update 'played' status.") self.api_client.notify_scheduled_item_start_playing(pkey, schedule) + - show_start = schedule[pkey]['show_start'] - show_end = schedule[pkey]['show_end'] - - if show_start <= str_tnow_s and str_tnow_s < show_end: - currently_on_air = True + def push_to_liquidsoap(self, media_item): + if media_item["starts"] == self.last_end_time: """ - If currently_on_air = False but liquidsoap_state_play = True then it means that Liquidsoap may - still be playing audio even though the show has ended ('currently_on_air = False' means no show is scheduled) - See CC-3231. - This is a temporary solution for Airtime 2.0 - """ - if not currently_on_air and self.liquidsoap_state_play: - logger.debug('Notifying Liquidsoap to stop playback.') - try: - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - tn.write('source.skip\n') - tn.write('exit\n') - tn.read_all() - except Exception, e: - logger.debug(e) - logger.debug('Could not connect to liquidsoap') + this media item is attached to the end of the last + track, so let's push it now so that Liquidsoap can start playing + it immediately after (and prepare crossfades if need be). + """ + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + tn.write(str('queue.push %s\n' % media_item["annotation"].encode('utf-8'))) + #TODO: vars.pypo_data + #TODO: vars.show_name + tn.write("exit\n") + + self.last_end_time = media_item["end"] + else: + """ + this media item does not start right after a current playing track. + We need to sleep, and then wake up when this track starts. + """ + + return False - self.liquidsoap_state_play = False - - - def push_liquidsoap(self, pkey, schedule, playlists): + def push_liquidsoap_old(self, pkey, schedule, playlists): logger = logging.getLogger('push') try: @@ -127,10 +119,6 @@ class PypoPush(Thread): plstart = schedule[pkey]['start'][0:19] #strptime returns struct_time in local time - #mktime takes a time_struct and returns a floating point - #gmtime Convert a time expressed in seconds since the epoch to a struct_time in UTC - #mktime: expresses the time in local time, not UTC. It returns a floating point number, for compatibility with time(). - epoch_start = calendar.timegm(time.strptime(plstart, '%Y-%m-%d-%H-%M-%S')) #Return the time as a floating point number expressed in seconds since the epoch, in UTC. @@ -186,7 +174,7 @@ class PypoPush(Thread): if loops % heartbeat_period == 0: logger.info("heartbeat") loops = 0 - try: self.push('scheduler') + try: self.push() except Exception, e: logger.error('Pypo Push Exception: %s', e) time.sleep(PUSH_INTERVAL)