From 719464a2721baa8952a9a14983fb35537b8ebacd Mon Sep 17 00:00:00 2001 From: jo Date: Wed, 1 Mar 2023 20:27:27 +0100 Subject: [PATCH] refactor(playout): add more typings --- playout/libretime_playout/history/stats.py | 6 ++--- .../liquidsoap/client/_client.py | 10 +++++--- .../liquidsoap/client/_connection.py | 4 +-- playout/libretime_playout/main.py | 10 +++++--- playout/libretime_playout/message_handler.py | 4 +-- playout/libretime_playout/player/fetch.py | 25 +++++++++++-------- .../libretime_playout/player/liquidsoap.py | 20 +++++++-------- playout/libretime_playout/player/push.py | 8 +++--- playout/libretime_playout/player/queue.py | 9 ++++--- playout/libretime_playout/player/schedule.py | 2 +- 10 files changed, 54 insertions(+), 44 deletions(-) diff --git a/playout/libretime_playout/history/stats.py b/playout/libretime_playout/history/stats.py index cc5250f5d..1c3518985 100644 --- a/playout/libretime_playout/history/stats.py +++ b/playout/libretime_playout/history/stats.py @@ -47,7 +47,7 @@ class StatsCollector: ) -> Dict[str, Stats]: response = self._session.get( url=self.get_output_url(output), - auth=(output.admin_user, output.admin_password), + auth=(output.admin_user, output.admin_password or ""), timeout=self._timeout, ) response.raise_for_status() @@ -90,7 +90,7 @@ class StatsCollector: outputs: List[AnyOutput], *, _timestamp: Optional[datetime] = None, - ): + ) -> None: if _timestamp is None: _timestamp = datetime.utcnow() @@ -148,7 +148,7 @@ class StatsCollectorThread(Thread): self._config = config self._collector = StatsCollector(legacy_client) - def run(self): + def run(self) -> None: logger.info("starting %s", self.name) while True: try: diff --git a/playout/libretime_playout/liquidsoap/client/_client.py b/playout/libretime_playout/liquidsoap/client/_client.py index 0ddeff1b2..bf6c4f25b 100644 --- a/playout/libretime_playout/liquidsoap/client/_client.py +++ b/playout/libretime_playout/liquidsoap/client/_client.py @@ -38,10 +38,10 @@ class LiquidsoapClient: timeout=timeout, ) - def _quote(self, value: Any): + def _quote(self, value: Any) -> str: return quote(value, double=True) - def _set_var(self, name: str, value: Any): + def _set_var(self, name: str, value: Any) -> None: self.conn.write(f"var.set {name} = {value}") result = self.conn.read() if f"Variable {name} set" not in result: @@ -126,16 +126,18 @@ class LiquidsoapClient: self, *, station_name: Optional[str] = None, - message_format: Optional[Union[MessageFormatKind, str]] = None, + message_format: Optional[Union[MessageFormatKind, int]] = None, message_offline: Optional[str] = None, input_fade_transition: Optional[float] = None, - ): + ) -> None: with self.conn: if station_name is not None: self._set_var("station_name", self._quote(station_name)) if message_format is not None: if isinstance(message_format, MessageFormatKind): message_format = message_format.value + # Use an interactive.string until Liquidsoap have interactive.int + # variables self._set_var("message_format", self._quote(message_format)) if message_offline is not None: self._set_var("message_offline", self._quote(message_offline)) diff --git a/playout/libretime_playout/liquidsoap/client/_connection.py b/playout/libretime_playout/liquidsoap/client/_connection.py index bb4fd2563..c954ef2f6 100644 --- a/playout/libretime_playout/liquidsoap/client/_connection.py +++ b/playout/libretime_playout/liquidsoap/client/_connection.py @@ -46,8 +46,8 @@ class LiquidsoapConnection: self._port = port self._timeout = timeout - def address(self): - return f"{self._host}:{self._port}" if self._path is None else self._path + def address(self) -> str: + return f"{self._host}:{self._port}" if self._path is None else str(self._path) def __enter__(self): try: diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index 711c9e3d2..d0e432dd8 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -36,7 +36,7 @@ for module in ("amqp",): logging.getLogger(module).propagate = False -def wait_for_legacy(legacy_client: LegacyClient): +def wait_for_legacy(legacy_client: LegacyClient) -> None: while not legacy_client.is_server_compatible(): time.sleep(5) @@ -54,7 +54,7 @@ def wait_for_legacy(legacy_client: LegacyClient): time.sleep(10) -def wait_for_liquidsoap(liq_client: LiquidsoapClient): +def wait_for_liquidsoap(liq_client: LiquidsoapClient) -> None: logger.debug("Checking if Liquidsoap is running") liq_version = liq_client.wait_for_version() if not LIQUIDSOAP_MIN_VERSION <= liq_version: @@ -64,7 +64,11 @@ def wait_for_liquidsoap(liq_client: LiquidsoapClient): @click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX}) @cli_logging_options() @cli_config_options() -def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[Path]): +def cli( + log_level: str, + log_filepath: Optional[Path], + config_filepath: Optional[Path], +) -> None: """ Run playout. """ diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index c59837dc9..7995d4431 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -34,7 +34,7 @@ class MessageHandler(ConsumerMixin): Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]), ] - def on_message(self, body, message: Message): + def on_message(self, body, message: Message) -> None: logger.debug("received message: %s", body) try: try: @@ -76,7 +76,7 @@ class MessageListener: self.config = config self.fetch_queue = fetch_queue - def run_forever(self): + def run_forever(self) -> None: while True: with Connection( self.config.rabbitmq.url, diff --git a/playout/libretime_playout/player/fetch.py b/playout/libretime_playout/player/fetch.py index d9b02124d..35ad6cbc3 100644 --- a/playout/libretime_playout/player/fetch.py +++ b/playout/libretime_playout/player/fetch.py @@ -7,7 +7,7 @@ from pathlib import Path from queue import Empty, Queue from subprocess import DEVNULL, PIPE, run from threading import Thread, Timer -from typing import Any, Dict +from typing import Any, Dict, Union from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v2 import ApiClient @@ -15,7 +15,7 @@ from requests import RequestException from ..config import CACHE_DIR, POLL_INTERVAL, Config from ..liquidsoap.client import LiquidsoapClient -from ..liquidsoap.models import Info, StreamPreferences, StreamState +from ..liquidsoap.models import Info, MessageFormatKind, StreamPreferences, StreamState from ..timeout import ls_timeout from .events import EventKind, Events, FileEvent, FileEvents, event_key_to_datetime from .liquidsoap import PypoLiquidsoap @@ -75,7 +75,7 @@ class PypoFetch(Thread): # Handle a message from RabbitMQ, put it into our yucky global var. # Hopefully there is a better way to do this. - def handle_message(self, message: Dict[str, Any]): + def handle_message(self, message: Dict[str, Any]) -> None: try: command = message["event_type"] logger.debug("handling event %s: %s", command, message) @@ -123,7 +123,7 @@ class PypoFetch(Thread): logger.exception(exception) # Initialize Liquidsoap environment - def set_bootstrap_variables(self): + def set_bootstrap_variables(self) -> None: logger.debug("Getting information needed on bootstrap from Airtime") try: info = Info(**self.api_client.get_info().json()) @@ -168,28 +168,31 @@ class PypoFetch(Thread): self.pypo_liquidsoap.clear_queue_tracker() @ls_timeout - def update_liquidsoap_stream_format(self, stream_format): + def update_liquidsoap_stream_format( + self, + stream_format: Union[MessageFormatKind, int], + ) -> None: try: self.liq_client.settings_update(message_format=stream_format) except (ConnectionError, TimeoutError) as exception: logger.exception(exception) @ls_timeout - def update_liquidsoap_message_offline(self, message_offline: str): + def update_liquidsoap_message_offline(self, message_offline: str) -> None: try: self.liq_client.settings_update(message_offline=message_offline) except (ConnectionError, TimeoutError) as exception: logger.exception(exception) @ls_timeout - def update_liquidsoap_transition_fade(self, fade): + def update_liquidsoap_transition_fade(self, fade: float) -> None: try: self.liq_client.settings_update(input_fade_transition=fade) except (ConnectionError, TimeoutError) as exception: logger.exception(exception) @ls_timeout - def update_liquidsoap_station_name(self, station_name): + def update_liquidsoap_station_name(self, station_name: str) -> None: try: self.liq_client.settings_update(station_name=station_name) except (ConnectionError, TimeoutError) as exception: @@ -204,7 +207,7 @@ class PypoFetch(Thread): # (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) # - runs the cleanup routine, to get rid of unused cached files - def process_schedule(self, events: Events): + def process_schedule(self, events: Events) -> None: self.last_update_schedule_timestamp = time.time() logger.debug(events) file_events: FileEvents = {} @@ -300,7 +303,7 @@ class PypoFetch(Thread): "Problem removing file '%s': %s", expired_file, exception ) - def manual_schedule_fetch(self): + def manual_schedule_fetch(self) -> bool: try: self.schedule_data = get_schedule(self.api_client) logger.debug("Received event from API client: %s", self.schedule_data) @@ -310,7 +313,7 @@ class PypoFetch(Thread): logger.exception("Unable to fetch schedule: %s", exception) return False - def persistent_manual_schedule_fetch(self, max_attempts=1): + def persistent_manual_schedule_fetch(self, max_attempts=1) -> bool: success = False num_attempts = 0 while not success and num_attempts < max_attempts: diff --git a/playout/libretime_playout/player/liquidsoap.py b/playout/libretime_playout/player/liquidsoap.py index c671b9857..7a923bf35 100644 --- a/playout/libretime_playout/player/liquidsoap.py +++ b/playout/libretime_playout/player/liquidsoap.py @@ -33,7 +33,7 @@ class PypoLiquidsoap: list(self.liq_queue_tracker.keys()), ) - def play(self, media_item: AnyEvent): + def play(self, media_item: AnyEvent) -> None: if media_item["type"] == EventKind.FILE: self.handle_file_type(media_item) elif media_item["type"] == EventKind.ACTION: @@ -57,7 +57,7 @@ class PypoLiquidsoap: else: raise UnknownMediaItemType(str(media_item)) - def handle_file_type(self, media_item: FileEvent): + def handle_file_type(self, media_item: FileEvent) -> None: """ Wait 200 seconds (2000 iterations) for file to become ready, otherwise give up on it. @@ -82,13 +82,13 @@ class PypoLiquidsoap: media_item["dst"], ) - def handle_event_type(self, media_item: ActionEvent): + def handle_event_type(self, media_item: ActionEvent) -> None: if media_item["event_type"] == "kick_out": self.telnet_liquidsoap.disconnect_source("live_dj") elif media_item["event_type"] == "switch_off": self.telnet_liquidsoap.switch_source("live_dj", "off") - def is_media_item_finished(self, media_item: Optional[AnyEvent]): + def is_media_item_finished(self, media_item: Optional[AnyEvent]) -> bool: if media_item is None: return True return datetime.utcnow() > event_key_to_datetime(media_item["end"]) @@ -106,7 +106,7 @@ class PypoLiquidsoap: return available_queue # pylint: disable=too-many-branches - def verify_correct_present_media(self, scheduled_now: List[AnyEvent]): + def verify_correct_present_media(self, scheduled_now: List[AnyEvent]) -> None: """ verify whether Liquidsoap is currently playing the correct files. if we find an item that Liquidsoap is not playing, then push it @@ -213,18 +213,18 @@ class PypoLiquidsoap: except KeyError as exception: logger.exception("Malformed event in schedule: %s", exception) - def stop(self, queue_id: int): + def stop(self, queue_id: int) -> None: self.telnet_liquidsoap.queue_remove(queue_id) self.liq_queue_tracker[queue_id] = None - def is_file(self, event: AnyEvent): + def is_file(self, event: AnyEvent) -> bool: return event["type"] == EventKind.FILE - def clear_queue_tracker(self): + def clear_queue_tracker(self) -> None: for queue_id in self.liq_queue_tracker: self.liq_queue_tracker[queue_id] = None - def modify_cue_point(self, link: FileEvent): + def modify_cue_point(self, link: FileEvent) -> None: assert self.is_file(link) lateness = seconds_between( @@ -237,7 +237,7 @@ class PypoLiquidsoap: cue_in_orig = timedelta(seconds=float(link["cue_in"])) link["cue_in"] = cue_in_orig.total_seconds() + lateness - def clear_all_queues(self): + def clear_all_queues(self) -> None: self.telnet_liquidsoap.queue_clear_all() diff --git a/playout/libretime_playout/player/push.py b/playout/libretime_playout/player/push.py index a98355ac9..90b42fdda 100644 --- a/playout/libretime_playout/player/push.py +++ b/playout/libretime_playout/player/push.py @@ -14,11 +14,11 @@ from .queue import PypoLiqQueue logger = logging.getLogger(__name__) -def is_stream(media_item): +def is_stream(media_item: AnyEvent) -> bool: return media_item["type"] == "stream_output_start" -def is_file(media_item): +def is_file(media_item: AnyEvent) -> bool: return media_item["type"] == "file" @@ -43,7 +43,7 @@ class PypoPush(Thread): self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap) self.plq.start() - def main(self): + def main(self) -> None: loops = 0 heartbeat_period = math.floor(30 / PUSH_INTERVAL) @@ -99,7 +99,7 @@ class PypoPush(Thread): return present, future - def run(self): + def run(self) -> None: while True: try: self.main() diff --git a/playout/libretime_playout/player/queue.py b/playout/libretime_playout/player/queue.py index 956057ecb..aa0ac37a2 100644 --- a/playout/libretime_playout/player/queue.py +++ b/playout/libretime_playout/player/queue.py @@ -6,6 +6,7 @@ from threading import Thread from typing import Any, Dict from ..utils import seconds_between +from .events import AnyEvent, event_key_to_datetime from .liquidsoap import PypoLiquidsoap logger = logging.getLogger(__name__) @@ -24,9 +25,9 @@ class PypoLiqQueue(Thread): self.queue = future_queue self.pypo_liquidsoap = pypo_liquidsoap - def main(self): + def main(self) -> None: time_until_next_play = None - schedule_deque = deque() + schedule_deque: deque[AnyEvent] = deque() media_schedule = None while True: @@ -48,7 +49,7 @@ class PypoLiqQueue(Thread): if len(schedule_deque): time_until_next_play = seconds_between( datetime.utcnow(), - schedule_deque[0]["start"], + event_key_to_datetime(schedule_deque[0]["start"]), ) else: time_until_next_play = None @@ -71,7 +72,7 @@ class PypoLiqQueue(Thread): else: time_until_next_play = None - def run(self): + def run(self) -> None: try: self.main() except Exception as exception: # pylint: disable=broad-exception-caught diff --git a/playout/libretime_playout/player/schedule.py b/playout/libretime_playout/player/schedule.py index 126cbb20b..0dafefbf2 100644 --- a/playout/libretime_playout/player/schedule.py +++ b/playout/libretime_playout/player/schedule.py @@ -18,7 +18,7 @@ from .events import ( ) -def insert_event(events: Events, event_key: str, event: AnyEvent): +def insert_event(events: Events, event_key: str, event: AnyEvent) -> None: key = event_key # Search for an empty slot