feat(playout): don't serialize message twice

This commit is contained in:
jo 2023-02-28 17:50:53 +01:00 committed by Kyle Robbertze
parent 5983b2e9de
commit 5042704d42
4 changed files with 31 additions and 36 deletions

View File

@ -8,7 +8,7 @@ import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional
import click import click
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
@ -83,7 +83,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
if not LIQUIDSOAP_MIN_VERSION <= liq_version: if not LIQUIDSOAP_MIN_VERSION <= liq_version:
raise RuntimeError(f"Invalid liquidsoap version {liq_version}") raise RuntimeError(f"Invalid liquidsoap version {liq_version}")
fetch_queue: Queue[Union[str, bytes]] = Queue() fetch_queue: Queue[Dict[str, Any]] = Queue()
recorder_queue: Queue[Dict[str, Any]] = Queue() recorder_queue: Queue[Dict[str, Any]] = Queue()
push_queue: Queue[Events] = Queue() push_queue: Queue[Events] = Queue()
# This queue is shared between pypo-fetch and pypo-file, where pypo-file # This queue is shared between pypo-fetch and pypo-file, where pypo-file

View File

@ -20,8 +20,8 @@ class MessageHandler(ConsumerMixin):
def __init__( def __init__(
self, self,
connection: Connection, connection: Connection,
fetch_queue: ThreadQueue, fetch_queue: ThreadQueue[Dict[str, Any]],
recorder_queue: ThreadQueue, recorder_queue: ThreadQueue[Dict[str, Any]],
): ):
self.connection = connection self.connection = connection
@ -44,9 +44,9 @@ class MessageHandler(ConsumerMixin):
except (UnicodeDecodeError, AttributeError): except (UnicodeDecodeError, AttributeError):
pass pass
payload = json.loads(body) payload: dict = json.loads(body)
command = payload["event_type"] command = payload["event_type"]
logger.info("handling command: %s", command) logger.info("handling event %s: %s", command, payload)
if command in ( if command in (
"update_schedule", "update_schedule",
@ -58,13 +58,13 @@ class MessageHandler(ConsumerMixin):
"update_transition_fade", "update_transition_fade",
"disconnect_source", "disconnect_source",
): ):
self.fetch_queue.put(message.payload) self.fetch_queue.put(payload)
elif command in ( elif command in (
"update_recorder_schedule", "update_recorder_schedule",
"cancel_recording", "cancel_recording",
): ):
self.recorder_queue.put(message.payload) self.recorder_queue.put(payload)
else: else:
logger.warning("invalid command: %s", command) logger.warning("invalid command: %s", command)

View File

@ -1,5 +1,4 @@
import copy import copy
import json
import logging import logging
import mimetypes import mimetypes
import os import os
@ -7,7 +6,7 @@ import time
from queue import Empty, Queue from queue import Empty, Queue
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from threading import Thread, Timer from threading import Thread, Timer
from typing import Union from typing import Any, Dict
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient from libretime_api_client.v2 import ApiClient
@ -30,7 +29,7 @@ class PypoFetch(Thread):
def __init__( def __init__(
self, self,
fetch_queue: Queue[Union[str, bytes]], fetch_queue: Queue[Dict[str, Any]],
push_queue: Queue[Events], push_queue: Queue[Events],
file_queue: Queue[FileEvents], file_queue: Queue[FileEvents],
liq_client: LiquidsoapClient, liq_client: LiquidsoapClient,
@ -62,42 +61,37 @@ class PypoFetch(Thread):
# Handle a message from RabbitMQ, put it into our yucky global var. # Handle a message from RabbitMQ, put it into our yucky global var.
# Hopefully there is a better way to do this. # Hopefully there is a better way to do this.
def handle_message(self, message: Union[bytes, str]): def handle_message(self, message: Dict[str, Any]):
try: try:
logger.info("Received event from Pypo Message Handler: %s", message) command = message["event_type"]
if isinstance(message, bytes): logger.debug("handling event %s: %s", command, message)
message = message.decode()
m: dict = json.loads(message)
command = m["event_type"]
logger.info("Handling command: %s", command)
if command == "update_schedule": if command == "update_schedule":
self.schedule_data = m["schedule"]["media"] self.schedule_data = message["schedule"]["media"]
self.process_schedule(self.schedule_data) self.process_schedule(self.schedule_data)
elif command == "reset_liquidsoap_bootstrap": elif command == "reset_liquidsoap_bootstrap":
self.set_bootstrap_variables() self.set_bootstrap_variables()
elif command == "update_stream_format": elif command == "update_stream_format":
logger.info("Updating stream format...") logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m["stream_format"]) self.update_liquidsoap_stream_format(message["stream_format"])
elif command == "update_message_offline": elif command == "update_message_offline":
logger.info("Updating message offline...") logger.info("Updating message offline...")
self.update_liquidsoap_message_offline(m["message_offline"]) self.update_liquidsoap_message_offline(message["message_offline"])
elif command == "update_station_name": elif command == "update_station_name":
logger.info("Updating station name...") logger.info("Updating station name...")
self.update_liquidsoap_station_name(m["station_name"]) self.update_liquidsoap_station_name(message["station_name"])
elif command == "update_transition_fade": elif command == "update_transition_fade":
logger.info("Updating transition_fade...") logger.info("Updating transition_fade...")
self.update_liquidsoap_transition_fade(m["transition_fade"]) self.update_liquidsoap_transition_fade(message["transition_fade"])
elif command == "switch_source": elif command == "switch_source":
logger.info("switch_on_source show command received...") logger.info("switch_on_source show command received...")
self.pypo_liquidsoap.telnet_liquidsoap.switch_source( self.pypo_liquidsoap.telnet_liquidsoap.switch_source(
m["sourcename"], m["status"] message["sourcename"], message["status"]
) )
elif command == "disconnect_source": elif command == "disconnect_source":
logger.info("disconnect_on_source show command received...") logger.info("disconnect_on_source show command received...")
self.pypo_liquidsoap.telnet_liquidsoap.disconnect_source( self.pypo_liquidsoap.telnet_liquidsoap.disconnect_source(
m["sourcename"] message["sourcename"]
) )
else: else:
logger.info("Unknown command: %s", command) logger.info("Unknown command: %s", command)

View File

@ -1,5 +1,4 @@
import datetime import datetime
import json
import logging import logging
import math import math
import os import os
@ -8,8 +7,10 @@ import signal
import sys import sys
import time import time
from datetime import timezone from datetime import timezone
from queue import Queue
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from threading import Thread from threading import Thread
from typing import Any, Dict
import mutagen import mutagen
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
@ -177,14 +178,19 @@ class Recorder(Thread):
name = "recorder" name = "recorder"
daemon = True daemon = True
def __init__(self, q, config: Config, legacy_client: LegacyClient): def __init__(
self,
recorder_queue: Queue[Dict[str, Any]],
config: Config,
legacy_client: LegacyClient,
):
Thread.__init__(self) Thread.__init__(self)
self.legacy_client = legacy_client self.legacy_client = legacy_client
self.config = config self.config = config
self.sr = None self.sr = None
self.shows_to_record = {} self.shows_to_record = {}
self.server_timezone = "" self.server_timezone = ""
self.queue = q self.queue = recorder_queue
self.loops = 0 self.loops = 0
logger.info("RecorderFetch: init complete") logger.info("RecorderFetch: init complete")
@ -199,14 +205,9 @@ class Recorder(Thread):
def handle_message(self): def handle_message(self):
if not self.queue.empty(): if not self.queue.empty():
message = self.queue.get() msg = self.queue.get()
try:
message = message.decode()
except (UnicodeDecodeError, AttributeError):
pass
msg = json.loads(message)
command = msg["event_type"] command = msg["event_type"]
logger.info("Received msg from Pypo Message Handler: %s", msg) logger.debug("handling event %s: %s", command, msg)
if command == "cancel_recording": if command == "cancel_recording":
if self.currently_recording(): if self.currently_recording():
self.cancel_recording() self.cancel_recording()