libretime/playout/libretime_playout/liquidsoap/client/_client.py

146 lines
4.7 KiB
Python
Raw Permalink Normal View History

2023-02-26 01:27:00 +01:00
import logging
from pathlib import Path
from time import sleep
from typing import Any, Literal, Optional, Tuple, Union
from ..models import MessageFormatKind
from ..utils import quote
from ..version import parse_liquidsoap_version
from ._connection import LiquidsoapConnection
2023-02-26 01:27:00 +01:00
logger = logging.getLogger(__name__)
class LiquidsoapClientError(Exception):
"""
A Liquidsoap client error
"""
class LiquidsoapClient:
"""
A client to communicate with a running Liquidsoap server.
"""
conn: LiquidsoapConnection
def __init__(
self,
host: str = "localhost",
port: int = 0,
path: Optional[Path] = None,
timeout: int = 15,
):
self.conn = LiquidsoapConnection(
host=host,
port=port,
path=path,
timeout=timeout,
)
2023-03-01 20:27:27 +01:00
def _quote(self, value: Any) -> str:
return quote(value, double=True)
2023-03-01 20:27:27 +01:00
def _set_var(self, name: str, value: Any) -> None:
self.conn.write(f"var.set {name} = {value}")
result = self.conn.read()
if f"Variable {name} set" not in result:
logger.error(result)
def version(self) -> Tuple[int, int, int]:
with self.conn:
self.conn.write("version")
return parse_liquidsoap_version(self.conn.read())
def wait_for_version(self, timeout: int = 30) -> Tuple[int, int, int]:
while timeout > 0:
try:
version = self.version()
logger.info("found version %s", version)
return version
except OSError as exception:
logger.warning("could not get version: %s", exception)
timeout -= 1
sleep(1)
raise LiquidsoapClientError("could not get liquidsoap version")
def queues_remove(self, *queues: int) -> None:
with self.conn:
for queue_id in queues:
self.conn.write(f"queues.s{queue_id}_skip")
def queue_push(self, queue_id: int, entry: str, show_name: str) -> None:
with self.conn:
self.conn.write(f"s{queue_id}.push {entry}")
self.conn.read() # Flush
self._set_var("show_name", self._quote(show_name))
def web_stream_get_id(self) -> str:
with self.conn:
self.conn.write("web_stream.get_id")
return self.conn.read().splitlines()[0]
def web_stream_start(self) -> None:
with self.conn:
self.conn.write("sources.start_schedule")
self.conn.write("sources.start_web_stream")
def web_stream_start_buffer(self, schedule_id: int, uri: str) -> None:
with self.conn:
self.conn.write(f"web_stream.set_id {schedule_id}")
self.conn.write(f"http.restart {uri}")
def web_stream_stop(self) -> None:
with self.conn:
self.conn.write("sources.stop_web_stream")
def web_stream_stop_buffer(self) -> None:
with self.conn:
self.conn.write("http.stop")
self.conn.write("web_stream.set_id -1")
def source_disconnect(self, name: Literal["master_dj", "live_dj"]) -> None:
command_map = {
"master_dj": "master_harbor.stop",
"live_dj": "live_dj_harbor.stop",
}
with self.conn:
self.conn.write(command_map[name])
def source_switch_status(
self,
name: Literal["master_dj", "live_dj", "scheduled_play"],
streaming: bool,
) -> None:
name_map = {
"master_dj": "input_main",
"live_dj": "input_show",
"scheduled_play": "schedule",
}
action = "start" if streaming else "stop"
with self.conn:
self.conn.write(f"sources.{action}_{name_map[name]}")
def settings_update(
self,
*,
station_name: Optional[str] = None,
2023-03-01 20:27:27 +01:00
message_format: Optional[Union[MessageFormatKind, int]] = None,
message_offline: Optional[str] = None,
input_fade_transition: Optional[float] = None,
2023-03-01 20:27:27 +01:00
) -> None:
with self.conn:
if station_name is not None:
self._set_var("station_name", self._quote(station_name))
if message_format is not None:
if isinstance(message_format, MessageFormatKind):
message_format = message_format.value
2023-03-01 20:27:27 +01:00
# Use an interactive.string until Liquidsoap have interactive.int
# variables
self._set_var("message_format", self._quote(message_format))
if message_offline is not None:
self._set_var("message_offline", self._quote(message_offline))
if input_fade_transition is not None:
self._set_var("input_fade_transition", input_fade_transition)