From f030cf4f672a34f5891346fec0fbd866644cd41f Mon Sep 17 00:00:00 2001 From: James Date: Mon, 27 Feb 2012 16:18:10 -0500 Subject: [PATCH] CC-3346: Recorder: Merge recorder with pypo - separated rabitMQ listener part out from pypoFetch and created pypomessagehandler.py --- python_apps/pypo/logging.cfg | 16 +++- python_apps/pypo/pypo-cli.py | 21 +++-- python_apps/pypo/pypofetch.py | 104 +++------------------ python_apps/pypo/pypomessagehandler.py | 120 +++++++++++++++++++++++++ python_apps/pypo/recorder.py | 56 ++++++++++-- 5 files changed, 207 insertions(+), 110 deletions(-) create mode 100644 python_apps/pypo/pypomessagehandler.py diff --git a/python_apps/pypo/logging.cfg b/python_apps/pypo/logging.cfg index acff7007d..6dae7d9c5 100644 --- a/python_apps/pypo/logging.cfg +++ b/python_apps/pypo/logging.cfg @@ -1,8 +1,8 @@ [loggers] -keys=root,fetch,push,recorder +keys=root,fetch,push,recorder,message_h [handlers] -keys=pypo,recorder +keys=pypo,recorder,message_h [formatters] keys=simpleFormatter @@ -29,6 +29,12 @@ handlers=recorder qualname=recorder propagate=0 +[logger_message_h] +level=DEBUG +handlers=message_h +qualname=message_h +propagate=0 + [handler_pypo] class=logging.handlers.RotatingFileHandler level=DEBUG @@ -41,6 +47,12 @@ level=DEBUG formatter=simpleFormatter args=("/var/log/airtime/pypo/show-recorder.log", 'a', 1000000, 5,) +[handler_message_h] +class=logging.handlers.RotatingFileHandler +level=DEBUG +formatter=simpleFormatter +args=("/var/log/airtime/pypo/message-handler.log", 'a', 1000000, 5,) + [formatter_simpleFormatter] format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s datefmt= diff --git a/python_apps/pypo/pypo-cli.py b/python_apps/pypo/pypo-cli.py index d82d0b782..1fcf6e59c 100644 --- a/python_apps/pypo/pypo-cli.py +++ b/python_apps/pypo/pypo-cli.py @@ -16,6 +16,7 @@ from Queue import Queue from pypopush import PypoPush from pypofetch import PypoFetch from recorder import Recorder +from pypomessagehandler import PypoMessageHandler from configobj import ConfigObj @@ -127,11 +128,19 @@ if __name__ == '__main__': api_client = api_client.api_client_factory(config) api_client.register_component("pypo") - q = Queue() - + pypoFetch_q = Queue() recorder_q = Queue() - - pp = PypoPush(q) + pypoPush_q = Queue() + + pmh = PypoMessageHandler(pypoFetch_q, recorder_q) + pmh.daemon = True + pmh.start() + + pf = PypoFetch(pypoFetch_q, pypoPush_q) + pf.daemon = True + pf.start() + + pp = PypoPush(pypoPush_q) pp.daemon = True pp.start() @@ -139,10 +148,6 @@ if __name__ == '__main__': recorder.daemon = True recorder.start() - pf = PypoFetch(q, recorder_q) - pf.daemon = True - pf.start() - #pp.join() pf.join() logger.info("pypo fetch exit") diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index f211f0e20..4a4bc59ca 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -16,13 +16,6 @@ from datetime import datetime from datetime import timedelta from Queue import Empty import filecmp -import thread - -# For RabbitMQ -from kombu.connection import BrokerConnection -from kombu.messaging import Exchange, Queue, Consumer, Producer -from kombu.exceptions import MessageStateError -from kombu.simple import SimpleQueue from api_clients import api_client @@ -44,30 +37,15 @@ except Exception, e: sys.exit() class PypoFetch(Thread): - def __init__(self, q, recorder_q): + def __init__(self, pypoFetch_q, pypoPush_q): Thread.__init__(self) self.api_client = api_client.api_client_factory(config) self.set_export_source('scheduler') - self.queue = q - self.recorder_queue = recorder_q + self.fetch_queue = pypoFetch_q + self.push_queue = pypoPush_q self.schedule_data = [] logger = logging.getLogger('fetch') logger.info("PypoFetch: init complete") - - def init_rabbit_mq(self): - logger = logging.getLogger('fetch') - logger.info("Initializing RabbitMQ stuff") - try: - schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) - schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") - connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"]) - channel = connection.channel() - self.simple_queue = SimpleQueue(channel, schedule_queue) - except Exception, e: - logger.error(e) - return False - - return True """ Handle a message from RabbitMQ, put it into our yucky global var. @@ -84,7 +62,7 @@ class PypoFetch(Thread): if command == 'update_schedule': self.schedule_data = m['schedule'] - thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", False)) + self.process_schedule(self.schedule_data, "scheduler", False) elif command == 'update_stream_setting': logger.info("Updating stream setting...") self.regenerateLiquidsoapConf(m['setting']) @@ -97,13 +75,11 @@ class PypoFetch(Thread): elif command == 'cancel_current_show': logger.info("Cancel current show command received...") self.stop_current_show() - elif command == 'update_recorder_schedule': - temp = m - if temp is not None: - self.process_recorder_schedule(temp) - elif command == 'cancel_recording': - self.recorder_queue.put('cancel_recording') except Exception, e: + import traceback + top = traceback.format_exc() + logger.error('Exception: %s', e) + logger.error("traceback: %s", top) logger.error("Exception in handling RabbitMQ message: %s", e) def stop_current_show(self): @@ -316,36 +292,11 @@ class PypoFetch(Thread): scheduled_data = dict() scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists scheduled_data['schedule'] = playlists - self.queue.put(scheduled_data) + self.push_queue.put(scheduled_data) # cleanup try: self.cleanup(self.export_source) except Exception, e: logger.error("%s", e) - - def getDateTimeObj(self,time): - timeinfo = time.split(" ") - date = timeinfo[0].split("-") - time = timeinfo[1].split(":") - - date = map(int, date) - time = map(int, time) - - return datetime(date[0], date[1], date[2], time[0], time[1], time[2], 0, None) - - def process_recorder_schedule(self, m): - logger = logging.getLogger('fetch') - logger.info("Parsing recording show schedules...") - shows_to_record = {} - shows = m['shows'] - for show in shows: - show_starts = self.getDateTimeObj(show[u'starts']) - show_end = self.getDateTimeObj(show[u'ends']) - time_delta = show_end - show_starts - - shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']] - self.recorder_queue.put(shows_to_record) - logger.info(shows_to_record) - """ In this function every audio file is cut as necessary (cue_in/cue_out != 0) @@ -519,24 +470,7 @@ class PypoFetch(Thread): status, self.schedule_data = self.api_client.get_schedule() if status == 1: logger.info("Bootstrap schedule received: %s", self.schedule_data) - thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", True)) - - # Bootstrap: since we are just starting up, we need to grab the - # most recent schedule. After that we can just wait for updates. - try: - temp = self.api_client.get_shows_to_record() - if temp is not None: - self.process_recorder_schedule(temp) - logger.info("Bootstrap recorder schedule received: %s", temp) - except Exception, e: - logger.error(e) - - logger.info("Bootstrap complete: got initial copy of the schedule") - - - while not self.init_rabbit_mq(): - logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") - time.sleep(5) + self.process_schedule(self.schedule_data, "scheduler", True) loops = 1 while True: @@ -556,10 +490,8 @@ class PypoFetch(Thread): Currently we are checking every 3600 seconds (1 hour) """ - message = self.simple_queue.get(block=True, timeout=3600) - self.handle_message(message.payload) - # ACK the message to take it off the queue - message.ack() + message = self.fetch_queue.get(block=True, timeout=3600) + self.handle_message(message) except Empty, e: """ Queue timeout. Fetching data manually @@ -584,17 +516,7 @@ class PypoFetch(Thread): """ status, self.schedule_data = self.api_client.get_schedule() if status == 1: - thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", False)) - """ - Fetch recorder schedule - """ - try: - temp = self.api_client.get_shows_to_record() - if temp is not None: - self.process_recorder_schedule(temp) - logger.info("updated recorder schedule received: %s", temp) - except Exception, e: - logger.error(e) + self.process_schedule(self.schedule_data, "scheduler", False) loops += 1 diff --git a/python_apps/pypo/pypomessagehandler.py b/python_apps/pypo/pypomessagehandler.py new file mode 100644 index 000000000..75b0407ea --- /dev/null +++ b/python_apps/pypo/pypomessagehandler.py @@ -0,0 +1,120 @@ +import logging +import logging.config +import sys +from configobj import ConfigObj +from threading import Thread +import time +# For RabbitMQ +from kombu.connection import BrokerConnection +from kombu.messaging import Exchange, Queue, Consumer, Producer +from kombu.exceptions import MessageStateError +from kombu.simple import SimpleQueue +import json + +# configure logging +logging.config.fileConfig("logging.cfg") + +# loading config file +try: + config = ConfigObj('/etc/airtime/pypo.cfg') + LS_HOST = config['ls_host'] + LS_PORT = config['ls_port'] + POLL_INTERVAL = int(config['poll_interval']) + +except Exception, e: + logger = logging.getLogger('message_h') + logger.error('Error loading config file: %s', e) + sys.exit() + +class PypoMessageHandler(Thread): + def __init__(self, pq, rq): + Thread.__init__(self) + self.logger = logging.getLogger('message_h') + self.pypo_queue = pq + self.recorder_queue = rq + + def init_rabbit_mq(self): + self.logger.info("Initializing RabbitMQ stuff") + try: + schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") + connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"]) + channel = connection.channel() + self.simple_queue = SimpleQueue(channel, schedule_queue) + except Exception, e: + self.logger.error(e) + return False + + return True + + """ + Handle a message from RabbitMQ, put it into our yucky global var. + Hopefully there is a better way to do this. + """ + def handle_message(self, message): + try: + self.logger.info("Received event from RabbitMQ: %s" % message) + + m = json.loads(message) + command = m['event_type'] + self.logger.info("Handling command: " + command) + + if command == 'update_schedule': + self.logger.info("Updating schdule...") + self.pypo_queue.put(message) + elif command == 'update_stream_setting': + self.logger.info("Updating stream setting...") + self.pypo_queue.put(message) + elif command == 'update_stream_format': + self.logger.info("Updating stream format...") + self.pypo_queue.put(message) + elif command == 'update_station_name': + self.logger.info("Updating station name...") + self.pypo_queue.put(message) + elif command == 'cancel_current_show': + self.logger.info("Cancel current show command received...") + self.pypo_queue.put(message) + elif command == 'update_recorder_schedule': + self.recorder_queue.put(message) + elif command == 'cancel_recording': + self.recorder_queue.put(message) + except Exception, e: + self.logger.error("Exception in handling RabbitMQ message: %s", e) + + def main(self): + while not self.init_rabbit_mq(): + self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") + time.sleep(5) + + loops = 1 + while True: + self.logger.info("Loop #%s", loops) + try: + message = self.simple_queue.get(block=True) + self.handle_message(message.payload) + # ACK the message to take it off the queue + message.ack() + except Exception, e: + """ + sleep 5 seconds so that we don't spin inside this + while loop and eat all the CPU + """ + time.sleep(5) + + """ + There is a problem with the RabbitMq messenger service. Let's + log the error and get the schedule via HTTP polling + """ + self.logger.error("Exception, %s", e) + + loops += 1 + + """ + Main loop of the thread: + Wait for schedule updates from RabbitMQ, but in case there arent any, + poll the server to get the upcoming schedule. + """ + def run(self): + while True: + self.main() + diff --git a/python_apps/pypo/recorder.py b/python_apps/pypo/recorder.py index 6347c4f76..a4052c485 100644 --- a/python_apps/pypo/recorder.py +++ b/python_apps/pypo/recorder.py @@ -176,19 +176,35 @@ class Recorder(Thread): self.server_timezone = '' self.queue = q self.logger.info("RecorderFetch: init complete") + self.loops = 0 def handle_message(self): if not self.queue.empty(): - msg = self.queue.get() - self.logger.info("Receivied msg from Pypo Fetch: %s", msg) - if msg == 'cancel_recording': + message = self.queue.get() + msg = json.loads(message) + command = msg["event_type"] + self.logger.info("Received msg from Pypo Fetch: %s", msg) + if command == 'cancel_recording': if self.sr is not None and self.sr.is_recording(): self.sr.cancel_recording() else: - self.shows_to_record = msg + self.process_recorder_schedule(msg) + self.loops = 0 if self.shows_to_record: self.start_record() + + def process_recorder_schedule(self, m): + self.logger.info("Parsing recording show schedules...") + temp_shows_to_record = {} + shows = m['shows'] + for show in shows: + show_starts = getDateTimeObj(show[u'starts']) + show_end = getDateTimeObj(show[u'ends']) + time_delta = show_end - show_starts + + temp_shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']] + self.shows_to_record = temp_shows_to_record def get_time_till_next_show(self): if len(self.shows_to_record) != 0: @@ -247,21 +263,43 @@ class Recorder(Thread): def run(self): try: self.logger.info("Started...") - + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + try: + temp = self.api_client.get_shows_to_record() + if temp is not None: + self.process_recorder_schedule(temp) + self.logger.info("Bootstrap recorder schedule received: %s", temp) + except Exception, e: + self.logger.error(e) + + self.logger.info("Bootstrap complete: got initial copy of the schedule") + recording = False - loops = 0 + self.loops = 0 heartbeat_period = math.floor(30/PUSH_INTERVAL) while True: - if loops % heartbeat_period == 0: + if self.loops % heartbeat_period == 0: self.logger.info("heartbeat") - loops = 0 + if self.loops * PUSH_INTERVAL > 3600: + self.loops = 0 + """ + Fetch recorder schedule + """ + try: + temp = self.api_client.get_shows_to_record() + if temp is not None: + self.process_recorder_schedule(temp) + self.logger.info("updated recorder schedule received: %s", temp) + except Exception, e: + self.logger.error(e) try: self.handle_message() except Exception, e: self.logger.error('Pypo Recorder Exception: %s', e) time.sleep(PUSH_INTERVAL) - loops += 1 + self.loops += 1 except Exception,e : import traceback top = traceback.format_exc()