PypoMessageHandlers using a kombu Consumer solves MVC/Liquidsoap synchronization issues:

The real error was PypoMessageHandler wasn't consuming messages from RabbitMQ, thus schedules were only updated on PypoFetch time out.
Proably queue is in a stale state  because it didn't recover from a broken connection.
The issue is a SimpleQueue was being used, which's 'python-like' *get*  method apparently doesn't handle recovery.
Consumers are the way to go instead.
This commit is contained in:
Roberto Soto 2019-11-02 17:48:32 -07:00
parent 03c3cb02d8
commit cdf9f6f98e
1 changed files with 25 additions and 17 deletions

View File

@ -7,15 +7,32 @@ import sys
from threading import Thread
import time
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.connection import Connection
from kombu.messaging import Exchange, Queue
from kombu.simple import SimpleQueue
from amqp.exceptions import AMQPError
import json
from kombu.mixins import ConsumerMixin
logging.captureWarnings(True)
class RabbitConsumer(ConsumerMixin):
def __init__(self, connection, queues, handler):
self.connection = connection
self.queues = queues
self.handler = handler
def get_consumers(self, Consumer, channel):
return [
Consumer(self.queues, callbacks=[self.on_message], accept=['text/plain']),
]
def on_message(self, body, message):
self.handler.handle_message(message.payload)
message.ack()
class PypoMessageHandler(Thread):
def __init__(self, pq, rq, config):
Thread.__init__(self)
@ -26,22 +43,18 @@ class PypoMessageHandler(Thread):
def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff")
simple_queue = None
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(self.config["host"],
self.config["user"],
self.config["password"],
self.config["vhost"])
channel = connection.channel()
simple_queue = SimpleQueue(channel, schedule_queue)
with Connection(self.config["host"], \
self.config["user"], \
self.config["password"],\
self.config["vhost"]) as connection:
rabbit = RabbitConsumer(connection, [schedule_queue], self)
rabbit.run()
except Exception, e:
self.logger.error(e)
return simple_queue
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
@ -89,12 +102,7 @@ class PypoMessageHandler(Thread):
def main(self):
try:
with self.init_rabbit_mq() as queue:
while True:
message = queue.get(block=True)
self.handle_message(message.payload)
# ACK the message to take it off the queue
message.ack()
self.init_rabbit_mq()
except Exception, e:
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", traceback.format_exc())