feat(playout): integrate new liquisoap client

This commit is contained in:
jo 2022-08-16 13:34:02 +02:00 committed by Kyle Robbertze
parent 624a60c4af
commit da6458caea
5 changed files with 97 additions and 387 deletions

View File

@ -4,13 +4,11 @@ Python part of radio playout (pypo)
import signal
import sys
import telnetlib
import time
from datetime import datetime
from pathlib import Path
from queue import Queue
from threading import Lock
from typing import Optional, Tuple
from typing import Optional
import click
from libretime_api_client.v1 import ApiClient as LegacyClient
@ -22,14 +20,14 @@ from loguru import logger
from .config import CACHE_DIR, RECORD_DIR, Config
from .history.stats import StatsCollectorThread
from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION, parse_liquidsoap_version
from .liquidsoap.client import LiquidsoapClient
from .liquidsoap.version import LIQUIDSOAP_MIN_VERSION
from .message_handler import PypoMessageHandler
from .player.fetch import PypoFetch
from .player.file import PypoFile
from .player.liquidsoap import PypoLiquidsoap
from .player.push import PypoPush
from .recorder import Recorder
from .timeout import ls_timeout
class Global:
@ -45,56 +43,6 @@ def keyboardInterruptHandler(signum, frame):
sys.exit(0)
@ls_timeout
def liquidsoap_get_info(telnet_lock, host, port) -> Optional[Tuple[int, int, int]]:
logger.debug("Checking to see if Liquidsoap is running")
try:
telnet_lock.acquire()
tn = telnetlib.Telnet(host, port)
msg = "version\n"
tn.write(msg.encode("utf-8"))
tn.write(b"exit\n")
response = tn.read_all().decode("utf-8")
except Exception as e:
logger.error(e)
return None
finally:
telnet_lock.release()
return parse_liquidsoap_version(response)
def liquidsoap_startup_test(telnet_lock, liquidsoap_host, liquidsoap_port):
liquidsoap_version = liquidsoap_get_info(
telnet_lock,
liquidsoap_host,
liquidsoap_port,
)
while not liquidsoap_version:
logger.warning("Liquidsoap doesn't appear to be running! Trying again later...")
time.sleep(1)
liquidsoap_version = liquidsoap_get_info(
telnet_lock,
liquidsoap_host,
liquidsoap_port,
)
while not LIQUIDSOAP_MIN_VERSION <= liquidsoap_version:
logger.warning(
f"Found invalid Liquidsoap version! "
f"Liquidsoap<={LIQUIDSOAP_MIN_VERSION} is required!"
)
time.sleep(5)
liquidsoap_version = liquidsoap_get_info(
telnet_lock,
liquidsoap_host,
liquidsoap_port,
)
logger.info(f"Liquidsoap version {liquidsoap_version}")
@click.command(context_settings={"auto_envvar_prefix": DEFAULT_ENV_PREFIX})
@cli_logging_options()
@cli_config_options()
@ -144,18 +92,21 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
logger.exception(exception)
time.sleep(10)
telnet_lock = Lock()
liq_client = LiquidsoapClient(
host=config.playout.liquidsoap_host,
port=config.playout.liquidsoap_port,
)
liquidsoap_host = config.playout.liquidsoap_host
liquidsoap_port = config.playout.liquidsoap_port
liquidsoap_startup_test(telnet_lock, liquidsoap_host, liquidsoap_port)
logger.debug("Checking if Liquidsoap is running")
liq_version = liq_client.wait_for_version()
if not LIQUIDSOAP_MIN_VERSION <= liq_version:
raise Exception(f"Invalid liquidsoap version {liq_version}")
pypoFetch_q = Queue()
recorder_q = Queue()
pypoPush_q = Queue()
pypo_liquidsoap = PypoLiquidsoap(telnet_lock, liquidsoap_host, liquidsoap_port)
pypo_liquidsoap = PypoLiquidsoap(liq_client)
# This queue is shared between pypo-fetch and pypo-file, where pypo-file
# is the consumer. Pypo-fetch will send every schedule it gets to pypo-file
@ -176,7 +127,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
pypoFetch_q,
pypoPush_q,
media_q,
telnet_lock,
liq_client,
pypo_liquidsoap,
config,
api_client,
@ -185,7 +136,7 @@ def cli(log_level: str, log_filepath: Optional[Path], config_filepath: Optional[
pf.daemon = True
pf.start()
pp = PypoPush(pypoPush_q, telnet_lock, pypo_liquidsoap, config)
pp = PypoPush(pypoPush_q, pypo_liquidsoap, config)
pp.daemon = True
pp.start()

View File

@ -3,9 +3,7 @@ import json
import mimetypes
import os
import signal
import subprocess
import sys
import telnetlib
import time
from datetime import datetime
from queue import Empty
@ -17,7 +15,9 @@ from libretime_api_client.v2 import ApiClient
from loguru import logger
from ..config import CACHE_DIR, POLL_INTERVAL, Config
from ..liquidsoap.client import LiquidsoapClient
from ..timeout import ls_timeout
from .liquidsoap import PypoLiquidsoap
from .schedule import get_schedule
@ -37,8 +37,8 @@ class PypoFetch(Thread):
pypoFetch_q,
pypoPush_q,
media_q,
telnet_lock,
pypo_liquidsoap,
liq_client: LiquidsoapClient,
pypo_liquidsoap: PypoLiquidsoap,
config: Config,
api_client: ApiClient,
legacy_client: LegacyClient,
@ -57,8 +57,7 @@ class PypoFetch(Thread):
self.config = config
self.listener_timeout = POLL_INTERVAL
self.telnet_lock = telnet_lock
self.liq_client = liq_client
self.pypo_liquidsoap = pypo_liquidsoap
self.cache_dir = CACHE_DIR
@ -101,12 +100,12 @@ class PypoFetch(Thread):
self.update_liquidsoap_transition_fade(m["transition_fade"])
elif command == "switch_source":
logger.info("switch_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().switch_source(
self.pypo_liquidsoap.telnet_liquidsoap.switch_source(
m["sourcename"], m["status"]
)
elif command == "disconnect_source":
logger.info("disconnect_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().disconnect_source(
self.pypo_liquidsoap.telnet_liquidsoap.disconnect_source(
m["sourcename"]
)
else:
@ -125,25 +124,7 @@ class PypoFetch(Thread):
except Exception as exception:
logger.exception(exception)
def switch_source_temp(self, sourcename, status):
logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
if sourcename == "master_dj":
command += "master_dj_"
elif sourcename == "live_dj":
command += "live_dj_"
elif sourcename == "scheduled_play":
command += "scheduled_play_"
if status == "on":
command += "start\n"
else:
command += "stop\n"
return command
# Initialize Liquidsoap environment
def set_bootstrap_variables(self):
logger.debug("Getting information needed on bootstrap from Airtime")
try:
@ -152,59 +133,33 @@ class PypoFetch(Thread):
logger.exception(f"Unable to get bootstrap info: {exception}")
logger.debug("info:%s", info)
commands = []
for k, v in info["switch_status"].items():
commands.append(self.switch_source_temp(k, v))
stream_format = info["stream_label"]
station_name = info["station_name"]
fade = info["transition_fade"]
try:
for source_name, source_status in info["switch_status"].items():
self.pypo_liquidsoap.liq_client.source_switch_status(
name=source_name,
streaming=source_status == "on",
)
commands.append(
("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
)
commands.append(("vars.station_name %s\n" % station_name).encode("utf-8"))
commands.append(("vars.default_dj_fade %s\n" % fade).encode("utf-8"))
self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands)
self.pypo_liquidsoap.liq_client.settings_update(
station_name=info["station_name"],
message_format=info["stream_label"],
input_fade_transition=info["transition_fade"],
)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
self.pypo_liquidsoap.clear_all_queues()
self.pypo_liquidsoap.clear_queue_tracker()
def restart_liquidsoap(self):
try:
# do not block - if we receive the lock then good - no other thread
# will try communicating with Liquidsoap. If we don't receive, it may
# mean some thread blocked and is still holding the lock. Restarting
# Liquidsoap will cause that thread to release the lock as an Exception
# will be thrown.
self.telnet_lock.acquire(False)
logger.info("Restarting Liquidsoap")
subprocess.call(
"kill -9 `pidof libretime-liquidsoap`", shell=True, close_fds=True
)
# Wait here and poll Liquidsoap until it has started up
logger.info("Waiting for Liquidsoap to start")
while True:
try:
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
tn.write(b"exit\n")
tn.read_all()
logger.info("Liquidsoap is up and running")
break
except Exception:
# sleep 0.5 seconds and try again
time.sleep(0.5)
self.liq_client.restart()
logger.info("Liquidsoap is up and running")
except Exception as exception:
logger.exception(exception)
finally:
if self.telnet_lock.locked():
self.telnet_lock.release()
# NOTE: This function is quite short after it was refactored.
@ -220,35 +175,18 @@ class PypoFetch(Thread):
"""
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = (
"vars.bootup_time " + str(current_time) + "\n"
).encode("utf-8")
logger.info(boot_up_time_command)
tn.write(boot_up_time_command)
with self.liq_client.conn:
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
self.liq_client.conn.write(f"vars.bootup_time {str(current_time)}")
self.liq_client.conn.read()
connection_status = b"streams.connection_status\n"
logger.info(connection_status)
tn.write(connection_status)
tn.write(b"exit\n")
output = tn.read_all()
except Exception as exception:
self.liq_client.conn.write("streams.connection_status")
stream_info = self.liq_client.conn.read().splitlines()[0]
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
output_list = output.split("\r\n")
stream_info = output_list[2]
# streamin info is in the form of:
# eg. s1:true,2:true,3:false
@ -267,65 +205,23 @@ class PypoFetch(Thread):
@ls_timeout
def update_liquidsoap_stream_format(self, stream_format):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
command = ("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
logger.info(command)
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as exception:
self.liq_client.settings_update(message_format=stream_format)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def update_liquidsoap_transition_fade(self, fade):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
command = ("vars.default_dj_fade %s\n" % fade).encode("utf-8")
logger.info(command)
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as exception:
self.liq_client.settings_update(input_fade_transition=fade)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
command = ("vars.station_name %s\n" % station_name).encode("utf-8")
logger.info(command)
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
except Exception as exception:
self.liq_client.settings_update(station_name=station_name)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
# Process the schedule

View File

@ -3,13 +3,14 @@ from datetime import datetime, timedelta
from loguru import logger
from ..liquidsoap.client import LiquidsoapClient
from ..utils import seconds_between
from .events import EventKind
from .liquidsoap_gateway import TelnetLiquidsoap
class PypoLiquidsoap:
def __init__(self, telnet_lock, host, port):
def __init__(self, liq_client: LiquidsoapClient):
self.liq_queue_tracker = {
"s0": None,
"s1": None,
@ -18,13 +19,12 @@ class PypoLiquidsoap:
"s4": None,
}
self.liq_client = liq_client
self.telnet_liquidsoap = TelnetLiquidsoap(
telnet_lock, host, port, list(self.liq_queue_tracker.keys())
liq_client,
list(self.liq_queue_tracker.keys()),
)
def get_telnet_dispatcher(self):
return self.telnet_liquidsoap
def play(self, media_item):
if media_item["type"] == EventKind.FILE:
self.handle_file_type(media_item)

View File

@ -1,7 +1,8 @@
import telnetlib
from typing import List
from loguru import logger
from ..liquidsoap.client import LiquidsoapClient
from ..timeout import ls_timeout
@ -43,229 +44,91 @@ def create_liquidsoap_annotation(media):
class TelnetLiquidsoap:
def __init__(self, telnet_lock, ls_host, ls_port, queues):
self.telnet_lock = telnet_lock
self.ls_host = ls_host
self.ls_port = ls_port
def __init__(self, liq_client: LiquidsoapClient, queues: List[str]):
self.liq_client = liq_client
self.queues = queues
self.current_prebuffering_stream_id = None
def __connect(self):
return telnetlib.Telnet(self.ls_host, self.ls_port)
@ls_timeout
def queue_clear_all(self):
try:
self.telnet_lock.acquire()
connection = self.__connect()
for i in self.queues:
msg = "queues.%s_skip\n" % i
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
finally:
self.telnet_lock.release()
self.liq_client.queues_remove(*self.queues)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
@ls_timeout
def queue_remove(self, queue_id):
try:
self.telnet_lock.acquire()
connection = self.__connect()
msg = "queues.%s_skip\n" % queue_id
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
finally:
self.telnet_lock.release()
self.liq_client.queues_remove(queue_id)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
@ls_timeout
def queue_push(self, queue_id, media_item):
try:
self.telnet_lock.acquire()
connection = self.__connect()
annotation = create_liquidsoap_annotation(media_item)
msg = f"{queue_id}.push {annotation}\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
show_name = media_item["show_name"]
msg = "vars.show_name %s\n" % show_name
connection.write(msg.encode("utf-8"))
logger.debug(msg)
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
finally:
self.telnet_lock.release()
self.liq_client.queue_push(queue_id, annotation, media_item["show_name"])
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
@ls_timeout
def stop_web_stream_buffer(self):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = "http.stop\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
msg = "dynamic_source.id -1\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception as exception:
self.liq_client.web_stream_stop_buffer()
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def stop_web_stream_output(self):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = "dynamic_source.output_stop\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception as exception:
self.liq_client.web_stream_stop()
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def start_web_stream(self, media_item):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
# TODO: DO we need this?
msg = "streams.scheduled_play_start\n"
connection.write(msg.encode("utf-8"))
msg = "dynamic_source.output_start\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
self.liq_client.web_stream_start()
self.current_prebuffering_stream_id = None
except Exception as exception:
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def start_web_stream_buffer(self, media_item):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = "dynamic_source.id %s\n" % media_item["row_id"]
logger.debug(msg)
connection.write(msg.encode("utf-8"))
msg = "http.restart %s\n" % media_item["uri"]
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
self.liq_client.web_stream_start_buffer(
media_item["row_id"],
media_item["uri"],
)
self.current_prebuffering_stream_id = media_item["row_id"]
except Exception as exception:
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def get_current_stream_id(self):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = "dynamic_source.get_id\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
stream_id = connection.read_all().decode("utf-8").splitlines()[0]
logger.debug("stream_id: %s" % stream_id)
return stream_id
except Exception as exception:
return self.liq_client.web_stream_get_id()
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def disconnect_source(self, sourcename):
logger.debug("Disconnecting source: %s", sourcename)
command = ""
if sourcename == "master_dj":
command += "master_harbor.stop\n"
elif sourcename == "live_dj":
command += "live_dj_harbor.stop\n"
if sourcename not in ("master_dj", "live_dj"):
raise ValueError(f"invalid source name: {sourcename}")
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
logger.info(command)
connection.write(command.encode("utf-8"))
connection.write(b"exit\n")
connection.read_all().decode("utf-8")
except Exception as exception:
logger.debug("Disconnecting source: %s", sourcename)
self.liq_client.source_disconnect(sourcename)
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
@ls_timeout
def telnet_send(self, commands):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
for line in commands:
logger.info(line)
if type(line) is str:
line = line.encode("utf-8")
connection.write(line)
connection.write(b"exit\n")
connection.read_all().decode("utf-8")
except Exception as exception:
logger.exception(exception)
finally:
self.telnet_lock.release()
def switch_source(self, sourcename, status):
logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
if sourcename == "master_dj":
command += "master_dj_"
elif sourcename == "live_dj":
command += "live_dj_"
elif sourcename == "scheduled_play":
command += "scheduled_play_"
if sourcename not in ("master_dj", "live_dj", "scheduled_play"):
raise ValueError(f"invalid source name: {sourcename}")
if status == "on":
command += "start\n"
else:
command += "stop\n"
self.telnet_send([command])
try:
logger.debug('Switching source: %s to "%s" status', sourcename, status)
self.liq_client.source_switch_status(sourcename, status == "on")
except (ConnectionError, TimeoutError) as exception:
logger.exception(exception)

View File

@ -7,6 +7,7 @@ from threading import Thread
from loguru import logger
from ..config import PUSH_INTERVAL, Config
from .liquidsoap import PypoLiquidsoap
from .queue import PypoLiqQueue
@ -21,11 +22,10 @@ def is_file(media_item):
class PypoPush(Thread):
name = "push"
def __init__(self, q, telnet_lock, pypo_liquidsoap, config: Config):
def __init__(self, q, pypo_liquidsoap: PypoLiquidsoap, config: Config):
Thread.__init__(self)
self.queue = q
self.telnet_lock = telnet_lock
self.config = config
self.pushed_objects = {}