diff --git a/application/models/RabbitMq.php b/application/models/RabbitMq.php index d251116bf..af5bffa02 100644 --- a/application/models/RabbitMq.php +++ b/application/models/RabbitMq.php @@ -10,23 +10,23 @@ class RabbitMq * in the future. */ public static function PushSchedule() { -// global $CC_CONFIG; -// $conn = new AMQPConnection($CC_CONFIG["rabbitmq"]["host"], -// $CC_CONFIG["rabbitmq"]["port"], -// $CC_CONFIG["rabbitmq"]["user"], -// $CC_CONFIG["rabbitmq"]["password"]); -// $channel = $conn->channel(); -// $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true); -// -// $EXCHANGE = 'airtime-schedule'; -// $channel->exchange_declare($EXCHANGE, 'direct', false, false, false); -// -// $data = json_encode(Schedule::ExportRangeAsJson()); -// $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); -// -// $channel->basic_publish($msg, $EXCHANGE); -// $channel->close(); -// $conn->close(); + global $CC_CONFIG; + $conn = new AMQPConnection($CC_CONFIG["rabbitmq"]["host"], + $CC_CONFIG["rabbitmq"]["port"], + $CC_CONFIG["rabbitmq"]["user"], + $CC_CONFIG["rabbitmq"]["password"]); + $channel = $conn->channel(); + $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true); + + $EXCHANGE = 'airtime-schedule'; + $channel->exchange_declare($EXCHANGE, 'direct', false, true); + + $data = json_encode(Schedule::ExportRangeAsJson()); + $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); + + $channel->basic_publish($msg, $EXCHANGE); + $channel->close(); + $conn->close(); } } diff --git a/library/php-amqplib/demo/amqp_airtime_consumer.php b/library/php-amqplib/demo/amqp_airtime_consumer.php new file mode 100644 index 000000000..bb5f8bcc8 --- /dev/null +++ b/library/php-amqplib/demo/amqp_airtime_consumer.php @@ -0,0 +1,54 @@ +#!/usr/bin/php + + */ + +require_once('../amqp.inc'); + +$HOST = 'localhost'; +$PORT = 5672; +$USER = 'guest'; +$PASS = 'guest'; +$VHOST = '/'; +$EXCHANGE = 'airtime-schedule'; +$QUEUE = 'airtime-schedule-msgs'; +$CONSUMER_TAG = 'airtime-consumer'; + +$conn = new AMQPConnection($HOST, $PORT, $USER, $PASS); +$ch = $conn->channel(); +$ch->access_request($VHOST, false, false, true, true); + +$ch->queue_declare($QUEUE); +$ch->exchange_declare($EXCHANGE, 'direct', false, false, false); +$ch->queue_bind($QUEUE, $EXCHANGE); + +function process_message($msg) { + global $ch, $CONSUMER_TAG; + + echo "\n--------\n"; + echo $msg->body; + echo "\n--------\n"; + + $ch->basic_ack($msg->delivery_info['delivery_tag']); + + // Cancel callback + if ($msg->body === 'quit') { + $ch->basic_cancel($CONSUMER_TAG); + } +} + +$ch->basic_consume($QUEUE, $CONSUMER_TAG, false, false, false, false, 'process_message'); + +// Loop as long as the channel has callbacks registered +echo "Waiting for messages...\n"; +while(count($ch->callbacks)) { + $ch->wait(); +} + +$ch->close(); +$conn->close(); +?> diff --git a/pypo/config.cfg b/pypo/config.cfg index 638558fbf..b4dc9424e 100644 --- a/pypo/config.cfg +++ b/pypo/config.cfg @@ -26,6 +26,13 @@ base_url = 'http://localhost/' ls_host = '127.0.0.1' ls_port = '1234' +############################################ +# RabbitMQ settings # +############################################ +rabbitmq_host = 'localhost' +rabbitmq_user = 'guest' +rabbitmq_password = 'guest' + ############################################ # pypo preferences # ############################################ @@ -42,7 +49,7 @@ cache_for = 24 #how long to hold the cache, in hours # the time you expect to "lock-in" your schedule. So if your schedule is set # 24 hours in advance, this can be set to poll every 12 hours. # -poll_interval = 5 # in seconds. +poll_interval = 3600 # in seconds. # Push interval in seconds. @@ -52,7 +59,7 @@ poll_interval = 5 # in seconds. # # It's hard to imagine a situation where this should be more than 1 second. # -push_interval = 2 # in seconds +push_interval = 1 # in seconds # 'pre' or 'otf'. 'pre' cues while playlist preparation # while 'otf' (on the fly) cues while loading into ls diff --git a/pypo/pypofetch.py b/pypo/pypofetch.py index 0ca468198..c31bd094a 100644 --- a/pypo/pypofetch.py +++ b/pypo/pypofetch.py @@ -12,6 +12,10 @@ import telnetlib import math from threading import Thread +# For RabbitMQ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + from api_clients import api_client from util import CueFile @@ -25,91 +29,101 @@ try: config = ConfigObj('config.cfg') LS_HOST = config['ls_host'] LS_PORT = config['ls_port'] - POLL_INTERVAL = 5 + POLL_INTERVAL = int(config['poll_interval']) except Exception, e: print 'Error loading config file: ', e sys.exit() +# Yuk - using a global, i know! +SCHEDULE_PUSH_MSG = [] + +""" +Handle a message from RabbitMQ, put it into our yucky global var. +Hopefully there is a better way to do this. +""" +def handle_message(body, message): + logger = logging.getLogger('fetch') + global SCHEDULE_PUSH_MSG + logger.info("Received schedule from RabbitMQ: " + message.body) + SCHEDULE_PUSH_MSG = json.loads(message.body) + # ACK the message to take it off the queue + message.ack() + + class PypoFetch(Thread): def __init__(self, q): Thread.__init__(self) + logger = logging.getLogger('fetch') self.api_client = api_client.api_client_factory(config) self.cue_file = CueFile() self.set_export_source('scheduler') self.queue = q + logger.info("Initializing RabbitMQ stuff") + schedule_exchange = Exchange("airtime-schedule", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") + self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/") + channel = self.connection.channel() + consumer = Consumer(channel, schedule_queue) + consumer.register_callback(handle_message) + consumer.consume() + + logger.info("PypoFetch: init complete"); + + def set_export_source(self, export_source): self.export_source = export_source self.cache_dir = config["cache_dir"] + self.export_source + '/' + """ - Fetching part of pypo - - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") - - Saves a serialized file of the schedule - - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied - to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) - - runs the cleanup routine, to get rid of unused cashed files + Process the schedule + - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") + - Saves a serialized file of the schedule + - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied + to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) + - runs the cleanup routine, to get rid of unused cashed files """ - def fetch(self, export_source): - #wrapper script for fetching the whole schedule (in json) + def process_schedule(self, schedule_data, export_source): logger = logging.getLogger('fetch') - - try: os.mkdir(self.cache_dir) - except Exception, e: pass - - # get schedule + self.schedule = schedule_data["playlists"] + + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + stream_metadata = schedule_data['stream_metadata'] try: - while self.get_schedule() != 1: - logger.warning("failed to read from export url") - time.sleep(1) + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + #encode in latin-1 due to telnet protocol not supporting utf-8 + tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1')) + tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1')) + tn.write('exit\n') + tn.read_all() + except Exception, e: + logger.error("Exception %s", e) + status = 0 + # Download all the media and put playlists in liquidsoap format + try: + playlists = self.prepare_playlists() except Exception, e: logger.error("%s", e) - # prepare the playlists - try: - playlists = self.prepare_playlists() - except Exception, e: logger.error("%s", e) - - + # Send the data to pypo-push scheduled_data = dict() scheduled_data['playlists'] = playlists scheduled_data['schedule'] = self.schedule + scheduled_data['stream_metadata'] = schedule_data["stream_metadata"] self.queue.put(scheduled_data) # cleanup try: self.cleanup(self.export_source) except Exception, e: logger.error("%s", e) - def get_schedule(self): - logger = logging.getLogger('fetch') - status, response = self.api_client.get_schedule() - - if status == 1: - schedule = response['playlists'] - stream_metadata = response['stream_metadata'] - try: - self.schedule = schedule - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - #encode in latin-1 due to telnet protocol not supporting utf-8 - tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1')) - tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1')) - - tn.write('exit\n') - tn.read_all() - - except Exception, e: - logger.error("Exception %s", e) - status = 0 - - return status """ - Alternative version of playout preparation. Every playlist entry is - pre-cued if neccessary (cue_in/cue_out != 0) and stored in the - playlist folder. - file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 + In this function every audio file is cut as necessary (cue_in/cue_out != 0) + and stored in a playlist folder. + file is e.g. 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3 """ def prepare_playlists(self): logger = logging.getLogger('fetch') @@ -126,7 +140,7 @@ class PypoFetch(Thread): try: for pkey in scheduleKeys: - logger.info("found playlist at %s", pkey) + logger.info("Playlist starting at %s", pkey) playlist = schedule[pkey] # create playlist directory @@ -135,15 +149,15 @@ class PypoFetch(Thread): except Exception, e: pass - logger.debug('*****************************************') - logger.debug('pkey: ' + str(pkey)) - logger.debug('cached at : ' + self.cache_dir + str(pkey)) - logger.debug('subtype: ' + str(playlist['subtype'])) - logger.debug('played: ' + str(playlist['played'])) - logger.debug('schedule id: ' + str(playlist['schedule_id'])) - logger.debug('duration: ' + str(playlist['duration'])) - logger.debug('source id: ' + str(playlist['x_ident'])) - logger.debug('*****************************************') + #logger.debug('*****************************************') + #logger.debug('pkey: ' + str(pkey)) + #logger.debug('cached at : ' + self.cache_dir + str(pkey)) + #logger.debug('subtype: ' + str(playlist['subtype'])) + #logger.debug('played: ' + str(playlist['played'])) + #logger.debug('schedule id: ' + str(playlist['schedule_id'])) + #logger.debug('duration: ' + str(playlist['duration'])) + #logger.debug('source id: ' + str(playlist['x_ident'])) + #logger.debug('*****************************************') if int(playlist['played']) == 1: logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey) @@ -156,11 +170,13 @@ class PypoFetch(Thread): logger.info("%s", e) return playlists + + """ + Download and cache the media files. + This handles both remote and local files. + Returns an updated ls_playlist string. + """ def handle_media_file(self, playlist, pkey): - """ - This handles both remote and local files. - Returns an updated ls_playlist string. - """ ls_playlist = [] logger = logging.getLogger('fetch') @@ -170,11 +186,11 @@ class PypoFetch(Thread): fileExt = os.path.splitext(media['uri'])[1] try: if str(media['cue_in']) == '0' and str(media['cue_out']) == '0': - logger.debug('No cue in/out detected for this file') + #logger.debug('No cue in/out detected for this file') dst = "%s%s/%s%s" % (self.cache_dir, str(pkey), str(media['id']), str(fileExt)) do_cue = False else: - logger.debug('Cue in/out detected') + #logger.debug('Cue in/out detected') dst = "%s%s/%s_cue_%s-%s%s" % \ (self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt)) do_cue = True @@ -199,7 +215,7 @@ class PypoFetch(Thread): % (str(media['export_source']), media['id'], 0, str(float(media['fade_in']) / 1000), \ str(float(media['fade_out']) / 1000), media['row_id'],dst) - logger.debug(pl_entry) + #logger.debug(pl_entry) """ Tracks are only added to the playlist if they are accessible @@ -213,7 +229,7 @@ class PypoFetch(Thread): entry['show_name'] = playlist['show_name'] ls_playlist.append(entry) - logger.debug("everything ok, adding %s to playlist", pl_entry) + #logger.debug("everything ok, adding %s to playlist", pl_entry) else: print 'zero-file: ' + dst + ' from ' + media['uri'] logger.warning("zero-size file - skipping %s. will not add it to playlist", dst) @@ -225,11 +241,15 @@ class PypoFetch(Thread): return ls_playlist + """ + Download a file from a remote server and store it in the cache. + """ def handle_remote_file(self, media, dst, do_cue): logger = logging.getLogger('fetch') if do_cue == False: if os.path.isfile(dst): - logger.debug("file already in cache: %s", dst) + pass + #logger.debug("file already in cache: %s", dst) else: logger.debug("try to download %s", media['uri']) self.api_client.get_media(media['uri'], dst) @@ -270,11 +290,11 @@ class PypoFetch(Thread): logger.error("%s", e) + """ + Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" + and deletes them. + """ def cleanup(self, export_source): - """ - Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR" - and deletes them. - """ logger = logging.getLogger('fetch') offset = 3600 * int(config["cache_for"]) @@ -297,18 +317,41 @@ class PypoFetch(Thread): print e logger.error("%s", e) + + """ + Main loop of the thread: + Wait for schedule updates from RabbitMQ, but in case there arent any, + poll the server to get the upcoming schedule. + """ def run(self): - loops = 0 - heartbeat_period = math.floor(30/POLL_INTERVAL) logger = logging.getLogger('fetch') - + + try: os.mkdir(self.cache_dir) + except Exception, e: pass + + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + status, schedule_data = self.api_client.get_schedule() + if status == 1: + self.process_schedule(schedule_data, "scheduler") + logger.info("Bootstrap complete: got initial copy of the schedule") + + loops = 1 while True: - if loops % heartbeat_period == 0: - logger.info("heartbeat") - loops = 0 - try: self.fetch('scheduler') - except Exception, e: - logger.error('Pypo Fetch Error, exiting: %s', e) - sys.exit() - time.sleep(POLL_INTERVAL) + logger.info("Loop #"+str(loops)) + try: + # Wait for messages from RabbitMQ. Timeout if we + # dont get any after POLL_INTERVAL. + self.connection.drain_events(timeout=POLL_INTERVAL) + # Hooray for globals! + schedule_data = SCHEDULE_PUSH_MSG + status = 1 + except: + # We didnt get a message for a while, so poll the server + # to get an updated schedule. + status, schedule_data = self.api_client.get_schedule() + + if status == 1: + self.process_schedule(schedule_data, "scheduler") loops += 1 + diff --git a/pypo/pypopush.py b/pypo/pypopush.py index 62203fc59..25bfddbf1 100644 --- a/pypo/pypopush.py +++ b/pypo/pypopush.py @@ -38,6 +38,7 @@ class PypoPush(Thread): self.schedule = dict() self.playlists = dict() + self.stream_metadata = dict() """ push_ahead2 MUST be < push_ahead. The difference in these two values @@ -53,18 +54,21 @@ class PypoPush(Thread): self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" """ - The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid) - checks if there is a playlist that should be scheduled at the current time. - If yes, the temporary liquidsoap playlist gets replaced with the corresponding one, + The Push Loop - the push loop periodically checks if there is a playlist + that should be scheduled at the current time. + If yes, the current liquidsoap playlist gets replaced with the corresponding one, then liquidsoap is asked (via telnet) to reload and immediately play it. """ def push(self, export_source): logger = logging.getLogger('push') + # get a new schedule from pypo-fetch if not self.queue.empty(): scheduled_data = self.queue.get() + logger.debug("Received data from pypo-fetch") self.schedule = scheduled_data['schedule'] self.playlists = scheduled_data['playlists'] + self.stream_metadata = scheduled_data['stream_metadata'] schedule = self.schedule playlists = self.playlists @@ -120,7 +124,8 @@ class PypoPush(Thread): if start <= str_tnow_s and str_tnow_s < end: currently_on_air = True else: - logger.debug('Empty schedule') + pass + #logger.debug('Empty schedule') if not currently_on_air: tn = telnetlib.Telnet(LS_HOST, LS_PORT) @@ -184,7 +189,7 @@ class PypoPush(Thread): def load_schedule_tracker(self): logger = logging.getLogger('push') - logger.debug('load_schedule_tracker') + #logger.debug('load_schedule_tracker') playedItems = dict() # create the file if it doesnt exist @@ -197,7 +202,7 @@ class PypoPush(Thread): except Exception, e: logger.error('Error creating schedule tracker file: %s', e) else: - logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file) + #logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file) try: schedule_tracker = open(self.schedule_tracker_file, "r") playedItems = pickle.load(schedule_tracker)