From 624a60c4af7db7ab56ef63540f4c4a71b1409b80 Mon Sep 17 00:00:00 2001 From: jo Date: Sat, 13 Aug 2022 20:27:39 +0200 Subject: [PATCH] feat(playout): create liquidsoap client --- .../liquidsoap/client/__init__.py | 2 + .../liquidsoap/client/_client.py | 142 +++++++++++++++ .../liquidsoap/client/_connection.py | 130 ++++++++++++++ .../tests/liquidsoap/client/client_test.py | 9 + .../liquidsoap/client/connection_test.py | 167 ++++++++++++++++++ 5 files changed, 450 insertions(+) create mode 100644 playout/libretime_playout/liquidsoap/client/__init__.py create mode 100644 playout/libretime_playout/liquidsoap/client/_client.py create mode 100644 playout/libretime_playout/liquidsoap/client/_connection.py create mode 100644 playout/tests/liquidsoap/client/client_test.py create mode 100644 playout/tests/liquidsoap/client/connection_test.py diff --git a/playout/libretime_playout/liquidsoap/client/__init__.py b/playout/libretime_playout/liquidsoap/client/__init__.py new file mode 100644 index 000000000..20f29a033 --- /dev/null +++ b/playout/libretime_playout/liquidsoap/client/__init__.py @@ -0,0 +1,2 @@ +from ._client import LiquidsoapClient +from ._connection import LiquidsoapConnection diff --git a/playout/libretime_playout/liquidsoap/client/_client.py b/playout/libretime_playout/liquidsoap/client/_client.py new file mode 100644 index 000000000..4c0e5714b --- /dev/null +++ b/playout/libretime_playout/liquidsoap/client/_client.py @@ -0,0 +1,142 @@ +from pathlib import Path +from subprocess import CalledProcessError, check_output, run +from time import sleep +from typing import Optional, Tuple + +from loguru import logger +from typing_extensions import Literal + +from ..version import parse_liquidsoap_version +from ._connection import LiquidsoapConnection + + +class LiquidsoapClientError(Exception): + """ + A Liquidsoap client error + """ + + +class LiquidsoapClient: + """ + A client to communicate with a running Liquidsoap server. + """ + + conn: LiquidsoapConnection + + def __init__( + self, + host: str = "localhost", + port: int = 0, + path: Optional[Path] = None, + timeout: int = 15, + ): + self.conn = LiquidsoapConnection( + host=host, + port=port, + path=path, + timeout=timeout, + ) + + def version(self) -> Tuple[int, int, int]: + with self.conn: + self.conn.write("version") + return parse_liquidsoap_version(self.conn.read()) + + def wait_for_version(self, timeout: int = 30) -> Tuple[int, int, int]: + while timeout > 0: + try: + version = self.version() + logger.info(f"found version {version}") + return version + except (ConnectionError, TimeoutError) as exception: + logger.warning(f"could not get version: {exception}") + timeout -= 1 + sleep(1) + + raise LiquidsoapClientError("could not get liquidsoap version") + + def queues_remove(self, *queues: int) -> None: + with self.conn: + for queue_id in queues: + self.conn.write(f"queues.{queue_id}_skip") + + def queue_push(self, queue_id: int, entry: str, show_name: str) -> None: + with self.conn: + self.conn.write(f"{queue_id}.push {entry}") + self.conn.write(f"vars.show_name {show_name}") + + def web_stream_get_id(self) -> str: + with self.conn: + self.conn.write("dynamic_source.get_id") + return self.conn.read().splitlines()[0] + + def web_stream_start(self) -> None: + with self.conn: + self.conn.write("streams.scheduled_play_start") + self.conn.write("dynamic_source.output_start") + + def web_stream_start_buffer(self, schedule_id: int, uri: str) -> None: + with self.conn: + self.conn.write(f"dynamic_source.id {schedule_id}") + self.conn.write(f"http.restart {uri}") + + def web_stream_stop(self) -> None: + with self.conn: + self.conn.write("dynamic_source.output_stop") + + def web_stream_stop_buffer(self) -> None: + with self.conn: + self.conn.write("http.stop") + self.conn.write("dynamic_source.id -1") + + def source_disconnect(self, name: Literal["master_dj", "live_dj"]) -> None: + command_map = { + "master_dj": "master_harbor.stop", + "live_dj": "live_dj_harbor.stop", + } + with self.conn: + self.conn.write(command_map[name]) + + def source_switch_status( + self, + name: Literal["master_dj", "live_dj", "scheduled_play"], + streaming: bool, + ) -> None: + action = "start" if streaming else "stop" + with self.conn: + self.conn.write(f"streams.{name}_{action}") + + def settings_update( + self, + *, + station_name: Optional[str] = None, + message_format: Optional[int] = None, + input_fade_transition: Optional[float] = None, + ): + with self.conn: + if station_name is not None: + self.conn.write(f"vars.station_name {station_name}") + self.conn.read() + if message_format is not None: + self.conn.write(f"vars.stream_metadata_type {message_format}") + self.conn.read() + if input_fade_transition is not None: + self.conn.write(f"vars.default_dj_fade {input_fade_transition}") + self.conn.read() + + def restart(self): + logger.warning("restarting Liquidsoap") + + try: + output = check_output(("pidof", "libretime-liquidsoap")) + liq_pid = output.strip().decode("utf-8") + logger.debug(f"found liquidsoap pid {liq_pid}") + + run(("kill", "-9", liq_pid), check=True) + except CalledProcessError as exception: + raise LiquidsoapClientError("could not restart liquidsoap") from exception + + # Wait for the previous process to shutdown. + sleep(1) + + self.wait_for_version() diff --git a/playout/libretime_playout/liquidsoap/client/_connection.py b/playout/libretime_playout/liquidsoap/client/_connection.py new file mode 100644 index 000000000..6ed3cc51d --- /dev/null +++ b/playout/libretime_playout/liquidsoap/client/_connection.py @@ -0,0 +1,130 @@ +from pathlib import Path +from socket import AF_UNIX, SOCK_STREAM, create_connection, socket +from threading import Lock +from typing import Optional + +from loguru import logger + + +class InvalidConnection(Exception): + """ + Call was made with an invalid connection + """ + + +class LiquidsoapConnection: + _host: str + _port: int + _path: Optional[Path] = None + _timeout: int + + _lock: Lock + _sock: Optional[socket] = None + _eof = b"END" + + def __init__( + self, + host: str = "localhost", + port: int = 0, + path: Optional[Path] = None, + timeout: int = 5, + ): + """ + Create a connection to a Liquidsoap server. + + Args: + host: Host of the Liquidsoap server. Defaults to "localhost". + port: Port of the Liquidsoap server. Defaults to 0. + path: Unix socket path of the Liquidsoap server. If defined, use a unix + socket instead of the host and port address. Defaults to None. + timeout: Socket timeout. Defaults to 5. + """ + self._lock = Lock() + self._path = path + self._host = host + self._port = port + self._timeout = timeout + + def address(self): + return f"{self._host}:{self._port}" if self._path is None else self._path + + def __enter__(self): + try: + self.connect() + return self + except (ConnectionError, TimeoutError) as exception: + self._sock = None + self._lock.release() + raise exception + + def __exit__(self, exc_type, exc_value, _traceback): + self.close() + + def connect(self): + logger.trace("trying to acquire lock") + # pylint: disable=consider-using-with + self._lock.acquire() + logger.debug(f"connecting to {self.address()}") + + if self._path is not None: + self._sock = socket(AF_UNIX, SOCK_STREAM) + self._sock.settimeout(self._timeout) + self._sock.connect(str(self._path)) + else: + self._sock = create_connection( + address=(self._host, self._port), + timeout=self._timeout, + ) + + def close(self): + if self._sock is not None: + logger.debug(f"closing connection to {self.address()}") + + self.write("exit") + # Reading for clean exit + while self._sock.recv(1024): + continue + + sock = self._sock + self._sock = None + sock.close() + + self._lock.release() + + def write(self, *messages: str): + if self._sock is None: + raise InvalidConnection() + + for message in messages: + logger.debug(f"sending {message}") + buffer = message.encode(encoding="utf-8") + buffer += b"\n" + + self._sock.sendall(buffer) + + def read(self) -> str: + if self._sock is None: + raise InvalidConnection() + + chunks = [] + while True: + chunk = self._sock.recv(1024) + if not chunk: + break + + eof_index = chunk.find(self._eof) + if eof_index >= 0: + chunk = chunk[:eof_index] + chunks.append(chunk) + break + + chunks.append(chunk) + + buffer = b"".join(chunks) + buffer = buffer.replace(b"\r\n", b"\n") + buffer = buffer.rstrip(b"END") + buffer = buffer.strip(b"\n") + message = buffer.decode("utf-8") + + logger.debug(f"received {message}") + return message diff --git a/playout/tests/liquidsoap/client/client_test.py b/playout/tests/liquidsoap/client/client_test.py new file mode 100644 index 000000000..7ed6074a8 --- /dev/null +++ b/playout/tests/liquidsoap/client/client_test.py @@ -0,0 +1,9 @@ +from libretime_playout.liquidsoap.client import LiquidsoapClient + + +def test_liq_client(): + assert LiquidsoapClient( + host="localhost", + port=1234, + timeout=15, + ) diff --git a/playout/tests/liquidsoap/client/connection_test.py b/playout/tests/liquidsoap/client/connection_test.py new file mode 100644 index 000000000..94953541c --- /dev/null +++ b/playout/tests/liquidsoap/client/connection_test.py @@ -0,0 +1,167 @@ +from pathlib import Path +from random import randint +from subprocess import PIPE, STDOUT, Popen +from textwrap import dedent +from time import sleep + +import pytest +from libretime_shared.logging import TRACE, setup_logger +from loguru import logger + +from libretime_playout.liquidsoap.client import LiquidsoapConnection +from libretime_playout.liquidsoap.version import get_liquidsoap_version + +setup_logger(TRACE) + +LIQ_VERSION = get_liquidsoap_version() +LIQ_VERSION_STR = ".".join(map(str, LIQ_VERSION)) + +pytestmark = pytest.mark.skipif( + LIQ_VERSION >= (2, 0, 0), + reason="unsupported liquidsoap >= 2.0.0", +) + +LIQ_SCRIPT = """ +set("log.file", false) +{settings} + +var1 = interactive.string("var1", "default") + +output.dummy(blank(id="safe_blank")) +""" + +LIQ_TELNET_SETTINGS = """ +set("server.telnet", true) +set("server.telnet.port", {telnet_port}) +""" + +LIQ_SOCKET_SETTINGS = """ +set("server.socket", true) +set("server.socket.path", "{socket_path}") +""" + + +@pytest.fixture( + name="liq_conn", + scope="session", + params=["telnet", "socket"], +) +def liq_conn_fixture(request, tmp_path_factory): + tmp_path: Path = tmp_path_factory.mktemp(__name__) + + entrypoint = tmp_path / "main.liq" + + if request.param == "telnet": + telnet_port = randint(32768, 65535) + liq_settings = LIQ_TELNET_SETTINGS.format(telnet_port=telnet_port) + elif request.param == "socket": + socket_path = entrypoint.with_name("main.sock") + liq_settings = LIQ_SOCKET_SETTINGS.format(socket_path=socket_path) + + liq_script = LIQ_SCRIPT.format(settings=liq_settings.strip()) + logger.debug(liq_script) + entrypoint.write_text(liq_script) + + # The --verbose flag seem to hang when testing on bionic in CI + with Popen( + ("liquidsoap", "--debug", str(entrypoint)), + stdout=PIPE, + stderr=STDOUT, + universal_newlines=True, + ) as process: + if request.param == "telnet": + sleep(2) + elif request.param == "socket": + while process.poll() is None and not socket_path.is_socket(): + sleep(0.1) + + if process.poll() is not None: + pytest.fail(process.stdout.read()) + + if request.param == "telnet": + conn = LiquidsoapConnection(host="localhost", port=telnet_port) + elif request.param == "socket": + conn = LiquidsoapConnection(path=socket_path) + + with conn: + yield conn + process.terminate() + + +def test_liq_conn_version(liq_conn: LiquidsoapConnection): + liq_conn.write("version") + result = liq_conn.read() + assert result == f"Liquidsoap {LIQ_VERSION_STR}" + + +def test_liq_conn_allow_reopen(liq_conn: LiquidsoapConnection): + for _ in range(2): + liq_conn.close() + liq_conn.connect() + + liq_conn.write("version") + result = liq_conn.read() + assert result == f"Liquidsoap {LIQ_VERSION_STR}" + + +def test_liq_conn_vars(liq_conn: LiquidsoapConnection): + liq_conn.write("var.get var1") + result = liq_conn.read() + assert result == '"default"' + + liq_conn.write('var.set var1 = "changed"') + result = liq_conn.read() + assert result == 'Variable var1 set (was "default").' + + liq_conn.write("var.get var1") + result = liq_conn.read() + assert result == '"changed"' + + +def test_liq_conn_help(liq_conn: LiquidsoapConnection): + expected = dedent( + """ + Available commands: + | dummy.autostart + | dummy.metadata + | dummy.remaining + | dummy.skip + | dummy.start + | dummy.status + | dummy.stop + | exit + | help [] + | list + | quit + | request.alive + | request.all + | request.metadata + | request.on_air + | request.resolving + | request.trace + | uptime + | var.get + | var.list + | var.set = + | version + + Type "help " for more information. + """ + ).strip() + liq_conn.write("help") + result = liq_conn.read() + assert result == expected + + +def test_liq_conn_raises(): + liq_conn = LiquidsoapConnection(host="localhost", port=12345) + + with pytest.raises(OSError): + with liq_conn: + pass + + liq_conn = LiquidsoapConnection(path="/somewhere/invalid") + + with pytest.raises(OSError): + with liq_conn: + pass