diff --git a/python_apps/pypo/pypo/pypomessagehandler.py b/python_apps/pypo/pypo/pypomessagehandler.py index 61caab976..6e31d2f70 100644 --- a/python_apps/pypo/pypo/pypomessagehandler.py +++ b/python_apps/pypo/pypo/pypomessagehandler.py @@ -7,15 +7,32 @@ import sys from threading import Thread import time # For RabbitMQ -from kombu.connection import BrokerConnection +from kombu.connection import Connection from kombu.messaging import Exchange, Queue from kombu.simple import SimpleQueue from amqp.exceptions import AMQPError import json +from kombu.mixins import ConsumerMixin + logging.captureWarnings(True) +class RabbitConsumer(ConsumerMixin): + def __init__(self, connection, queues, handler): + self.connection = connection + self.queues = queues + self.handler = handler + + def get_consumers(self, Consumer, channel): + return [ + Consumer(self.queues, callbacks=[self.on_message], accept=['text/plain']), + ] + + def on_message(self, body, message): + self.handler.handle_message(message.payload) + message.ack() + class PypoMessageHandler(Thread): def __init__(self, pq, rq, config): Thread.__init__(self) @@ -26,22 +43,19 @@ class PypoMessageHandler(Thread): def init_rabbit_mq(self): self.logger.info("Initializing RabbitMQ stuff") - simple_queue = None try: schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") - connection = BrokerConnection(self.config["host"], - self.config["user"], - self.config["password"], - self.config["vhost"]) - - channel = connection.channel() - simple_queue = SimpleQueue(channel, schedule_queue) + with Connection(self.config["host"], \ + self.config["user"], \ + self.config["password"], \ + self.config["vhost"], \ + heartbeat = 5) as connection: + rabbit = RabbitConsumer(connection, [schedule_queue], self) + rabbit.run() except Exception, e: self.logger.error(e) - return simple_queue - """ Handle a message from RabbitMQ, put it into our yucky global var. Hopefully there is a better way to do this. @@ -89,12 +103,7 @@ class PypoMessageHandler(Thread): def main(self): try: - with self.init_rabbit_mq() as queue: - while True: - message = queue.get(block=True) - self.handle_message(message.payload) - # ACK the message to take it off the queue - message.ack() + self.init_rabbit_mq() except Exception, e: self.logger.error('Exception: %s', e) self.logger.error("traceback: %s", traceback.format_exc())