From 2f33e99ff51dba84c4c9d8ca2338e0b6185133bf Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 1 Mar 2013 15:52:19 -0500 Subject: [PATCH] CC-4915: Media-Monitor cannot handle rabbitmq restart event -fixed --- .../media-monitor2/media/monitor/airtime.py | 31 ++++++++++++------- .../media/monitor/eventdrainer.py | 18 +++++++---- .../media-monitor2/media/saas/launcher.py | 2 +- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/python_apps/media-monitor2/media/monitor/airtime.py b/python_apps/media-monitor2/media/monitor/airtime.py index 19659ed21..44d88aa17 100644 --- a/python_apps/media-monitor2/media/monitor/airtime.py +++ b/python_apps/media-monitor2/media/monitor/airtime.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from kombu.messaging import Exchange, Queue, Consumer from kombu.connection import BrokerConnection +from kombu.simple import SimpleQueue from os.path import normpath import json @@ -24,35 +25,43 @@ class AirtimeNotifier(Loggable): """ def __init__(self, cfg, message_receiver): self.cfg = cfg + self.handler = message_receiver + while not self.init_rabbit_mq(): + self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") + time.sleep(5) + + def init_rabbit_mq(self): try: - self.handler = message_receiver self.logger.info("Initializing RabbitMQ message consumer...") schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") - self.connection = BrokerConnection(cfg["rabbitmq_host"], - cfg["rabbitmq_user"], cfg["rabbitmq_password"], - cfg["rabbitmq_vhost"]) + self.connection = BrokerConnection(self.cfg["rabbitmq_host"], + self.cfg["rabbitmq_user"], self.cfg["rabbitmq_password"], + self.cfg["rabbitmq_vhost"]) channel = self.connection.channel() - consumer = Consumer(channel, schedule_queue) - consumer.register_callback(self.handle_message) - consumer.consume() + + self.simple_queue = SimpleQueue(channel, schedule_queue) + self.logger.info("Initialized RabbitMQ consumer.") except Exception as e: self.logger.info("Failed to initialize RabbitMQ consumer") self.logger.error(e) + return False - def handle_message(self, body, message): + return True + + + def handle_message(self, message): """ Messages received from RabbitMQ are handled here. These messages instruct media-monitor of events such as a new directory being watched, file metadata has been changed, or any other changes to the config of media-monitor via the web UI. """ - message.ack() - self.logger.info("Received md from RabbitMQ: %s" % str(body)) - m = json.loads(message.body) + self.logger.info("Received md from RabbitMQ: %s" % str(message)) + m = json.loads(message) # TODO : normalize any other keys that could be used to pass # directories if 'directory' in m: m['directory'] = normpath(m['directory']) diff --git a/python_apps/media-monitor2/media/monitor/eventdrainer.py b/python_apps/media-monitor2/media/monitor/eventdrainer.py index 1d3bc96f6..4a7316c5e 100644 --- a/python_apps/media-monitor2/media/monitor/eventdrainer.py +++ b/python_apps/media-monitor2/media/monitor/eventdrainer.py @@ -1,4 +1,5 @@ import socket +import time from media.monitor.log import Loggable from media.monitor.toucher import RepeatTimer @@ -7,13 +8,18 @@ class EventDrainer(Loggable): Flushes events from RabbitMQ that are sent from airtime every certain amount of time """ - def __init__(self, connection, interval=1): + def __init__(self, airtime_notifier, interval=1): def cb(): - # TODO : make 0.3 parameter configurable - try : connection.drain_events(timeout=0.3) - except socket.timeout : pass - except Exception as e : - self.fatal_exception("Error flushing events", e) + try: + message = airtime_notifier.simple_queue.get(block=True) + airtime_notifier.handle_message(message.payload) + message.ack() + except (IOError, AttributeError), e: + self.logger.error('Exception: %s', e) + while not airtime_notifier.init_rabbit_mq(): + self.logger.error("Error connecting to RabbitMQ Server. \ + Trying again in few seconds") + time.sleep(5) t = RepeatTimer(interval, cb) t.daemon = True diff --git a/python_apps/media-monitor2/media/saas/launcher.py b/python_apps/media-monitor2/media/saas/launcher.py index 83a972311..c561464e3 100644 --- a/python_apps/media-monitor2/media/saas/launcher.py +++ b/python_apps/media-monitor2/media/saas/launcher.py @@ -75,7 +75,7 @@ class MM2(InstanceThread, Loggable): airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True) else: self.logger.info("Failed to add watch on %s" % str(watch_dir)) - EventDrainer(airtime_notifier.connection, + EventDrainer(airtime_notifier, interval=float(config['rmq_event_wait'])) # Launch the toucher that updates the last time when the script was