From da6458caea08b84268b07142feda1f8b6b8a9e89 Mon Sep 17 00:00:00 2001 From: jo Date: Tue, 16 Aug 2022 13:34:02 +0200 Subject: [PATCH] feat(playout): integrate new liquisoap client --- playout/libretime_playout/main.py | 77 ++----- playout/libretime_playout/player/fetch.py | 180 ++++----------- .../libretime_playout/player/liquidsoap.py | 10 +- .../player/liquidsoap_gateway.py | 213 ++++-------------- playout/libretime_playout/player/push.py | 4 +- 5 files changed, 97 insertions(+), 387 deletions(-) diff --git a/playout/libretime_playout/main.py b/playout/libretime_playout/main.py index 7f96ba6af..7c79628ed 100644 --- a/playout/libretime_playout/main.py +++ b/playout/libretime_playout/main.py @@ -4,13 +4,11 @@ Python part of radio playout (pypo) import signal import sys -import telnetlib import time from datetime import datetime from pathlib import Path from queue import Queue -from threading import Lock -from typing import Optional, Tuple +from typing import Optional import click from libretime_api_client.v1 import ApiClient as LegacyClient @@ -22,14 +20,14 @@ from loguru import logger from .config import CACHE_DIR, RECORD_DIR, Config from .history.stats import StatsCollectorThread -from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION, parse_liquidsoap_version +from .liquidsoap.client import LiquidsoapClient +from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION from .message_handler import PypoMessageHandler from .player.fetch import PypoFetch from .player.file import PypoFile from .player.liquidsoap import PypoLiquidsoap from .player.push import PypoPush from .recorder import Recorder -from .timeout import ls_timeout class Global: @@ -45,56 +43,6 @@ def keyboardInterruptHandler(signum, frame): sys.exit(0) -@ls_timeout -def liquidsoap_get_info(telnet_lock, host, port) -> Optional[Tuple[int, int, int]]: - logger.debug("Checking to see if Liquidsoap is running") - try: - telnet_lock.acquire() - tn = telnetlib.Telnet(host, port) - msg = "version\n" - tn.write(msg.encode("utf-8")) - tn.write(b"exit\n") - response = tn.read_all().decode("utf-8") - except Exception as e: - logger.error(e) - return None - finally: - telnet_lock.release() - - return parse_liquidsoap_version(response) - - -def liquidsoap_startup_test(telnet_lock, liquidsoap_host, liquidsoap_port): - - liquidsoap_version = liquidsoap_get_info( - telnet_lock, - liquidsoap_host, - liquidsoap_port, - ) - while not liquidsoap_version: - logger.warning("Liquidsoap doesn't appear to be running! Trying again later...") - time.sleep(1) - liquidsoap_version = liquidsoap_get_info( - telnet_lock, - liquidsoap_host, - liquidsoap_port, - ) - - while not LIQUIDSOAP_MIN_VERSION <= liquidsoap_version: - logger.warning( - f"Found invalid Liquidsoap version! " - f"Liquidsoap<={LIQUIDSOAP_MIN_VERSION} is required!" - ) - time.sleep(5) - liquidsoap_version = liquidsoap_get_info( - telnet_lock, - liquidsoap_host, - liquidsoap_port, - ) - - logger.info(f"Liquidsoap version {liquidsoap_version}") - - @click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX}) @cli_logging_options() @cli_config_options() @@ -144,18 +92,21 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ logger.exception(exception) time.sleep(10) - telnet_lock = Lock() + liq_client = LiquidsoapClient( + host=config.playout.liquidsoap_host, + port=config.playout.liquidsoap_port, + ) - liquidsoap_host = config.playout.liquidsoap_host - liquidsoap_port = config.playout.liquidsoap_port - - liquidsoap_startup_test(telnet_lock, liquidsoap_host, liquidsoap_port) + logger.debug("Checking if Liquidsoap is running") + liq_version = liq_client.wait_for_version() + if not LIQUIDSOAP_MIN_VERSION <= liq_version: + raise Exception(f"Invalid liquidsoap version {liq_version}") pypoFetch_q = Queue() recorder_q = Queue() pypoPush_q = Queue() - pypo_liquidsoap = PypoLiquidsoap(telnet_lock, liquidsoap_host, liquidsoap_port) + pypo_liquidsoap = PypoLiquidsoap(liq_client) # This queue is shared between pypo-fetch and pypo-file, where pypo-file # is the consumer. Pypo-fetch will send every schedule it gets to pypo-file @@ -176,7 +127,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ pypoFetch_q, pypoPush_q, media_q, - telnet_lock, + liq_client, pypo_liquidsoap, config, api_client, @@ -185,7 +136,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[ pf.daemon = True pf.start() - pp = PypoPush(pypoPush_q, telnet_lock, pypo_liquidsoap, config) + pp = PypoPush(pypoPush_q, pypo_liquidsoap, config) pp.daemon = True pp.start() diff --git a/playout/libretime_playout/player/fetch.py b/playout/libretime_playout/player/fetch.py index 17142a1d8..61c5a67b9 100644 --- a/playout/libretime_playout/player/fetch.py +++ b/playout/libretime_playout/player/fetch.py @@ -3,9 +3,7 @@ import json import mimetypes import os import signal -import subprocess import sys -import telnetlib import time from datetime import datetime from queue import Empty @@ -17,7 +15,9 @@ from libretime_api_client.v2 import ApiClient from loguru import logger from ..config import CACHE_DIR, POLL_INTERVAL, Config +from ..liquidsoap.client import LiquidsoapClient from ..timeout import ls_timeout +from .liquidsoap import PypoLiquidsoap from .schedule import get_schedule @@ -37,8 +37,8 @@ class PypoFetch(Thread): pypoFetch_q, pypoPush_q, media_q, - telnet_lock, - pypo_liquidsoap, + liq_client: LiquidsoapClient, + pypo_liquidsoap: PypoLiquidsoap, config: Config, api_client: ApiClient, legacy_client: LegacyClient, @@ -57,8 +57,7 @@ class PypoFetch(Thread): self.config = config self.listener_timeout = POLL_INTERVAL - self.telnet_lock = telnet_lock - + self.liq_client = liq_client self.pypo_liquidsoap = pypo_liquidsoap self.cache_dir = CACHE_DIR @@ -101,12 +100,12 @@ class PypoFetch(Thread): self.update_liquidsoap_transition_fade(m["transition_fade"]) elif command == "switch_source": logger.info("switch_on_source show command received...") - self.pypo_liquidsoap.get_telnet_dispatcher().switch_source( + self.pypo_liquidsoap.telnet_liquidsoap.switch_source( m["sourcename"], m["status"] ) elif command == "disconnect_source": logger.info("disconnect_on_source show command received...") - self.pypo_liquidsoap.get_telnet_dispatcher().disconnect_source( + self.pypo_liquidsoap.telnet_liquidsoap.disconnect_source( m["sourcename"] ) else: @@ -125,25 +124,7 @@ class PypoFetch(Thread): except Exception as exception: logger.exception(exception) - def switch_source_temp(self, sourcename, status): - logger.debug('Switching source: %s to "%s" status', sourcename, status) - command = "streams." - if sourcename == "master_dj": - command += "master_dj_" - elif sourcename == "live_dj": - command += "live_dj_" - elif sourcename == "scheduled_play": - command += "scheduled_play_" - - if status == "on": - command += "start\n" - else: - command += "stop\n" - - return command - # Initialize Liquidsoap environment - def set_bootstrap_variables(self): logger.debug("Getting information needed on bootstrap from Airtime") try: @@ -152,59 +133,33 @@ class PypoFetch(Thread): logger.exception(f"Unable to get bootstrap info: {exception}") logger.debug("info:%s", info) - commands = [] - for k, v in info["switch_status"].items(): - commands.append(self.switch_source_temp(k, v)) - stream_format = info["stream_label"] - station_name = info["station_name"] - fade = info["transition_fade"] + try: + for source_name, source_status in info["switch_status"].items(): + self.pypo_liquidsoap.liq_client.source_switch_status( + name=source_name, + streaming=source_status == "on", + ) - commands.append( - ("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8") - ) - commands.append(("vars.station_name %s\n" % station_name).encode("utf-8")) - commands.append(("vars.default_dj_fade %s\n" % fade).encode("utf-8")) - self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands) + self.pypo_liquidsoap.liq_client.settings_update( + station_name=info["station_name"], + message_format=info["stream_label"], + input_fade_transition=info["transition_fade"], + ) + except (ConnectionError, TimeoutError) as exception: + logger.exception(exception) self.pypo_liquidsoap.clear_all_queues() self.pypo_liquidsoap.clear_queue_tracker() def restart_liquidsoap(self): try: - # do not block - if we receive the lock then good - no other thread - # will try communicating with Liquidsoap. If we don't receive, it may - # mean some thread blocked and is still holding the lock. Restarting - # Liquidsoap will cause that thread to release the lock as an Exception - # will be thrown. - self.telnet_lock.acquire(False) - logger.info("Restarting Liquidsoap") - subprocess.call( - "kill -9 `pidof libretime-liquidsoap`", shell=True, close_fds=True - ) - - # Wait here and poll Liquidsoap until it has started up - logger.info("Waiting for Liquidsoap to start") - while True: - try: - tn = telnetlib.Telnet( - self.config.playout.liquidsoap_host, - self.config.playout.liquidsoap_port, - ) - tn.write(b"exit\n") - tn.read_all() - logger.info("Liquidsoap is up and running") - break - except Exception: - # sleep 0.5 seconds and try again - time.sleep(0.5) + self.liq_client.restart() + logger.info("Liquidsoap is up and running") except Exception as exception: logger.exception(exception) - finally: - if self.telnet_lock.locked(): - self.telnet_lock.release() # NOTE: This function is quite short after it was refactored. @@ -220,35 +175,18 @@ class PypoFetch(Thread): """ try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet( - self.config.playout.liquidsoap_host, - self.config.playout.liquidsoap_port, - ) - # update the boot up time of Liquidsoap. Since Liquidsoap is not restarting, - # we are manually adjusting the bootup time variable so the status msg will get - # updated. - current_time = time.time() - boot_up_time_command = ( - "vars.bootup_time " + str(current_time) + "\n" - ).encode("utf-8") - logger.info(boot_up_time_command) - tn.write(boot_up_time_command) + with self.liq_client.conn: + # update the boot up time of Liquidsoap. Since Liquidsoap is not restarting, + # we are manually adjusting the bootup time variable so the status msg will get + # updated. + current_time = time.time() + self.liq_client.conn.write(f"vars.bootup_time {str(current_time)}") + self.liq_client.conn.read() - connection_status = b"streams.connection_status\n" - logger.info(connection_status) - tn.write(connection_status) - - tn.write(b"exit\n") - - output = tn.read_all() - except Exception as exception: + self.liq_client.conn.write("streams.connection_status") + stream_info = self.liq_client.conn.read().splitlines()[0] + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() - - output_list = output.split("\r\n") - stream_info = output_list[2] # streamin info is in the form of: # eg. s1:true,2:true,3:false @@ -267,65 +205,23 @@ class PypoFetch(Thread): @ls_timeout def update_liquidsoap_stream_format(self, stream_format): - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet( - self.config.playout.liquidsoap_host, - self.config.playout.liquidsoap_port, - ) - command = ("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8") - logger.info(command) - tn.write(command) - tn.write(b"exit\n") - tn.read_all() - except Exception as exception: + self.liq_client.settings_update(message_format=stream_format) + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def update_liquidsoap_transition_fade(self, fade): - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet( - self.config.playout.liquidsoap_host, - self.config.playout.liquidsoap_port, - ) - command = ("vars.default_dj_fade %s\n" % fade).encode("utf-8") - logger.info(command) - tn.write(command) - tn.write(b"exit\n") - tn.read_all() - except Exception as exception: + self.liq_client.settings_update(input_fade_transition=fade) + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def update_liquidsoap_station_name(self, station_name): - # Push stream metadata to liquidsoap - # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! try: - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet( - self.config.playout.liquidsoap_host, - self.config.playout.liquidsoap_port, - ) - command = ("vars.station_name %s\n" % station_name).encode("utf-8") - logger.info(command) - tn.write(command) - tn.write(b"exit\n") - tn.read_all() - except Exception as exception: - logger.exception(exception) - finally: - self.telnet_lock.release() - except Exception as exception: + self.liq_client.settings_update(station_name=station_name) + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) # Process the schedule diff --git a/playout/libretime_playout/player/liquidsoap.py b/playout/libretime_playout/player/liquidsoap.py index ea625460d..8eb0c16ee 100644 --- a/playout/libretime_playout/player/liquidsoap.py +++ b/playout/libretime_playout/player/liquidsoap.py @@ -3,13 +3,14 @@ from datetime import datetime, timedelta from loguru import logger +from ..liquidsoap.client import LiquidsoapClient from ..utils import seconds_between from .events import EventKind from .liquidsoap_gateway import TelnetLiquidsoap class PypoLiquidsoap: - def __init__(self, telnet_lock, host, port): + def __init__(self, liq_client: LiquidsoapClient): self.liq_queue_tracker = { "s0": None, "s1": None, @@ -18,13 +19,12 @@ class PypoLiquidsoap: "s4": None, } + self.liq_client = liq_client self.telnet_liquidsoap = TelnetLiquidsoap( - telnet_lock, host, port, list(self.liq_queue_tracker.keys()) + liq_client, + list(self.liq_queue_tracker.keys()), ) - def get_telnet_dispatcher(self): - return self.telnet_liquidsoap - def play(self, media_item): if media_item["type"] == EventKind.FILE: self.handle_file_type(media_item) diff --git a/playout/libretime_playout/player/liquidsoap_gateway.py b/playout/libretime_playout/player/liquidsoap_gateway.py index 067d36d70..93bf2db14 100644 --- a/playout/libretime_playout/player/liquidsoap_gateway.py +++ b/playout/libretime_playout/player/liquidsoap_gateway.py @@ -1,7 +1,8 @@ -import telnetlib +from typing import List from loguru import logger +from ..liquidsoap.client import LiquidsoapClient from ..timeout import ls_timeout @@ -43,229 +44,91 @@ def create_liquidsoap_annotation(media): class TelnetLiquidsoap: - def __init__(self, telnet_lock, ls_host, ls_port, queues): - self.telnet_lock = telnet_lock - self.ls_host = ls_host - self.ls_port = ls_port + def __init__(self, liq_client: LiquidsoapClient, queues: List[str]): + self.liq_client = liq_client self.queues = queues self.current_prebuffering_stream_id = None - def __connect(self): - return telnetlib.Telnet(self.ls_host, self.ls_port) - @ls_timeout def queue_clear_all(self): try: - self.telnet_lock.acquire() - connection = self.__connect() - - for i in self.queues: - msg = "queues.%s_skip\n" % i - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - finally: - self.telnet_lock.release() + self.liq_client.queues_remove(*self.queues) + except (ConnectionError, TimeoutError) as exception: + logger.exception(exception) @ls_timeout def queue_remove(self, queue_id): try: - self.telnet_lock.acquire() - connection = self.__connect() - - msg = "queues.%s_skip\n" % queue_id - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - finally: - self.telnet_lock.release() + self.liq_client.queues_remove(queue_id) + except (ConnectionError, TimeoutError) as exception: + logger.exception(exception) @ls_timeout def queue_push(self, queue_id, media_item): try: - self.telnet_lock.acquire() - - connection = self.__connect() annotation = create_liquidsoap_annotation(media_item) - msg = f"{queue_id}.push {annotation}\n" - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - show_name = media_item["show_name"] - msg = "vars.show_name %s\n" % show_name - connection.write(msg.encode("utf-8")) - logger.debug(msg) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - finally: - self.telnet_lock.release() + self.liq_client.queue_push(queue_id, annotation, media_item["show_name"]) + except (ConnectionError, TimeoutError) as exception: + logger.exception(exception) @ls_timeout def stop_web_stream_buffer(self): try: - self.telnet_lock.acquire() - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - # dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - - msg = "http.stop\n" - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - msg = "dynamic_source.id -1\n" - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - - except Exception as exception: + self.liq_client.web_stream_stop_buffer() + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def stop_web_stream_output(self): try: - self.telnet_lock.acquire() - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - # dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - - msg = "dynamic_source.output_stop\n" - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - - except Exception as exception: + self.liq_client.web_stream_stop() + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def start_web_stream(self, media_item): try: - self.telnet_lock.acquire() - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - - # TODO: DO we need this? - msg = "streams.scheduled_play_start\n" - connection.write(msg.encode("utf-8")) - - msg = "dynamic_source.output_start\n" - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - + self.liq_client.web_stream_start() self.current_prebuffering_stream_id = None - except Exception as exception: + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def start_web_stream_buffer(self, media_item): try: - self.telnet_lock.acquire() - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - - msg = "dynamic_source.id %s\n" % media_item["row_id"] - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - msg = "http.restart %s\n" % media_item["uri"] - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - logger.debug(connection.read_all().decode("utf-8")) - + self.liq_client.web_stream_start_buffer( + media_item["row_id"], + media_item["uri"], + ) self.current_prebuffering_stream_id = media_item["row_id"] - except Exception as exception: + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def get_current_stream_id(self): try: - self.telnet_lock.acquire() - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - - msg = "dynamic_source.get_id\n" - logger.debug(msg) - connection.write(msg.encode("utf-8")) - - connection.write(b"exit\n") - stream_id = connection.read_all().decode("utf-8").splitlines()[0] - logger.debug("stream_id: %s" % stream_id) - - return stream_id - except Exception as exception: + return self.liq_client.web_stream_get_id() + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout def disconnect_source(self, sourcename): - logger.debug("Disconnecting source: %s", sourcename) - command = "" - if sourcename == "master_dj": - command += "master_harbor.stop\n" - elif sourcename == "live_dj": - command += "live_dj_harbor.stop\n" + if sourcename not in ("master_dj", "live_dj"): + raise ValueError(f"invalid source name: {sourcename}") try: - self.telnet_lock.acquire() - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - logger.info(command) - connection.write(command.encode("utf-8")) - connection.write(b"exit\n") - connection.read_all().decode("utf-8") - except Exception as exception: + logger.debug("Disconnecting source: %s", sourcename) + self.liq_client.source_disconnect(sourcename) + except (ConnectionError, TimeoutError) as exception: logger.exception(exception) - finally: - self.telnet_lock.release() @ls_timeout - def telnet_send(self, commands): - try: - self.telnet_lock.acquire() - - connection = telnetlib.Telnet(self.ls_host, self.ls_port) - for line in commands: - logger.info(line) - if type(line) is str: - line = line.encode("utf-8") - connection.write(line) - - connection.write(b"exit\n") - connection.read_all().decode("utf-8") - except Exception as exception: - logger.exception(exception) - finally: - self.telnet_lock.release() - def switch_source(self, sourcename, status): - logger.debug('Switching source: %s to "%s" status', sourcename, status) - command = "streams." - if sourcename == "master_dj": - command += "master_dj_" - elif sourcename == "live_dj": - command += "live_dj_" - elif sourcename == "scheduled_play": - command += "scheduled_play_" + if sourcename not in ("master_dj", "live_dj", "scheduled_play"): + raise ValueError(f"invalid source name: {sourcename}") - if status == "on": - command += "start\n" - else: - command += "stop\n" - - self.telnet_send([command]) + try: + logger.debug('Switching source: %s to "%s" status', sourcename, status) + self.liq_client.source_switch_status(sourcename, status == "on") + except (ConnectionError, TimeoutError) as exception: + logger.exception(exception) diff --git a/playout/libretime_playout/player/push.py b/playout/libretime_playout/player/push.py index 2403025f4..4d56ac84f 100644 --- a/playout/libretime_playout/player/push.py +++ b/playout/libretime_playout/player/push.py @@ -7,6 +7,7 @@ from threading import Thread from loguru import logger from ..config import PUSH_INTERVAL, Config +from .liquidsoap import PypoLiquidsoap from .queue import PypoLiqQueue @@ -21,11 +22,10 @@ def is_file(media_item): class PypoPush(Thread): name = "push" - def __init__(self, q, telnet_lock, pypo_liquidsoap, config: Config): + def __init__(self, q, pypo_liquidsoap: PypoLiquidsoap, config: Config): Thread.__init__(self) self.queue = q - self.telnet_lock = telnet_lock self.config = config self.pushed_objects = {}