CC-2084: Integrate RabbitMQ for immediate schedule updates and commands

Implemented RabbitMQ on the pypo side.  Schedule updates are now
almost instantaneous and we are only polling the server once per
hour if we aren't updated in that time.  Canceling a show happens
right away.
This commit is contained in:
paul.baranowski 2011-03-23 01:09:27 -04:00
parent 7f7800aa2b
commit 78b33b9e23
5 changed files with 218 additions and 109 deletions

View File

@ -10,23 +10,23 @@ class RabbitMq
* in the future. * in the future.
*/ */
public static function PushSchedule() { public static function PushSchedule() {
// global $CC_CONFIG; global $CC_CONFIG;
// $conn = new AMQPConnection($CC_CONFIG["rabbitmq"]["host"], $conn = new AMQPConnection($CC_CONFIG["rabbitmq"]["host"],
// $CC_CONFIG["rabbitmq"]["port"], $CC_CONFIG["rabbitmq"]["port"],
// $CC_CONFIG["rabbitmq"]["user"], $CC_CONFIG["rabbitmq"]["user"],
// $CC_CONFIG["rabbitmq"]["password"]); $CC_CONFIG["rabbitmq"]["password"]);
// $channel = $conn->channel(); $channel = $conn->channel();
// $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true); $channel->access_request($CC_CONFIG["rabbitmq"]["vhost"], false, false, true, true);
//
// $EXCHANGE = 'airtime-schedule'; $EXCHANGE = 'airtime-schedule';
// $channel->exchange_declare($EXCHANGE, 'direct', false, false, false); $channel->exchange_declare($EXCHANGE, 'direct', false, true);
//
// $data = json_encode(Schedule::ExportRangeAsJson()); $data = json_encode(Schedule::ExportRangeAsJson());
// $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); $msg = new AMQPMessage($data, array('content_type' => 'text/plain'));
//
// $channel->basic_publish($msg, $EXCHANGE); $channel->basic_publish($msg, $EXCHANGE);
// $channel->close(); $channel->close();
// $conn->close(); $conn->close();
} }
} }

View File

@ -0,0 +1,54 @@
#!/usr/bin/php
<?php
/**
* Repeatedly receive messages from queue until it receives a message with
* 'quit' as the body.
*
* @author Sean Murphy<sean@iamseanmurphy.com>
*/
require_once('../amqp.inc');
$HOST = 'localhost';
$PORT = 5672;
$USER = 'guest';
$PASS = 'guest';
$VHOST = '/';
$EXCHANGE = 'airtime-schedule';
$QUEUE = 'airtime-schedule-msgs';
$CONSUMER_TAG = 'airtime-consumer';
$conn = new AMQPConnection($HOST, $PORT, $USER, $PASS);
$ch = $conn->channel();
$ch->access_request($VHOST, false, false, true, true);
$ch->queue_declare($QUEUE);
$ch->exchange_declare($EXCHANGE, 'direct', false, false, false);
$ch->queue_bind($QUEUE, $EXCHANGE);
function process_message($msg) {
global $ch, $CONSUMER_TAG;
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
$ch->basic_ack($msg->delivery_info['delivery_tag']);
// Cancel callback
if ($msg->body === 'quit') {
$ch->basic_cancel($CONSUMER_TAG);
}
}
$ch->basic_consume($QUEUE, $CONSUMER_TAG, false, false, false, false, 'process_message');
// Loop as long as the channel has callbacks registered
echo "Waiting for messages...\n";
while(count($ch->callbacks)) {
$ch->wait();
}
$ch->close();
$conn->close();
?>

View File

@ -26,6 +26,13 @@ base_url = 'http://localhost/'
ls_host = '127.0.0.1' ls_host = '127.0.0.1'
ls_port = '1234' ls_port = '1234'
############################################
# RabbitMQ settings #
############################################
rabbitmq_host = 'localhost'
rabbitmq_user = 'guest'
rabbitmq_password = 'guest'
############################################ ############################################
# pypo preferences # # pypo preferences #
############################################ ############################################
@ -42,7 +49,7 @@ cache_for = 24 #how long to hold the cache, in hours
# the time you expect to "lock-in" your schedule. So if your schedule is set # the time you expect to "lock-in" your schedule. So if your schedule is set
# 24 hours in advance, this can be set to poll every 12 hours. # 24 hours in advance, this can be set to poll every 12 hours.
# #
poll_interval = 5 # in seconds. poll_interval = 3600 # in seconds.
# Push interval in seconds. # Push interval in seconds.
@ -52,7 +59,7 @@ poll_interval = 5 # in seconds.
# #
# It's hard to imagine a situation where this should be more than 1 second. # It's hard to imagine a situation where this should be more than 1 second.
# #
push_interval = 2 # in seconds push_interval = 1 # in seconds
# 'pre' or 'otf'. 'pre' cues while playlist preparation # 'pre' or 'otf'. 'pre' cues while playlist preparation
# while 'otf' (on the fly) cues while loading into ls # while 'otf' (on the fly) cues while loading into ls

View File

@ -12,6 +12,10 @@ import telnetlib
import math import math
from threading import Thread from threading import Thread
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from api_clients import api_client from api_clients import api_client
from util import CueFile from util import CueFile
@ -25,91 +29,101 @@ try:
config = ConfigObj('config.cfg') config = ConfigObj('config.cfg')
LS_HOST = config['ls_host'] LS_HOST = config['ls_host']
LS_PORT = config['ls_port'] LS_PORT = config['ls_port']
POLL_INTERVAL = 5 POLL_INTERVAL = int(config['poll_interval'])
except Exception, e: except Exception, e:
print 'Error loading config file: ', e print 'Error loading config file: ', e
sys.exit() sys.exit()
# Yuk - using a global, i know!
SCHEDULE_PUSH_MSG = []
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(body, message):
logger = logging.getLogger('fetch')
global SCHEDULE_PUSH_MSG
logger.info("Received schedule from RabbitMQ: " + message.body)
SCHEDULE_PUSH_MSG = json.loads(message.body)
# ACK the message to take it off the queue
message.ack()
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, q): def __init__(self, q):
Thread.__init__(self) Thread.__init__(self)
logger = logging.getLogger('fetch')
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.cue_file = CueFile() self.cue_file = CueFile()
self.set_export_source('scheduler') self.set_export_source('scheduler')
self.queue = q self.queue = q
logger.info("Initializing RabbitMQ stuff")
schedule_exchange = Exchange("airtime-schedule", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/")
channel = self.connection.channel()
consumer = Consumer(channel, schedule_queue)
consumer.register_callback(handle_message)
consumer.consume()
logger.info("PypoFetch: init complete");
def set_export_source(self, export_source): def set_export_source(self, export_source):
self.export_source = export_source self.export_source = export_source
self.cache_dir = config["cache_dir"] + self.export_source + '/' self.cache_dir = config["cache_dir"] + self.export_source + '/'
""" """
Fetching part of pypo Process the schedule
- Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
- Saves a serialized file of the schedule - Saves a serialized file of the schedule
- playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
- runs the cleanup routine, to get rid of unused cashed files - runs the cleanup routine, to get rid of unused cashed files
""" """
def fetch(self, export_source): def process_schedule(self, schedule_data, export_source):
#wrapper script for fetching the whole schedule (in json)
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
self.schedule = schedule_data["playlists"]
try: os.mkdir(self.cache_dir)
except Exception, e: pass # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
# get schedule stream_metadata = schedule_data['stream_metadata']
try: try:
while self.get_schedule() != 1: tn = telnetlib.Telnet(LS_HOST, LS_PORT)
logger.warning("failed to read from export url") #encode in latin-1 due to telnet protocol not supporting utf-8
time.sleep(1) tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1'))
tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1'))
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
status = 0
# Download all the media and put playlists in liquidsoap format
try:
playlists = self.prepare_playlists()
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
# prepare the playlists # Send the data to pypo-push
try:
playlists = self.prepare_playlists()
except Exception, e: logger.error("%s", e)
scheduled_data = dict() scheduled_data = dict()
scheduled_data['playlists'] = playlists scheduled_data['playlists'] = playlists
scheduled_data['schedule'] = self.schedule scheduled_data['schedule'] = self.schedule
scheduled_data['stream_metadata'] = schedule_data["stream_metadata"]
self.queue.put(scheduled_data) self.queue.put(scheduled_data)
# cleanup # cleanup
try: self.cleanup(self.export_source) try: self.cleanup(self.export_source)
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
def get_schedule(self):
logger = logging.getLogger('fetch')
status, response = self.api_client.get_schedule()
if status == 1:
schedule = response['playlists']
stream_metadata = response['stream_metadata']
try:
self.schedule = schedule
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#encode in latin-1 due to telnet protocol not supporting utf-8
tn.write(('vars.stream_metadata_type %s\n' % stream_metadata['format']).encode('latin-1'))
tn.write(('vars.station_name %s\n' % stream_metadata['station_name']).encode('latin-1'))
tn.write('exit\n')
tn.read_all()
except Exception, e:
logger.error("Exception %s", e)
status = 0
return status
""" """
Alternative version of playout preparation. Every playlist entry is In this function every audio file is cut as necessary (cue_in/cue_out != 0)
pre-cued if neccessary (cue_in/cue_out != 0) and stored in the and stored in a playlist folder.
playlist folder. file is e.g. 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3
file is eg 2010-06-23-15-00-00/17_cue_10.132-123.321.mp3
""" """
def prepare_playlists(self): def prepare_playlists(self):
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
@ -126,7 +140,7 @@ class PypoFetch(Thread):
try: try:
for pkey in scheduleKeys: for pkey in scheduleKeys:
logger.info("found playlist at %s", pkey) logger.info("Playlist starting at %s", pkey)
playlist = schedule[pkey] playlist = schedule[pkey]
# create playlist directory # create playlist directory
@ -135,15 +149,15 @@ class PypoFetch(Thread):
except Exception, e: except Exception, e:
pass pass
logger.debug('*****************************************') #logger.debug('*****************************************')
logger.debug('pkey: ' + str(pkey)) #logger.debug('pkey: ' + str(pkey))
logger.debug('cached at : ' + self.cache_dir + str(pkey)) #logger.debug('cached at : ' + self.cache_dir + str(pkey))
logger.debug('subtype: ' + str(playlist['subtype'])) #logger.debug('subtype: ' + str(playlist['subtype']))
logger.debug('played: ' + str(playlist['played'])) #logger.debug('played: ' + str(playlist['played']))
logger.debug('schedule id: ' + str(playlist['schedule_id'])) #logger.debug('schedule id: ' + str(playlist['schedule_id']))
logger.debug('duration: ' + str(playlist['duration'])) #logger.debug('duration: ' + str(playlist['duration']))
logger.debug('source id: ' + str(playlist['x_ident'])) #logger.debug('source id: ' + str(playlist['x_ident']))
logger.debug('*****************************************') #logger.debug('*****************************************')
if int(playlist['played']) == 1: if int(playlist['played']) == 1:
logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey) logger.info("playlist %s already played / sent to liquidsoap, so will ignore it", pkey)
@ -156,11 +170,13 @@ class PypoFetch(Thread):
logger.info("%s", e) logger.info("%s", e)
return playlists return playlists
"""
Download and cache the media files.
This handles both remote and local files.
Returns an updated ls_playlist string.
"""
def handle_media_file(self, playlist, pkey): def handle_media_file(self, playlist, pkey):
"""
This handles both remote and local files.
Returns an updated ls_playlist string.
"""
ls_playlist = [] ls_playlist = []
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
@ -170,11 +186,11 @@ class PypoFetch(Thread):
fileExt = os.path.splitext(media['uri'])[1] fileExt = os.path.splitext(media['uri'])[1]
try: try:
if str(media['cue_in']) == '0' and str(media['cue_out']) == '0': if str(media['cue_in']) == '0' and str(media['cue_out']) == '0':
logger.debug('No cue in/out detected for this file') #logger.debug('No cue in/out detected for this file')
dst = "%s%s/%s%s" % (self.cache_dir, str(pkey), str(media['id']), str(fileExt)) dst = "%s%s/%s%s" % (self.cache_dir, str(pkey), str(media['id']), str(fileExt))
do_cue = False do_cue = False
else: else:
logger.debug('Cue in/out detected') #logger.debug('Cue in/out detected')
dst = "%s%s/%s_cue_%s-%s%s" % \ dst = "%s%s/%s_cue_%s-%s%s" % \
(self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt)) (self.cache_dir, str(pkey), str(media['id']), str(float(media['cue_in']) / 1000), str(float(media['cue_out']) / 1000), str(fileExt))
do_cue = True do_cue = True
@ -199,7 +215,7 @@ class PypoFetch(Thread):
% (str(media['export_source']), media['id'], 0, str(float(media['fade_in']) / 1000), \ % (str(media['export_source']), media['id'], 0, str(float(media['fade_in']) / 1000), \
str(float(media['fade_out']) / 1000), media['row_id'],dst) str(float(media['fade_out']) / 1000), media['row_id'],dst)
logger.debug(pl_entry) #logger.debug(pl_entry)
""" """
Tracks are only added to the playlist if they are accessible Tracks are only added to the playlist if they are accessible
@ -213,7 +229,7 @@ class PypoFetch(Thread):
entry['show_name'] = playlist['show_name'] entry['show_name'] = playlist['show_name']
ls_playlist.append(entry) ls_playlist.append(entry)
logger.debug("everything ok, adding %s to playlist", pl_entry) #logger.debug("everything ok, adding %s to playlist", pl_entry)
else: else:
print 'zero-file: ' + dst + ' from ' + media['uri'] print 'zero-file: ' + dst + ' from ' + media['uri']
logger.warning("zero-size file - skipping %s. will not add it to playlist", dst) logger.warning("zero-size file - skipping %s. will not add it to playlist", dst)
@ -225,11 +241,15 @@ class PypoFetch(Thread):
return ls_playlist return ls_playlist
"""
Download a file from a remote server and store it in the cache.
"""
def handle_remote_file(self, media, dst, do_cue): def handle_remote_file(self, media, dst, do_cue):
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
if do_cue == False: if do_cue == False:
if os.path.isfile(dst): if os.path.isfile(dst):
logger.debug("file already in cache: %s", dst) pass
#logger.debug("file already in cache: %s", dst)
else: else:
logger.debug("try to download %s", media['uri']) logger.debug("try to download %s", media['uri'])
self.api_client.get_media(media['uri'], dst) self.api_client.get_media(media['uri'], dst)
@ -270,11 +290,11 @@ class PypoFetch(Thread):
logger.error("%s", e) logger.error("%s", e)
"""
Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR"
and deletes them.
"""
def cleanup(self, export_source): def cleanup(self, export_source):
"""
Cleans up folders in cache_dir. Look for modification date older than "now - CACHE_FOR"
and deletes them.
"""
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
offset = 3600 * int(config["cache_for"]) offset = 3600 * int(config["cache_for"])
@ -297,18 +317,41 @@ class PypoFetch(Thread):
print e print e
logger.error("%s", e) logger.error("%s", e)
"""
Main loop of the thread:
Wait for schedule updates from RabbitMQ, but in case there arent any,
poll the server to get the upcoming schedule.
"""
def run(self): def run(self):
loops = 0
heartbeat_period = math.floor(30/POLL_INTERVAL)
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
try: os.mkdir(self.cache_dir)
except Exception, e: pass
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
status, schedule_data = self.api_client.get_schedule()
if status == 1:
self.process_schedule(schedule_data, "scheduler")
logger.info("Bootstrap complete: got initial copy of the schedule")
loops = 1
while True: while True:
if loops % heartbeat_period == 0: logger.info("Loop #"+str(loops))
logger.info("heartbeat") try:
loops = 0 # Wait for messages from RabbitMQ. Timeout if we
try: self.fetch('scheduler') # dont get any after POLL_INTERVAL.
except Exception, e: self.connection.drain_events(timeout=POLL_INTERVAL)
logger.error('Pypo Fetch Error, exiting: %s', e) # Hooray for globals!
sys.exit() schedule_data = SCHEDULE_PUSH_MSG
time.sleep(POLL_INTERVAL) status = 1
except:
# We didnt get a message for a while, so poll the server
# to get an updated schedule.
status, schedule_data = self.api_client.get_schedule()
if status == 1:
self.process_schedule(schedule_data, "scheduler")
loops += 1 loops += 1

View File

@ -38,6 +38,7 @@ class PypoPush(Thread):
self.schedule = dict() self.schedule = dict()
self.playlists = dict() self.playlists = dict()
self.stream_metadata = dict()
""" """
push_ahead2 MUST be < push_ahead. The difference in these two values push_ahead2 MUST be < push_ahead. The difference in these two values
@ -53,18 +54,21 @@ class PypoPush(Thread):
self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle" self.schedule_tracker_file = self.cache_dir + "schedule_tracker.pickle"
""" """
The Push Loop - the push loop periodically (minimal 1/2 of the playlist-grid) The Push Loop - the push loop periodically checks if there is a playlist
checks if there is a playlist that should be scheduled at the current time. that should be scheduled at the current time.
If yes, the temporary liquidsoap playlist gets replaced with the corresponding one, If yes, the current liquidsoap playlist gets replaced with the corresponding one,
then liquidsoap is asked (via telnet) to reload and immediately play it. then liquidsoap is asked (via telnet) to reload and immediately play it.
""" """
def push(self, export_source): def push(self, export_source):
logger = logging.getLogger('push') logger = logging.getLogger('push')
# get a new schedule from pypo-fetch
if not self.queue.empty(): if not self.queue.empty():
scheduled_data = self.queue.get() scheduled_data = self.queue.get()
logger.debug("Received data from pypo-fetch")
self.schedule = scheduled_data['schedule'] self.schedule = scheduled_data['schedule']
self.playlists = scheduled_data['playlists'] self.playlists = scheduled_data['playlists']
self.stream_metadata = scheduled_data['stream_metadata']
schedule = self.schedule schedule = self.schedule
playlists = self.playlists playlists = self.playlists
@ -120,7 +124,8 @@ class PypoPush(Thread):
if start <= str_tnow_s and str_tnow_s < end: if start <= str_tnow_s and str_tnow_s < end:
currently_on_air = True currently_on_air = True
else: else:
logger.debug('Empty schedule') pass
#logger.debug('Empty schedule')
if not currently_on_air: if not currently_on_air:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(LS_HOST, LS_PORT)
@ -184,7 +189,7 @@ class PypoPush(Thread):
def load_schedule_tracker(self): def load_schedule_tracker(self):
logger = logging.getLogger('push') logger = logging.getLogger('push')
logger.debug('load_schedule_tracker') #logger.debug('load_schedule_tracker')
playedItems = dict() playedItems = dict()
# create the file if it doesnt exist # create the file if it doesnt exist
@ -197,7 +202,7 @@ class PypoPush(Thread):
except Exception, e: except Exception, e:
logger.error('Error creating schedule tracker file: %s', e) logger.error('Error creating schedule tracker file: %s', e)
else: else:
logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file) #logger.debug('schedule tracker file exists, opening: ' + self.schedule_tracker_file)
try: try:
schedule_tracker = open(self.schedule_tracker_file, "r") schedule_tracker = open(self.schedule_tracker_file, "r")
playedItems = pickle.load(schedule_tracker) playedItems = pickle.load(schedule_tracker)