Merge pull request #907 from althost/pypofix_rebased

PypoMessageHandler reliable RabbitMQ consumption
This commit is contained in:
Kyle Robbertze 2019-11-06 12:59:35 +02:00 committed by GitHub
commit 180d8686df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 26 additions and 17 deletions

View File

@ -7,15 +7,32 @@ import sys
from threading import Thread from threading import Thread
import time import time
# For RabbitMQ # For RabbitMQ
from kombu.connection import BrokerConnection from kombu.connection import Connection
from kombu.messaging import Exchange, Queue from kombu.messaging import Exchange, Queue
from kombu.simple import SimpleQueue from kombu.simple import SimpleQueue
from amqp.exceptions import AMQPError from amqp.exceptions import AMQPError
import json import json
from kombu.mixins import ConsumerMixin
logging.captureWarnings(True) logging.captureWarnings(True)
class RabbitConsumer(ConsumerMixin):
def __init__(self, connection, queues, handler):
self.connection = connection
self.queues = queues
self.handler = handler
def get_consumers(self, Consumer, channel):
return [
Consumer(self.queues, callbacks=[self.on_message], accept=['text/plain']),
]
def on_message(self, body, message):
self.handler.handle_message(message.payload)
message.ack()
class PypoMessageHandler(Thread): class PypoMessageHandler(Thread):
def __init__(self, pq, rq, config): def __init__(self, pq, rq, config):
Thread.__init__(self) Thread.__init__(self)
@ -26,22 +43,19 @@ class PypoMessageHandler(Thread):
def init_rabbit_mq(self): def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff") self.logger.info("Initializing RabbitMQ stuff")
simple_queue = None
try: try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(self.config["host"], with Connection(self.config["host"], \
self.config["user"], self.config["user"], \
self.config["password"], self.config["password"], \
self.config["vhost"]) self.config["vhost"], \
heartbeat = 5) as connection:
channel = connection.channel() rabbit = RabbitConsumer(connection, [schedule_queue], self)
simple_queue = SimpleQueue(channel, schedule_queue) rabbit.run()
except Exception, e: except Exception, e:
self.logger.error(e) self.logger.error(e)
return simple_queue
""" """
Handle a message from RabbitMQ, put it into our yucky global var. Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this. Hopefully there is a better way to do this.
@ -89,12 +103,7 @@ class PypoMessageHandler(Thread):
def main(self): def main(self):
try: try:
with self.init_rabbit_mq() as queue: self.init_rabbit_mq()
while True:
message = queue.get(block=True)
self.handle_message(message.payload)
# ACK the message to take it off the queue
message.ack()
except Exception, e: except Exception, e:
self.logger.error('Exception: %s', e) self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", traceback.format_exc()) self.logger.error("traceback: %s", traceback.format_exc())