From 5042704d42b7652dfcc423270af871d9b5297597 Mon Sep 17 00:00:00 2001 From: jo Date: Tue, 28 Feb 2023 17:50:53 +0100 Subject: [PATCH] feat(playout): don't serialize message twice --- playout/libretime_playout/main.py | 4 +-- playout/libretime_playout/message_handler.py | 12 ++++---- playout/libretime_playout/player/fetch.py | 30 ++++++++------------ playout/libretime_playout/recorder.py | 21 +++++++------- 4 files changed, 31 insertions(+), 36 deletions(-) diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index b3d8f1fd0..e0dba014e 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -8,7 +8,7 @@ import time from datetime import datetime from pathlib import Path from queue import Queue -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional import click from libretime_api_client.v1 import ApiClient as LegacyClient @@ -83,7 +83,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ if not LIQUIDSOAP_MIN_VERSION <= liq_version: raise RuntimeError(f"Invalid liquidsoap version {liq_version}") - fetch_queue: Queue[Union[str, bytes]] = Queue() + fetch_queue: Queue[Dict[str, Any]] = Queue() recorder_queue: Queue[Dict[str, Any]] = Queue() push_queue: Queue[Events] = Queue() # This queue is shared between pypo-fetch and pypo-file, where pypo-file diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index 21d0e5328..4c89365c0 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -20,8 +20,8 @@ class MessageHandler(ConsumerMixin): def __init__( self, connection: Connection, - fetch_queue: ThreadQueue, - recorder_queue: ThreadQueue, + fetch_queue: ThreadQueue[Dict[str, Any]], + recorder_queue: ThreadQueue[Dict[str, Any]], ): self.connection = connection @@ -44,9 +44,9 @@ class MessageHandler(ConsumerMixin): except (UnicodeDecodeError, AttributeError): pass - payload = json.loads(body) + payload: dict = json.loads(body) command = payload["event_type"] - logger.info("handling command: %s", command) + logger.info("handling event %s: %s", command, payload) if command in ( "update_schedule", @@ -58,13 +58,13 @@ class MessageHandler(ConsumerMixin): "update_transition_fade", "disconnect_source", ): - self.fetch_queue.put(message.payload) + self.fetch_queue.put(payload) elif command in ( "update_recorder_schedule", "cancel_recording", ): - self.recorder_queue.put(message.payload) + self.recorder_queue.put(payload) else: logger.warning("invalid command: %s", command) diff --git a/playout/libretime_playout/player/fetch.py b/playout/libretime_playout/player/fetch.py index 77a9824bf..c9eec3bbe 100644 --- a/playout/libretime_playout/player/fetch.py +++ b/playout/libretime_playout/player/fetch.py @@ -1,5 +1,4 @@ import copy -import json import logging import mimetypes import os @@ -7,7 +6,7 @@ import time from queue import Empty, Queue from subprocess import PIPE, Popen from threading import Thread, Timer -from typing import Union +from typing import Any, Dict from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v2 import ApiClient @@ -30,7 +29,7 @@ class PypoFetch(Thread): def __init__( self, - fetch_queue: Queue[Union[str, bytes]], + fetch_queue: Queue[Dict[str, Any]], push_queue: Queue[Events], file_queue: Queue[FileEvents], liq_client: LiquidsoapClient, @@ -62,42 +61,37 @@ class PypoFetch(Thread): # Handle a message from RabbitMQ, put it into our yucky global var. # Hopefully there is a better way to do this. - def handle_message(self, message: Union[bytes, str]): + def handle_message(self, message: Dict[str, Any]): try: - logger.info("Received event from Pypo Message Handler: %s", message) - if isinstance(message, bytes): - message = message.decode() - m: dict = json.loads(message) - - command = m["event_type"] - logger.info("Handling command: %s", command) + command = message["event_type"] + logger.debug("handling event %s: %s", command, message) if command == "update_schedule": - self.schedule_data = m["schedule"]["media"] + self.schedule_data = message["schedule"]["media"] self.process_schedule(self.schedule_data) elif command == "reset_liquidsoap_bootstrap": self.set_bootstrap_variables() elif command == "update_stream_format": logger.info("Updating stream format...") - self.update_liquidsoap_stream_format(m["stream_format"]) + self.update_liquidsoap_stream_format(message["stream_format"]) elif command == "update_message_offline": logger.info("Updating message offline...") - self.update_liquidsoap_message_offline(m["message_offline"]) + self.update_liquidsoap_message_offline(message["message_offline"]) elif command == "update_station_name": logger.info("Updating station name...") - self.update_liquidsoap_station_name(m["station_name"]) + self.update_liquidsoap_station_name(message["station_name"]) elif command == "update_transition_fade": logger.info("Updating transition_fade...") - self.update_liquidsoap_transition_fade(m["transition_fade"]) + self.update_liquidsoap_transition_fade(message["transition_fade"]) elif command == "switch_source": logger.info("switch_on_source show command received...") self.pypo_liquidsoap.telnet_liquidsoap.switch_source( - m["sourcename"], m["status"] + message["sourcename"], message["status"] ) elif command == "disconnect_source": logger.info("disconnect_on_source show command received...") self.pypo_liquidsoap.telnet_liquidsoap.disconnect_source( - m["sourcename"] + message["sourcename"] ) else: logger.info("Unknown command: %s", command) diff --git a/playout/libretime_playout/recorder.py b/playout/libretime_playout/recorder.py index bef084a8a..ce9724373 100644 --- a/playout/libretime_playout/recorder.py +++ b/playout/libretime_playout/recorder.py @@ -1,5 +1,4 @@ import datetime -import json import logging import math import os @@ -8,8 +7,10 @@ import signal import sys import time from datetime import timezone +from queue import Queue from subprocess import PIPE, Popen from threading import Thread +from typing import Any, Dict import mutagen from libretime_api_client.v1 import ApiClient as LegacyClient @@ -177,14 +178,19 @@ class Recorder(Thread): name = "recorder" daemon = True - def __init__(self, q, config: Config, legacy_client: LegacyClient): + def __init__( + self, + recorder_queue: Queue[Dict[str, Any]], + config: Config, + legacy_client: LegacyClient, + ): Thread.__init__(self) self.legacy_client = legacy_client self.config = config self.sr = None self.shows_to_record = {} self.server_timezone = "" - self.queue = q + self.queue = recorder_queue self.loops = 0 logger.info("RecorderFetch: init complete") @@ -199,14 +205,9 @@ class Recorder(Thread): def handle_message(self): if not self.queue.empty(): - message = self.queue.get() - try: - message = message.decode() - except (UnicodeDecodeError, AttributeError): - pass - msg = json.loads(message) + msg = self.queue.get() command = msg["event_type"] - logger.info("Received msg from Pypo Message Handler: %s", msg) + logger.debug("handling event %s: %s", command, msg) if command == "cancel_recording": if self.currently_recording(): self.cancel_recording()