diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index 228ea6a21..9a8567c38 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -21,7 +21,7 @@ from .config import CACHE_DIR, RECORD_DIR, Config from .history.stats import StatsCollectorThread from .liquidsoap.client import LiquidsoapClient from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION -from .message_handler import PypoMessageHandler +from .message_handler import MessageListener from .player.fetch import PypoFetch from .player.file import PypoFile from .player.liquidsoap import PypoLiquidsoap @@ -96,12 +96,6 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ pypo_liquidsoap = PypoLiquidsoap(liq_client) - # Pass only the configuration sections needed; PypoMessageHandler only - # needs rabbitmq settings - message_handler = PypoMessageHandler(fetch_queue, recorder_queue, config.rabbitmq) - message_handler.daemon = True - message_handler.start() - file_thread = PypoFile(file_queue, api_client) file_thread.start() @@ -126,7 +120,5 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ stats_collector_thread = StatsCollectorThread(config, legacy_client) stats_collector_thread.start() - # Just sleep the main thread, instead of blocking on fetch_thread.join(). - # This allows CTRL-C to work! - while True: - time.sleep(1) + message_listener = MessageListener(config, fetch_queue, recorder_queue) + message_listener.run_forever() diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index 88de47caf..430b758ad 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -1,124 +1,107 @@ import json -import time from queue import Queue as ThreadQueue -from threading import Thread +from signal import SIGTERM, signal +from time import sleep # For RabbitMQ from kombu.connection import Connection from kombu.messaging import Exchange, Queue from kombu.mixins import ConsumerMixin -from libretime_shared.config import RabbitMQConfig from loguru import logger +from .config import Config -class RabbitConsumer(ConsumerMixin): - def __init__(self, connection, queues, handler): + +class MessageHandler(ConsumerMixin): + def __init__( + self, + connection: Connection, + fetch_queue: ThreadQueue, + recorder_queue: ThreadQueue, + ): self.connection = connection - self.queues = queues - self.handler = handler + + self.fetch_queue = fetch_queue + self.recorder_queue = recorder_queue def get_consumers(self, Consumer, channel): + exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) + queues = [Queue("pypo-fetch", exchange=exchange, key="foo")] + return [ - Consumer(self.queues, callbacks=[self.on_message], accept=["text/plain"]), + Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]), ] def on_message(self, body, message): - self.handler.handle_message(message.payload) + logger.debug(f"received message: {body}") + try: + try: + body = body.decode() + except (UnicodeDecodeError, AttributeError): + pass + + payload = json.loads(body) + command = payload["event_type"] + logger.info(f"handling command: {command}") + + if command in ( + "update_schedule", + "reset_liquidsoap_bootstrap", + "update_stream_format", + "update_message_offline", + "update_station_name", + "switch_source", + "update_transition_fade", + "disconnect_source", + ): + self.fetch_queue.put(message.payload) + + elif command in ( + "update_recorder_schedule", + "cancel_recording", + ): + self.recorder_queue.put(message.payload) + + else: + logger.warning(f"invalid command: {command}") + + except Exception as exception: + logger.exception(exception) + message.ack() -class PypoMessageHandler(Thread): - name = "message_handler" - +class MessageListener: def __init__( self, + config: Config, fetch_queue: ThreadQueue, recorder_queue: ThreadQueue, - config: RabbitMQConfig, - ): - Thread.__init__(self) - self.pypo_queue = fetch_queue - self.recorder_queue = recorder_queue + ) -> None: self.config = config - def init_rabbit_mq(self): - logger.info("Initializing RabbitMQ stuff") - try: - schedule_exchange = Exchange( - "airtime-pypo", "direct", durable=True, auto_delete=True - ) - schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") - with Connection( - f"amqp://{self.config.user}:{self.config.password}" - f"@{self.config.host}:{self.config.port}" - f"/{self.config.vhost}", - heartbeat=5, - ) as connection: - rabbit = RabbitConsumer(connection, [schedule_queue], self) - rabbit.run() - except Exception as exception: - logger.exception(exception) + self.fetch_queue = fetch_queue + self.recorder_queue = recorder_queue - # Handle a message from RabbitMQ, put it into our yucky global var. - # Hopefully there is a better way to do this. - - def handle_message(self, message): - try: - logger.info("Received event from RabbitMQ: %s" % message) - - try: - message = message.decode() - except (UnicodeDecodeError, AttributeError): - pass - m = json.loads(message) - command = m["event_type"] - logger.info("Handling command: " + command) - - if command == "update_schedule": - logger.info("Updating schedule...") - self.pypo_queue.put(message) - elif command == "reset_liquidsoap_bootstrap": - logger.info("Resetting bootstrap vars...") - self.pypo_queue.put(message) - elif command == "update_stream_format": - logger.info("Updating stream format...") - self.pypo_queue.put(message) - elif command == "update_message_offline": - logger.info("Updating message offline...") - self.pypo_queue.put(message) - elif command == "update_station_name": - logger.info("Updating station name...") - self.pypo_queue.put(message) - elif command == "switch_source": - logger.info("switch_source command received...") - self.pypo_queue.put(message) - elif command == "update_transition_fade": - logger.info("Updating trasition fade...") - self.pypo_queue.put(message) - elif command == "disconnect_source": - logger.info("disconnect_source command received...") - self.pypo_queue.put(message) - elif command == "update_recorder_schedule": - self.recorder_queue.put(message) - elif command == "cancel_recording": - self.recorder_queue.put(message) - else: - logger.info("Unknown command: %s" % command) - except Exception as exception: - logger.exception(exception) - - def main(self): - try: - self.init_rabbit_mq() - except Exception as exception: - logger.exception(exception) - logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds") - time.sleep(5) - - # Main loop of the thread: - # Wait for schedule updates from RabbitMQ, but in case there aren't any, - # poll the server to get the upcoming schedule. - - def run(self): + def run_forever(self): while True: - self.main() + with Connection( + self.config.rabbitmq.url, + heartbeat=5, + transport_options={"client_properties": {"connection_name": "playout"}}, + ) as connection: + handler = MessageHandler( + connection=connection, + fetch_queue=self.fetch_queue, + recorder_queue=self.recorder_queue, + ) + + def shutdown(_signum, _frame): + raise SystemExit() + + signal(SIGTERM, shutdown) + + logger.info("starting message handler") + handler.run() + + sleep(5)