CC-4915: Media-Monitor cannot handle rabbitmq restart event

-fixed
This commit is contained in:
Martin Konecny 2013-03-01 15:52:19 -05:00
parent d4891803cc
commit 2f33e99ff5
3 changed files with 33 additions and 18 deletions

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from kombu.messaging import Exchange, Queue, Consumer
from kombu.connection import BrokerConnection
from kombu.simple import SimpleQueue
from os.path import normpath
import json
@ -24,35 +25,43 @@ class AirtimeNotifier(Loggable):
"""
def __init__(self, cfg, message_receiver):
self.cfg = cfg
self.handler = message_receiver
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
def init_rabbit_mq(self):
try:
self.handler = message_receiver
self.logger.info("Initializing RabbitMQ message consumer...")
schedule_exchange = Exchange("airtime-media-monitor", "direct",
durable=True, auto_delete=True)
schedule_queue = Queue("media-monitor", exchange=schedule_exchange,
key="filesystem")
self.connection = BrokerConnection(cfg["rabbitmq_host"],
cfg["rabbitmq_user"], cfg["rabbitmq_password"],
cfg["rabbitmq_vhost"])
self.connection = BrokerConnection(self.cfg["rabbitmq_host"],
self.cfg["rabbitmq_user"], self.cfg["rabbitmq_password"],
self.cfg["rabbitmq_vhost"])
channel = self.connection.channel()
consumer = Consumer(channel, schedule_queue)
consumer.register_callback(self.handle_message)
consumer.consume()
self.simple_queue = SimpleQueue(channel, schedule_queue)
self.logger.info("Initialized RabbitMQ consumer.")
except Exception as e:
self.logger.info("Failed to initialize RabbitMQ consumer")
self.logger.error(e)
return False
def handle_message(self, body, message):
return True
def handle_message(self, message):
"""
Messages received from RabbitMQ are handled here. These messages
instruct media-monitor of events such as a new directory being watched,
file metadata has been changed, or any other changes to the config of
media-monitor via the web UI.
"""
message.ack()
self.logger.info("Received md from RabbitMQ: %s" % str(body))
m = json.loads(message.body)
self.logger.info("Received md from RabbitMQ: %s" % str(message))
m = json.loads(message)
# TODO : normalize any other keys that could be used to pass
# directories
if 'directory' in m: m['directory'] = normpath(m['directory'])

View File

@ -1,4 +1,5 @@
import socket
import time
from media.monitor.log import Loggable
from media.monitor.toucher import RepeatTimer
@ -7,13 +8,18 @@ class EventDrainer(Loggable):
Flushes events from RabbitMQ that are sent from airtime every
certain amount of time
"""
def __init__(self, connection, interval=1):
def __init__(self, airtime_notifier, interval=1):
def cb():
# TODO : make 0.3 parameter configurable
try : connection.drain_events(timeout=0.3)
except socket.timeout : pass
except Exception as e :
self.fatal_exception("Error flushing events", e)
try:
message = airtime_notifier.simple_queue.get(block=True)
airtime_notifier.handle_message(message.payload)
message.ack()
except (IOError, AttributeError), e:
self.logger.error('Exception: %s', e)
while not airtime_notifier.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. \
Trying again in few seconds")
time.sleep(5)
t = RepeatTimer(interval, cb)
t.daemon = True

View File

@ -75,7 +75,7 @@ class MM2(InstanceThread, Loggable):
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
else: self.logger.info("Failed to add watch on %s" % str(watch_dir))
EventDrainer(airtime_notifier.connection,
EventDrainer(airtime_notifier,
interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was