libretime/playout/libretime_playout/liquidsoap/client/_connection.py

132 lines
3.5 KiB
Python

import logging
from pathlib import Path
from socket import AF_UNIX, SOCK_STREAM, create_connection, socket
from threading import Lock
from typing import Optional
logger = logging.getLogger(__name__)
class InvalidConnection(Exception):
"""
Call was made with an invalid connection
"""
class LiquidsoapConnection:
_host: str
_port: int
_path: Optional[Path] = None
_timeout: int
_lock: Lock
_sock: Optional[socket] = None
_eof = b"END"
def __init__(
self,
host: str = "localhost",
port: int = 0,
path: Optional[Path] = None,
timeout: int = 5,
):
"""
Create a connection to a Liquidsoap server.
Args:
host: Host of the Liquidsoap server. Defaults to "localhost".
port: Port of the Liquidsoap server. Defaults to 0.
path: Unix socket path of the Liquidsoap server. If defined, use a unix
socket instead of the host and port address. Defaults to None.
timeout: Socket timeout. Defaults to 5.
"""
self._lock = Lock()
self._path = path
self._host = host
self._port = port
self._timeout = timeout
def address(self) -> str:
return f"{self._host}:{self._port}" if self._path is None else str(self._path)
def __enter__(self):
try:
self.connect()
return self
except OSError as exception:
self._sock = None
self._lock.release()
raise exception
def __exit__(self, exc_type, exc_value, _traceback):
self.close()
def connect(self):
logger.debug("trying to acquire lock")
# pylint: disable=consider-using-with
self._lock.acquire()
logger.debug("connecting to %s", self.address())
if self._path is not None:
self._sock = socket(AF_UNIX, SOCK_STREAM)
self._sock.settimeout(self._timeout)
self._sock.connect(str(self._path))
else:
self._sock = create_connection(
address=(self._host, self._port),
timeout=self._timeout,
)
def close(self):
if self._sock is not None:
logger.debug("closing connection to %s", self.address())
self.write("exit")
# Reading for clean exit
while self._sock.recv(1024):
continue
sock = self._sock
self._sock = None
sock.close()
self._lock.release()
def write(self, *messages: str):
if self._sock is None:
raise InvalidConnection()
for message in messages:
logger.debug("sending %s", message)
buffer = message.encode(encoding="utf-8")
buffer += b"\n"
self._sock.sendall(buffer)
def read(self) -> str:
if self._sock is None:
raise InvalidConnection()
chunks = []
while True:
chunk = self._sock.recv(1024)
if not chunk:
break
eof_index = chunk.find(self._eof)
if eof_index >= 0:
chunk = chunk[:eof_index]
chunks.append(chunk)
break
chunks.append(chunk)
buffer = b"".join(chunks)
buffer = buffer.replace(b"\r\n", b"\n")
buffer = buffer.rstrip(b"END")
buffer = buffer.strip(b"\n")
message = buffer.decode("utf-8")
logger.debug("received %s", message)
return message