From a77321190f61fdb0043833d4fb153a3a7af89282 Mon Sep 17 00:00:00 2001 From: jo Date: Wed, 1 Mar 2023 17:13:02 +0100 Subject: [PATCH] refactor(playout): fix linting errors --- playout/libretime_playout/history/stats.py | 12 ++-- playout/libretime_playout/main.py | 60 +++++++++++-------- playout/libretime_playout/message_handler.py | 5 +- playout/libretime_playout/notify/main.py | 5 +- playout/libretime_playout/player/fetch.py | 36 +++++------ playout/libretime_playout/player/file.py | 10 ++-- .../libretime_playout/player/liquidsoap.py | 24 ++++---- .../player/liquidsoap_gateway.py | 12 ++-- playout/libretime_playout/player/push.py | 18 +++--- playout/pyproject.toml | 1 + playout/tests/liquidsoap/version_test.py | 4 +- 11 files changed, 100 insertions(+), 87 deletions(-) diff --git a/playout/libretime_playout/history/stats.py b/playout/libretime_playout/history/stats.py index 451d3dabb..d5de3df24 100644 --- a/playout/libretime_playout/history/stats.py +++ b/playout/libretime_playout/history/stats.py @@ -5,15 +5,11 @@ from threading import Thread from time import sleep from typing import Any, Dict, List, Optional, Union +import requests from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_shared.config import IcecastOutput, ShoutcastOutput from lxml import etree from requests import Session -from requests.exceptions import ( # pylint: disable=redefined-builtin - ConnectionError, - HTTPError, - Timeout, -) from ..config import Config @@ -116,9 +112,9 @@ class StatsCollector: cache[output_url] = self.collect_output_stats(output) except ( etree.XMLSyntaxError, - ConnectionError, - HTTPError, - Timeout, + requests.exceptions.ConnectionError, + requests.exceptions.HTTPError, + requests.exceptions.Timeout, ) as exception: logger.exception(exception) self._legacy_client.update_stream_setting_table( diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index a97b731f0..711c9e3d2 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -11,6 +11,7 @@ from queue import Queue from typing import Any, Dict, Optional import click +import requests from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v2 import ApiClient from libretime_shared.cli import cli_config_options, cli_logging_options @@ -35,6 +36,31 @@ for module in ("amqp",): logging.getLogger(module).propagate = False +def wait_for_legacy(legacy_client: LegacyClient): + while not legacy_client.is_server_compatible(): + time.sleep(5) + + success = False + while not success: + try: + legacy_client.register_component("pypo") + success = True + except ( + requests.exceptions.ConnectionError, + requests.exceptions.HTTPError, + requests.exceptions.Timeout, + ) as exception: + logger.exception(exception) + time.sleep(10) + + +def wait_for_liquidsoap(liq_client: LiquidsoapClient): + logger.debug("Checking if Liquidsoap is running") + liq_version = liq_client.wait_for_version() + if not LIQUIDSOAP_MIN_VERSION <= liq_version: + raise RuntimeError(f"Invalid liquidsoap version {liq_version}") + + @click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX}) @cli_logging_options() @cli_config_options() @@ -58,33 +84,19 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ logger.info("Timezone: %s", time.tzname) logger.info("UTC time: %s", datetime.utcnow()) - legacy_client = LegacyClient() api_client = ApiClient( base_url=config.general.public_url, api_key=config.general.api_key, ) - while not legacy_client.is_server_compatible(): - time.sleep(5) - - success = False - while not success: - try: - legacy_client.register_component("pypo") - success = True - except Exception as exception: - logger.exception(exception) - time.sleep(10) + legacy_client = LegacyClient() + wait_for_legacy(legacy_client) liq_client = LiquidsoapClient( host=config.playout.liquidsoap_host, port=config.playout.liquidsoap_port, ) - - logger.debug("Checking if Liquidsoap is running") - liq_version = liq_client.wait_for_version() - if not LIQUIDSOAP_MIN_VERSION <= liq_version: - raise RuntimeError(f"Invalid liquidsoap version {liq_version}") + wait_for_liquidsoap(liq_client) fetch_queue: Queue[Dict[str, Any]] = Queue() push_queue: Queue[Events] = Queue() @@ -96,10 +108,9 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ pypo_liquidsoap = PypoLiquidsoap(liq_client) - file_thread = PypoFile(file_queue, api_client) - file_thread.start() + PypoFile(file_queue, api_client).start() - fetch_thread = PypoFetch( + PypoFetch( fetch_queue, push_queue, file_queue, @@ -108,14 +119,11 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ config, api_client, legacy_client, - ) - fetch_thread.start() + ).start() - push_thread = PypoPush(push_queue, pypo_liquidsoap, config) - push_thread.start() + PypoPush(push_queue, pypo_liquidsoap, config).start() - stats_collector_thread = StatsCollectorThread(config, legacy_client) - stats_collector_thread.start() + StatsCollectorThread(config, legacy_client).start() message_listener = MessageListener(config, fetch_queue) message_listener.run_forever() diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index a4b618877..7add17819 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -3,7 +3,7 @@ import logging from queue import Queue as ThreadQueue from signal import SIGTERM, signal from time import sleep -from typing import Any, Dict, Union +from typing import Any, Dict # For RabbitMQ from kombu.connection import Connection @@ -66,11 +66,12 @@ class MessageHandler(ConsumerMixin): message.ack() +# pylint: disable=too-few-public-methods class MessageListener: def __init__( self, config: Config, - fetch_queue: ThreadQueue[Union[str, bytes]], + fetch_queue: ThreadQueue[Dict[str, Any]], ) -> None: self.config = config self.fetch_queue = fetch_queue diff --git a/playout/libretime_playout/notify/main.py b/playout/libretime_playout/notify/main.py index 45b4055c3..941ebba0c 100644 --- a/playout/libretime_playout/notify/main.py +++ b/playout/libretime_playout/notify/main.py @@ -22,11 +22,12 @@ from libretime_shared.cli import cli_config_options, cli_logging_options from libretime_shared.config import DEFAULT_ENV_PREFIX from libretime_shared.logging import setup_logger -logger = logging.getLogger(__name__) - from ..config import Config +logger = logging.getLogger(__name__) + +# pylint: disable=too-few-public-methods class App: config: Config api_client: LegacyClient diff --git a/playout/libretime_playout/player/fetch.py b/playout/libretime_playout/player/fetch.py index cc9dc6454..774eb7fa5 100644 --- a/playout/libretime_playout/player/fetch.py +++ b/playout/libretime_playout/player/fetch.py @@ -5,7 +5,7 @@ import os import time from pathlib import Path from queue import Empty, Queue -from subprocess import PIPE, Popen +from subprocess import DEVNULL, PIPE, run from threading import Thread, Timer from typing import Any, Dict @@ -35,10 +35,12 @@ def mime_guess_extension(mime: str) -> str: return extension +# pylint: disable=too-many-instance-attributes class PypoFetch(Thread): name = "fetch" daemon = True + # pylint: disable=too-many-arguments def __init__( self, fetch_queue: Queue[Dict[str, Any]], @@ -112,11 +114,10 @@ class PypoFetch(Thread): if command == "update_schedule": self.listener_timeout = POLL_INTERVAL else: - self.listener_timeout = ( - self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL + self.listener_timeout = max( + self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL, + 0, ) - if self.listener_timeout < 0: - self.listener_timeout = 0 logger.info("New timeout: %s", self.listener_timeout) except Exception as exception: logger.exception(exception) @@ -195,10 +196,12 @@ class PypoFetch(Thread): logger.exception(exception) # Process the schedule - # - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") + # - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / + # "cache_for") # - Saves a serialized file of the schedule - # - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied - # to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) + # - playlists are prepared. (brought to liquidsoap format) and, if not mounted via + # nsf, files are copied to the cache dir + # (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) # - runs the cleanup routine, to get rid of unused cached files def process_schedule(self, events: Events): @@ -255,17 +258,16 @@ class PypoFetch(Thread): return file_ext - def is_file_opened(self, path): - # Capture stderr to avoid polluting py-interpreter.log - proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE) - out = proc.communicate()[0].strip() - return bool(out) + def is_file_opened(self, path: str) -> bool: + result = run(["lsof", "--", path], stdout=PIPE, stderr=DEVNULL, check=False) + return bool(result.stdout) def cache_cleanup(self, events: Events): """ - Get list of all files in the cache dir and remove them if they aren't being used anymore. - Input dict() media, lists all files that are scheduled or currently playing. Not being in this - dict() means the file is safe to remove. + Get list of all files in the cache dir and remove them if they aren't being used + anymore. + Input dict() media, lists all files that are scheduled or currently playing. Not + being in this dict() means the file is safe to remove. """ cached_file_set = set(os.listdir(self.cache_dir)) scheduled_file_set = set() @@ -275,7 +277,7 @@ class PypoFetch(Thread): if item["type"] == EventKind.FILE: if "file_ext" not in item: item["file_ext"] = mime_guess_extension(item["metadata"]["mime"]) - scheduled_file_set.add("{}{}".format(item["id"], item["file_ext"])) + scheduled_file_set.add(f'{item["id"]}{item["file_ext"]}') expired_files = cached_file_set - scheduled_file_set diff --git a/playout/libretime_playout/player/file.py b/playout/libretime_playout/player/file.py index 544ce2ca8..36e289c84 100644 --- a/playout/libretime_playout/player/file.py +++ b/playout/libretime_playout/player/file.py @@ -7,8 +7,8 @@ from queue import Empty, Queue from threading import Thread from typing import Optional +import requests from libretime_api_client.v2 import ApiClient -from requests.exceptions import ConnectionError, HTTPError, Timeout from .events import FileEvent, FileEvents @@ -71,7 +71,7 @@ class PypoFile(Thread): for chunk in response.iter_content(chunk_size=2048): handle.write(chunk) - except HTTPError as exception: + except requests.exceptions.HTTPError as exception: raise RuntimeError( f"could not download file {file_event['id']}" ) from exception @@ -98,10 +98,10 @@ class PypoFile(Thread): try: file_size = os.path.getsize(file_path) - with open(file_path, "rb") as fh: + with open(file_path, "rb") as file_fd: hasher = hashlib.md5(usedforsecurity=False) while True: - data = fh.read(8192) + data = file_fd.read(8192) if not data: break hasher.update(data) @@ -121,7 +121,7 @@ class PypoFile(Thread): file_id, json={"filesize": file_size, "md5": md5_hash}, ) - except (ConnectionError, Timeout): + except (requests.exceptions.ConnectionError, requests.exceptions.Timeout): logger.exception(error_msg) except Exception as exception: logger.exception("%s: %s", error_msg, exception) diff --git a/playout/libretime_playout/player/liquidsoap.py b/playout/libretime_playout/player/liquidsoap.py index 29054f258..9701fe5cb 100644 --- a/playout/libretime_playout/player/liquidsoap.py +++ b/playout/libretime_playout/player/liquidsoap.py @@ -1,7 +1,7 @@ import logging import time from datetime import datetime, timedelta -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Set from ..liquidsoap.client import LiquidsoapClient from ..utils import seconds_between @@ -49,7 +49,7 @@ class PypoLiquidsoap: # time so that the prebuffering stage could take effect. Let's do the # prebuffering now. self.telnet_liquidsoap.start_web_stream_buffer(media_item) - self.telnet_liquidsoap.start_web_stream(media_item) + self.telnet_liquidsoap.start_web_stream() elif media_item["type"] == EventKind.WEB_STREAM_BUFFER_END: self.telnet_liquidsoap.stop_web_stream_buffer() elif media_item["type"] == EventKind.WEB_STREAM_OUTPUT_END: @@ -105,6 +105,7 @@ class PypoLiquidsoap: return available_queue + # pylint: disable=too-many-branches def verify_correct_present_media(self, scheduled_now: List[AnyEvent]): """ verify whether Liquidsoap is currently playing the correct files. @@ -139,20 +140,19 @@ class PypoLiquidsoap: if x["type"] == EventKind.WEB_STREAM_OUTPUT_START ] - schedule_ids = {x["row_id"] for x in scheduled_now_files} + schedule_ids: Set[int] = {x["row_id"] for x in scheduled_now_files} row_id_map = {} - liq_queue_ids = set() - for queue_id in self.liq_queue_tracker: - queue_item = self.liq_queue_tracker[queue_id] + liq_queue_ids: Set[int] = set() + for queue_item in self.liq_queue_tracker.values(): if queue_item is not None and not self.is_media_item_finished( queue_item ): liq_queue_ids.add(queue_item["row_id"]) row_id_map[queue_item["row_id"]] = queue_item - to_be_removed = set() - to_be_added = set() + to_be_removed: Set[int] = set() + to_be_added: Set[int] = set() # Iterate over the new files, and compare them to currently scheduled # tracks. If already in liquidsoap queue still need to make sure they don't @@ -182,9 +182,11 @@ class PypoLiquidsoap: logger.info("Need to remove items from Liquidsoap: %s", to_be_removed) # remove files from Liquidsoap's queue - for queue_id in self.liq_queue_tracker: - queue_item = self.liq_queue_tracker[queue_id] - if queue_item is not None and queue_item["row_id"] in to_be_removed: + for queue_id, queue_item in self.liq_queue_tracker.items(): + if ( + queue_item is not None + and queue_item.get("row_id") in to_be_removed + ): self.stop(queue_id) if to_be_added: diff --git a/playout/libretime_playout/player/liquidsoap_gateway.py b/playout/libretime_playout/player/liquidsoap_gateway.py index 993362fa2..a81758758 100644 --- a/playout/libretime_playout/player/liquidsoap_gateway.py +++ b/playout/libretime_playout/player/liquidsoap_gateway.py @@ -9,7 +9,8 @@ logger = logging.getLogger(__name__) def create_liquidsoap_annotation(file_event: FileEvent) -> str: - # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. + # We need liq_start_next value in the annotate. That is the value that controls + # overlap duration of crossfade. annotations = { "media_id": file_event["id"], "liq_start_next": "0", @@ -21,8 +22,8 @@ def create_liquidsoap_annotation(file_event: FileEvent) -> str: "replay_gain": f"{file_event['replay_gain']} dB", } - # Override the the artist/title that Liquidsoap extracts from a file's metadata - # with the metadata we get from Airtime. (You can modify metadata in Airtime's library, + # Override the the artist/title that Liquidsoap extracts from a file's metadata with + # the metadata we get from Airtime. (You can modify metadata in Airtime's library, # which doesn't get saved back to the file.) if "metadata" in file_event: if "artist_name" in file_event["metadata"]: @@ -87,7 +88,7 @@ class TelnetLiquidsoap: logger.exception(exception) @ls_timeout - def start_web_stream(self, media_item): + def start_web_stream(self): try: self.liq_client.web_stream_start() self.current_prebuffering_stream_id = None @@ -106,11 +107,12 @@ class TelnetLiquidsoap: logger.exception(exception) @ls_timeout - def get_current_stream_id(self): + def get_current_stream_id(self) -> str: try: return self.liq_client.web_stream_get_id() except (ConnectionError, TimeoutError) as exception: logger.exception(exception) + return "-1" @ls_timeout def disconnect_source(self, sourcename): diff --git a/playout/libretime_playout/player/push.py b/playout/libretime_playout/player/push.py index 43c0ac506..f908b5dfd 100644 --- a/playout/libretime_playout/player/push.py +++ b/playout/libretime_playout/player/push.py @@ -55,16 +55,16 @@ class PypoPush(Thread): except Exception as exception: logger.exception(exception) raise exception - else: - logger.debug(events) - # separate media_schedule list into currently_playing and - # scheduled_for_future lists - currently_playing, scheduled_for_future = self.separate_present_future( - events - ) - self.pypo_liquidsoap.verify_correct_present_media(currently_playing) - self.future_scheduled_queue.put(scheduled_for_future) + logger.debug(events) + # separate media_schedule list into currently_playing and + # scheduled_for_future lists + currently_playing, scheduled_for_future = self.separate_present_future( + events + ) + + self.pypo_liquidsoap.verify_correct_present_media(currently_playing) + self.future_scheduled_queue.put(scheduled_for_future) if loops % heartbeat_period == 0: logger.info("heartbeat") diff --git a/playout/pyproject.toml b/playout/pyproject.toml index 13a147366..4e8a2ffaa 100644 --- a/playout/pyproject.toml +++ b/playout/pyproject.toml @@ -9,6 +9,7 @@ extension-pkg-whitelist = [ "pydantic", ] disable = [ + "fixme", "missing-class-docstring", "missing-function-docstring", "missing-module-docstring", diff --git a/playout/tests/liquidsoap/version_test.py b/playout/tests/liquidsoap/version_test.py index fba97a820..0bde5abe9 100644 --- a/playout/tests/liquidsoap/version_test.py +++ b/playout/tests/liquidsoap/version_test.py @@ -27,9 +27,9 @@ def test_parse_liquidsoap_version(version, expected): @pytest.mark.skipif(getenv("CI") != "true", reason="requires liquidsoap") def test_get_liquidsoap_version(): - LIQUIDSOAP_VERSION_MAP = { + liquidsoap_version_map = { "focal": (1, 4, 2), "bullseye": (1, 4, 3), "jammy": (2, 0, 2), } - assert get_liquidsoap_version() == LIQUIDSOAP_VERSION_MAP[distro.codename()] + assert get_liquidsoap_version() == liquidsoap_version_map[distro.codename()]