From 10c7c0b5fdbbe41fcddd091caf8c9d3ed6b1ca4d Mon Sep 17 00:00:00 2001 From: martin Date: Fri, 24 Jun 2011 15:13:47 -0400 Subject: [PATCH] CC-2419: Media monitor does not import files that already existed in /srv/airtime/stor -problem where media-monitor froze on startup if rabbitmq wasn't running is fixed (now consistent with pypo) --- .../airtimefilemonitor/airtimenotifier.py | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py index c8607b1a9..25053362d 100644 --- a/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py +++ b/python_apps/media-monitor/airtimefilemonitor/airtimenotifier.py @@ -27,13 +27,26 @@ class AirtimeNotifier(Notifier): self.import_processes = {} self.watched_folders = [] - schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) - schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") - self.connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/") - channel = self.connection.channel() - consumer = Consumer(channel, schedule_queue) - consumer.register_callback(self.handle_message) - consumer.consume() + while not self.init_rabbit_mq(): + logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") + time.sleep(5) + + def init_rabbit_mq(self): + logger = logging.getLogger('fetch') + logger.info("Initializing RabbitMQ stuff") + try: + schedule_exchange = Exchange("airtime-media-monitor", "direct", durable=True, auto_delete=True) + schedule_queue = Queue("media-monitor", exchange=schedule_exchange, key="filesystem") + self.connection = BrokerConnection(self.config.cfg["rabbitmq_host"], self.config.cfg["rabbitmq_user"], self.config.cfg["rabbitmq_password"], "/") + channel = self.connection.channel() + consumer = Consumer(channel, schedule_queue) + consumer.register_callback(self.handle_message) + consumer.consume() + except Exception, e: + logger.error(e) + return False + + return True def handle_message(self, body, message): # ACK the message to take it off the queue