libretime/playout/libretime_playout/liquidsoap/client/_client.py

146 lines
4.7 KiB
Python

import logging
from pathlib import Path
from time import sleep
from typing import Any, Literal, Optional, Tuple, Union
from ..models import MessageFormatKind
from ..utils import quote
from ..version import parse_liquidsoap_version
from ._connection import LiquidsoapConnection
logger = logging.getLogger(__name__)
class LiquidsoapClientError(Exception):
"""
A Liquidsoap client error
"""
class LiquidsoapClient:
"""
A client to communicate with a running Liquidsoap server.
"""
conn: LiquidsoapConnection
def __init__(
self,
host: str = "localhost",
port: int = 0,
path: Optional[Path] = None,
timeout: int = 15,
):
self.conn = LiquidsoapConnection(
host=host,
port=port,
path=path,
timeout=timeout,
)
def _quote(self, value: Any) -> str:
return quote(value, double=True)
def _set_var(self, name: str, value: Any) -> None:
self.conn.write(f"var.set {name} = {value}")
result = self.conn.read()
if f"Variable {name} set" not in result:
logger.error(result)
def version(self) -> Tuple[int, int, int]:
with self.conn:
self.conn.write("version")
return parse_liquidsoap_version(self.conn.read())
def wait_for_version(self, timeout: int = 30) -> Tuple[int, int, int]:
while timeout > 0:
try:
version = self.version()
logger.info("found version %s", version)
return version
except OSError as exception:
logger.warning("could not get version: %s", exception)
timeout -= 1
sleep(1)
raise LiquidsoapClientError("could not get liquidsoap version")
def queues_remove(self, *queues: int) -> None:
with self.conn:
for queue_id in queues:
self.conn.write(f"queues.s{queue_id}_skip")
def queue_push(self, queue_id: int, entry: str, show_name: str) -> None:
with self.conn:
self.conn.write(f"s{queue_id}.push {entry}")
self.conn.read() # Flush
self._set_var("show_name", self._quote(show_name))
def web_stream_get_id(self) -> str:
with self.conn:
self.conn.write("web_stream.get_id")
return self.conn.read().splitlines()[0]
def web_stream_start(self) -> None:
with self.conn:
self.conn.write("sources.start_schedule")
self.conn.write("sources.start_web_stream")
def web_stream_start_buffer(self, schedule_id: int, uri: str) -> None:
with self.conn:
self.conn.write(f"web_stream.set_id {schedule_id}")
self.conn.write(f"http.restart {uri}")
def web_stream_stop(self) -> None:
with self.conn:
self.conn.write("sources.stop_web_stream")
def web_stream_stop_buffer(self) -> None:
with self.conn:
self.conn.write("http.stop")
self.conn.write("web_stream.set_id -1")
def source_disconnect(self, name: Literal["master_dj", "live_dj"]) -> None:
command_map = {
"master_dj": "master_harbor.stop",
"live_dj": "live_dj_harbor.stop",
}
with self.conn:
self.conn.write(command_map[name])
def source_switch_status(
self,
name: Literal["master_dj", "live_dj", "scheduled_play"],
streaming: bool,
) -> None:
name_map = {
"master_dj": "input_main",
"live_dj": "input_show",
"scheduled_play": "schedule",
}
action = "start" if streaming else "stop"
with self.conn:
self.conn.write(f"sources.{action}_{name_map[name]}")
def settings_update(
self,
*,
station_name: Optional[str] = None,
message_format: Optional[Union[MessageFormatKind, int]] = None,
message_offline: Optional[str] = None,
input_fade_transition: Optional[float] = None,
) -> None:
with self.conn:
if station_name is not None:
self._set_var("station_name", self._quote(station_name))
if message_format is not None:
if isinstance(message_format, MessageFormatKind):
message_format = message_format.value
# Use an interactive.string until Liquidsoap have interactive.int
# variables
self._set_var("message_format", self._quote(message_format))
if message_offline is not None:
self._set_var("message_offline", self._quote(message_offline))
if input_fade_transition is not None:
self._set_var("input_fade_transition", input_fade_transition)