From e88e843b6552682be1961df9c71d07f9360105a9 Mon Sep 17 00:00:00 2001 From: jo Date: Sun, 26 Feb 2023 00:11:49 +0100 Subject: [PATCH] refactor(playout): add typings and fix linting errors move EVENT_KEY_FORMAT to events module properly type fetch queue event start/end can be str or datetime --- playout/libretime_playout/config.py | 4 +- playout/libretime_playout/main.py | 9 +- playout/libretime_playout/message_handler.py | 7 +- playout/libretime_playout/player/events.py | 32 ++++- playout/libretime_playout/player/fetch.py | 106 ++++++++-------- playout/libretime_playout/player/file.py | 72 ++++++----- .../libretime_playout/player/liquidsoap.py | 118 ++++++++++-------- .../player/liquidsoap_gateway.py | 51 ++++---- playout/libretime_playout/player/push.py | 49 ++++---- playout/libretime_playout/player/schedule.py | 18 +-- .../tests/player/liquidsoap_gateway_test.py | 33 +++++ 11 files changed, 286 insertions(+), 213 deletions(-) create mode 100644 playout/tests/player/liquidsoap_gateway_test.py diff --git a/playout/libretime_playout/config.py b/playout/libretime_playout/config.py index 1cb19e8fe..5c506e41d 100644 --- a/playout/libretime_playout/config.py +++ b/playout/libretime_playout/config.py @@ -12,8 +12,8 @@ from pydantic import BaseModel CACHE_DIR = Path.cwd() / "scheduler" RECORD_DIR = Path.cwd() / "recorder" -PUSH_INTERVAL = 2 -POLL_INTERVAL = 400 +PUSH_INTERVAL: float = 2.0 +POLL_INTERVAL: float = 400.0 class PlayoutConfig(BaseModel): diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index acda74991..79fb67a5d 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -8,7 +8,7 @@ import time from datetime import datetime from pathlib import Path from queue import Queue -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union import click from libretime_api_client.v1 import ApiClient as LegacyClient @@ -22,6 +22,7 @@ from .history.stats import StatsCollectorThread from .liquidsoap.client import LiquidsoapClient from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION from .message_handler import MessageListener +from .player.events import Events, FileEvents from .player.fetch import PypoFetch from .player.file import PypoFile from .player.liquidsoap import PypoLiquidsoap @@ -87,14 +88,14 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ if not LIQUIDSOAP_MIN_VERSION <= liq_version: raise RuntimeError(f"Invalid liquidsoap version {liq_version}") - fetch_queue: Queue[Dict[str, Any]] = Queue() + fetch_queue: Queue[Union[str, bytes]] = Queue() recorder_queue: Queue[Dict[str, Any]] = Queue() - push_queue: Queue[Dict[str, Any]] = Queue() + push_queue: Queue[Events] = Queue() # This queue is shared between pypo-fetch and pypo-file, where pypo-file # is the consumer. Pypo-fetch will send every schedule it gets to pypo-file # and pypo will parse this schedule to determine which file has the highest # priority, and retrieve it. - file_queue: Queue[Dict[str, Any]] = Queue() + file_queue: Queue[FileEvents] = Queue() pypo_liquidsoap = PypoLiquidsoap(liq_client) diff --git a/playout/libretime_playout/message_handler.py b/playout/libretime_playout/message_handler.py index 094ec10ca..21d0e5328 100644 --- a/playout/libretime_playout/message_handler.py +++ b/playout/libretime_playout/message_handler.py @@ -3,10 +3,11 @@ import logging from queue import Queue as ThreadQueue from signal import SIGTERM, signal from time import sleep -from typing import Any, Dict +from typing import Any, Dict, Union # For RabbitMQ from kombu.connection import Connection +from kombu.message import Message from kombu.messaging import Exchange, Queue from kombu.mixins import ConsumerMixin @@ -35,7 +36,7 @@ class MessageHandler(ConsumerMixin): Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]), ] - def on_message(self, body, message): + def on_message(self, body, message: Message): logger.debug("received message: %s", body) try: try: @@ -78,7 +79,7 @@ class MessageListener: def __init__( self, config: Config, - fetch_queue: ThreadQueue[Dict[str, Any]], + fetch_queue: ThreadQueue[Union[str, bytes]], recorder_queue: ThreadQueue[Dict[str, Any]], ) -> None: self.config = config diff --git a/playout/libretime_playout/player/events.py b/playout/libretime_playout/player/events.py index e95f01052..b601b10d2 100644 --- a/playout/libretime_playout/player/events.py +++ b/playout/libretime_playout/player/events.py @@ -1,8 +1,23 @@ +from datetime import datetime from enum import Enum -from typing import Dict, Literal, TypedDict, Union +from typing import Dict, Literal, Optional, TypedDict, Union from typing_extensions import NotRequired +EVENT_KEY_FORMAT = "%Y-%m-%d-%H-%M-%S" + + +def event_key_to_datetime(value: Union[str, datetime]) -> datetime: + if isinstance(value, datetime): + return value + return datetime.strptime(value, EVENT_KEY_FORMAT) + + +def datetime_to_event_key(value: Union[str, datetime]) -> str: + if isinstance(value, str): + return value + return value.strftime(EVENT_KEY_FORMAT) + class EventKind(str, Enum): FILE = "file" @@ -14,9 +29,9 @@ class EventKind(str, Enum): class BaseEvent(TypedDict): - # TODO: Convert start/end to datetime - start: str - end: str + # TODO: Only use datetime + start: Union[str, datetime] + end: Union[str, datetime] class FileEventMetadata(TypedDict): @@ -30,7 +45,7 @@ class FileEvent(BaseEvent): # Schedule row_id: int - uri: str + uri: Optional[str] id: int # Show data @@ -48,6 +63,11 @@ class FileEvent(BaseEvent): replay_gain: float filesize: int + # Runtime + dst: NotRequired[str] + file_ready: NotRequired[bool] + file_ext: NotRequired[str] + class WebStreamEvent(BaseEvent): type: Literal[ @@ -78,4 +98,6 @@ class ActionEvent(BaseEvent): AnyEvent = Union[FileEvent, WebStreamEvent, ActionEvent] + +FileEvents = Dict[str, FileEvent] Events = Dict[str, AnyEvent] diff --git a/playout/libretime_playout/player/fetch.py b/playout/libretime_playout/player/fetch.py index c8dee06d8..77a9824bf 100644 --- a/playout/libretime_playout/player/fetch.py +++ b/playout/libretime_playout/player/fetch.py @@ -4,11 +4,10 @@ import logging import mimetypes import os import time -from datetime import datetime from queue import Empty, Queue from subprocess import PIPE, Popen from threading import Thread, Timer -from typing import Any, Dict +from typing import Union from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v2 import ApiClient @@ -18,7 +17,7 @@ from ..config import CACHE_DIR, POLL_INTERVAL, Config from ..liquidsoap.client import LiquidsoapClient from ..liquidsoap.models import Info, StreamPreferences, StreamState from ..timeout import ls_timeout -from .events import Events +from .events import EventKind, Events, FileEvent, FileEvents, event_key_to_datetime from .liquidsoap import PypoLiquidsoap from .schedule import get_schedule @@ -31,9 +30,9 @@ class PypoFetch(Thread): def __init__( self, - fetch_queue: Queue[Dict[str, Any]], - push_queue: Queue[Dict[str, Any]], - file_queue: Queue[Dict[str, Any]], + fetch_queue: Queue[Union[str, bytes]], + push_queue: Queue[Events], + file_queue: Queue[FileEvents], liq_client: LiquidsoapClient, pypo_liquidsoap: PypoLiquidsoap, config: Config, @@ -63,15 +62,13 @@ class PypoFetch(Thread): # Handle a message from RabbitMQ, put it into our yucky global var. # Hopefully there is a better way to do this. - def handle_message(self, message): + def handle_message(self, message: Union[bytes, str]): try: logger.info("Received event from Pypo Message Handler: %s", message) - - try: + if isinstance(message, bytes): message = message.decode() - except (UnicodeDecodeError, AttributeError): - pass - m = json.loads(message) + m: dict = json.loads(message) + command = m["event_type"] logger.info("Handling command: %s", command) @@ -200,62 +197,57 @@ class PypoFetch(Thread): def process_schedule(self, events: Events): self.last_update_schedule_timestamp = time.time() logger.debug(events) - media = events - media_filtered = {} + file_events: FileEvents = {} + all_events: Events = {} # Download all the media and put playlists in liquidsoap "annotate" format try: - media_copy = {} - for key in media: - media_item = media[key] - if media_item["type"] == "file": - fileExt = self.sanity_check_media_item(media_item) - dst = os.path.join(self.cache_dir, f'{media_item["id"]}{fileExt}') - media_item["dst"] = dst - media_item["file_ready"] = False - media_filtered[key] = media_item + for key in events: + item = events[key] + if item["type"] == EventKind.FILE: + file_ext = self.sanity_check_media_item(item) + dst = os.path.join(self.cache_dir, f'{item["id"]}{file_ext}') + item["dst"] = dst + item["file_ready"] = False + file_events[key] = item - media_item["start"] = datetime.strptime( - media_item["start"], "%Y-%m-%d-%H-%M-%S" - ) - media_item["end"] = datetime.strptime( - media_item["end"], "%Y-%m-%d-%H-%M-%S" - ) - media_copy[key] = media_item + item["start"] = event_key_to_datetime(item["start"]) + item["end"] = event_key_to_datetime(item["end"]) + all_events[key] = item - self.media_prepare_queue.put(copy.copy(media_filtered)) + self.media_prepare_queue.put(copy.copy(file_events)) except Exception as exception: logger.exception(exception) # Send the data to pypo-push logger.debug("Pushing to pypo-push") - self.push_queue.put(media_copy) + self.push_queue.put(all_events) # cleanup try: - self.cache_cleanup(media) + self.cache_cleanup(events) except Exception as exception: logger.exception(exception) # do basic validation of file parameters. Useful for debugging # purposes - def sanity_check_media_item(self, media_item): - start = datetime.strptime(media_item["start"], "%Y-%m-%d-%H-%M-%S") - end = datetime.strptime(media_item["end"], "%Y-%m-%d-%H-%M-%S") + def sanity_check_media_item(self, event: FileEvent): + start = event_key_to_datetime(event["start"]) + end = event_key_to_datetime(event["end"]) - mime = media_item["metadata"]["mime"] + mime = event["metadata"]["mime"] mimetypes.init(["%s/mime.types" % os.path.dirname(os.path.realpath(__file__))]) mime_ext = mimetypes.guess_extension(mime, strict=False) length1 = (end - start).total_seconds() - length2 = media_item["cue_out"] - media_item["cue_in"] + length2 = event["cue_out"] - event["cue_in"] if abs(length2 - length1) > 1: logger.error("end - start length: %s", length1) logger.error("cue_out - cue_in length: %s", length2) logger.error("Two lengths are not equal!!!") - media_item["file_ext"] = mime_ext + event["file_ext"] = mime_ext return mime_ext @@ -265,7 +257,7 @@ class PypoFetch(Thread): out = proc.communicate()[0].strip() return bool(out) - def cache_cleanup(self, media): + def cache_cleanup(self, events: Events): """ Get list of all files in the cache dir and remove them if they aren't being used anymore. Input dict() media, lists all files that are scheduled or currently playing. Not being in this @@ -274,35 +266,35 @@ class PypoFetch(Thread): cached_file_set = set(os.listdir(self.cache_dir)) scheduled_file_set = set() - for mkey in media: - media_item = media[mkey] - if media_item["type"] == "file": - if "file_ext" not in media_item.keys(): - media_item["file_ext"] = mimetypes.guess_extension( - media_item["metadata"]["mime"], strict=False + for key in events: + item = events[key] + if item["type"] == EventKind.FILE: + if "file_ext" not in item.keys(): + item["file_ext"] = mimetypes.guess_extension( + item["metadata"]["mime"], strict=False ) - scheduled_file_set.add( - "{}{}".format(media_item["id"], media_item["file_ext"]) - ) + scheduled_file_set.add("{}{}".format(item["id"], item["file_ext"])) expired_files = cached_file_set - scheduled_file_set logger.debug("Files to remove %s", str(expired_files)) - for f in expired_files: + for expired_file in expired_files: try: - path = os.path.join(self.cache_dir, f) - logger.debug("Removing %s", path) + expired_filepath = os.path.join(self.cache_dir, expired_file) + logger.debug("Removing %s", expired_filepath) # check if this file is opened (sometimes Liquidsoap is still # playing the file due to our knowledge of the track length # being incorrect!) - if not self.is_file_opened(path): - os.remove(path) - logger.info("File '%s' removed", path) + if not self.is_file_opened(expired_filepath): + os.remove(expired_filepath) + logger.info("File '%s' removed", expired_filepath) else: - logger.info("File '%s' not removed. Still busy!", path) + logger.info("File '%s' not removed. Still busy!", expired_filepath) except Exception as exception: - logger.exception("Problem removing file '%s': %s", f, exception) + logger.exception( + "Problem removing file '%s': %s", expired_file, exception + ) def manual_schedule_fetch(self): try: diff --git a/playout/libretime_playout/player/file.py b/playout/libretime_playout/player/file.py index 9495f7d40..544ce2ca8 100644 --- a/playout/libretime_playout/player/file.py +++ b/playout/libretime_playout/player/file.py @@ -5,11 +5,13 @@ import stat import time from queue import Empty, Queue from threading import Thread -from typing import Any, Dict +from typing import Optional from libretime_api_client.v2 import ApiClient from requests.exceptions import ConnectionError, HTTPError, Timeout +from .events import FileEvent, FileEvents + logger = logging.getLogger(__name__) @@ -17,22 +19,25 @@ class PypoFile(Thread): name = "file" daemon = True + file_events_queue: Queue[FileEvents] + file_events: FileEvents + def __init__( self, - file_queue: Queue[Dict[str, Any]], + file_queue: Queue[FileEvents], api_client: ApiClient, ): Thread.__init__(self) - self.media_queue = file_queue - self.media = None + self.file_events_queue = file_queue + self.file_events = {} self.api_client = api_client - def copy_file(self, media_item): + def copy_file(self, file_event: FileEvent): """ - Copy media_item from local library directory to local cache directory. + Copy file_event from local library directory to local cache directory. """ - file_id = media_item["id"] - dst = media_item["dst"] + file_id = file_event["id"] + dst = file_event["dst"] dst_exists = True try: @@ -54,13 +59,13 @@ class PypoFile(Thread): else: do_copy = True - media_item["file_ready"] = not do_copy + file_event["file_ready"] = not do_copy if do_copy: logger.info("copying file %s to cache %s", file_id, dst) try: with open(dst, "wb") as handle: - logger.info(media_item) + logger.info(file_event) try: response = self.api_client.download_file(file_id, stream=True) for chunk in response.iter_content(chunk_size=2048): @@ -68,19 +73,19 @@ class PypoFile(Thread): except HTTPError as exception: raise RuntimeError( - f"could not download file {media_item['id']}" + f"could not download file {file_event['id']}" ) from exception # make file world readable and owner writable os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) - if media_item["filesize"] == 0: + if file_event["filesize"] == 0: file_size = self.report_file_size_and_md5_to_api( - dst, media_item["id"] + dst, file_event["id"] ) - media_item["filesize"] = file_size + file_event["filesize"] = file_size - media_item["file_ready"] = True + file_event["file_ready"] = True except Exception as exception: logger.exception( "could not copy file %s to %s: %s", @@ -89,18 +94,18 @@ class PypoFile(Thread): exception, ) - def report_file_size_and_md5_to_api(self, file_path, file_id): + def report_file_size_and_md5_to_api(self, file_path: str, file_id: int) -> int: try: file_size = os.path.getsize(file_path) with open(file_path, "rb") as fh: - m = hashlib.md5() + hasher = hashlib.md5(usedforsecurity=False) while True: data = fh.read(8192) if not data: break - m.update(data) - md5_hash = m.hexdigest() + hasher.update(data) + md5_hash = hasher.hexdigest() except OSError as exception: file_size = 0 logger.exception( @@ -123,21 +128,24 @@ class PypoFile(Thread): return file_size - def get_highest_priority_media_item(self, schedule): + def get_highest_priority_file_event( + self, + file_events: FileEvents, + ) -> Optional[FileEvent]: """ - Get highest priority media_item in the queue. Currently the highest + Get highest priority file event in the queue. Currently the highest priority is decided by how close the start time is to "now". """ - if schedule is None or len(schedule) == 0: + if file_events is None or len(file_events) == 0: return None - sorted_keys = sorted(schedule.keys()) + sorted_keys = sorted(file_events.keys()) if len(sorted_keys) == 0: return None highest_priority = sorted_keys[0] - media_item = schedule[highest_priority] + file_event = file_events[highest_priority] logger.debug("Highest priority item: %s", highest_priority) @@ -147,30 +155,30 @@ class PypoFile(Thread): # it is very possible we will have to deal with the same media_items # again. In this situation, the worst possible case is that we try to # copy the file again and realize we already have it (thus aborting the copy). - del schedule[highest_priority] + del file_events[highest_priority] - return media_item + return file_event def main(self): while True: try: - if self.media is None or len(self.media) == 0: + if self.file_events is None or len(self.file_events) == 0: # We have no schedule, so we have nothing else to do. Let's # do a blocked wait on the queue - self.media = self.media_queue.get(block=True) + self.file_events = self.file_events_queue.get(block=True) else: # We have a schedule we need to process, but we also want # to check if a newer schedule is available. In this case # do a non-blocking queue.get and in either case (we get something # or we don't), get back to work on preparing getting files. try: - self.media = self.media_queue.get_nowait() + self.file_events = self.file_events_queue.get_nowait() except Empty: pass - media_item = self.get_highest_priority_media_item(self.media) - if media_item is not None: - self.copy_file(media_item) + file_event = self.get_highest_priority_file_event(self.file_events) + if file_event is not None: + self.copy_file(file_event) except Exception as exception: logger.exception(exception) raise exception diff --git a/playout/libretime_playout/player/liquidsoap.py b/playout/libretime_playout/player/liquidsoap.py index f15d704ba..29054f258 100644 --- a/playout/libretime_playout/player/liquidsoap.py +++ b/playout/libretime_playout/player/liquidsoap.py @@ -1,10 +1,18 @@ import logging import time from datetime import datetime, timedelta +from typing import Dict, List, Optional from ..liquidsoap.client import LiquidsoapClient from ..utils import seconds_between -from .events import EventKind +from .events import ( + ActionEvent, + AnyEvent, + EventKind, + FileEvent, + WebStreamEvent, + event_key_to_datetime, +) from .liquidsoap_gateway import TelnetLiquidsoap logger = logging.getLogger(__name__) @@ -12,7 +20,7 @@ logger = logging.getLogger(__name__) class PypoLiquidsoap: def __init__(self, liq_client: LiquidsoapClient): - self.liq_queue_tracker = { + self.liq_queue_tracker: Dict[str, Optional[FileEvent]] = { "s0": None, "s1": None, "s2": None, @@ -25,7 +33,7 @@ class PypoLiquidsoap: list(self.liq_queue_tracker.keys()), ) - def play(self, media_item): + def play(self, media_item: AnyEvent): if media_item["type"] == EventKind.FILE: self.handle_file_type(media_item) elif media_item["type"] == EventKind.ACTION: @@ -37,8 +45,9 @@ class PypoLiquidsoap: media_item["row_id"] != self.telnet_liquidsoap.current_prebuffering_stream_id ): - # this is called if the stream wasn't scheduled sufficiently ahead of time - # so that the prebuffering stage could take effect. Let's do the prebuffering now. + # this is called if the stream wasn't scheduled sufficiently ahead of + # time so that the prebuffering stage could take effect. Let's do the + # prebuffering now. self.telnet_liquidsoap.start_web_stream_buffer(media_item) self.telnet_liquidsoap.start_web_stream(media_item) elif media_item["type"] == EventKind.WEB_STREAM_BUFFER_END: @@ -48,17 +57,17 @@ class PypoLiquidsoap: else: raise UnknownMediaItemType(str(media_item)) - def handle_file_type(self, media_item): + def handle_file_type(self, media_item: FileEvent): """ Wait 200 seconds (2000 iterations) for file to become ready, otherwise give up on it. """ iter_num = 0 - while not media_item["file_ready"] and iter_num < 2000: + while not media_item.get("file_ready", False) and iter_num < 2000: time.sleep(0.1) iter_num += 1 - if media_item["file_ready"]: + if media_item.get("file_ready", False): available_queue = self.find_available_queue() try: @@ -73,19 +82,18 @@ class PypoLiquidsoap: media_item["dst"], ) - def handle_event_type(self, media_item): + def handle_event_type(self, media_item: ActionEvent): if media_item["event_type"] == "kick_out": self.telnet_liquidsoap.disconnect_source("live_dj") elif media_item["event_type"] == "switch_off": self.telnet_liquidsoap.switch_source("live_dj", "off") - def is_media_item_finished(self, media_item): + def is_media_item_finished(self, media_item: Optional[AnyEvent]): if media_item is None: return True - else: - return datetime.utcnow() > media_item["end"] + return datetime.utcnow() > event_key_to_datetime(media_item["end"]) - def find_available_queue(self): + def find_available_queue(self) -> str: available_queue = None for queue_id, item in self.liq_queue_tracker.items(): if item is None or self.is_media_item_finished(item): @@ -97,7 +105,7 @@ class PypoLiquidsoap: return available_queue - def verify_correct_present_media(self, scheduled_now): + def verify_correct_present_media(self, scheduled_now: List[AnyEvent]): """ verify whether Liquidsoap is currently playing the correct files. if we find an item that Liquidsoap is not playing, then push it @@ -121,47 +129,51 @@ class PypoLiquidsoap: """ try: - scheduled_now_files = [ + scheduled_now_files: List[FileEvent] = [ x for x in scheduled_now if x["type"] == EventKind.FILE ] - scheduled_now_webstream = [ + scheduled_now_webstream: List[WebStreamEvent] = [ x for x in scheduled_now - if x["type"] in (EventKind.WEB_STREAM_OUTPUT_START) + if x["type"] == EventKind.WEB_STREAM_OUTPUT_START ] schedule_ids = {x["row_id"] for x in scheduled_now_files} row_id_map = {} liq_queue_ids = set() - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if not self.is_media_item_finished(mi): - liq_queue_ids.add(mi["row_id"]) - row_id_map[mi["row_id"]] = mi + for queue_id in self.liq_queue_tracker: + queue_item = self.liq_queue_tracker[queue_id] + if queue_item is not None and not self.is_media_item_finished( + queue_item + ): + liq_queue_ids.add(queue_item["row_id"]) + row_id_map[queue_item["row_id"]] = queue_item to_be_removed = set() to_be_added = set() # Iterate over the new files, and compare them to currently scheduled # tracks. If already in liquidsoap queue still need to make sure they don't - # have different attributes - # if replay gain changes, it shouldn't change the amplification of the currently playing song - for i in scheduled_now_files: - if i["row_id"] in row_id_map: - mi = row_id_map[i["row_id"]] + # have different attributes. Ff replay gain changes, it shouldn't change the + # amplification of the currently playing song + for item in scheduled_now_files: + if item["row_id"] in row_id_map: + queue_item = row_id_map[item["row_id"]] + assert queue_item is not None + correct = ( - mi["start"] == i["start"] - and mi["end"] == i["end"] - and mi["row_id"] == i["row_id"] + queue_item["start"] == item["start"] + and queue_item["end"] == item["end"] + and queue_item["row_id"] == item["row_id"] ) if not correct: # need to re-add - logger.info("Track %s found to have new attr.", i) - to_be_removed.add(i["row_id"]) - to_be_added.add(i["row_id"]) + logger.info("Track %s found to have new attr.", item) + to_be_removed.add(item["row_id"]) + to_be_added.add(item["row_id"]) to_be_removed.update(liq_queue_ids - schedule_ids) to_be_added.update(schedule_ids - liq_queue_ids) @@ -170,21 +182,24 @@ class PypoLiquidsoap: logger.info("Need to remove items from Liquidsoap: %s", to_be_removed) # remove files from Liquidsoap's queue - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if mi is not None and mi["row_id"] in to_be_removed: - self.stop(i) + for queue_id in self.liq_queue_tracker: + queue_item = self.liq_queue_tracker[queue_id] + if queue_item is not None and queue_item["row_id"] in to_be_removed: + self.stop(queue_id) if to_be_added: logger.info("Need to add items to Liquidsoap *now*: %s", to_be_added) - for i in scheduled_now_files: - if i["row_id"] in to_be_added: - self.modify_cue_point(i) - self.play(i) + for item in scheduled_now_files: + if item["row_id"] in to_be_added: + self.modify_cue_point(item) + self.play(item) # handle webstreams current_stream_id = self.telnet_liquidsoap.get_current_stream_id() + if current_stream_id is None: + current_stream_id = "-1" + logger.debug("scheduled now webstream: %s", scheduled_now_webstream) if scheduled_now_webstream: if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]): @@ -196,21 +211,24 @@ class PypoLiquidsoap: except KeyError as exception: logger.exception("Malformed event in schedule: %s", exception) - def stop(self, queue): - self.telnet_liquidsoap.queue_remove(queue) - self.liq_queue_tracker[queue] = None + def stop(self, queue_id: str): + self.telnet_liquidsoap.queue_remove(queue_id) + self.liq_queue_tracker[queue_id] = None - def is_file(self, media_item): - return media_item["type"] == EventKind.FILE + def is_file(self, event: AnyEvent): + return event["type"] == EventKind.FILE def clear_queue_tracker(self): - for i in self.liq_queue_tracker.keys(): - self.liq_queue_tracker[i] = None + for queue_id in self.liq_queue_tracker: + self.liq_queue_tracker[queue_id] = None - def modify_cue_point(self, link): + def modify_cue_point(self, link: FileEvent): assert self.is_file(link) - lateness = seconds_between(link["start"], datetime.utcnow()) + lateness = seconds_between( + event_key_to_datetime(link["start"]), + datetime.utcnow(), + ) if lateness > 0: logger.debug("media item was supposed to start %ss ago", lateness) diff --git a/playout/libretime_playout/player/liquidsoap_gateway.py b/playout/libretime_playout/player/liquidsoap_gateway.py index 597249db4..993362fa2 100644 --- a/playout/libretime_playout/player/liquidsoap_gateway.py +++ b/playout/libretime_playout/player/liquidsoap_gateway.py @@ -3,44 +3,41 @@ from typing import List from ..liquidsoap.client import LiquidsoapClient from ..timeout import ls_timeout +from .events import FileEvent logger = logging.getLogger(__name__) -def create_liquidsoap_annotation(media): +def create_liquidsoap_annotation(file_event: FileEvent) -> str: # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. - - filename = media["dst"] - annotation = ( - 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' - + 'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' - + 'schedule_table_id="%s",replay_gain="%s dB"' - ) % ( - media["id"], - float(media["fade_in"]) / 1000, - float(media["fade_out"]) / 1000, - float(media["cue_in"]), - float(media["cue_out"]), - media["row_id"], - media["replay_gain"], - ) + annotations = { + "media_id": file_event["id"], + "liq_start_next": "0", + "liq_fade_in": float(file_event["fade_in"]) / 1000, + "liq_fade_out": float(file_event["fade_out"]) / 1000, + "liq_cue_in": float(file_event["cue_in"]), + "liq_cue_out": float(file_event["cue_out"]), + "schedule_table_id": file_event["row_id"], + "replay_gain": f"{file_event['replay_gain']} dB", + } # Override the the artist/title that Liquidsoap extracts from a file's metadata # with the metadata we get from Airtime. (You can modify metadata in Airtime's library, # which doesn't get saved back to the file.) - if "metadata" in media: - if "artist_name" in media["metadata"]: - artist_name = media["metadata"]["artist_name"] - if isinstance(artist_name, str): - annotation += ',artist="%s"' % (artist_name.replace('"', '\\"')) - if "track_title" in media["metadata"]: - track_title = media["metadata"]["track_title"] - if isinstance(track_title, str): - annotation += ',title="%s"' % (track_title.replace('"', '\\"')) + if "metadata" in file_event: + if "artist_name" in file_event["metadata"]: + artist_name = file_event["metadata"]["artist_name"] + if artist_name: + annotations["artist"] = artist_name.replace('"', '\\"') - annotation += ":" + filename + if "track_title" in file_event["metadata"]: + track_title = file_event["metadata"]["track_title"] + if track_title: + annotations["title"] = track_title.replace('"', '\\"') - return annotation + annotations_str = ",".join(f'{key}="{value}"' for key, value in annotations.items()) + + return "annotate:" + annotations_str + ":" + file_event["dst"] class TelnetLiquidsoap: diff --git a/playout/libretime_playout/player/push.py b/playout/libretime_playout/player/push.py index 7d1beb132..43c0ac506 100644 --- a/playout/libretime_playout/player/push.py +++ b/playout/libretime_playout/player/push.py @@ -4,9 +4,10 @@ import time from datetime import datetime from queue import Queue from threading import Thread -from typing import Any, Dict +from typing import List, Tuple from ..config import PUSH_INTERVAL, Config +from .events import AnyEvent, EventKind, Events, event_key_to_datetime from .liquidsoap import PypoLiquidsoap from .queue import PypoLiqQueue @@ -27,7 +28,7 @@ class PypoPush(Thread): def __init__( self, - push_queue: Queue[Dict[str, Any]], + push_queue: Queue[Events], pypo_liquidsoap: PypoLiquidsoap, config: Config, ): @@ -36,11 +37,7 @@ class PypoPush(Thread): self.config = config - self.pushed_objects = {} - self.current_prebuffering_stream_id = None - self.queue_id = 0 - - self.future_scheduled_queue: Queue[Dict[str, Any]] = Queue() + self.future_scheduled_queue: Queue[Events] = Queue() self.pypo_liquidsoap = pypo_liquidsoap self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap) @@ -50,20 +47,20 @@ class PypoPush(Thread): loops = 0 heartbeat_period = math.floor(30 / PUSH_INTERVAL) - media_schedule = None + events = None while True: try: - media_schedule = self.queue.get(block=True) + events = self.queue.get(block=True) except Exception as exception: logger.exception(exception) raise exception else: - logger.debug(media_schedule) + logger.debug(events) # separate media_schedule list into currently_playing and # scheduled_for_future lists currently_playing, scheduled_for_future = self.separate_present_future( - media_schedule + events ) self.pypo_liquidsoap.verify_correct_present_media(currently_playing) @@ -74,29 +71,31 @@ class PypoPush(Thread): loops = 0 loops += 1 - def separate_present_future(self, media_schedule): - tnow = datetime.utcnow() + def separate_present_future(self, events: Events) -> Tuple[List[AnyEvent], Events]: + now = datetime.utcnow() - present = [] - future = {} + present: List[AnyEvent] = [] + future: Events = {} - sorted_keys = sorted(media_schedule.keys()) - for mkey in sorted_keys: - media_item = media_schedule[mkey] + for key in sorted(events.keys()): + item = events[key] # Ignore track that already ended - if media_item["type"] == "file" and media_item["end"] < tnow: - logger.debug("ignoring ended media_item: %s", media_item) + if ( + item["type"] == EventKind.FILE + and event_key_to_datetime(item["end"]) < now + ): + logger.debug("ignoring ended media_item: %s", item) continue - diff_sec = (tnow - media_item["start"]).total_seconds() + diff_sec = (now - event_key_to_datetime(item["start"])).total_seconds() if diff_sec >= 0: - logger.debug("adding media_item to present: %s", media_item) - present.append(media_item) + logger.debug("adding media_item to present: %s", item) + present.append(item) else: - logger.debug("adding media_item to future: %s", media_item) - future[mkey] = media_item + logger.debug("adding media_item to future: %s", item) + future[key] = item return present, future diff --git a/playout/libretime_playout/player/schedule.py b/playout/libretime_playout/player/schedule.py index 8368ad5a0..126cbb20b 100644 --- a/playout/libretime_playout/player/schedule.py +++ b/playout/libretime_playout/player/schedule.py @@ -1,19 +1,21 @@ from datetime import datetime, time, timedelta from operator import itemgetter -from typing import Dict, Literal +from typing import Dict from dateutil.parser import isoparse from libretime_api_client.v2 import ApiClient from libretime_shared.datetime import time_in_milliseconds, time_in_seconds from ..liquidsoap.models import StreamPreferences -from .events import ActionEvent, AnyEvent, EventKind, Events, FileEvent, WebStreamEvent - -EVENT_KEY_FORMAT = "%Y-%m-%d-%H-%M-%S" - - -def datetime_to_event_key(value: datetime) -> str: - return value.strftime(EVENT_KEY_FORMAT) +from .events import ( + ActionEvent, + AnyEvent, + EventKind, + Events, + FileEvent, + WebStreamEvent, + datetime_to_event_key, +) def insert_event(events: Events, event_key: str, event: AnyEvent): diff --git a/playout/tests/player/liquidsoap_gateway_test.py b/playout/tests/player/liquidsoap_gateway_test.py new file mode 100644 index 000000000..58ad41a18 --- /dev/null +++ b/playout/tests/player/liquidsoap_gateway_test.py @@ -0,0 +1,33 @@ +from libretime_playout.player.events import EventKind, FileEvent +from libretime_playout.player.liquidsoap_gateway import create_liquidsoap_annotation + + +def test_create_liquidsoap_annotation(): + file_event: FileEvent = { + "type": EventKind.FILE, + "row_id": 1, + "start": "2022-09-05-11-00-00", + "end": "2022-09-05-11-05-02", + "uri": None, + "id": 2, + "show_name": "Show 1", + "fade_in": 500.0, + "fade_out": 500.0, + "cue_in": 13.7008, + "cue_out": 315.845, + "metadata": { + "track_title": 'My Friend the "Forest"', + "artist_name": "Nils Frahm", + "mime": "audio/flac", + }, + "replay_gain": "11.46", + "filesize": 10000, + "dst": "fake/path.flac", + } + + assert create_liquidsoap_annotation(file_event) == ( + """annotate:media_id="2",liq_start_next="0",liq_fade_in="0.5",""" + """liq_fade_out="0.5",liq_cue_in="13.7008",liq_cue_out="315.845",""" + """schedule_table_id="1",replay_gain="11.46 dB",artist="Nils Frahm",""" + """title="My Friend the \\"Forest\\"":fake/path.flac""" + )