import logging import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Set from ..liquidsoap.client import LiquidsoapClient from ..utils import seconds_between from .events import ActionEvent, AnyEvent, EventKind, FileEvent, WebStreamEvent from .liquidsoap_gateway import TelnetLiquidsoap logger = logging.getLogger(__name__) class PypoLiquidsoap: def __init__(self, liq_client: LiquidsoapClient): self.liq_queue_tracker: Dict[int, Optional[FileEvent]] = { 0: None, 1: None, 2: None, 3: None, } self.liq_client = liq_client self.telnet_liquidsoap = TelnetLiquidsoap( liq_client, list(self.liq_queue_tracker.keys()), ) def play(self, event: AnyEvent) -> None: if isinstance(event, FileEvent): self.handle_file_type(event) elif isinstance(event, ActionEvent): self.handle_event_type(event) elif isinstance(event, WebStreamEvent): self.handle_web_stream_type(event) else: raise UnknownEvent(str(event)) def handle_file_type(self, file_event: FileEvent) -> None: """ Wait 200 seconds (2000 iterations) for file to become ready, otherwise give up on it. """ iter_num = 0 while not file_event.file_ready and iter_num < 2000: time.sleep(0.1) iter_num += 1 if file_event.file_ready: available_queue = self.find_available_queue() try: self.telnet_liquidsoap.queue_push(available_queue, file_event) self.liq_queue_tracker[available_queue] = file_event except Exception as exception: logger.exception(exception) raise exception else: logger.warning( "File %s did not become ready in less than 5 seconds. Skipping...", file_event.local_filepath, ) def handle_web_stream_type(self, event: WebStreamEvent) -> None: if event.type == EventKind.WEB_STREAM_BUFFER_START: self.telnet_liquidsoap.start_web_stream_buffer(event) elif event.type == EventKind.WEB_STREAM_OUTPUT_START: if event.row_id != self.telnet_liquidsoap.current_prebuffering_stream_id: # this is called if the stream wasn't scheduled sufficiently ahead of # time so that the prebuffering stage could take effect. Let's do the # prebuffering now. self.telnet_liquidsoap.start_web_stream_buffer(event) self.telnet_liquidsoap.start_web_stream() elif event.type == EventKind.WEB_STREAM_BUFFER_END: self.telnet_liquidsoap.stop_web_stream_buffer() elif event.type == EventKind.WEB_STREAM_OUTPUT_END: self.telnet_liquidsoap.stop_web_stream_output() def handle_event_type(self, event: ActionEvent) -> None: if event.event_type == "kick_out": self.telnet_liquidsoap.disconnect_source("live_dj") elif event.event_type == "switch_off": self.telnet_liquidsoap.switch_source("live_dj", "off") def is_media_item_finished(self, media_item: Optional[AnyEvent]) -> bool: if media_item is None: return True return datetime.utcnow() > media_item.end def find_available_queue(self) -> int: available_queue = None for queue_id, item in self.liq_queue_tracker.items(): if item is None or self.is_media_item_finished(item): # queue "i" is available. Push to this queue available_queue = queue_id if available_queue is None: raise NoQueueAvailableException() return available_queue # pylint: disable=too-many-branches def verify_correct_present_media(self, scheduled_now: List[AnyEvent]) -> None: """ verify whether Liquidsoap is currently playing the correct files. if we find an item that Liquidsoap is not playing, then push it into one of Liquidsoap's queues. If Liquidsoap is already playing it do nothing. If Liquidsoap is playing a track that isn't in currently_playing then stop it. Check for Liquidsoap media we should source.skip get liquidsoap items for each queue. Since each queue can only have one item, we should have a max of 8 items. 2013-03-21-22-56-00_0: { id: 1, type: "stream_output_start", row_id: 41, uri: "http://stream2.radioblackout.org:80/blackout.ogg", start: "2013-03-21-22-56-00", end: "2013-03-21-23-26-00", show_name: "Untitled Show" }, """ try: scheduled_now_files: List[FileEvent] = [ x for x in scheduled_now if x.type == EventKind.FILE # type: ignore ] scheduled_now_webstream: List[WebStreamEvent] = [ x # type: ignore for x in scheduled_now if x.type == EventKind.WEB_STREAM_OUTPUT_START ] schedule_ids: Set[int] = {x.row_id for x in scheduled_now_files} row_id_map: Dict[int, FileEvent] = {} liq_queue_ids: Set[int] = set() for queue_item in self.liq_queue_tracker.values(): if queue_item is not None and not self.is_media_item_finished( queue_item ): liq_queue_ids.add(queue_item.row_id) row_id_map[queue_item.row_id] = queue_item to_be_removed: Set[int] = set() to_be_added: Set[int] = set() # Iterate over the new files, and compare them to currently scheduled # tracks. If already in liquidsoap queue still need to make sure they don't # have different attributes. Ff replay gain changes, it shouldn't change the # amplification of the currently playing song for item in scheduled_now_files: if item.row_id in row_id_map: queue_item = row_id_map[item.row_id] if not ( queue_item.start == item.start and queue_item.end == item.end and queue_item.row_id == item.row_id ): # need to re-add logger.info("Track %s found to have new attr.", item) to_be_removed.add(item.row_id) to_be_added.add(item.row_id) to_be_removed.update(liq_queue_ids - schedule_ids) to_be_added.update(schedule_ids - liq_queue_ids) if to_be_removed: logger.info("Need to remove items from Liquidsoap: %s", to_be_removed) # remove files from Liquidsoap's queue for queue_id, queue_item in self.liq_queue_tracker.items(): if queue_item is not None and queue_item.row_id in to_be_removed: self.stop(queue_id) if to_be_added: logger.info("Need to add items to Liquidsoap *now*: %s", to_be_added) for item in scheduled_now_files: if item.row_id in to_be_added: self.modify_cue_point(item) self.play(item) # handle webstreams current_stream_id = self.telnet_liquidsoap.get_current_stream_id() if current_stream_id is None: current_stream_id = "-1" logger.debug("scheduled now webstream: %s", scheduled_now_webstream) if scheduled_now_webstream: if int(current_stream_id) != int(scheduled_now_webstream[0].row_id): self.play(scheduled_now_webstream[0]) elif current_stream_id != "-1": # something is playing and it shouldn't be. self.telnet_liquidsoap.stop_web_stream_buffer() self.telnet_liquidsoap.stop_web_stream_output() except KeyError as exception: logger.exception("Malformed event in schedule: %s", exception) def stop(self, queue_id: int) -> None: self.telnet_liquidsoap.queue_remove(queue_id) self.liq_queue_tracker[queue_id] = None def clear_queue_tracker(self) -> None: for queue_id in self.liq_queue_tracker: self.liq_queue_tracker[queue_id] = None def modify_cue_point(self, file_event: FileEvent) -> None: assert file_event.type == EventKind.FILE lateness = seconds_between(file_event.start, datetime.utcnow()) if lateness > 0: logger.debug("media item was supposed to start %ss ago", lateness) cue_in_orig = timedelta(seconds=file_event.cue_in) file_event.cue_in = cue_in_orig.total_seconds() + lateness def clear_all_queues(self) -> None: self.telnet_liquidsoap.queue_clear_all() class UnknownEvent(Exception): pass class NoQueueAvailableException(Exception): pass