refactor(playout): add more typings

This commit is contained in:
jo 2023-03-01 20:27:27 +01:00 committed by Kyle Robbertze
parent a32d9d25f1
commit 719464a272
10 changed files with 54 additions and 44 deletions

View File

@ -47,7 +47,7 @@ class StatsCollector:
) -> Dict[str, Stats]:
response = self._session.get(
url=self.get_output_url(output),
auth=(output.admin_user, output.admin_password),
auth=(output.admin_user, output.admin_password or ""),
timeout=self._timeout,
)
response.raise_for_status()
@ -90,7 +90,7 @@ class StatsCollector:
outputs: List[AnyOutput],
*,
_timestamp: Optional[datetime] = None,
):
) -> None:
if _timestamp is None:
_timestamp = datetime.utcnow()
@ -148,7 +148,7 @@ class StatsCollectorThread(Thread):
self._config = config
self._collector = StatsCollector(legacy_client)
def run(self):
def run(self) -> None:
logger.info("starting %s", self.name)
while True:
try:

View File

@ -38,10 +38,10 @@ class LiquidsoapClient:
timeout=timeout,
)
def _quote(self, value: Any):
def _quote(self, value: Any) -> str:
return quote(value, double=True)
def _set_var(self, name: str, value: Any):
def _set_var(self, name: str, value: Any) -> None:
self.conn.write(f"var.set {name} = {value}")
result = self.conn.read()
if f"Variable {name} set" not in result:
@ -126,16 +126,18 @@ class LiquidsoapClient:
self,
*,
station_name: Optional[str] = None,
message_format: Optional[Union[MessageFormatKind, str]] = None,
message_format: Optional[Union[MessageFormatKind, int]] = None,
message_offline: Optional[str] = None,
input_fade_transition: Optional[float] = None,
):
) -> None:
with self.conn:
if station_name is not None:
self._set_var("station_name", self._quote(station_name))
if message_format is not None:
if isinstance(message_format, MessageFormatKind):
message_format = message_format.value
# Use an interactive.string until Liquidsoap have interactive.int
# variables
self._set_var("message_format", self._quote(message_format))
if message_offline is not None:
self._set_var("message_offline", self._quote(message_offline))

View File

@ -46,8 +46,8 @@ class LiquidsoapConnection:
self._port = port
self._timeout = timeout
def address(self):
return f"{self._host}:{self._port}" if self._path is None else self._path
def address(self) -> str:
return f"{self._host}:{self._port}" if self._path is None else str(self._path)
def __enter__(self):
try:

View File

@ -36,7 +36,7 @@ for module in ("amqp",):
logging.getLogger(module).propagate = False
def wait_for_legacy(legacy_client: LegacyClient):
def wait_for_legacy(legacy_client: LegacyClient) -> None:
while not legacy_client.is_server_compatible():
time.sleep(5)
@ -54,7 +54,7 @@ def wait_for_legacy(legacy_client: LegacyClient):
time.sleep(10)
def wait_for_liquidsoap(liq_client: LiquidsoapClient):
def wait_for_liquidsoap(liq_client: LiquidsoapClient) -> None:
logger.debug("Checking if Liquidsoap is running")
liq_version = liq_client.wait_for_version()
if not LIQUIDSOAP_MIN_VERSION <= liq_version:
@ -64,7 +64,11 @@ def wait_for_liquidsoap(liq_client: LiquidsoapClient):
@click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX})
@cli_logging_options()
@cli_config_options()
def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[Path]):
def cli(
log_level: str,
log_filepath: Optional[Path],
config_filepath: Optional[Path],
) -> None:
"""
Run playout.
"""

View File

@ -34,7 +34,7 @@ class MessageHandler(ConsumerMixin):
Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),
]
def on_message(self, body, message: Message):
def on_message(self, body, message: Message) -> None:
logger.debug("received message: %s", body)
try:
try:
@ -76,7 +76,7 @@ class MessageListener:
self.config = config
self.fetch_queue = fetch_queue
def run_forever(self):
def run_forever(self) -> None:
while True:
with Connection(
self.config.rabbitmq.url,

View File

@ -7,7 +7,7 @@ from pathlib import Path
from queue import Empty, Queue
from subprocess import DEVNULL, PIPE, run
from threading import Thread, Timer
from typing import Any, Dict
from typing import Any, Dict, Union
from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient
@ -15,7 +15,7 @@ from requests import RequestException
from ..config import CACHE_DIR, POLL_INTERVAL, Config
from ..liquidsoap.client import LiquidsoapClient
from ..liquidsoap.models import Info, StreamPreferences, StreamState
from ..liquidsoap.models import Info, MessageFormatKind, StreamPreferences, StreamState
from ..timeout import ls_timeout
from .events import EventKind, Events, FileEvent, FileEvents, event_key_to_datetime
from .liquidsoap import PypoLiquidsoap
@ -75,7 +75,7 @@ class PypoFetch(Thread):
# Handle a message from RabbitMQ, put it into our yucky global var.
# Hopefully there is a better way to do this.
def handle_message(self, message: Dict[str, Any]):
def handle_message(self, message: Dict[str, Any]) -> None:
try:
command = message["event_type"]
logger.debug("handling event %s: %s", command, message)
@ -123,7 +123,7 @@ class PypoFetch(Thread):
logger.exception(exception)
# Initialize Liquidsoap environment
def set_bootstrap_variables(self):
def set_bootstrap_variables(self) -> None:
logger.debug("Getting information needed on bootstrap from Airtime")
try:
info = Info(**self.api_client.get_info().json())
@ -168,28 +168,31 @@ class PypoFetch(Thread):
self.pypo_liquidsoap.clear_queue_tracker()
@ls_timeout
def update_liquidsoap_stream_format(self, stream_format):
def update_liquidsoap_stream_format(
self,
stream_format: Union[MessageFormatKind, int],
) -> None:
try:
self.liq_client.settings_update(message_format=stream_format)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
@ls_timeout
def update_liquidsoap_message_offline(self, message_offline: str):
def update_liquidsoap_message_offline(self, message_offline: str) -> None:
try:
self.liq_client.settings_update(message_offline=message_offline)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
@ls_timeout
def update_liquidsoap_transition_fade(self, fade):
def update_liquidsoap_transition_fade(self, fade: float) -> None:
try:
self.liq_client.settings_update(input_fade_transition=fade)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
@ls_timeout
def update_liquidsoap_station_name(self, station_name):
def update_liquidsoap_station_name(self, station_name: str) -> None:
try:
self.liq_client.settings_update(station_name=station_name)
except (ConnectionError, TimeoutError) as exception:
@ -204,7 +207,7 @@ class PypoFetch(Thread):
# (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
# - runs the cleanup routine, to get rid of unused cached files
def process_schedule(self, events: Events):
def process_schedule(self, events: Events) -> None:
self.last_update_schedule_timestamp = time.time()
logger.debug(events)
file_events: FileEvents = {}
@ -300,7 +303,7 @@ class PypoFetch(Thread):
"Problem removing file '%s': %s", expired_file, exception
)
def manual_schedule_fetch(self):
def manual_schedule_fetch(self) -> bool:
try:
self.schedule_data = get_schedule(self.api_client)
logger.debug("Received event from API client: %s", self.schedule_data)
@ -310,7 +313,7 @@ class PypoFetch(Thread):
logger.exception("Unable to fetch schedule: %s", exception)
return False
def persistent_manual_schedule_fetch(self, max_attempts=1):
def persistent_manual_schedule_fetch(self, max_attempts=1) -> bool:
success = False
num_attempts = 0
while not success and num_attempts < max_attempts:

View File

@ -33,7 +33,7 @@ class PypoLiquidsoap:
list(self.liq_queue_tracker.keys()),
)
def play(self, media_item: AnyEvent):
def play(self, media_item: AnyEvent) -> None:
if media_item["type"] == EventKind.FILE:
self.handle_file_type(media_item)
elif media_item["type"] == EventKind.ACTION:
@ -57,7 +57,7 @@ class PypoLiquidsoap:
else:
raise UnknownMediaItemType(str(media_item))
def handle_file_type(self, media_item: FileEvent):
def handle_file_type(self, media_item: FileEvent) -> None:
"""
Wait 200 seconds (2000 iterations) for file to become ready,
otherwise give up on it.
@ -82,13 +82,13 @@ class PypoLiquidsoap:
media_item["dst"],
)
def handle_event_type(self, media_item: ActionEvent):
def handle_event_type(self, media_item: ActionEvent) -> None:
if media_item["event_type"] == "kick_out":
self.telnet_liquidsoap.disconnect_source("live_dj")
elif media_item["event_type"] == "switch_off":
self.telnet_liquidsoap.switch_source("live_dj", "off")
def is_media_item_finished(self, media_item: Optional[AnyEvent]):
def is_media_item_finished(self, media_item: Optional[AnyEvent]) -> bool:
if media_item is None:
return True
return datetime.utcnow() > event_key_to_datetime(media_item["end"])
@ -106,7 +106,7 @@ class PypoLiquidsoap:
return available_queue
# pylint: disable=too-many-branches
def verify_correct_present_media(self, scheduled_now: List[AnyEvent]):
def verify_correct_present_media(self, scheduled_now: List[AnyEvent]) -> None:
"""
verify whether Liquidsoap is currently playing the correct files.
if we find an item that Liquidsoap is not playing, then push it
@ -213,18 +213,18 @@ class PypoLiquidsoap:
except KeyError as exception:
logger.exception("Malformed event in schedule: %s", exception)
def stop(self, queue_id: int):
def stop(self, queue_id: int) -> None:
self.telnet_liquidsoap.queue_remove(queue_id)
self.liq_queue_tracker[queue_id] = None
def is_file(self, event: AnyEvent):
def is_file(self, event: AnyEvent) -> bool:
return event["type"] == EventKind.FILE
def clear_queue_tracker(self):
def clear_queue_tracker(self) -> None:
for queue_id in self.liq_queue_tracker:
self.liq_queue_tracker[queue_id] = None
def modify_cue_point(self, link: FileEvent):
def modify_cue_point(self, link: FileEvent) -> None:
assert self.is_file(link)
lateness = seconds_between(
@ -237,7 +237,7 @@ class PypoLiquidsoap:
cue_in_orig = timedelta(seconds=float(link["cue_in"]))
link["cue_in"] = cue_in_orig.total_seconds() + lateness
def clear_all_queues(self):
def clear_all_queues(self) -> None:
self.telnet_liquidsoap.queue_clear_all()

View File

@ -14,11 +14,11 @@ from .queue import PypoLiqQueue
logger = logging.getLogger(__name__)
def is_stream(media_item):
def is_stream(media_item: AnyEvent) -> bool:
return media_item["type"] == "stream_output_start"
def is_file(media_item):
def is_file(media_item: AnyEvent) -> bool:
return media_item["type"] == "file"
@ -43,7 +43,7 @@ class PypoPush(Thread):
self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap)
self.plq.start()
def main(self):
def main(self) -> None:
loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
@ -99,7 +99,7 @@ class PypoPush(Thread):
return present, future
def run(self):
def run(self) -> None:
while True:
try:
self.main()

View File

@ -6,6 +6,7 @@ from threading import Thread
from typing import Any, Dict
from ..utils import seconds_between
from .events import AnyEvent, event_key_to_datetime
from .liquidsoap import PypoLiquidsoap
logger = logging.getLogger(__name__)
@ -24,9 +25,9 @@ class PypoLiqQueue(Thread):
self.queue = future_queue
self.pypo_liquidsoap = pypo_liquidsoap
def main(self):
def main(self) -> None:
time_until_next_play = None
schedule_deque = deque()
schedule_deque: deque[AnyEvent] = deque()
media_schedule = None
while True:
@ -48,7 +49,7 @@ class PypoLiqQueue(Thread):
if len(schedule_deque):
time_until_next_play = seconds_between(
datetime.utcnow(),
schedule_deque[0]["start"],
event_key_to_datetime(schedule_deque[0]["start"]),
)
else:
time_until_next_play = None
@ -71,7 +72,7 @@ class PypoLiqQueue(Thread):
else:
time_until_next_play = None
def run(self):
def run(self) -> None:
try:
self.main()
except Exception as exception: # pylint: disable=broad-exception-caught

View File

@ -18,7 +18,7 @@ from .events import (
)
def insert_event(events: Events, event_key: str, event: AnyEvent):
def insert_event(events: Events, event_key: str, event: AnyEvent) -> None:
key = event_key
# Search for an empty slot