From 5505222df632ff8f311d19056db0619501b083cf Mon Sep 17 00:00:00 2001 From: jo Date: Sun, 21 Aug 2022 11:28:57 +0200 Subject: [PATCH] refactor(playout): rename vars and add typing --- playout/libretime_playout/main.py | 60 +++++++++---------- playout/libretime_playout/message_handler.py | 13 +++- playout/libretime_playout/player/fetch.py | 15 ++--- playout/libretime_playout/player/file.py | 11 +++- .../player/liquidsoap_gateway.py | 6 +- playout/libretime_playout/player/push.py | 12 +++- playout/libretime_playout/player/queue.py | 12 +++- 7 files changed, 79 insertions(+), 50 deletions(-) diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index 896a3ec4d..7d52eea65 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -8,7 +8,7 @@ import time from datetime import datetime from pathlib import Path from queue import Queue -from typing import Optional +from typing import Any, Dict, Optional import click from libretime_api_client.v1 import ApiClient as LegacyClient @@ -93,52 +93,52 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ if not LIQUIDSOAP_MIN_VERSION <= liq_version: raise Exception(f"Invalid liquidsoap version {liq_version}") - pypoFetch_q = Queue() - recorder_q = Queue() - pypoPush_q = Queue() - - pypo_liquidsoap = PypoLiquidsoap(liq_client) - + fetch_queue: Queue[Dict[str, Any]] = Queue() + recorder_queue: Queue[Dict[str, Any]] = Queue() + push_queue: Queue[Dict[str, Any]] = Queue() # This queue is shared between pypo-fetch and pypo-file, where pypo-file # is the consumer. Pypo-fetch will send every schedule it gets to pypo-file # and pypo will parse this schedule to determine which file has the highest # priority, and retrieve it. - media_q = Queue() + file_queue: Queue[Dict[str, Any]] = Queue() - # Pass only the configuration sections needed; PypoMessageHandler only needs rabbitmq settings - pmh = PypoMessageHandler(pypoFetch_q, recorder_q, config.rabbitmq) - pmh.daemon = True - pmh.start() + pypo_liquidsoap = PypoLiquidsoap(liq_client) - pfile = PypoFile(media_q, api_client) - pfile.daemon = True - pfile.start() + # Pass only the configuration sections needed; PypoMessageHandler only + # needs rabbitmq settings + message_handler = PypoMessageHandler(fetch_queue, recorder_queue, config.rabbitmq) + message_handler.daemon = True + message_handler.start() - pf = PypoFetch( - pypoFetch_q, - pypoPush_q, - media_q, + file_thread = PypoFile(file_queue, api_client) + file_thread.daemon = True + file_thread.start() + + fetch_thread = PypoFetch( + fetch_queue, + push_queue, + file_queue, liq_client, pypo_liquidsoap, config, api_client, legacy_client, ) - pf.daemon = True - pf.start() + fetch_thread.daemon = True + fetch_thread.start() - pp = PypoPush(pypoPush_q, pypo_liquidsoap, config) - pp.daemon = True - pp.start() + push_thread = PypoPush(push_queue, pypo_liquidsoap, config) + push_thread.daemon = True + push_thread.start() - recorder = Recorder(recorder_q, config, legacy_client) - recorder.daemon = True - recorder.start() + recorder_thread = Recorder(recorder_queue, config, legacy_client) + recorder_thread.daemon = True + recorder_thread.start() - stats_collector = StatsCollectorThread(legacy_client) - stats_collector.start() + stats_collector_thread = StatsCollectorThread(legacy_client) + stats_collector_thread.start() - # Just sleep the main thread, instead of blocking on pf.join(). + # Just sleep the main thread, instead of blocking on fetch_thread.join(). # This allows CTRL-C to work! while True: time.sleep(1) diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index 0abc6893d..f657ed7bd 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -1,6 +1,8 @@ import json import time +from queue import Queue as ThreadQueue from threading import Thread +from typing import Any, Dict # For RabbitMQ from kombu.connection import Connection @@ -29,10 +31,15 @@ class RabbitConsumer(ConsumerMixin): class PypoMessageHandler(Thread): name = "message_handler" - def __init__(self, pq, rq, config: RabbitMQConfig): + def __init__( + self, + fetch_queue: ThreadQueue[Dict[str, Any]], + recorder_queue: ThreadQueue[Dict[str, Any]], + config: RabbitMQConfig, + ): Thread.__init__(self) - self.pypo_queue = pq - self.recorder_queue = rq + self.pypo_queue = fetch_queue + self.recorder_queue = recorder_queue self.config = config def init_rabbit_mq(self): diff --git a/playout/libretime_playout/player/fetch.py b/playout/libretime_playout/player/fetch.py index 0d482392c..e3e58a9d6 100644 --- a/playout/libretime_playout/player/fetch.py +++ b/playout/libretime_playout/player/fetch.py @@ -6,9 +6,10 @@ import signal import sys import time from datetime import datetime -from queue import Empty +from queue import Empty, Queue from subprocess import PIPE, Popen from threading import Thread, Timer +from typing import Any, Dict from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v2 import ApiClient @@ -34,9 +35,9 @@ class PypoFetch(Thread): def __init__( self, - pypoFetch_q, - pypoPush_q, - media_q, + fetch_queue: Queue[Dict[str, Any]], + push_queue: Queue[Dict[str, Any]], + file_queue: Queue[Dict[str, Any]], liq_client: LiquidsoapClient, pypo_liquidsoap: PypoLiquidsoap, config: Config, @@ -50,9 +51,9 @@ class PypoFetch(Thread): self.api_client = api_client self.legacy_client = legacy_client - self.fetch_queue = pypoFetch_q - self.push_queue = pypoPush_q - self.media_prepare_queue = media_q + self.fetch_queue = fetch_queue + self.push_queue = push_queue + self.media_prepare_queue = file_queue self.last_update_schedule_timestamp = time.time() self.config = config self.listener_timeout = POLL_INTERVAL diff --git a/playout/libretime_playout/player/file.py b/playout/libretime_playout/player/file.py index 9d8bed233..f7d1d2726 100644 --- a/playout/libretime_playout/player/file.py +++ b/playout/libretime_playout/player/file.py @@ -2,8 +2,9 @@ import hashlib import os import stat import time -from queue import Empty +from queue import Empty, Queue from threading import Thread +from typing import Any, Dict from libretime_api_client.v2 import ApiClient from loguru import logger @@ -13,9 +14,13 @@ from requests.exceptions import ConnectionError, HTTPError, Timeout class PypoFile(Thread): name = "file" - def __init__(self, schedule_queue, api_client: ApiClient): + def __init__( + self, + file_queue: Queue[Dict[str, Any]], + api_client: ApiClient, + ): Thread.__init__(self) - self.media_queue = schedule_queue + self.media_queue = file_queue self.media = None self.api_client = api_client diff --git a/playout/libretime_playout/player/liquidsoap_gateway.py b/playout/libretime_playout/player/liquidsoap_gateway.py index 93bf2db14..a75550426 100644 --- a/playout/libretime_playout/player/liquidsoap_gateway.py +++ b/playout/libretime_playout/player/liquidsoap_gateway.py @@ -44,7 +44,11 @@ def create_liquidsoap_annotation(media): class TelnetLiquidsoap: - def __init__(self, liq_client: LiquidsoapClient, queues: List[str]): + def __init__( + self, + liq_client: LiquidsoapClient, + queues: List[str], + ): self.liq_client = liq_client self.queues = queues self.current_prebuffering_stream_id = None diff --git a/playout/libretime_playout/player/push.py b/playout/libretime_playout/player/push.py index 4d56ac84f..cc1f6255f 100644 --- a/playout/libretime_playout/player/push.py +++ b/playout/libretime_playout/player/push.py @@ -3,6 +3,7 @@ import time from datetime import datetime from queue import Queue from threading import Thread +from typing import Any, Dict from loguru import logger @@ -22,9 +23,14 @@ def is_file(media_item): class PypoPush(Thread): name = "push" - def __init__(self, q, pypo_liquidsoap: PypoLiquidsoap, config: Config): + def __init__( + self, + push_queue: Queue[Dict[str, Any]], + pypo_liquidsoap: PypoLiquidsoap, + config: Config, + ): Thread.__init__(self) - self.queue = q + self.queue = push_queue self.config = config @@ -32,7 +38,7 @@ class PypoPush(Thread): self.current_prebuffering_stream_id = None self.queue_id = 0 - self.future_scheduled_queue = Queue() + self.future_scheduled_queue: Queue[Dict[str, Any]] = Queue() self.pypo_liquidsoap = pypo_liquidsoap self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap) diff --git a/playout/libretime_playout/player/queue.py b/playout/libretime_playout/player/queue.py index e935a01b5..704f70cf8 100644 --- a/playout/libretime_playout/player/queue.py +++ b/playout/libretime_playout/player/queue.py @@ -2,12 +2,14 @@ import signal import sys from collections import deque from datetime import datetime -from queue import Empty +from queue import Empty, Queue from threading import Thread +from typing import Any, Dict from loguru import logger from ..utils import seconds_between +from .liquidsoap import PypoLiquidsoap def keyboardInterruptHandler(signum, frame): @@ -21,9 +23,13 @@ signal.signal(signal.SIGINT, keyboardInterruptHandler) class PypoLiqQueue(Thread): name = "liquidsoap_queue" - def __init__(self, q, pypo_liquidsoap): + def __init__( + self, + future_queue: Queue[Dict[str, Any]], + pypo_liquidsoap: PypoLiquidsoap, + ): Thread.__init__(self) - self.queue = q + self.queue = future_queue self.pypo_liquidsoap = pypo_liquidsoap def main(self):