refactor(playout): add event dict typings

This commit is contained in:
jo 2023-02-19 18:16:07 +01:00 committed by Jonas L
parent cd0d9b6f4a
commit 0d4e2823e2
2 changed files with 86 additions and 15 deletions

View File

@ -1,4 +1,7 @@
from enum import Enum
from typing import Dict, Literal, TypedDict, Union
from typing_extensions import NotRequired
class EventKind(str, Enum):
@ -8,3 +11,71 @@ class EventKind(str, Enum):
WEB_STREAM_OUTPUT_START = "stream_output_start"
WEB_STREAM_BUFFER_END = "stream_buffer_end"
WEB_STREAM_OUTPUT_END = "stream_output_end"
class BaseEvent(TypedDict):
# TODO: Convert start/end to datetime
start: str
end: str
class FileEventMetadata(TypedDict):
track_title: str
artist_name: str
mime: str
class FileEvent(BaseEvent):
type: Literal[EventKind.FILE]
# Schedule
row_id: int
uri: str
id: int
# Show data
show_name: str
# File
fade_in: float
fade_out: float
cue_in: float
cue_out: float
# TODO: Flatten this metadata dict
metadata: FileEventMetadata
replay_gain: float
filesize: int
class WebStreamEvent(BaseEvent):
type: Literal[
EventKind.WEB_STREAM_BUFFER_START,
EventKind.WEB_STREAM_OUTPUT_START,
EventKind.WEB_STREAM_BUFFER_END,
EventKind.WEB_STREAM_OUTPUT_END,
]
# Schedule
row_id: int
uri: str
id: int
# Show data
show_name: NotRequired[str]
class ActionEventKind(str, Enum):
SWITCH_OFF = "switch_off"
KICK_OUT = "kick_out"
class ActionEvent(BaseEvent):
type: Literal[EventKind.ACTION]
# TODO: user ActionEventKind enum
event_type: str
AnyEvent = Union[FileEvent, WebStreamEvent, ActionEvent]
Events = Dict[str, AnyEvent]

View File

@ -1,13 +1,13 @@
from datetime import datetime, time, timedelta
from operator import itemgetter
from typing import Dict
from typing import Dict, Literal
from dateutil.parser import isoparse
from libretime_api_client.v2 import ApiClient
from libretime_shared.datetime import time_in_milliseconds, time_in_seconds
from ..liquidsoap.models import StreamPreferences
from .events import EventKind
from .events import ActionEvent, AnyEvent, EventKind, Events, FileEvent, WebStreamEvent
EVENT_KEY_FORMAT = "%Y-%m-%d-%H-%M-%S"
@ -16,7 +16,7 @@ def datetime_to_event_key(value: datetime) -> str:
return value.strftime(EVENT_KEY_FORMAT)
def insert_event(events: dict, event_key: str, event: dict):
def insert_event(events: Events, event_key: str, event: AnyEvent):
key = event_key
# Search for an empty slot
@ -32,7 +32,7 @@ def insert_event(events: dict, event_key: str, event: dict):
events[key] = event
def get_schedule(api_client: ApiClient):
def get_schedule(api_client: ApiClient) -> Dict[Literal["media"], Events]:
stream_preferences = StreamPreferences(**api_client.get_stream_preferences().json())
current_time = datetime.utcnow()
@ -50,7 +50,7 @@ def get_schedule(api_client: ApiClient):
}
).json()
events: Dict[str, dict] = {}
events: Dict[str, AnyEvent] = {}
for item in sorted(schedule, key=itemgetter("starts_at")):
item["starts_at"] = isoparse(item["starts_at"])
item["ends_at"] = isoparse(item["ends_at"])
@ -79,7 +79,7 @@ def get_schedule(api_client: ApiClient):
def generate_live_events(
events: dict,
events: Events,
show_instance: dict,
input_fade_transition: float,
):
@ -90,7 +90,7 @@ def generate_live_events(
# If enabled, fade the input source out
if switch_off_event_key != kick_out_event_key:
switch_off_event = {
switch_off_event: ActionEvent = {
"type": EventKind.ACTION,
"event_type": "switch_off",
"start": switch_off_event_key,
@ -99,7 +99,7 @@ def generate_live_events(
insert_event(events, switch_off_event_key, switch_off_event)
# Then kick the source out
kick_out_event = {
kick_out_event: ActionEvent = {
"type": EventKind.ACTION,
"event_type": "kick_out",
"start": kick_out_event_key,
@ -109,7 +109,7 @@ def generate_live_events(
def generate_file_events(
events: dict,
events: Events,
schedule: dict,
file: dict,
show: dict,
@ -120,7 +120,7 @@ def generate_file_events(
schedule_start_event_key = datetime_to_event_key(schedule["starts_at"])
schedule_end_event_key = datetime_to_event_key(schedule["ends_at"])
event = {
event: FileEvent = {
"type": EventKind.FILE,
"row_id": schedule["id"],
"start": schedule_start_event_key,
@ -146,7 +146,7 @@ def generate_file_events(
def generate_webstream_events(
events: dict,
events: Events,
schedule: dict,
webstream: dict,
show: dict,
@ -157,7 +157,7 @@ def generate_webstream_events(
schedule_start_event_key = datetime_to_event_key(schedule["starts_at"])
schedule_end_event_key = datetime_to_event_key(schedule["ends_at"])
stream_buffer_start_event = {
stream_buffer_start_event: WebStreamEvent = {
"type": EventKind.WEB_STREAM_BUFFER_START,
"row_id": schedule["id"],
"start": datetime_to_event_key(schedule["starts_at"] - timedelta(seconds=5)),
@ -167,7 +167,7 @@ def generate_webstream_events(
}
insert_event(events, schedule_start_event_key, stream_buffer_start_event)
stream_output_start_event = {
stream_output_start_event: WebStreamEvent = {
"type": EventKind.WEB_STREAM_OUTPUT_START,
"row_id": schedule["id"],
"start": schedule_start_event_key,
@ -181,7 +181,7 @@ def generate_webstream_events(
# NOTE: stream_*_end were previously triggered 1 second before
# the schedule end.
stream_buffer_end_event = {
stream_buffer_end_event: WebStreamEvent = {
"type": EventKind.WEB_STREAM_BUFFER_END,
"row_id": schedule["id"],
"start": schedule_end_event_key,
@ -191,7 +191,7 @@ def generate_webstream_events(
}
insert_event(events, schedule_end_event_key, stream_buffer_end_event)
stream_output_end_event = {
stream_output_end_event: WebStreamEvent = {
"type": EventKind.WEB_STREAM_OUTPUT_END,
"row_id": schedule["id"],
"start": schedule_end_event_key,