feat(playout): move message handling to main thread

This commit is contained in:
jo 2022-09-15 23:16:07 +02:00 committed by Kyle Robbertze
parent b1c2e34ceb
commit a79980e65f
2 changed files with 82 additions and 107 deletions

View File

@ -21,7 +21,7 @@ from .config import CACHE_DIR, RECORD_DIR, Config
from .history.stats import StatsCollectorThread from .history.stats import StatsCollectorThread
from .liquidsoap.client import LiquidsoapClient from .liquidsoap.client import LiquidsoapClient
from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION
from .message_handler import PypoMessageHandler from .message_handler import MessageListener
from .player.fetch import PypoFetch from .player.fetch import PypoFetch
from .player.file import PypoFile from .player.file import PypoFile
from .player.liquidsoap import PypoLiquidsoap from .player.liquidsoap import PypoLiquidsoap
@ -96,12 +96,6 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
pypo_liquidsoap = PypoLiquidsoap(liq_client) pypo_liquidsoap = PypoLiquidsoap(liq_client)
# Pass only the configuration sections needed; PypoMessageHandler only
# needs rabbitmq settings
message_handler = PypoMessageHandler(fetch_queue, recorder_queue, config.rabbitmq)
message_handler.daemon = True
message_handler.start()
file_thread = PypoFile(file_queue, api_client) file_thread = PypoFile(file_queue, api_client)
file_thread.start() file_thread.start()
@ -126,7 +120,5 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
stats_collector_thread = StatsCollectorThread(config, legacy_client) stats_collector_thread = StatsCollectorThread(config, legacy_client)
stats_collector_thread.start() stats_collector_thread.start()
# Just sleep the main thread, instead of blocking on fetch_thread.join(). message_listener = MessageListener(config, fetch_queue, recorder_queue)
# This allows CTRL-C to work! message_listener.run_forever()
while True:
time.sleep(1)

View File

@ -1,124 +1,107 @@
import json import json
import time
from queue import Queue as ThreadQueue from queue import Queue as ThreadQueue
from threading import Thread from signal import SIGTERM, signal
from time import sleep
# For RabbitMQ # For RabbitMQ
from kombu.connection import Connection from kombu.connection import Connection
from kombu.messaging import Exchange, Queue from kombu.messaging import Exchange, Queue
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
from libretime_shared.config import RabbitMQConfig
from loguru import logger from loguru import logger
from .config import Config
class RabbitConsumer(ConsumerMixin):
def __init__(self, connection, queues, handler): class MessageHandler(ConsumerMixin):
def __init__(
self,
connection: Connection,
fetch_queue: ThreadQueue,
recorder_queue: ThreadQueue,
):
self.connection = connection self.connection = connection
self.queues = queues
self.handler = handler self.fetch_queue = fetch_queue
self.recorder_queue = recorder_queue
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
queues = [Queue("pypo-fetch", exchange=exchange, key="foo")]
return [ return [
Consumer(self.queues, callbacks=[self.on_message], accept=["text/plain"]), Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),
] ]
def on_message(self, body, message): def on_message(self, body, message):
self.handler.handle_message(message.payload) logger.debug(f"received message: {body}")
try:
try:
body = body.decode()
except (UnicodeDecodeError, AttributeError):
pass
payload = json.loads(body)
command = payload["event_type"]
logger.info(f"handling command: {command}")
if command in (
"update_schedule",
"reset_liquidsoap_bootstrap",
"update_stream_format",
"update_message_offline",
"update_station_name",
"switch_source",
"update_transition_fade",
"disconnect_source",
):
self.fetch_queue.put(message.payload)
elif command in (
"update_recorder_schedule",
"cancel_recording",
):
self.recorder_queue.put(message.payload)
else:
logger.warning(f"invalid command: {command}")
except Exception as exception:
logger.exception(exception)
message.ack() message.ack()
class PypoMessageHandler(Thread): class MessageListener:
name = "message_handler"
def __init__( def __init__(
self, self,
config: Config,
fetch_queue: ThreadQueue, fetch_queue: ThreadQueue,
recorder_queue: ThreadQueue, recorder_queue: ThreadQueue,
config: RabbitMQConfig, ) -> None:
):
Thread.__init__(self)
self.pypo_queue = fetch_queue
self.recorder_queue = recorder_queue
self.config = config self.config = config
def init_rabbit_mq(self): self.fetch_queue = fetch_queue
logger.info("Initializing RabbitMQ stuff") self.recorder_queue = recorder_queue
try:
schedule_exchange = Exchange(
"airtime-pypo", "direct", durable=True, auto_delete=True
)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
with Connection(
f"amqp://{self.config.user}:{self.config.password}"
f"@{self.config.host}:{self.config.port}"
f"/{self.config.vhost}",
heartbeat=5,
) as connection:
rabbit = RabbitConsumer(connection, [schedule_queue], self)
rabbit.run()
except Exception as exception:
logger.exception(exception)
# Handle a message from RabbitMQ, put it into our yucky global var. def run_forever(self):
# Hopefully there is a better way to do this.
def handle_message(self, message):
try:
logger.info("Received event from RabbitMQ: %s" % message)
try:
message = message.decode()
except (UnicodeDecodeError, AttributeError):
pass
m = json.loads(message)
command = m["event_type"]
logger.info("Handling command: " + command)
if command == "update_schedule":
logger.info("Updating schedule...")
self.pypo_queue.put(message)
elif command == "reset_liquidsoap_bootstrap":
logger.info("Resetting bootstrap vars...")
self.pypo_queue.put(message)
elif command == "update_stream_format":
logger.info("Updating stream format...")
self.pypo_queue.put(message)
elif command == "update_message_offline":
logger.info("Updating message offline...")
self.pypo_queue.put(message)
elif command == "update_station_name":
logger.info("Updating station name...")
self.pypo_queue.put(message)
elif command == "switch_source":
logger.info("switch_source command received...")
self.pypo_queue.put(message)
elif command == "update_transition_fade":
logger.info("Updating trasition fade...")
self.pypo_queue.put(message)
elif command == "disconnect_source":
logger.info("disconnect_source command received...")
self.pypo_queue.put(message)
elif command == "update_recorder_schedule":
self.recorder_queue.put(message)
elif command == "cancel_recording":
self.recorder_queue.put(message)
else:
logger.info("Unknown command: %s" % command)
except Exception as exception:
logger.exception(exception)
def main(self):
try:
self.init_rabbit_mq()
except Exception as exception:
logger.exception(exception)
logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
# Main loop of the thread:
# Wait for schedule updates from RabbitMQ, but in case there aren't any,
# poll the server to get the upcoming schedule.
def run(self):
while True: while True:
self.main() with Connection(
self.config.rabbitmq.url,
heartbeat=5,
transport_options={"client_properties": {"connection_name": "playout"}},
) as connection:
handler = MessageHandler(
connection=connection,
fetch_queue=self.fetch_queue,
recorder_queue=self.recorder_queue,
)
def shutdown(_signum, _frame):
raise SystemExit()
signal(SIGTERM, shutdown)
logger.info("starting message handler")
handler.run()
sleep(5)