sintonia/playout/libretime_playout/message_handler.py

111 lines
3.1 KiB
Python
Raw Normal View History

import json
2023-02-26 01:27:00 +01:00
import logging
from queue import Queue as ThreadQueue
from signal import SIGTERM, signal
from time import sleep
from typing import Any, Dict
# For RabbitMQ
from kombu.connection import Connection
from kombu.messaging import Exchange, Queue
from kombu.mixins import ConsumerMixin
from .config import Config
2023-02-26 01:27:00 +01:00
logger = logging.getLogger(__name__)
class MessageHandler(ConsumerMixin):
def __init__(
self,
connection: Connection,
fetch_queue: ThreadQueue,
recorder_queue: ThreadQueue,
):
self.connection = connection
self.fetch_queue = fetch_queue
self.recorder_queue = recorder_queue
def get_consumers(self, Consumer, channel):
exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
queues = [Queue("pypo-fetch", exchange=exchange, key="foo")]
return [
Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),
]
def on_message(self, body, message):
logger.debug(f"received message: {body}")
try:
try:
body = body.decode()
except (UnicodeDecodeError, AttributeError):
pass
payload = json.loads(body)
command = payload["event_type"]
logger.info(f"handling command: {command}")
if command in (
"update_schedule",
"reset_liquidsoap_bootstrap",
"update_stream_format",
"update_message_offline",
"update_station_name",
"switch_source",
"update_transition_fade",
"disconnect_source",
):
self.fetch_queue.put(message.payload)
elif command in (
"update_recorder_schedule",
"cancel_recording",
):
self.recorder_queue.put(message.payload)
else:
logger.warning(f"invalid command: {command}")
except Exception as exception:
logger.exception(exception)
message.ack()
2021-05-27 16:23:02 +02:00
2022-08-14 19:55:39 +02:00
class MessageListener:
def __init__(
self,
config: Config,
fetch_queue: ThreadQueue[Dict[str, Any]],
recorder_queue: ThreadQueue[Dict[str, Any]],
) -> None:
self.config = config
self.fetch_queue = fetch_queue
self.recorder_queue = recorder_queue
def run_forever(self):
while True:
2021-05-27 16:23:02 +02:00
with Connection(
self.config.rabbitmq.url,
2021-05-27 16:23:02 +02:00
heartbeat=5,
transport_options={"client_properties": {"connection_name": "playout"}},
2021-05-27 16:23:02 +02:00
) as connection:
handler = MessageHandler(
connection=connection,
fetch_queue=self.fetch_queue,
recorder_queue=self.recorder_queue,
)
def shutdown(_signum, _frame):
raise SystemExit()
2021-05-27 16:23:02 +02:00
signal(SIGTERM, shutdown)
logger.info("starting message handler")
handler.run()
sleep(5)