sintonia/playout/libretime_playout/player/push.py

98 lines
2.8 KiB
Python

import logging
import math
import time
from datetime import datetime
from queue import Queue
from threading import Thread
from typing import List, Tuple
from ..config import PUSH_INTERVAL, Config
from .events import AnyEvent, Events, FileEvent
from .liquidsoap import Liquidsoap
from .queue import PypoLiqQueue
logger = logging.getLogger(__name__)
class PypoPush(Thread):
name = "push"
daemon = True
def __init__(
self,
push_queue: "Queue[Events]",
liquidsoap: Liquidsoap,
config: Config,
):
Thread.__init__(self)
self.queue = push_queue
self.config = config
self.future_scheduled_queue: "Queue[Events]" = Queue()
self.liquidsoap = liquidsoap
self.plq = PypoLiqQueue(self.future_scheduled_queue, self.liquidsoap)
self.plq.start()
def main(self) -> None:
loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
events = None
while True:
try:
events = self.queue.get(block=True)
except Exception as exception: # pylint: disable=broad-exception-caught
logger.exception(exception)
raise exception
logger.debug(events)
# separate media_schedule list into currently_playing and
# scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future(
events
)
self.liquidsoap.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0:
logger.info("heartbeat")
loops = 0
loops += 1
def separate_present_future(self, events: Events) -> Tuple[List[AnyEvent], Events]:
now = datetime.utcnow()
present: List[AnyEvent] = []
future: Events = {}
for key in sorted(events.keys()):
item = events[key]
# Ignore track that already ended
if isinstance(item, FileEvent) and item.end < now:
logger.debug("ignoring ended media_item: %s", item)
continue
diff_sec = (now - item.start).total_seconds()
if diff_sec >= 0:
logger.debug("adding media_item to present: %s", item)
present.append(item)
else:
logger.debug("adding media_item to future: %s", item)
future[key] = item
return present, future
def run(self) -> None:
while True:
try:
self.main()
except Exception as exception: # pylint: disable=broad-exception-caught
logger.exception(exception)
time.sleep(5)