refactor(playout): add typings and fix linting errors

move EVENT_KEY_FORMAT to events module
properly type fetch queue
event start/end can be str or datetime
This commit is contained in:
jo 2023-02-26 00:11:49 +01:00 committed by Jonas L
parent 3fba7c73d3
commit e88e843b65
11 changed files with 286 additions and 213 deletions

View File

@ -12,8 +12,8 @@ from pydantic import BaseModel
CACHE_DIR = Path.cwd() / "scheduler" CACHE_DIR = Path.cwd() / "scheduler"
RECORD_DIR = Path.cwd() / "recorder" RECORD_DIR = Path.cwd() / "recorder"
PUSH_INTERVAL = 2 PUSH_INTERVAL: float = 2.0
POLL_INTERVAL = 400 POLL_INTERVAL: float = 400.0
class PlayoutConfig(BaseModel): class PlayoutConfig(BaseModel):

View File

@ -8,7 +8,7 @@ import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Union
import click import click
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
@ -22,6 +22,7 @@ from .history.stats import StatsCollectorThread
from .liquidsoap.client import LiquidsoapClient from .liquidsoap.client import LiquidsoapClient
from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION
from .message_handler import MessageListener from .message_handler import MessageListener
from .player.events import Events, FileEvents
from .player.fetch import PypoFetch from .player.fetch import PypoFetch
from .player.file import PypoFile from .player.file import PypoFile
from .player.liquidsoap import PypoLiquidsoap from .player.liquidsoap import PypoLiquidsoap
@ -87,14 +88,14 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
if not LIQUIDSOAP_MIN_VERSION <= liq_version: if not LIQUIDSOAP_MIN_VERSION <= liq_version:
raise RuntimeError(f"Invalid liquidsoap version {liq_version}") raise RuntimeError(f"Invalid liquidsoap version {liq_version}")
fetch_queue: Queue[Dict[str, Any]] = Queue() fetch_queue: Queue[Union[str, bytes]] = Queue()
recorder_queue: Queue[Dict[str, Any]] = Queue() recorder_queue: Queue[Dict[str, Any]] = Queue()
push_queue: Queue[Dict[str, Any]] = Queue() push_queue: Queue[Events] = Queue()
# This queue is shared between pypo-fetch and pypo-file, where pypo-file # This queue is shared between pypo-fetch and pypo-file, where pypo-file
# is the consumer. Pypo-fetch will send every schedule it gets to pypo-file # is the consumer. Pypo-fetch will send every schedule it gets to pypo-file
# and pypo will parse this schedule to determine which file has the highest # and pypo will parse this schedule to determine which file has the highest
# priority, and retrieve it. # priority, and retrieve it.
file_queue: Queue[Dict[str, Any]] = Queue() file_queue: Queue[FileEvents] = Queue()
pypo_liquidsoap = PypoLiquidsoap(liq_client) pypo_liquidsoap = PypoLiquidsoap(liq_client)

View File

@ -3,10 +3,11 @@ import logging
from queue import Queue as ThreadQueue from queue import Queue as ThreadQueue
from signal import SIGTERM, signal from signal import SIGTERM, signal
from time import sleep from time import sleep
from typing import Any, Dict from typing import Any, Dict, Union
# For RabbitMQ # For RabbitMQ
from kombu.connection import Connection from kombu.connection import Connection
from kombu.message import Message
from kombu.messaging import Exchange, Queue from kombu.messaging import Exchange, Queue
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
@ -35,7 +36,7 @@ class MessageHandler(ConsumerMixin):
Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]), Consumer(queues, callbacks=[self.on_message], accept=["text/plain"]),
] ]
def on_message(self, body, message): def on_message(self, body, message: Message):
logger.debug("received message: %s", body) logger.debug("received message: %s", body)
try: try:
try: try:
@ -78,7 +79,7 @@ class MessageListener:
def __init__( def __init__(
self, self,
config: Config, config: Config,
fetch_queue: ThreadQueue[Dict[str, Any]], fetch_queue: ThreadQueue[Union[str, bytes]],
recorder_queue: ThreadQueue[Dict[str, Any]], recorder_queue: ThreadQueue[Dict[str, Any]],
) -> None: ) -> None:
self.config = config self.config = config

View File

@ -1,8 +1,23 @@
from datetime import datetime
from enum import Enum from enum import Enum
from typing import Dict, Literal, TypedDict, Union from typing import Dict, Literal, Optional, TypedDict, Union
from typing_extensions import NotRequired from typing_extensions import NotRequired
EVENT_KEY_FORMAT = "%Y-%m-%d-%H-%M-%S"
def event_key_to_datetime(value: Union[str, datetime]) -> datetime:
if isinstance(value, datetime):
return value
return datetime.strptime(value, EVENT_KEY_FORMAT)
def datetime_to_event_key(value: Union[str, datetime]) -> str:
if isinstance(value, str):
return value
return value.strftime(EVENT_KEY_FORMAT)
class EventKind(str, Enum): class EventKind(str, Enum):
FILE = "file" FILE = "file"
@ -14,9 +29,9 @@ class EventKind(str, Enum):
class BaseEvent(TypedDict): class BaseEvent(TypedDict):
# TODO: Convert start/end to datetime # TODO: Only use datetime
start: str start: Union[str, datetime]
end: str end: Union[str, datetime]
class FileEventMetadata(TypedDict): class FileEventMetadata(TypedDict):
@ -30,7 +45,7 @@ class FileEvent(BaseEvent):
# Schedule # Schedule
row_id: int row_id: int
uri: str uri: Optional[str]
id: int id: int
# Show data # Show data
@ -48,6 +63,11 @@ class FileEvent(BaseEvent):
replay_gain: float replay_gain: float
filesize: int filesize: int
# Runtime
dst: NotRequired[str]
file_ready: NotRequired[bool]
file_ext: NotRequired[str]
class WebStreamEvent(BaseEvent): class WebStreamEvent(BaseEvent):
type: Literal[ type: Literal[
@ -78,4 +98,6 @@ class ActionEvent(BaseEvent):
AnyEvent = Union[FileEvent, WebStreamEvent, ActionEvent] AnyEvent = Union[FileEvent, WebStreamEvent, ActionEvent]
FileEvents = Dict[str, FileEvent]
Events = Dict[str, AnyEvent] Events = Dict[str, AnyEvent]

View File

@ -4,11 +4,10 @@ import logging
import mimetypes import mimetypes
import os import os
import time import time
from datetime import datetime
from queue import Empty, Queue from queue import Empty, Queue
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from threading import Thread, Timer from threading import Thread, Timer
from typing import Any, Dict from typing import Union
from libretime_api_client.v1 import ApiClient as LegacyClient from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient from libretime_api_client.v2 import ApiClient
@ -18,7 +17,7 @@ from ..config import CACHE_DIR, POLL_INTERVAL, Config
from ..liquidsoap.client import LiquidsoapClient from ..liquidsoap.client import LiquidsoapClient
from ..liquidsoap.models import Info, StreamPreferences, StreamState from ..liquidsoap.models import Info, StreamPreferences, StreamState
from ..timeout import ls_timeout from ..timeout import ls_timeout
from .events import Events from .events import EventKind, Events, FileEvent, FileEvents, event_key_to_datetime
from .liquidsoap import PypoLiquidsoap from .liquidsoap import PypoLiquidsoap
from .schedule import get_schedule from .schedule import get_schedule
@ -31,9 +30,9 @@ class PypoFetch(Thread):
def __init__( def __init__(
self, self,
fetch_queue: Queue[Dict[str, Any]], fetch_queue: Queue[Union[str, bytes]],
push_queue: Queue[Dict[str, Any]], push_queue: Queue[Events],
file_queue: Queue[Dict[str, Any]], file_queue: Queue[FileEvents],
liq_client: LiquidsoapClient, liq_client: LiquidsoapClient,
pypo_liquidsoap: PypoLiquidsoap, pypo_liquidsoap: PypoLiquidsoap,
config: Config, config: Config,
@ -63,15 +62,13 @@ class PypoFetch(Thread):
# Handle a message from RabbitMQ, put it into our yucky global var. # Handle a message from RabbitMQ, put it into our yucky global var.
# Hopefully there is a better way to do this. # Hopefully there is a better way to do this.
def handle_message(self, message): def handle_message(self, message: Union[bytes, str]):
try: try:
logger.info("Received event from Pypo Message Handler: %s", message) logger.info("Received event from Pypo Message Handler: %s", message)
if isinstance(message, bytes):
try:
message = message.decode() message = message.decode()
except (UnicodeDecodeError, AttributeError): m: dict = json.loads(message)
pass
m = json.loads(message)
command = m["event_type"] command = m["event_type"]
logger.info("Handling command: %s", command) logger.info("Handling command: %s", command)
@ -200,62 +197,57 @@ class PypoFetch(Thread):
def process_schedule(self, events: Events): def process_schedule(self, events: Events):
self.last_update_schedule_timestamp = time.time() self.last_update_schedule_timestamp = time.time()
logger.debug(events) logger.debug(events)
media = events file_events: FileEvents = {}
media_filtered = {} all_events: Events = {}
# Download all the media and put playlists in liquidsoap "annotate" format # Download all the media and put playlists in liquidsoap "annotate" format
try: try:
media_copy = {} for key in events:
for key in media: item = events[key]
media_item = media[key] if item["type"] == EventKind.FILE:
if media_item["type"] == "file": file_ext = self.sanity_check_media_item(item)
fileExt = self.sanity_check_media_item(media_item) dst = os.path.join(self.cache_dir, f'{item["id"]}{file_ext}')
dst = os.path.join(self.cache_dir, f'{media_item["id"]}{fileExt}') item["dst"] = dst
media_item["dst"] = dst item["file_ready"] = False
media_item["file_ready"] = False file_events[key] = item
media_filtered[key] = media_item
media_item["start"] = datetime.strptime( item["start"] = event_key_to_datetime(item["start"])
media_item["start"], "%Y-%m-%d-%H-%M-%S" item["end"] = event_key_to_datetime(item["end"])
) all_events[key] = item
media_item["end"] = datetime.strptime(
media_item["end"], "%Y-%m-%d-%H-%M-%S"
)
media_copy[key] = media_item
self.media_prepare_queue.put(copy.copy(media_filtered)) self.media_prepare_queue.put(copy.copy(file_events))
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
# Send the data to pypo-push # Send the data to pypo-push
logger.debug("Pushing to pypo-push") logger.debug("Pushing to pypo-push")
self.push_queue.put(media_copy) self.push_queue.put(all_events)
# cleanup # cleanup
try: try:
self.cache_cleanup(media) self.cache_cleanup(events)
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
# do basic validation of file parameters. Useful for debugging # do basic validation of file parameters. Useful for debugging
# purposes # purposes
def sanity_check_media_item(self, media_item): def sanity_check_media_item(self, event: FileEvent):
start = datetime.strptime(media_item["start"], "%Y-%m-%d-%H-%M-%S") start = event_key_to_datetime(event["start"])
end = datetime.strptime(media_item["end"], "%Y-%m-%d-%H-%M-%S") end = event_key_to_datetime(event["end"])
mime = media_item["metadata"]["mime"] mime = event["metadata"]["mime"]
mimetypes.init(["%s/mime.types" % os.path.dirname(os.path.realpath(__file__))]) mimetypes.init(["%s/mime.types" % os.path.dirname(os.path.realpath(__file__))])
mime_ext = mimetypes.guess_extension(mime, strict=False) mime_ext = mimetypes.guess_extension(mime, strict=False)
length1 = (end - start).total_seconds() length1 = (end - start).total_seconds()
length2 = media_item["cue_out"] - media_item["cue_in"] length2 = event["cue_out"] - event["cue_in"]
if abs(length2 - length1) > 1: if abs(length2 - length1) > 1:
logger.error("end - start length: %s", length1) logger.error("end - start length: %s", length1)
logger.error("cue_out - cue_in length: %s", length2) logger.error("cue_out - cue_in length: %s", length2)
logger.error("Two lengths are not equal!!!") logger.error("Two lengths are not equal!!!")
media_item["file_ext"] = mime_ext event["file_ext"] = mime_ext
return mime_ext return mime_ext
@ -265,7 +257,7 @@ class PypoFetch(Thread):
out = proc.communicate()[0].strip() out = proc.communicate()[0].strip()
return bool(out) return bool(out)
def cache_cleanup(self, media): def cache_cleanup(self, events: Events):
""" """
Get list of all files in the cache dir and remove them if they aren't being used anymore. Get list of all files in the cache dir and remove them if they aren't being used anymore.
Input dict() media, lists all files that are scheduled or currently playing. Not being in this Input dict() media, lists all files that are scheduled or currently playing. Not being in this
@ -274,35 +266,35 @@ class PypoFetch(Thread):
cached_file_set = set(os.listdir(self.cache_dir)) cached_file_set = set(os.listdir(self.cache_dir))
scheduled_file_set = set() scheduled_file_set = set()
for mkey in media: for key in events:
media_item = media[mkey] item = events[key]
if media_item["type"] == "file": if item["type"] == EventKind.FILE:
if "file_ext" not in media_item.keys(): if "file_ext" not in item.keys():
media_item["file_ext"] = mimetypes.guess_extension( item["file_ext"] = mimetypes.guess_extension(
media_item["metadata"]["mime"], strict=False item["metadata"]["mime"], strict=False
) )
scheduled_file_set.add( scheduled_file_set.add("{}{}".format(item["id"], item["file_ext"]))
"{}{}".format(media_item["id"], media_item["file_ext"])
)
expired_files = cached_file_set - scheduled_file_set expired_files = cached_file_set - scheduled_file_set
logger.debug("Files to remove %s", str(expired_files)) logger.debug("Files to remove %s", str(expired_files))
for f in expired_files: for expired_file in expired_files:
try: try:
path = os.path.join(self.cache_dir, f) expired_filepath = os.path.join(self.cache_dir, expired_file)
logger.debug("Removing %s", path) logger.debug("Removing %s", expired_filepath)
# check if this file is opened (sometimes Liquidsoap is still # check if this file is opened (sometimes Liquidsoap is still
# playing the file due to our knowledge of the track length # playing the file due to our knowledge of the track length
# being incorrect!) # being incorrect!)
if not self.is_file_opened(path): if not self.is_file_opened(expired_filepath):
os.remove(path) os.remove(expired_filepath)
logger.info("File '%s' removed", path) logger.info("File '%s' removed", expired_filepath)
else: else:
logger.info("File '%s' not removed. Still busy!", path) logger.info("File '%s' not removed. Still busy!", expired_filepath)
except Exception as exception: except Exception as exception:
logger.exception("Problem removing file '%s': %s", f, exception) logger.exception(
"Problem removing file '%s': %s", expired_file, exception
)
def manual_schedule_fetch(self): def manual_schedule_fetch(self):
try: try:

View File

@ -5,11 +5,13 @@ import stat
import time import time
from queue import Empty, Queue from queue import Empty, Queue
from threading import Thread from threading import Thread
from typing import Any, Dict from typing import Optional
from libretime_api_client.v2 import ApiClient from libretime_api_client.v2 import ApiClient
from requests.exceptions import ConnectionError, HTTPError, Timeout from requests.exceptions import ConnectionError, HTTPError, Timeout
from .events import FileEvent, FileEvents
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,22 +19,25 @@ class PypoFile(Thread):
name = "file" name = "file"
daemon = True daemon = True
file_events_queue: Queue[FileEvents]
file_events: FileEvents
def __init__( def __init__(
self, self,
file_queue: Queue[Dict[str, Any]], file_queue: Queue[FileEvents],
api_client: ApiClient, api_client: ApiClient,
): ):
Thread.__init__(self) Thread.__init__(self)
self.media_queue = file_queue self.file_events_queue = file_queue
self.media = None self.file_events = {}
self.api_client = api_client self.api_client = api_client
def copy_file(self, media_item): def copy_file(self, file_event: FileEvent):
""" """
Copy media_item from local library directory to local cache directory. Copy file_event from local library directory to local cache directory.
""" """
file_id = media_item["id"] file_id = file_event["id"]
dst = media_item["dst"] dst = file_event["dst"]
dst_exists = True dst_exists = True
try: try:
@ -54,13 +59,13 @@ class PypoFile(Thread):
else: else:
do_copy = True do_copy = True
media_item["file_ready"] = not do_copy file_event["file_ready"] = not do_copy
if do_copy: if do_copy:
logger.info("copying file %s to cache %s", file_id, dst) logger.info("copying file %s to cache %s", file_id, dst)
try: try:
with open(dst, "wb") as handle: with open(dst, "wb") as handle:
logger.info(media_item) logger.info(file_event)
try: try:
response = self.api_client.download_file(file_id, stream=True) response = self.api_client.download_file(file_id, stream=True)
for chunk in response.iter_content(chunk_size=2048): for chunk in response.iter_content(chunk_size=2048):
@ -68,19 +73,19 @@ class PypoFile(Thread):
except HTTPError as exception: except HTTPError as exception:
raise RuntimeError( raise RuntimeError(
f"could not download file {media_item['id']}" f"could not download file {file_event['id']}"
) from exception ) from exception
# make file world readable and owner writable # make file world readable and owner writable
os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if media_item["filesize"] == 0: if file_event["filesize"] == 0:
file_size = self.report_file_size_and_md5_to_api( file_size = self.report_file_size_and_md5_to_api(
dst, media_item["id"] dst, file_event["id"]
) )
media_item["filesize"] = file_size file_event["filesize"] = file_size
media_item["file_ready"] = True file_event["file_ready"] = True
except Exception as exception: except Exception as exception:
logger.exception( logger.exception(
"could not copy file %s to %s: %s", "could not copy file %s to %s: %s",
@ -89,18 +94,18 @@ class PypoFile(Thread):
exception, exception,
) )
def report_file_size_and_md5_to_api(self, file_path, file_id): def report_file_size_and_md5_to_api(self, file_path: str, file_id: int) -> int:
try: try:
file_size = os.path.getsize(file_path) file_size = os.path.getsize(file_path)
with open(file_path, "rb") as fh: with open(file_path, "rb") as fh:
m = hashlib.md5() hasher = hashlib.md5(usedforsecurity=False)
while True: while True:
data = fh.read(8192) data = fh.read(8192)
if not data: if not data:
break break
m.update(data) hasher.update(data)
md5_hash = m.hexdigest() md5_hash = hasher.hexdigest()
except OSError as exception: except OSError as exception:
file_size = 0 file_size = 0
logger.exception( logger.exception(
@ -123,21 +128,24 @@ class PypoFile(Thread):
return file_size return file_size
def get_highest_priority_media_item(self, schedule): def get_highest_priority_file_event(
self,
file_events: FileEvents,
) -> Optional[FileEvent]:
""" """
Get highest priority media_item in the queue. Currently the highest Get highest priority file event in the queue. Currently the highest
priority is decided by how close the start time is to "now". priority is decided by how close the start time is to "now".
""" """
if schedule is None or len(schedule) == 0: if file_events is None or len(file_events) == 0:
return None return None
sorted_keys = sorted(schedule.keys()) sorted_keys = sorted(file_events.keys())
if len(sorted_keys) == 0: if len(sorted_keys) == 0:
return None return None
highest_priority = sorted_keys[0] highest_priority = sorted_keys[0]
media_item = schedule[highest_priority] file_event = file_events[highest_priority]
logger.debug("Highest priority item: %s", highest_priority) logger.debug("Highest priority item: %s", highest_priority)
@ -147,30 +155,30 @@ class PypoFile(Thread):
# it is very possible we will have to deal with the same media_items # it is very possible we will have to deal with the same media_items
# again. In this situation, the worst possible case is that we try to # again. In this situation, the worst possible case is that we try to
# copy the file again and realize we already have it (thus aborting the copy). # copy the file again and realize we already have it (thus aborting the copy).
del schedule[highest_priority] del file_events[highest_priority]
return media_item return file_event
def main(self): def main(self):
while True: while True:
try: try:
if self.media is None or len(self.media) == 0: if self.file_events is None or len(self.file_events) == 0:
# We have no schedule, so we have nothing else to do. Let's # We have no schedule, so we have nothing else to do. Let's
# do a blocked wait on the queue # do a blocked wait on the queue
self.media = self.media_queue.get(block=True) self.file_events = self.file_events_queue.get(block=True)
else: else:
# We have a schedule we need to process, but we also want # We have a schedule we need to process, but we also want
# to check if a newer schedule is available. In this case # to check if a newer schedule is available. In this case
# do a non-blocking queue.get and in either case (we get something # do a non-blocking queue.get and in either case (we get something
# or we don't), get back to work on preparing getting files. # or we don't), get back to work on preparing getting files.
try: try:
self.media = self.media_queue.get_nowait() self.file_events = self.file_events_queue.get_nowait()
except Empty: except Empty:
pass pass
media_item = self.get_highest_priority_media_item(self.media) file_event = self.get_highest_priority_file_event(self.file_events)
if media_item is not None: if file_event is not None:
self.copy_file(media_item) self.copy_file(file_event)
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
raise exception raise exception

View File

@ -1,10 +1,18 @@
import logging import logging
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Optional
from ..liquidsoap.client import LiquidsoapClient from ..liquidsoap.client import LiquidsoapClient
from ..utils import seconds_between from ..utils import seconds_between
from .events import EventKind from .events import (
ActionEvent,
AnyEvent,
EventKind,
FileEvent,
WebStreamEvent,
event_key_to_datetime,
)
from .liquidsoap_gateway import TelnetLiquidsoap from .liquidsoap_gateway import TelnetLiquidsoap
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -12,7 +20,7 @@ logger = logging.getLogger(__name__)
class PypoLiquidsoap: class PypoLiquidsoap:
def __init__(self, liq_client: LiquidsoapClient): def __init__(self, liq_client: LiquidsoapClient):
self.liq_queue_tracker = { self.liq_queue_tracker: Dict[str, Optional[FileEvent]] = {
"s0": None, "s0": None,
"s1": None, "s1": None,
"s2": None, "s2": None,
@ -25,7 +33,7 @@ class PypoLiquidsoap:
list(self.liq_queue_tracker.keys()), list(self.liq_queue_tracker.keys()),
) )
def play(self, media_item): def play(self, media_item: AnyEvent):
if media_item["type"] == EventKind.FILE: if media_item["type"] == EventKind.FILE:
self.handle_file_type(media_item) self.handle_file_type(media_item)
elif media_item["type"] == EventKind.ACTION: elif media_item["type"] == EventKind.ACTION:
@ -37,8 +45,9 @@ class PypoLiquidsoap:
media_item["row_id"] media_item["row_id"]
!= self.telnet_liquidsoap.current_prebuffering_stream_id != self.telnet_liquidsoap.current_prebuffering_stream_id
): ):
# this is called if the stream wasn't scheduled sufficiently ahead of time # this is called if the stream wasn't scheduled sufficiently ahead of
# so that the prebuffering stage could take effect. Let's do the prebuffering now. # time so that the prebuffering stage could take effect. Let's do the
# prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item) self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item) self.telnet_liquidsoap.start_web_stream(media_item)
elif media_item["type"] == EventKind.WEB_STREAM_BUFFER_END: elif media_item["type"] == EventKind.WEB_STREAM_BUFFER_END:
@ -48,17 +57,17 @@ class PypoLiquidsoap:
else: else:
raise UnknownMediaItemType(str(media_item)) raise UnknownMediaItemType(str(media_item))
def handle_file_type(self, media_item): def handle_file_type(self, media_item: FileEvent):
""" """
Wait 200 seconds (2000 iterations) for file to become ready, Wait 200 seconds (2000 iterations) for file to become ready,
otherwise give up on it. otherwise give up on it.
""" """
iter_num = 0 iter_num = 0
while not media_item["file_ready"] and iter_num < 2000: while not media_item.get("file_ready", False) and iter_num < 2000:
time.sleep(0.1) time.sleep(0.1)
iter_num += 1 iter_num += 1
if media_item["file_ready"]: if media_item.get("file_ready", False):
available_queue = self.find_available_queue() available_queue = self.find_available_queue()
try: try:
@ -73,19 +82,18 @@ class PypoLiquidsoap:
media_item["dst"], media_item["dst"],
) )
def handle_event_type(self, media_item): def handle_event_type(self, media_item: ActionEvent):
if media_item["event_type"] == "kick_out": if media_item["event_type"] == "kick_out":
self.telnet_liquidsoap.disconnect_source("live_dj") self.telnet_liquidsoap.disconnect_source("live_dj")
elif media_item["event_type"] == "switch_off": elif media_item["event_type"] == "switch_off":
self.telnet_liquidsoap.switch_source("live_dj", "off") self.telnet_liquidsoap.switch_source("live_dj", "off")
def is_media_item_finished(self, media_item): def is_media_item_finished(self, media_item: Optional[AnyEvent]):
if media_item is None: if media_item is None:
return True return True
else: return datetime.utcnow() > event_key_to_datetime(media_item["end"])
return datetime.utcnow() > media_item["end"]
def find_available_queue(self): def find_available_queue(self) -> str:
available_queue = None available_queue = None
for queue_id, item in self.liq_queue_tracker.items(): for queue_id, item in self.liq_queue_tracker.items():
if item is None or self.is_media_item_finished(item): if item is None or self.is_media_item_finished(item):
@ -97,7 +105,7 @@ class PypoLiquidsoap:
return available_queue return available_queue
def verify_correct_present_media(self, scheduled_now): def verify_correct_present_media(self, scheduled_now: List[AnyEvent]):
""" """
verify whether Liquidsoap is currently playing the correct files. verify whether Liquidsoap is currently playing the correct files.
if we find an item that Liquidsoap is not playing, then push it if we find an item that Liquidsoap is not playing, then push it
@ -121,47 +129,51 @@ class PypoLiquidsoap:
""" """
try: try:
scheduled_now_files = [ scheduled_now_files: List[FileEvent] = [
x for x in scheduled_now if x["type"] == EventKind.FILE x for x in scheduled_now if x["type"] == EventKind.FILE
] ]
scheduled_now_webstream = [ scheduled_now_webstream: List[WebStreamEvent] = [
x x
for x in scheduled_now for x in scheduled_now
if x["type"] in (EventKind.WEB_STREAM_OUTPUT_START) if x["type"] == EventKind.WEB_STREAM_OUTPUT_START
] ]
schedule_ids = {x["row_id"] for x in scheduled_now_files} schedule_ids = {x["row_id"] for x in scheduled_now_files}
row_id_map = {} row_id_map = {}
liq_queue_ids = set() liq_queue_ids = set()
for i in self.liq_queue_tracker: for queue_id in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i] queue_item = self.liq_queue_tracker[queue_id]
if not self.is_media_item_finished(mi): if queue_item is not None and not self.is_media_item_finished(
liq_queue_ids.add(mi["row_id"]) queue_item
row_id_map[mi["row_id"]] = mi ):
liq_queue_ids.add(queue_item["row_id"])
row_id_map[queue_item["row_id"]] = queue_item
to_be_removed = set() to_be_removed = set()
to_be_added = set() to_be_added = set()
# Iterate over the new files, and compare them to currently scheduled # Iterate over the new files, and compare them to currently scheduled
# tracks. If already in liquidsoap queue still need to make sure they don't # tracks. If already in liquidsoap queue still need to make sure they don't
# have different attributes # have different attributes. Ff replay gain changes, it shouldn't change the
# if replay gain changes, it shouldn't change the amplification of the currently playing song # amplification of the currently playing song
for i in scheduled_now_files: for item in scheduled_now_files:
if i["row_id"] in row_id_map: if item["row_id"] in row_id_map:
mi = row_id_map[i["row_id"]] queue_item = row_id_map[item["row_id"]]
assert queue_item is not None
correct = ( correct = (
mi["start"] == i["start"] queue_item["start"] == item["start"]
and mi["end"] == i["end"] and queue_item["end"] == item["end"]
and mi["row_id"] == i["row_id"] and queue_item["row_id"] == item["row_id"]
) )
if not correct: if not correct:
# need to re-add # need to re-add
logger.info("Track %s found to have new attr.", i) logger.info("Track %s found to have new attr.", item)
to_be_removed.add(i["row_id"]) to_be_removed.add(item["row_id"])
to_be_added.add(i["row_id"]) to_be_added.add(item["row_id"])
to_be_removed.update(liq_queue_ids - schedule_ids) to_be_removed.update(liq_queue_ids - schedule_ids)
to_be_added.update(schedule_ids - liq_queue_ids) to_be_added.update(schedule_ids - liq_queue_ids)
@ -170,21 +182,24 @@ class PypoLiquidsoap:
logger.info("Need to remove items from Liquidsoap: %s", to_be_removed) logger.info("Need to remove items from Liquidsoap: %s", to_be_removed)
# remove files from Liquidsoap's queue # remove files from Liquidsoap's queue
for i in self.liq_queue_tracker: for queue_id in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i] queue_item = self.liq_queue_tracker[queue_id]
if mi is not None and mi["row_id"] in to_be_removed: if queue_item is not None and queue_item["row_id"] in to_be_removed:
self.stop(i) self.stop(queue_id)
if to_be_added: if to_be_added:
logger.info("Need to add items to Liquidsoap *now*: %s", to_be_added) logger.info("Need to add items to Liquidsoap *now*: %s", to_be_added)
for i in scheduled_now_files: for item in scheduled_now_files:
if i["row_id"] in to_be_added: if item["row_id"] in to_be_added:
self.modify_cue_point(i) self.modify_cue_point(item)
self.play(i) self.play(item)
# handle webstreams # handle webstreams
current_stream_id = self.telnet_liquidsoap.get_current_stream_id() current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
if current_stream_id is None:
current_stream_id = "-1"
logger.debug("scheduled now webstream: %s", scheduled_now_webstream) logger.debug("scheduled now webstream: %s", scheduled_now_webstream)
if scheduled_now_webstream: if scheduled_now_webstream:
if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]): if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]):
@ -196,21 +211,24 @@ class PypoLiquidsoap:
except KeyError as exception: except KeyError as exception:
logger.exception("Malformed event in schedule: %s", exception) logger.exception("Malformed event in schedule: %s", exception)
def stop(self, queue): def stop(self, queue_id: str):
self.telnet_liquidsoap.queue_remove(queue) self.telnet_liquidsoap.queue_remove(queue_id)
self.liq_queue_tracker[queue] = None self.liq_queue_tracker[queue_id] = None
def is_file(self, media_item): def is_file(self, event: AnyEvent):
return media_item["type"] == EventKind.FILE return event["type"] == EventKind.FILE
def clear_queue_tracker(self): def clear_queue_tracker(self):
for i in self.liq_queue_tracker.keys(): for queue_id in self.liq_queue_tracker:
self.liq_queue_tracker[i] = None self.liq_queue_tracker[queue_id] = None
def modify_cue_point(self, link): def modify_cue_point(self, link: FileEvent):
assert self.is_file(link) assert self.is_file(link)
lateness = seconds_between(link["start"], datetime.utcnow()) lateness = seconds_between(
event_key_to_datetime(link["start"]),
datetime.utcnow(),
)
if lateness > 0: if lateness > 0:
logger.debug("media item was supposed to start %ss ago", lateness) logger.debug("media item was supposed to start %ss ago", lateness)

View File

@ -3,44 +3,41 @@ from typing import List
from ..liquidsoap.client import LiquidsoapClient from ..liquidsoap.client import LiquidsoapClient
from ..timeout import ls_timeout from ..timeout import ls_timeout
from .events import FileEvent
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def create_liquidsoap_annotation(media): def create_liquidsoap_annotation(file_event: FileEvent) -> str:
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
annotations = {
filename = media["dst"] "media_id": file_event["id"],
annotation = ( "liq_start_next": "0",
'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' "liq_fade_in": float(file_event["fade_in"]) / 1000,
+ 'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' "liq_fade_out": float(file_event["fade_out"]) / 1000,
+ 'schedule_table_id="%s",replay_gain="%s dB"' "liq_cue_in": float(file_event["cue_in"]),
) % ( "liq_cue_out": float(file_event["cue_out"]),
media["id"], "schedule_table_id": file_event["row_id"],
float(media["fade_in"]) / 1000, "replay_gain": f"{file_event['replay_gain']} dB",
float(media["fade_out"]) / 1000, }
float(media["cue_in"]),
float(media["cue_out"]),
media["row_id"],
media["replay_gain"],
)
# Override the the artist/title that Liquidsoap extracts from a file's metadata # Override the the artist/title that Liquidsoap extracts from a file's metadata
# with the metadata we get from Airtime. (You can modify metadata in Airtime's library, # with the metadata we get from Airtime. (You can modify metadata in Airtime's library,
# which doesn't get saved back to the file.) # which doesn't get saved back to the file.)
if "metadata" in media: if "metadata" in file_event:
if "artist_name" in media["metadata"]: if "artist_name" in file_event["metadata"]:
artist_name = media["metadata"]["artist_name"] artist_name = file_event["metadata"]["artist_name"]
if isinstance(artist_name, str): if artist_name:
annotation += ',artist="%s"' % (artist_name.replace('"', '\\"')) annotations["artist"] = artist_name.replace('"', '\\"')
if "track_title" in media["metadata"]:
track_title = media["metadata"]["track_title"]
if isinstance(track_title, str):
annotation += ',title="%s"' % (track_title.replace('"', '\\"'))
annotation += ":" + filename if "track_title" in file_event["metadata"]:
track_title = file_event["metadata"]["track_title"]
if track_title:
annotations["title"] = track_title.replace('"', '\\"')
return annotation annotations_str = ",".join(f'{key}="{value}"' for key, value in annotations.items())
return "annotate:" + annotations_str + ":" + file_event["dst"]
class TelnetLiquidsoap: class TelnetLiquidsoap:

View File

@ -4,9 +4,10 @@ import time
from datetime import datetime from datetime import datetime
from queue import Queue from queue import Queue
from threading import Thread from threading import Thread
from typing import Any, Dict from typing import List, Tuple
from ..config import PUSH_INTERVAL, Config from ..config import PUSH_INTERVAL, Config
from .events import AnyEvent, EventKind, Events, event_key_to_datetime
from .liquidsoap import PypoLiquidsoap from .liquidsoap import PypoLiquidsoap
from .queue import PypoLiqQueue from .queue import PypoLiqQueue
@ -27,7 +28,7 @@ class PypoPush(Thread):
def __init__( def __init__(
self, self,
push_queue: Queue[Dict[str, Any]], push_queue: Queue[Events],
pypo_liquidsoap: PypoLiquidsoap, pypo_liquidsoap: PypoLiquidsoap,
config: Config, config: Config,
): ):
@ -36,11 +37,7 @@ class PypoPush(Thread):
self.config = config self.config = config
self.pushed_objects = {} self.future_scheduled_queue: Queue[Events] = Queue()
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.future_scheduled_queue: Queue[Dict[str, Any]] = Queue()
self.pypo_liquidsoap = pypo_liquidsoap self.pypo_liquidsoap = pypo_liquidsoap
self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap) self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap)
@ -50,20 +47,20 @@ class PypoPush(Thread):
loops = 0 loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL) heartbeat_period = math.floor(30 / PUSH_INTERVAL)
media_schedule = None events = None
while True: while True:
try: try:
media_schedule = self.queue.get(block=True) events = self.queue.get(block=True)
except Exception as exception: except Exception as exception:
logger.exception(exception) logger.exception(exception)
raise exception raise exception
else: else:
logger.debug(media_schedule) logger.debug(events)
# separate media_schedule list into currently_playing and # separate media_schedule list into currently_playing and
# scheduled_for_future lists # scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future( currently_playing, scheduled_for_future = self.separate_present_future(
media_schedule events
) )
self.pypo_liquidsoap.verify_correct_present_media(currently_playing) self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
@ -74,29 +71,31 @@ class PypoPush(Thread):
loops = 0 loops = 0
loops += 1 loops += 1
def separate_present_future(self, media_schedule): def separate_present_future(self, events: Events) -> Tuple[List[AnyEvent], Events]:
tnow = datetime.utcnow() now = datetime.utcnow()
present = [] present: List[AnyEvent] = []
future = {} future: Events = {}
sorted_keys = sorted(media_schedule.keys()) for key in sorted(events.keys()):
for mkey in sorted_keys: item = events[key]
media_item = media_schedule[mkey]
# Ignore track that already ended # Ignore track that already ended
if media_item["type"] == "file" and media_item["end"] < tnow: if (
logger.debug("ignoring ended media_item: %s", media_item) item["type"] == EventKind.FILE
and event_key_to_datetime(item["end"]) < now
):
logger.debug("ignoring ended media_item: %s", item)
continue continue
diff_sec = (tnow - media_item["start"]).total_seconds() diff_sec = (now - event_key_to_datetime(item["start"])).total_seconds()
if diff_sec >= 0: if diff_sec >= 0:
logger.debug("adding media_item to present: %s", media_item) logger.debug("adding media_item to present: %s", item)
present.append(media_item) present.append(item)
else: else:
logger.debug("adding media_item to future: %s", media_item) logger.debug("adding media_item to future: %s", item)
future[mkey] = media_item future[key] = item
return present, future return present, future

View File

@ -1,19 +1,21 @@
from datetime import datetime, time, timedelta from datetime import datetime, time, timedelta
from operator import itemgetter from operator import itemgetter
from typing import Dict, Literal from typing import Dict
from dateutil.parser import isoparse from dateutil.parser import isoparse
from libretime_api_client.v2 import ApiClient from libretime_api_client.v2 import ApiClient
from libretime_shared.datetime import time_in_milliseconds, time_in_seconds from libretime_shared.datetime import time_in_milliseconds, time_in_seconds
from ..liquidsoap.models import StreamPreferences from ..liquidsoap.models import StreamPreferences
from .events import ActionEvent, AnyEvent, EventKind, Events, FileEvent, WebStreamEvent from .events import (
ActionEvent,
EVENT_KEY_FORMAT = "%Y-%m-%d-%H-%M-%S" AnyEvent,
EventKind,
Events,
def datetime_to_event_key(value: datetime) -> str: FileEvent,
return value.strftime(EVENT_KEY_FORMAT) WebStreamEvent,
datetime_to_event_key,
)
def insert_event(events: Events, event_key: str, event: AnyEvent): def insert_event(events: Events, event_key: str, event: AnyEvent):

View File

@ -0,0 +1,33 @@
from libretime_playout.player.events import EventKind, FileEvent
from libretime_playout.player.liquidsoap_gateway import create_liquidsoap_annotation
def test_create_liquidsoap_annotation():
file_event: FileEvent = {
"type": EventKind.FILE,
"row_id": 1,
"start": "2022-09-05-11-00-00",
"end": "2022-09-05-11-05-02",
"uri": None,
"id": 2,
"show_name": "Show 1",
"fade_in": 500.0,
"fade_out": 500.0,
"cue_in": 13.7008,
"cue_out": 315.845,
"metadata": {
"track_title": 'My Friend the "Forest"',
"artist_name": "Nils Frahm",
"mime": "audio/flac",
},
"replay_gain": "11.46",
"filesize": 10000,
"dst": "fake/path.flac",
}
assert create_liquidsoap_annotation(file_event) == (
"""annotate:media_id="2",liq_start_next="0",liq_fade_in="0.5","""
"""liq_fade_out="0.5",liq_cue_in="13.7008",liq_cue_out="315.845","""
"""schedule_table_id="1",replay_gain="11.46 dB",artist="Nils Frahm","""
"""title="My Friend the \\"Forest\\"":fake/path.flac"""
)