From 03c3cb02d8488ad424b748d472a065dee6149376 Mon Sep 17 00:00:00 2001 From: Roberto Soto Date: Fri, 1 Nov 2019 20:18:11 -0700 Subject: [PATCH 1/5] raising the exception might kill pypopush thread this is apparently the reason why schedules don't get pushed after a silence producing desynchronization between the mvc and liquidsoap and cannot catch up until pypofetch times out. (ON AIR turns off and there's a mess until the schedule is fetched. sometimes it even continues playing the wrong tune) ((logging inside the while True loop produces no output, probably it is locked on the queue?)) --- python_apps/pypo/pypo/pypopush.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python_apps/pypo/pypo/pypopush.py b/python_apps/pypo/pypo/pypopush.py index e9b809473..5427c6112 100644 --- a/python_apps/pypo/pypo/pypopush.py +++ b/python_apps/pypo/pypo/pypopush.py @@ -69,7 +69,6 @@ class PypoPush(Thread): media_schedule = self.queue.get(block=True) except Exception, e: self.logger.error(str(e)) - raise else: self.logger.debug(media_schedule) #separate media_schedule list into currently_playing and From cdf9f6f98e058dc6c62b089f0541d8de29101a69 Mon Sep 17 00:00:00 2001 From: Roberto Soto Date: Sat, 2 Nov 2019 17:48:32 -0700 Subject: [PATCH 2/5] PypoMessageHandlers using a kombu Consumer solves MVC/Liquidsoap synchronization issues: The real error was PypoMessageHandler wasn't consuming messages from RabbitMQ, thus schedules were only updated on PypoFetch time out. Proably queue is in a stale state because it didn't recover from a broken connection. The issue is a SimpleQueue was being used, which's 'python-like' *get* method apparently doesn't handle recovery. Consumers are the way to go instead. --- python_apps/pypo/pypo/pypomessagehandler.py | 42 ++++++++++++--------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/python_apps/pypo/pypo/pypomessagehandler.py b/python_apps/pypo/pypo/pypomessagehandler.py index 61caab976..04b1a6a75 100644 --- a/python_apps/pypo/pypo/pypomessagehandler.py +++ b/python_apps/pypo/pypo/pypomessagehandler.py @@ -7,15 +7,32 @@ import sys from threading import Thread import time # For RabbitMQ -from kombu.connection import BrokerConnection +from kombu.connection import Connection from kombu.messaging import Exchange, Queue from kombu.simple import SimpleQueue from amqp.exceptions import AMQPError import json +from kombu.mixins import ConsumerMixin + logging.captureWarnings(True) +class RabbitConsumer(ConsumerMixin): + def __init__(self, connection, queues, handler): + self.connection = connection + self.queues = queues + self.handler = handler + + def get_consumers(self, Consumer, channel): + return [ + Consumer(self.queues, callbacks=[self.on_message], accept=['text/plain']), + ] + + def on_message(self, body, message): + self.handler.handle_message(message.payload) + message.ack() + class PypoMessageHandler(Thread): def __init__(self, pq, rq, config): Thread.__init__(self) @@ -26,22 +43,18 @@ class PypoMessageHandler(Thread): def init_rabbit_mq(self): self.logger.info("Initializing RabbitMQ stuff") - simple_queue = None try: schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") - connection = BrokerConnection(self.config["host"], - self.config["user"], - self.config["password"], - self.config["vhost"]) - - channel = connection.channel() - simple_queue = SimpleQueue(channel, schedule_queue) + with Connection(self.config["host"], \ + self.config["user"], \ + self.config["password"],\ + self.config["vhost"]) as connection: + rabbit = RabbitConsumer(connection, [schedule_queue], self) + rabbit.run() except Exception, e: self.logger.error(e) - return simple_queue - """ Handle a message from RabbitMQ, put it into our yucky global var. Hopefully there is a better way to do this. @@ -89,12 +102,7 @@ class PypoMessageHandler(Thread): def main(self): try: - with self.init_rabbit_mq() as queue: - while True: - message = queue.get(block=True) - self.handle_message(message.payload) - # ACK the message to take it off the queue - message.ack() + self.init_rabbit_mq() except Exception, e: self.logger.error('Exception: %s', e) self.logger.error("traceback: %s", traceback.format_exc()) From 5b3c0cb1c8ad7a9f41ecfc46df9e2bb9dd66fb28 Mon Sep 17 00:00:00 2001 From: Roberto Soto Date: Sat, 2 Nov 2019 17:56:54 -0700 Subject: [PATCH 3/5] Revert "raising the exception might kill pypopush thread" This reverts commit 2afe01b3ddc94194101a4a2149116e27e0e115e5. --- python_apps/pypo/pypo/pypopush.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python_apps/pypo/pypo/pypopush.py b/python_apps/pypo/pypo/pypopush.py index 5427c6112..e9b809473 100644 --- a/python_apps/pypo/pypo/pypopush.py +++ b/python_apps/pypo/pypo/pypopush.py @@ -69,6 +69,7 @@ class PypoPush(Thread): media_schedule = self.queue.get(block=True) except Exception, e: self.logger.error(str(e)) + raise else: self.logger.debug(media_schedule) #separate media_schedule list into currently_playing and From 8a0f4740753e1e3efd6e7150e627d537531acd79 Mon Sep 17 00:00:00 2001 From: Roberto Soto Date: Sat, 2 Nov 2019 20:00:43 -0700 Subject: [PATCH 4/5] use AMQP heartbeat feature --- python_apps/pypo/pypo/pypomessagehandler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python_apps/pypo/pypo/pypomessagehandler.py b/python_apps/pypo/pypo/pypomessagehandler.py index 04b1a6a75..5582be086 100644 --- a/python_apps/pypo/pypo/pypomessagehandler.py +++ b/python_apps/pypo/pypo/pypomessagehandler.py @@ -48,8 +48,9 @@ class PypoMessageHandler(Thread): schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") with Connection(self.config["host"], \ self.config["user"], \ - self.config["password"],\ - self.config["vhost"]) as connection: + self.config["password"], \ + self.config["vhost"], \ + hearbeat = 4) as connection: rabbit = RabbitConsumer(connection, [schedule_queue], self) rabbit.run() except Exception, e: From bf7659dfa0be5da9bc2f7b3768034f3a7f30b6f2 Mon Sep 17 00:00:00 2001 From: Roberto Soto Date: Mon, 4 Nov 2019 13:05:07 -0800 Subject: [PATCH 5/5] fix heartbeat spelling --- python_apps/pypo/pypo/pypomessagehandler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_apps/pypo/pypo/pypomessagehandler.py b/python_apps/pypo/pypo/pypomessagehandler.py index 5582be086..6e31d2f70 100644 --- a/python_apps/pypo/pypo/pypomessagehandler.py +++ b/python_apps/pypo/pypo/pypomessagehandler.py @@ -50,7 +50,7 @@ class PypoMessageHandler(Thread): self.config["user"], \ self.config["password"], \ self.config["vhost"], \ - hearbeat = 4) as connection: + heartbeat = 5) as connection: rabbit = RabbitConsumer(connection, [schedule_queue], self) rabbit.run() except Exception, e: