132 lines
3.5 KiB
Python
132 lines
3.5 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from socket import AF_UNIX, SOCK_STREAM, create_connection, socket
|
|
from threading import Lock
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class InvalidConnection(Exception):
|
|
"""
|
|
Call was made with an invalid connection
|
|
"""
|
|
|
|
|
|
class LiquidsoapConnection:
|
|
_host: str
|
|
_port: int
|
|
_path: Optional[Path] = None
|
|
_timeout: int
|
|
|
|
_lock: Lock
|
|
_sock: Optional[socket] = None
|
|
_eof = b"END"
|
|
|
|
def __init__(
|
|
self,
|
|
host: str = "localhost",
|
|
port: int = 0,
|
|
path: Optional[Path] = None,
|
|
timeout: int = 5,
|
|
):
|
|
"""
|
|
Create a connection to a Liquidsoap server.
|
|
|
|
Args:
|
|
host: Host of the Liquidsoap server. Defaults to "localhost".
|
|
port: Port of the Liquidsoap server. Defaults to 0.
|
|
path: Unix socket path of the Liquidsoap server. If defined, use a unix
|
|
socket instead of the host and port address. Defaults to None.
|
|
timeout: Socket timeout. Defaults to 5.
|
|
"""
|
|
self._lock = Lock()
|
|
self._path = path
|
|
self._host = host
|
|
self._port = port
|
|
self._timeout = timeout
|
|
|
|
def address(self) -> str:
|
|
return f"{self._host}:{self._port}" if self._path is None else str(self._path)
|
|
|
|
def __enter__(self):
|
|
try:
|
|
self.connect()
|
|
return self
|
|
except OSError as exception:
|
|
self._sock = None
|
|
self._lock.release()
|
|
raise exception
|
|
|
|
def __exit__(self, exc_type, exc_value, _traceback):
|
|
self.close()
|
|
|
|
def connect(self):
|
|
logger.debug("trying to acquire lock")
|
|
# pylint: disable=consider-using-with
|
|
self._lock.acquire()
|
|
logger.debug("connecting to %s", self.address())
|
|
|
|
if self._path is not None:
|
|
self._sock = socket(AF_UNIX, SOCK_STREAM)
|
|
self._sock.settimeout(self._timeout)
|
|
self._sock.connect(str(self._path))
|
|
else:
|
|
self._sock = create_connection(
|
|
address=(self._host, self._port),
|
|
timeout=self._timeout,
|
|
)
|
|
|
|
def close(self):
|
|
if self._sock is not None:
|
|
logger.debug("closing connection to %s", self.address())
|
|
|
|
self.write("exit")
|
|
# Reading for clean exit
|
|
while self._sock.recv(1024):
|
|
continue
|
|
|
|
sock = self._sock
|
|
self._sock = None
|
|
sock.close()
|
|
|
|
self._lock.release()
|
|
|
|
def write(self, *messages: str):
|
|
if self._sock is None:
|
|
raise InvalidConnection()
|
|
|
|
for message in messages:
|
|
logger.debug("sending %s", message)
|
|
buffer = message.encode(encoding="utf-8")
|
|
buffer += b"\n"
|
|
|
|
self._sock.sendall(buffer)
|
|
|
|
def read(self) -> str:
|
|
if self._sock is None:
|
|
raise InvalidConnection()
|
|
|
|
chunks = []
|
|
while True:
|
|
chunk = self._sock.recv(1024)
|
|
if not chunk:
|
|
break
|
|
|
|
eof_index = chunk.find(self._eof)
|
|
if eof_index >= 0:
|
|
chunk = chunk[:eof_index]
|
|
chunks.append(chunk)
|
|
break
|
|
|
|
chunks.append(chunk)
|
|
|
|
buffer = b"".join(chunks)
|
|
buffer = buffer.replace(b"\r\n", b"\n")
|
|
buffer = buffer.rstrip(b"END")
|
|
buffer = buffer.strip(b"\n")
|
|
message = buffer.decode("utf-8")
|
|
|
|
logger.debug("received %s", message)
|
|
return message
|