refactor(playout): fix linting errors

This commit is contained in:
jo 2023-03-01 17:13:02 +01:00 committed by Kyle Robbertze
parent c6c5b1125f
commit a77321190f
11 changed files with 100 additions and 87 deletions

View File

@ -5,15 +5,11 @@ from threading import Thread
from time import sleep from time import sleep
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
import requests
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_shared.config import IcecastOutput, ShoutcastOutput from libretime_shared.config import IcecastOutput, ShoutcastOutput
from lxml import etree from lxml import etree
from requests import Session from requests import Session
from requests.exceptions import ( # pylint: disable=redefined-builtin
ConnectionError,
HTTPError,
Timeout,
)
from ..config import Config from ..config import Config
@ -116,9 +112,9 @@ class StatsCollector:
cache[output_url] = self.collect_output_stats(output) cache[output_url] = self.collect_output_stats(output)
except ( except (
etree.XMLSyntaxError, etree.XMLSyntaxError,
ConnectionError, requests.exceptions.ConnectionError,
HTTPError, requests.exceptions.HTTPError,
Timeout, requests.exceptions.Timeout,
) as exception: ) as exception:
logger.exception(exception) logger.exception(exception)
self._legacy_client.update_stream_setting_table( self._legacy_client.update_stream_setting_table(

View File

@ -11,6 +11,7 @@ from queue import Queue
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
import click import click
import requests
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient from libretime_api_client.v2 import ApiClient
from libretime_shared.cli import cli_config_options, cli_logging_options from libretime_shared.cli import cli_config_options, cli_logging_options
@ -35,6 +36,31 @@ for module in ("amqp",):
logging.getLogger(module).propagate = False logging.getLogger(module).propagate = False
def wait_for_legacy(legacy_client: LegacyClient):
while not legacy_client.is_server_compatible():
time.sleep(5)
success = False
while not success:
try:
legacy_client.register_component("pypo")
success = True
except (
requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
requests.exceptions.Timeout,
) as exception:
logger.exception(exception)
time.sleep(10)
def wait_for_liquidsoap(liq_client: LiquidsoapClient):
logger.debug("Checking if Liquidsoap is running")
liq_version = liq_client.wait_for_version()
if not LIQUIDSOAP_MIN_VERSION <= liq_version:
raise RuntimeError(f"Invalid liquidsoap version {liq_version}")
@click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX}) @click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX})
@cli_logging_options() @cli_logging_options()
@cli_config_options() @cli_config_options()
@ -58,33 +84,19 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
logger.info("Timezone: %s", time.tzname) logger.info("Timezone: %s", time.tzname)
logger.info("UTC time: %s", datetime.utcnow()) logger.info("UTC time: %s", datetime.utcnow())
legacy_client = LegacyClient()
api_client = ApiClient( api_client = ApiClient(
base_url=config.general.public_url, base_url=config.general.public_url,
api_key=config.general.api_key, api_key=config.general.api_key,
) )
while not legacy_client.is_server_compatible(): legacy_client = LegacyClient()
time.sleep(5) wait_for_legacy(legacy_client)
success = False
while not success:
try:
legacy_client.register_component("pypo")
success = True
except Exception as exception:
logger.exception(exception)
time.sleep(10)
liq_client = LiquidsoapClient( liq_client = LiquidsoapClient(
host=config.playout.liquidsoap_host, host=config.playout.liquidsoap_host,
port=config.playout.liquidsoap_port, port=config.playout.liquidsoap_port,
) )
wait_for_liquidsoap(liq_client)
logger.debug("Checking if Liquidsoap is running")
liq_version = liq_client.wait_for_version()
if not LIQUIDSOAP_MIN_VERSION <= liq_version:
raise RuntimeError(f"Invalid liquidsoap version {liq_version}")
fetch_queue: Queue[Dict[str, Any]] = Queue() fetch_queue: Queue[Dict[str, Any]] = Queue()
push_queue: Queue[Events] = Queue() push_queue: Queue[Events] = Queue()
@ -96,10 +108,9 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
pypo_liquidsoap = PypoLiquidsoap(liq_client) pypo_liquidsoap = PypoLiquidsoap(liq_client)
file_thread = PypoFile(file_queue, api_client) PypoFile(file_queue, api_client).start()
file_thread.start()
fetch_thread = PypoFetch( PypoFetch(
fetch_queue, fetch_queue,
push_queue, push_queue,
file_queue, file_queue,
@ -108,14 +119,11 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
config, config,
api_client, api_client,
legacy_client, legacy_client,
) ).start()
fetch_thread.start()
push_thread = PypoPush(push_queue, pypo_liquidsoap, config) PypoPush(push_queue, pypo_liquidsoap, config).start()
push_thread.start()
stats_collector_thread = StatsCollectorThread(config, legacy_client) StatsCollectorThread(config, legacy_client).start()
stats_collector_thread.start()
message_listener = MessageListener(config, fetch_queue) message_listener = MessageListener(config, fetch_queue)
message_listener.run_forever() message_listener.run_forever()

View File

@ -3,7 +3,7 @@ import logging
from queue import Queue as ThreadQueue from queue import Queue as ThreadQueue
from signal import SIGTERM, signal from signal import SIGTERM, signal
from time import sleep from time import sleep
from typing import Any, Dict, Union from typing import Any, Dict
# For RabbitMQ # For RabbitMQ
from kombu.connection import Connection from kombu.connection import Connection
@ -66,11 +66,12 @@ class MessageHandler(ConsumerMixin):
message.ack() message.ack()
# pylint: disable=too-few-public-methods
class MessageListener: class MessageListener:
def __init__( def __init__(
self, self,
config: Config, config: Config,
fetch_queue: ThreadQueue[Union[str, bytes]], fetch_queue: ThreadQueue[Dict[str, Any]],
) -> None: ) -> None:
self.config = config self.config = config
self.fetch_queue = fetch_queue self.fetch_queue = fetch_queue

View File

@ -22,11 +22,12 @@ from libretime_shared.cli import cli_config_options, cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX from libretime_shared.config import DEFAULT_ENV_PREFIX
from libretime_shared.logging import setup_logger from libretime_shared.logging import setup_logger
logger = logging.getLogger(__name__)
from ..config import Config from ..config import Config
logger = logging.getLogger(__name__)
# pylint: disable=too-few-public-methods
class App: class App:
config: Config config: Config
api_client: LegacyClient api_client: LegacyClient

View File

@ -5,7 +5,7 @@ import os
import time import time
from pathlib import Path from pathlib import Path
from queue import Empty, Queue from queue import Empty, Queue
from subprocess import PIPE, Popen from subprocess import DEVNULL, PIPE, run
from threading import Thread, Timer from threading import Thread, Timer
from typing import Any, Dict from typing import Any, Dict
@ -35,10 +35,12 @@ def mime_guess_extension(mime: str) -> str:
return extension return extension
# pylint: disable=too-many-instance-attributes
class PypoFetch(Thread): class PypoFetch(Thread):
name = "fetch" name = "fetch"
daemon = True daemon = True
# pylint: disable=too-many-arguments
def __init__( def __init__(
self, self,
fetch_queue: Queue[Dict[str, Any]], fetch_queue: Queue[Dict[str, Any]],
@ -112,11 +114,10 @@ class PypoFetch(Thread):
if command == "update_schedule": if command == "update_schedule":
self.listener_timeout = POLL_INTERVAL self.listener_timeout = POLL_INTERVAL
else: else:
self.listener_timeout = ( self.listener_timeout = max(
self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL,
0,
) )
if self.listener_timeout < 0:
self.listener_timeout = 0
logger.info("New timeout: %s", self.listener_timeout) logger.info("New timeout: %s", self.listener_timeout)
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
@ -195,10 +196,12 @@ class PypoFetch(Thread):
logger.exception(exception) logger.exception(exception)
# Process the schedule # Process the schedule
# - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") # - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" /
# "cache_for")
# - Saves a serialized file of the schedule # - Saves a serialized file of the schedule
# - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied # - playlists are prepared. (brought to liquidsoap format) and, if not mounted via
# to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) # nsf, files are copied to the cache dir
# (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
# - runs the cleanup routine, to get rid of unused cached files # - runs the cleanup routine, to get rid of unused cached files
def process_schedule(self, events: Events): def process_schedule(self, events: Events):
@ -255,17 +258,16 @@ class PypoFetch(Thread):
return file_ext return file_ext
def is_file_opened(self, path): def is_file_opened(self, path: str) -> bool:
# Capture stderr to avoid polluting py-interpreter.log result = run(["lsof", "--", path], stdout=PIPE, stderr=DEVNULL, check=False)
proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE) return bool(result.stdout)
out = proc.communicate()[0].strip()
return bool(out)
def cache_cleanup(self, events: Events): def cache_cleanup(self, events: Events):
""" """
Get list of all files in the cache dir and remove them if they aren't being used anymore. Get list of all files in the cache dir and remove them if they aren't being used
Input dict() media, lists all files that are scheduled or currently playing. Not being in this anymore.
dict() means the file is safe to remove. Input dict() media, lists all files that are scheduled or currently playing. Not
being in this dict() means the file is safe to remove.
""" """
cached_file_set = set(os.listdir(self.cache_dir)) cached_file_set = set(os.listdir(self.cache_dir))
scheduled_file_set = set() scheduled_file_set = set()
@ -275,7 +277,7 @@ class PypoFetch(Thread):
if item["type"] == EventKind.FILE: if item["type"] == EventKind.FILE:
if "file_ext" not in item: if "file_ext" not in item:
item["file_ext"] = mime_guess_extension(item["metadata"]["mime"]) item["file_ext"] = mime_guess_extension(item["metadata"]["mime"])
scheduled_file_set.add("{}{}".format(item["id"], item["file_ext"])) scheduled_file_set.add(f'{item["id"]}{item["file_ext"]}')
expired_files = cached_file_set - scheduled_file_set expired_files = cached_file_set - scheduled_file_set

View File

@ -7,8 +7,8 @@ from queue import Empty, Queue
from threading import Thread from threading import Thread
from typing import Optional from typing import Optional
import requests
from libretime_api_client.v2 import ApiClient from libretime_api_client.v2 import ApiClient
from requests.exceptions import ConnectionError, HTTPError, Timeout
from .events import FileEvent, FileEvents from .events import FileEvent, FileEvents
@ -71,7 +71,7 @@ class PypoFile(Thread):
for chunk in response.iter_content(chunk_size=2048): for chunk in response.iter_content(chunk_size=2048):
handle.write(chunk) handle.write(chunk)
except HTTPError as exception: except requests.exceptions.HTTPError as exception:
raise RuntimeError( raise RuntimeError(
f"could not download file {file_event['id']}" f"could not download file {file_event['id']}"
) from exception ) from exception
@ -98,10 +98,10 @@ class PypoFile(Thread):
try: try:
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
with open(file_path, "rb") as fh: with open(file_path, "rb") as file_fd:
hasher = hashlib.md5(usedforsecurity=False) hasher = hashlib.md5(usedforsecurity=False)
while True: while True:
data = fh.read(8192) data = file_fd.read(8192)
if not data: if not data:
break break
hasher.update(data) hasher.update(data)
@ -121,7 +121,7 @@ class PypoFile(Thread):
file_id, file_id,
json={"filesize": file_size, "md5": md5_hash}, json={"filesize": file_size, "md5": md5_hash},
) )
except (ConnectionError, Timeout): except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
logger.exception(error_msg) logger.exception(error_msg)
except Exception as exception: except Exception as exception:
logger.exception("%s: %s", error_msg, exception) logger.exception("%s: %s", error_msg, exception)

View File

@ -1,7 +1,7 @@
import logging import logging
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Optional from typing import Dict, List, Optional, Set
from ..liquidsoap.client import LiquidsoapClient from ..liquidsoap.client import LiquidsoapClient
from ..utils import seconds_between from ..utils import seconds_between
@ -49,7 +49,7 @@ class PypoLiquidsoap:
# time so that the prebuffering stage could take effect. Let's do the # time so that the prebuffering stage could take effect. Let's do the
# prebuffering now. # prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item) self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item) self.telnet_liquidsoap.start_web_stream()
elif media_item["type"] == EventKind.WEB_STREAM_BUFFER_END: elif media_item["type"] == EventKind.WEB_STREAM_BUFFER_END:
self.telnet_liquidsoap.stop_web_stream_buffer() self.telnet_liquidsoap.stop_web_stream_buffer()
elif media_item["type"] == EventKind.WEB_STREAM_OUTPUT_END: elif media_item["type"] == EventKind.WEB_STREAM_OUTPUT_END:
@ -105,6 +105,7 @@ class PypoLiquidsoap:
return available_queue return available_queue
# pylint: disable=too-many-branches
def verify_correct_present_media(self, scheduled_now: List[AnyEvent]): def verify_correct_present_media(self, scheduled_now: List[AnyEvent]):
""" """
verify whether Liquidsoap is currently playing the correct files. verify whether Liquidsoap is currently playing the correct files.
@ -139,20 +140,19 @@ class PypoLiquidsoap:
if x["type"] == EventKind.WEB_STREAM_OUTPUT_START if x["type"] == EventKind.WEB_STREAM_OUTPUT_START
] ]
schedule_ids = {x["row_id"] for x in scheduled_now_files} schedule_ids: Set[int] = {x["row_id"] for x in scheduled_now_files}
row_id_map = {} row_id_map = {}
liq_queue_ids = set() liq_queue_ids: Set[int] = set()
for queue_id in self.liq_queue_tracker: for queue_item in self.liq_queue_tracker.values():
queue_item = self.liq_queue_tracker[queue_id]
if queue_item is not None and not self.is_media_item_finished( if queue_item is not None and not self.is_media_item_finished(
queue_item queue_item
): ):
liq_queue_ids.add(queue_item["row_id"]) liq_queue_ids.add(queue_item["row_id"])
row_id_map[queue_item["row_id"]] = queue_item row_id_map[queue_item["row_id"]] = queue_item
to_be_removed = set() to_be_removed: Set[int] = set()
to_be_added = set() to_be_added: Set[int] = set()
# Iterate over the new files, and compare them to currently scheduled # Iterate over the new files, and compare them to currently scheduled
# tracks. If already in liquidsoap queue still need to make sure they don't # tracks. If already in liquidsoap queue still need to make sure they don't
@ -182,9 +182,11 @@ class PypoLiquidsoap:
logger.info("Need to remove items from Liquidsoap: %s", to_be_removed) logger.info("Need to remove items from Liquidsoap: %s", to_be_removed)
# remove files from Liquidsoap's queue # remove files from Liquidsoap's queue
for queue_id in self.liq_queue_tracker: for queue_id, queue_item in self.liq_queue_tracker.items():
queue_item = self.liq_queue_tracker[queue_id] if (
if queue_item is not None and queue_item["row_id"] in to_be_removed: queue_item is not None
and queue_item.get("row_id") in to_be_removed
):
self.stop(queue_id) self.stop(queue_id)
if to_be_added: if to_be_added:

View File

@ -9,7 +9,8 @@ logger = logging.getLogger(__name__)
def create_liquidsoap_annotation(file_event: FileEvent) -> str: def create_liquidsoap_annotation(file_event: FileEvent) -> str:
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. # We need liq_start_next value in the annotate. That is the value that controls
# overlap duration of crossfade.
annotations = { annotations = {
"media_id": file_event["id"], "media_id": file_event["id"],
"liq_start_next": "0", "liq_start_next": "0",
@ -21,8 +22,8 @@ def create_liquidsoap_annotation(file_event: FileEvent) -> str:
"replay_gain": f"{file_event['replay_gain']} dB", "replay_gain": f"{file_event['replay_gain']} dB",
} }
# Override the the artist/title that Liquidsoap extracts from a file's metadata # Override the the artist/title that Liquidsoap extracts from a file's metadata with
# with the metadata we get from Airtime. (You can modify metadata in Airtime's library, # the metadata we get from Airtime. (You can modify metadata in Airtime's library,
# which doesn't get saved back to the file.) # which doesn't get saved back to the file.)
if "metadata" in file_event: if "metadata" in file_event:
if "artist_name" in file_event["metadata"]: if "artist_name" in file_event["metadata"]:
@ -87,7 +88,7 @@ class TelnetLiquidsoap:
logger.exception(exception) logger.exception(exception)
@ls_timeout @ls_timeout
def start_web_stream(self, media_item): def start_web_stream(self):
try: try:
self.liq_client.web_stream_start() self.liq_client.web_stream_start()
self.current_prebuffering_stream_id = None self.current_prebuffering_stream_id = None
@ -106,11 +107,12 @@ class TelnetLiquidsoap:
logger.exception(exception) logger.exception(exception)
@ls_timeout @ls_timeout
def get_current_stream_id(self): def get_current_stream_id(self) -> str:
try: try:
return self.liq_client.web_stream_get_id() return self.liq_client.web_stream_get_id()
except (ConnectionError, TimeoutError) as exception: except (ConnectionError, TimeoutError) as exception:
logger.exception(exception) logger.exception(exception)
return "-1"
@ls_timeout @ls_timeout
def disconnect_source(self, sourcename): def disconnect_source(self, sourcename):

View File

@ -55,16 +55,16 @@ class PypoPush(Thread):
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
raise exception raise exception
else:
logger.debug(events)
# separate media_schedule list into currently_playing and
# scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future(
events
)
self.pypo_liquidsoap.verify_correct_present_media(currently_playing) logger.debug(events)
self.future_scheduled_queue.put(scheduled_for_future) # separate media_schedule list into currently_playing and
# scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future(
events
)
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0: if loops % heartbeat_period == 0:
logger.info("heartbeat") logger.info("heartbeat")

View File

@ -9,6 +9,7 @@ extension-pkg-whitelist = [
"pydantic", "pydantic",
] ]
disable = [ disable = [
"fixme",
"missing-class-docstring", "missing-class-docstring",
"missing-function-docstring", "missing-function-docstring",
"missing-module-docstring", "missing-module-docstring",

View File

@ -27,9 +27,9 @@ def test_parse_liquidsoap_version(version, expected):
@pytest.mark.skipif(getenv("CI") != "true", reason="requires liquidsoap") @pytest.mark.skipif(getenv("CI") != "true", reason="requires liquidsoap")
def test_get_liquidsoap_version(): def test_get_liquidsoap_version():
LIQUIDSOAP_VERSION_MAP = { liquidsoap_version_map = {
"focal": (1, 4, 2), "focal": (1, 4, 2),
"bullseye": (1, 4, 3), "bullseye": (1, 4, 3),
"jammy": (2, 0, 2), "jammy": (2, 0, 2),
} }
assert get_liquidsoap_version() == LIQUIDSOAP_VERSION_MAP[distro.codename()] assert get_liquidsoap_version() == liquidsoap_version_map[distro.codename()]