refactor(playout): rename vars and add typing

This commit is contained in:
jo 2022-08-21 11:28:57 +02:00 committed by Kyle Robbertze
parent 1d59310156
commit 5505222df6
7 changed files with 79 additions and 50 deletions

View File

@ -8,7 +8,7 @@ import time
from datetime import datetime
from pathlib import Path
from queue import Queue
from typing import Optional
from typing import Any, Dict, Optional
import click
from libretime_api_client.v1 import ApiClient as LegacyClient
@ -93,52 +93,52 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
if not LIQUIDSOAP_MIN_VERSION <= liq_version:
raise Exception(f"Invalid liquidsoap version {liq_version}")
pypoFetch_q = Queue()
recorder_q = Queue()
pypoPush_q = Queue()
pypo_liquidsoap = PypoLiquidsoap(liq_client)
fetch_queue: Queue[Dict[str, Any]] = Queue()
recorder_queue: Queue[Dict[str, Any]] = Queue()
push_queue: Queue[Dict[str, Any]] = Queue()
# This queue is shared between pypo-fetch and pypo-file, where pypo-file
# is the consumer. Pypo-fetch will send every schedule it gets to pypo-file
# and pypo will parse this schedule to determine which file has the highest
# priority, and retrieve it.
media_q = Queue()
file_queue: Queue[Dict[str, Any]] = Queue()
# Pass only the configuration sections needed; PypoMessageHandler only needs rabbitmq settings
pmh = PypoMessageHandler(pypoFetch_q, recorder_q, config.rabbitmq)
pmh.daemon = True
pmh.start()
pypo_liquidsoap = PypoLiquidsoap(liq_client)
pfile = PypoFile(media_q, api_client)
pfile.daemon = True
pfile.start()
# Pass only the configuration sections needed; PypoMessageHandler only
# needs rabbitmq settings
message_handler = PypoMessageHandler(fetch_queue, recorder_queue, config.rabbitmq)
message_handler.daemon = True
message_handler.start()
pf = PypoFetch(
pypoFetch_q,
pypoPush_q,
media_q,
file_thread = PypoFile(file_queue, api_client)
file_thread.daemon = True
file_thread.start()
fetch_thread = PypoFetch(
fetch_queue,
push_queue,
file_queue,
liq_client,
pypo_liquidsoap,
config,
api_client,
legacy_client,
)
pf.daemon = True
pf.start()
fetch_thread.daemon = True
fetch_thread.start()
pp = PypoPush(pypoPush_q, pypo_liquidsoap, config)
pp.daemon = True
pp.start()
push_thread = PypoPush(push_queue, pypo_liquidsoap, config)
push_thread.daemon = True
push_thread.start()
recorder = Recorder(recorder_q, config, legacy_client)
recorder.daemon = True
recorder.start()
recorder_thread = Recorder(recorder_queue, config, legacy_client)
recorder_thread.daemon = True
recorder_thread.start()
stats_collector = StatsCollectorThread(legacy_client)
stats_collector.start()
stats_collector_thread = StatsCollectorThread(legacy_client)
stats_collector_thread.start()
# Just sleep the main thread, instead of blocking on pf.join().
# Just sleep the main thread, instead of blocking on fetch_thread.join().
# This allows CTRL-C to work!
while True:
time.sleep(1)

View File

@ -1,6 +1,8 @@
import json
import time
from queue import Queue as ThreadQueue
from threading import Thread
from typing import Any, Dict
# For RabbitMQ
from kombu.connection import Connection
@ -29,10 +31,15 @@ class RabbitConsumer(ConsumerMixin):
class PypoMessageHandler(Thread):
name = "message_handler"
def __init__(self, pq, rq, config: RabbitMQConfig):
def __init__(
self,
fetch_queue: ThreadQueue[Dict[str, Any]],
recorder_queue: ThreadQueue[Dict[str, Any]],
config: RabbitMQConfig,
):
Thread.__init__(self)
self.pypo_queue = pq
self.recorder_queue = rq
self.pypo_queue = fetch_queue
self.recorder_queue = recorder_queue
self.config = config
def init_rabbit_mq(self):

View File

@ -6,9 +6,10 @@ import signal
import sys
import time
from datetime import datetime
from queue import Empty
from queue import Empty, Queue
from subprocess import PIPE, Popen
from threading import Thread, Timer
from typing import Any, Dict
from libretime_api_client.v1 import ApiClient as LegacyClient
from libretime_api_client.v2 import ApiClient
@ -34,9 +35,9 @@ class PypoFetch(Thread):
def __init__(
self,
pypoFetch_q,
pypoPush_q,
media_q,
fetch_queue: Queue[Dict[str, Any]],
push_queue: Queue[Dict[str, Any]],
file_queue: Queue[Dict[str, Any]],
liq_client: LiquidsoapClient,
pypo_liquidsoap: PypoLiquidsoap,
config: Config,
@ -50,9 +51,9 @@ class PypoFetch(Thread):
self.api_client = api_client
self.legacy_client = legacy_client
self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q
self.media_prepare_queue = media_q
self.fetch_queue = fetch_queue
self.push_queue = push_queue
self.media_prepare_queue = file_queue
self.last_update_schedule_timestamp = time.time()
self.config = config
self.listener_timeout = POLL_INTERVAL

View File

@ -2,8 +2,9 @@ import hashlib
import os
import stat
import time
from queue import Empty
from queue import Empty, Queue
from threading import Thread
from typing import Any, Dict
from libretime_api_client.v2 import ApiClient
from loguru import logger
@ -13,9 +14,13 @@ from requests.exceptions import ConnectionError, HTTPError, Timeout
class PypoFile(Thread):
name = "file"
def __init__(self, schedule_queue, api_client: ApiClient):
def __init__(
self,
file_queue: Queue[Dict[str, Any]],
api_client: ApiClient,
):
Thread.__init__(self)
self.media_queue = schedule_queue
self.media_queue = file_queue
self.media = None
self.api_client = api_client

View File

@ -44,7 +44,11 @@ def create_liquidsoap_annotation(media):
class TelnetLiquidsoap:
def __init__(self, liq_client: LiquidsoapClient, queues: List[str]):
def __init__(
self,
liq_client: LiquidsoapClient,
queues: List[str],
):
self.liq_client = liq_client
self.queues = queues
self.current_prebuffering_stream_id = None

View File

@ -3,6 +3,7 @@ import time
from datetime import datetime
from queue import Queue
from threading import Thread
from typing import Any, Dict
from loguru import logger
@ -22,9 +23,14 @@ def is_file(media_item):
class PypoPush(Thread):
name = "push"
def __init__(self, q, pypo_liquidsoap: PypoLiquidsoap, config: Config):
def __init__(
self,
push_queue: Queue[Dict[str, Any]],
pypo_liquidsoap: PypoLiquidsoap,
config: Config,
):
Thread.__init__(self)
self.queue = q
self.queue = push_queue
self.config = config
@ -32,7 +38,7 @@ class PypoPush(Thread):
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.future_scheduled_queue = Queue()
self.future_scheduled_queue: Queue[Dict[str, Any]] = Queue()
self.pypo_liquidsoap = pypo_liquidsoap
self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap)

View File

@ -2,12 +2,14 @@ import signal
import sys
from collections import deque
from datetime import datetime
from queue import Empty
from queue import Empty, Queue
from threading import Thread
from typing import Any, Dict
from loguru import logger
from ..utils import seconds_between
from .liquidsoap import PypoLiquidsoap
def keyboardInterruptHandler(signum, frame):
@ -21,9 +23,13 @@ signal.signal(signal.SIGINT, keyboardInterruptHandler)
class PypoLiqQueue(Thread):
name = "liquidsoap_queue"
def __init__(self, q, pypo_liquidsoap):
def __init__(
self,
future_queue: Queue[Dict[str, Any]],
pypo_liquidsoap: PypoLiquidsoap,
):
Thread.__init__(self)
self.queue = q
self.queue = future_queue
self.pypo_liquidsoap = pypo_liquidsoap
def main(self):