From 2b36e8193e8529747a1db68b95abd3aaa92034fa Mon Sep 17 00:00:00 2001 From: James Date: Mon, 25 Jul 2011 16:24:00 -0400 Subject: [PATCH] CC-2588 :Use RabbitMQ to control the show recorder - done --- airtime_mvc/application/models/RabbitMq.php | 11 +- airtime_mvc/application/models/Shows.php | 7 + python_apps/show-recorder/commandlistener.py | 144 -------------- python_apps/show-recorder/recorder.py | 198 +++++++++++++------ 4 files changed, 150 insertions(+), 210 deletions(-) delete mode 100644 python_apps/show-recorder/commandlistener.py diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index d07965ca4..e50590622 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -80,7 +80,16 @@ class RabbitMq $EXCHANGE = 'airtime-show-recorder'; $channel->exchange_declare($EXCHANGE, 'direct', false, true); - $msg = new AMQPMessage($event_type, array('content_type' => 'text/plain')); + $today_timestamp = date("Y-m-d H:i:s"); + $now = new DateTime($today_timestamp); + $end_timestamp = $now->add(new DateInterval("PT2H")); + $end_timestamp = $end_timestamp->format("Y-m-d H:i:s"); + $temp['event_type'] = $event_type; + if($event_type = "update_schedule"){ + $temp['shows'] = Show::getShows($today_timestamp, $end_timestamp, $excludeInstance=NULL, $onlyRecord=TRUE); + } + $data = json_encode($temp); + $msg = new AMQPMessage($data, array('content_type' => 'text/plain')); $channel->basic_publish($msg, $EXCHANGE); $channel->close(); diff --git a/airtime_mvc/application/models/Shows.php b/airtime_mvc/application/models/Shows.php index 5026ad604..1311b837a 100644 --- a/airtime_mvc/application/models/Shows.php +++ b/airtime_mvc/application/models/Shows.php @@ -1611,10 +1611,17 @@ class ShowInstance { public function deleteShow() { + // see if it was recording show + $recording = CcShowInstancesQuery::create() + ->findPK($this->_instanceId) + ->getDbRecord(); CcShowInstancesQuery::create() ->findPK($this->_instanceId) ->delete(); RabbitMq::PushSchedule(); + if($recording){ + RabbitMq::SendMessageToShowRecorder("cancel_recording"); + } } public function setRecordedFile($file_id) diff --git a/python_apps/show-recorder/commandlistener.py b/python_apps/show-recorder/commandlistener.py deleted file mode 100644 index 2ef9f9f90..000000000 --- a/python_apps/show-recorder/commandlistener.py +++ /dev/null @@ -1,144 +0,0 @@ -#!/usr/local/bin/python -import urllib -import logging -import logging.config -import json -import time -import datetime -import os -import sys -import shutil - -from configobj import ConfigObj - -from poster.encode import multipart_encode -from poster.streaminghttp import register_openers -import urllib2 - -from subprocess import Popen -from threading import Thread - -import mutagen - -from api_clients import api_client - -# For RabbitMQ - to be implemented in the future -from kombu.connection import BrokerConnection -from kombu.messaging import Exchange, Queue, Consumer, Producer - -# configure logging -try: - logging.config.fileConfig("logging.cfg") -except Exception, e: - print 'Error configuring logging: ', e - sys.exit() - -POLL_INTERVAL=3600 -# loading config file -try: - config = ConfigObj('/etc/airtime/recorder.cfg') -except Exception, e: - logger = logging.getLogger('root') - logger.error('Error loading config file: %s', e) - sys.exit() - -def getDateTimeObj(time): - - timeinfo = time.split(" ") - date = timeinfo[0].split("-") - time = timeinfo[1].split(":") - - return datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), int(time[2])) - - -class CommandHandler(Thread): - def __init__(self, command, q): - Thread.__init__(self) - self.api_client = api_client.api_client_factory(config) - self.logger = logging.getLogger('root') - self.logger.info("Handling command: " + command) - self.command = command - self.queue = q - - def run(self): - if(self.command == 'update_schedule'): - self.queue.put(self.api_client.get_shows_to_record()) - -class CommandListener(Thread): - def __init__(self, q): - Thread.__init__(self) - self.api_client = api_client.api_client_factory(config) - self.logger = logging.getLogger('root') - self.sr = None - self.queue = q - self.shows_to_record = [] - self.logger.info("RecorderFetch: init complete") - - def init_rabbit_mq(self): - self.logger.info("Initializing RabbitMQ stuff") - try: - schedule_exchange = Exchange("airtime-show-recorder", "direct", durable=True, auto_delete=True) - schedule_queue = Queue("recorder-fetch", exchange=schedule_exchange, key="foo") - self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/") - channel = self.connection.channel() - consumer = Consumer(channel, schedule_queue) - consumer.register_callback(self.handle_message) - consumer.consume() - except Exception, e: - self.logger.error(e) - return False - - return True - - def handle_message(self, body, message): - self.logger.info("Received command from RabbitMQ: " + message.body) - ch = CommandHandler(message.body, self.queue) - ch.start() - # ACK the message to take it off the queue - message.ack() - - """def process_shows(self, shows): - self.logger.info("Processing shows...") - self.shows_to_record = [] - self.logger.info(shows) - shows = shows[u'shows'] - for show in shows: - show_starts = getDateTimeObj(show[u'starts']) - show_end = getDateTimeObj(show[u'ends']) - time_delta = show_end - show_starts - - self.shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name']]""" - - """ - Main loop of the thread: - Wait for schedule updates from RabbitMQ, but in case there arent any, - poll the server to get the upcoming schedule. - """ - def run(self): - self.logger.info("Started...") - while not self.init_rabbit_mq(): - self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") - time.sleep(5) - - # Bootstrap: since we are just starting up, we need to grab the - # most recent schedule. After that we can just wait for updates. - try: - self.queue.put(self.api_client.get_shows_to_record()) - self.logger.info("Bootstrap complete: got initial copy of the schedule") - except Exception, e: - self.logger.error(e) - - loops = 1 - while True: - self.logger.info("Loop #%s", loops) - try: - # Wait for messages from RabbitMQ. Timeout if we - # dont get any after POLL_INTERVAL. - self.connection.drain_events(timeout=POLL_INTERVAL) - status = 1 - except Exception, e: - self.logger.info(e) - # We didnt get a message for a while, so poll the server - # to get an updated schedule. - self.queue.put(self.api_client.get_shows_to_record()) - loops += 1 \ No newline at end of file diff --git a/python_apps/show-recorder/recorder.py b/python_apps/show-recorder/recorder.py index 7a2416e5f..11942cc8b 100644 --- a/python_apps/show-recorder/recorder.py +++ b/python_apps/show-recorder/recorder.py @@ -8,9 +8,6 @@ import datetime import os import sys import shutil -from Queue import Queue - -from commandlistener import CommandListener from configobj import ConfigObj @@ -25,6 +22,10 @@ import mutagen from api_clients import api_client +# For RabbitMQ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer + # configure logging try: logging.config.fileConfig("logging.cfg") @@ -79,9 +80,12 @@ class ShowRecorder(Thread): #blocks at the following line until the child process #quits code = self.p.wait() + + self.logger.info("finishing record, return code %s", self.p.returncode) + code = self.p.returncode + self.p = None - - self.logger.info("finishing record, return code %s", code) + return code, filepath def cancel_recording(self): @@ -89,14 +93,13 @@ class ShowRecorder(Thread): #for this is because it appears that ecasound starts 1 second later than #it should, and therefore this method is sometimes incorrectly called 1 #second before the show ends. - time.sleep(3) + #time.sleep(3) #send signal interrupt (2) self.logger.info("Show manually cancelled!") if (self.p is not None): - self.p.terminate() - self.p = None - + self.p.kill() + #if self.p is defined, then the child process ecasound is recording def is_recording(self): return (self.p is not None) @@ -146,86 +149,151 @@ class ShowRecorder(Thread): self.set_metadata_and_save(filepath) self.upload_file(filepath) + os.remove(filepath) except Exceptio, e: self.logger.error(e) else: self.logger.info("problem recording show") - -class RecordScheduler(Thread): - def __init__(self, q): + os.remove(filepath) + +class CommandListener(Thread): + def __init__(self): Thread.__init__(self) - self.queue = q - self.shows_to_record = {} + self.api_client = api_client.api_client_factory(config) self.logger = logging.getLogger('root') + self.sr = None + self.current_schedule = {} + self.shows_to_record = [] + self.time_till_next_show = 3600 + self.logger.info("RecorderFetch: init complete") + + def init_rabbit_mq(self): + self.logger.info("Initializing RabbitMQ stuff") + try: + schedule_exchange = Exchange("airtime-show-recorder", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("recorder-fetch", exchange=schedule_exchange, key="foo") + self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/") + channel = self.connection.channel() + consumer = Consumer(channel, schedule_queue) + consumer.register_callback(self.handle_message) + consumer.consume() + except Exception, e: + self.logger.error(e) + return False + + return True - def process_shows(self, shows): - self.logger.info("Processing show schedules...") + def handle_message(self, body, message): + # ACK the message to take it off the queue + message.ack() + self.logger.info("Received command from RabbitMQ: " + message.body) + m = json.loads(message.body) + command = m['event_type'] + self.logger.info("Handling command: " + command) + + if(command == 'update_schedule'): + temp = m['shows'] + if temp is not None: + self.parse_shows(temp) + elif(command == 'cancel_recording'): + if self.sr.is_recording(): + self.sr.cancel_recording() + + def parse_shows(self, shows): + self.logger.info("Parsing show schedules...") self.shows_to_record = {} - temp = shows[u'shows'] - for show in temp: + for show in shows: show_starts = getDateTimeObj(show[u'starts']) show_end = getDateTimeObj(show[u'ends']) time_delta = show_end - show_starts self.shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name']] + + delta = self.get_time_till_next_show() + # awake at least 5 seconds prior to the show start + self.time_till_next_show = delta - 5 + self.logger.info(self.shows_to_record) + + def get_time_till_next_show(self): + if len(self.shows_to_record) != 0: + tnow = datetime.datetime.now() + sorted_show_keys = sorted(self.shows_to_record.keys()) + + start_time = sorted_show_keys[0] + next_show = getDateTimeObj(start_time) + + delta = next_show - tnow + out = delta.seconds + + self.logger.debug("Next show %s", next_show) + self.logger.debug("Now %s", tnow) + else: + out = 3600 + return out - def check_record(self): + def start_record(self): if len(self.shows_to_record) != 0: try: - tnow = datetime.datetime.now() - sorted_show_keys = sorted(self.shows_to_record.keys()) - - start_time = sorted_show_keys[0] - next_show = getDateTimeObj(start_time) - - self.logger.debug("Next show %s", next_show) - self.logger.debug("Now %s", tnow) - - delta = next_show - tnow - min_delta = datetime.timedelta(seconds=5) - - if delta <= min_delta: - self.logger.debug("sleeping %s seconds until show", delta.seconds) - time.sleep(delta.seconds) - - show_length = self.shows_to_record[start_time][0] - show_instance = self.shows_to_record[start_time][1] - show_name = self.shows_to_record[start_time][2] - - self.sr = ShowRecorder(show_instance, show_name, show_length.seconds, start_time, filetype="mp3") - self.sr.start() - - #remove show from shows to record. - del self.shows_to_record[start_time] + delta = self.get_time_till_next_show() + + self.logger.debug("sleeping %s seconds until show", delta) + time.sleep(delta) + + sorted_show_keys = sorted(self.shows_to_record.keys()) + start_time = sorted_show_keys[0] + show_length = self.shows_to_record[start_time][0] + show_instance = self.shows_to_record[start_time][1] + show_name = self.shows_to_record[start_time][2] + + self.sr = ShowRecorder(show_instance, show_name, show_length.seconds, start_time, filetype="mp3") + self.sr.start() + + #remove show from shows to record. + del self.shows_to_record[start_time] + self.time_till_next_show = 3600 except Exception,e : self.logger.error(e) else: - self.logger.info("No recording schedule...") - - + self.logger.debug("No recording scheduled...") + + """ + Main loop of the thread: + Wait for schedule updates from RabbitMQ, but in case there arent any, + poll the server to get the upcoming schedule. + """ def run(self): - self.logger.info("RecordScheduler started...") - while True: - if not self.queue.empty(): - try: - self.logger.debug("Received data from command handler") - shows = self.queue.get() - self.logger.debug('shows %s' % shows) - self.process_shows(shows) - except Exception, e: - self.logger.error(e) - self.check_record() - time.sleep(1) + self.logger.info("Started...") + while not self.init_rabbit_mq(): + self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") + time.sleep(5) + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + try: + temp = self.api_client.get_shows_to_record() + if temp is not None: + shows = temp['shows'] + self.parse_shows(shows) + self.logger.info("Bootstrap complete: got initial copy of the schedule") + except Exception, e: + self.logger.error(e) + + loops = 1 + while True: + self.logger.info("Loop #%s", loops) + try: + # block until 5 seconds before the next show start + self.connection.drain_events(timeout=self.time_till_next_show) + except Exception, e: + self.logger.info(e) + # start recording + self.start_record() + + loops += 1 if __name__ == '__main__': - q = Queue() - - cl = CommandListener(q) + cl = CommandListener() cl.start() - rs = RecordScheduler(q) - rs.start() -