chore(playout): restructure modules (#1971)

This commit is contained in:
Jonas L 2022-07-18 15:11:47 +02:00 committed by GitHub
parent 57046e2a9d
commit 7e2f2d60f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 20 additions and 20 deletions

View file

@ -0,0 +1,10 @@
from enum import Enum
class EventKind(str, Enum):
FILE = "file"
EVENT = "event"
STREAM_BUFFER_START = "stream_buffer_start"
STREAM_OUTPUT_START = "stream_output_start"
STREAM_BUFFER_END = "stream_buffer_end"
STREAM_OUTPUT_END = "stream_output_end"

View file

@ -0,0 +1,536 @@
import copy
import json
import mimetypes
import os
import signal
import subprocess
import sys
import telnetlib
import time
from datetime import datetime
from queue import Empty
from subprocess import PIPE, Popen
from threading import Thread, Timer
from libretime_api_client import version1 as v1_api_client
from libretime_api_client import version2 as api_client
from loguru import logger
from ..config import CACHE_DIR, POLL_INTERVAL, Config
from ..timeout import ls_timeout
from .schedule import get_schedule
def keyboardInterruptHandler(signum, frame):
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
class PypoFetch(Thread):
def __init__(
self,
pypoFetch_q,
pypoPush_q,
media_q,
telnet_lock,
pypo_liquidsoap,
config: Config,
):
Thread.__init__(self)
# Hacky...
PypoFetch.ref = self
self.v1_api_client = v1_api_client.AirtimeApiClient()
self.api_client = api_client.AirtimeApiClient()
self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q
self.media_prepare_queue = media_q
self.last_update_schedule_timestamp = time.time()
self.config = config
self.listener_timeout = POLL_INTERVAL
self.telnet_lock = telnet_lock
self.pypo_liquidsoap = pypo_liquidsoap
self.cache_dir = CACHE_DIR
logger.debug("Cache dir %s", self.cache_dir)
self.schedule_data = []
logger.info("PypoFetch: init complete")
# Handle a message from RabbitMQ, put it into our yucky global var.
# Hopefully there is a better way to do this.
def handle_message(self, message):
try:
logger.info("Received event from Pypo Message Handler: %s" % message)
try:
message = message.decode()
except (UnicodeDecodeError, AttributeError):
pass
m = json.loads(message)
command = m["event_type"]
logger.info("Handling command: " + command)
if command == "update_schedule":
self.schedule_data = m["schedule"]
self.process_schedule(self.schedule_data)
elif command == "reset_liquidsoap_bootstrap":
self.set_bootstrap_variables()
elif command == "update_stream_setting":
logger.info("Updating stream setting...")
self.regenerate_liquidsoap_conf(m["setting"])
elif command == "update_stream_format":
logger.info("Updating stream format...")
self.update_liquidsoap_stream_format(m["stream_format"])
elif command == "update_station_name":
logger.info("Updating station name...")
self.update_liquidsoap_station_name(m["station_name"])
elif command == "update_transition_fade":
logger.info("Updating transition_fade...")
self.update_liquidsoap_transition_fade(m["transition_fade"])
elif command == "switch_source":
logger.info("switch_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().switch_source(
m["sourcename"], m["status"]
)
elif command == "disconnect_source":
logger.info("disconnect_on_source show command received...")
self.pypo_liquidsoap.get_telnet_dispatcher().disconnect_source(
m["sourcename"]
)
else:
logger.info("Unknown command: %s" % command)
# update timeout value
if command == "update_schedule":
self.listener_timeout = POLL_INTERVAL
else:
self.listener_timeout = (
self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL
)
if self.listener_timeout < 0:
self.listener_timeout = 0
logger.info("New timeout: %s" % self.listener_timeout)
except Exception as e:
logger.exception("Exception in handling Message Handler message")
def switch_source_temp(self, sourcename, status):
logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
if sourcename == "master_dj":
command += "master_dj_"
elif sourcename == "live_dj":
command += "live_dj_"
elif sourcename == "scheduled_play":
command += "scheduled_play_"
if status == "on":
command += "start\n"
else:
command += "stop\n"
return command
# Initialize Liquidsoap environment
def set_bootstrap_variables(self):
logger.debug("Getting information needed on bootstrap from Airtime")
try:
info = self.v1_api_client.get_bootstrap_info()
except Exception as e:
logger.exception("Unable to get bootstrap info.. Exiting pypo...")
logger.debug("info:%s", info)
commands = []
for k, v in info["switch_status"].items():
commands.append(self.switch_source_temp(k, v))
stream_format = info["stream_label"]
station_name = info["station_name"]
fade = info["transition_fade"]
commands.append(
("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
)
commands.append(("vars.station_name %s\n" % station_name).encode("utf-8"))
commands.append(("vars.default_dj_fade %s\n" % fade).encode("utf-8"))
self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands)
self.pypo_liquidsoap.clear_all_queues()
self.pypo_liquidsoap.clear_queue_tracker()
def restart_liquidsoap(self):
try:
"""do not block - if we receive the lock then good - no other thread
will try communicating with Liquidsoap. If we don't receive, it may
mean some thread blocked and is still holding the lock. Restarting
Liquidsoap will cause that thread to release the lock as an Exception
will be thrown."""
self.telnet_lock.acquire(False)
logger.info("Restarting Liquidsoap")
subprocess.call(
"kill -9 `pidof libretime-liquidsoap`", shell=True, close_fds=True
)
# Wait here and poll Liquidsoap until it has started up
logger.info("Waiting for Liquidsoap to start")
while True:
try:
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
tn.write(b"exit\n")
tn.read_all()
logger.info("Liquidsoap is up and running")
break
except Exception as e:
# sleep 0.5 seconds and try again
time.sleep(0.5)
except Exception as e:
logger.exception(e)
finally:
if self.telnet_lock.locked():
self.telnet_lock.release()
# NOTE: This function is quite short after it was refactored.
def regenerate_liquidsoap_conf(self, setting):
self.restart_liquidsoap()
self.update_liquidsoap_connection_status()
@ls_timeout
def update_liquidsoap_connection_status(self):
"""
updates the status of Liquidsoap connection to the streaming server
This function updates the bootup time variable in Liquidsoap script
"""
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = (
"vars.bootup_time " + str(current_time) + "\n"
).encode("utf-8")
logger.info(boot_up_time_command)
tn.write(boot_up_time_command)
connection_status = b"streams.connection_status\n"
logger.info(connection_status)
tn.write(connection_status)
tn.write(b"exit\n")
output = tn.read_all()
except Exception as e:
logger.exception(e)
finally:
self.telnet_lock.release()
output_list = output.split("\r\n")
stream_info = output_list[2]
# streamin info is in the form of:
# eg. s1:true,2:true,3:false
streams = stream_info.split(",")
logger.info(streams)
fake_time = current_time + 1
for s in streams:
info = s.split(":")
stream_id = info[0]
status = info[1]
if status == "true":
self.v1_api_client.notify_liquidsoap_status(
"OK", stream_id, str(fake_time)
)
@ls_timeout
def update_liquidsoap_stream_format(self, stream_format):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
command = ("vars.stream_metadata_type %s\n" % stream_format).encode("utf-8")
logger.info(command)
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as e:
logger.exception(e)
finally:
self.telnet_lock.release()
@ls_timeout
def update_liquidsoap_transition_fade(self, fade):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
command = ("vars.default_dj_fade %s\n" % fade).encode("utf-8")
logger.info(command)
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as e:
logger.exception(e)
finally:
self.telnet_lock.release()
@ls_timeout
def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
command = ("vars.station_name %s\n" % station_name).encode("utf-8")
logger.info(command)
tn.write(command)
tn.write(b"exit\n")
tn.read_all()
except Exception as e:
logger.exception(e)
finally:
self.telnet_lock.release()
except Exception as e:
logger.exception(e)
# Process the schedule
# - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
# - Saves a serialized file of the schedule
# - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied
# to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
# - runs the cleanup routine, to get rid of unused cached files
def process_schedule(self, schedule_data):
self.last_update_schedule_timestamp = time.time()
logger.debug(schedule_data)
media = schedule_data["media"]
media_filtered = {}
# Download all the media and put playlists in liquidsoap "annotate" format
try:
# Make sure cache_dir exists
download_dir = self.cache_dir
try:
os.makedirs(download_dir)
except Exception as e:
pass
media_copy = {}
for key in media:
media_item = media[key]
if media_item["type"] == "file":
fileExt = self.sanity_check_media_item(media_item)
dst = os.path.join(download_dir, f'{media_item["id"]}{fileExt}')
media_item["dst"] = dst
media_item["file_ready"] = False
media_filtered[key] = media_item
media_item["start"] = datetime.strptime(
media_item["start"], "%Y-%m-%d-%H-%M-%S"
)
media_item["end"] = datetime.strptime(
media_item["end"], "%Y-%m-%d-%H-%M-%S"
)
media_copy[key] = media_item
self.media_prepare_queue.put(copy.copy(media_filtered))
except Exception as e:
logger.exception(e)
# Send the data to pypo-push
logger.debug("Pushing to pypo-push")
self.push_queue.put(media_copy)
# cleanup
try:
self.cache_cleanup(media)
except Exception as e:
logger.exception(e)
# do basic validation of file parameters. Useful for debugging
# purposes
def sanity_check_media_item(self, media_item):
start = datetime.strptime(media_item["start"], "%Y-%m-%d-%H-%M-%S")
end = datetime.strptime(media_item["end"], "%Y-%m-%d-%H-%M-%S")
mime = media_item["metadata"]["mime"]
mimetypes.init(["%s/mime.types" % os.path.dirname(os.path.realpath(__file__))])
mime_ext = mimetypes.guess_extension(mime, strict=False)
length1 = (end - start).total_seconds()
length2 = media_item["cue_out"] - media_item["cue_in"]
if abs(length2 - length1) > 1:
logger.error("end - start length: %s", length1)
logger.error("cue_out - cue_in length: %s", length2)
logger.error("Two lengths are not equal!!!")
media_item["file_ext"] = mime_ext
return mime_ext
def is_file_opened(self, path):
# Capture stderr to avoid polluting py-interpreter.log
proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE)
out = proc.communicate()[0].strip()
return bool(out)
def cache_cleanup(self, media):
"""
Get list of all files in the cache dir and remove them if they aren't being used anymore.
Input dict() media, lists all files that are scheduled or currently playing. Not being in this
dict() means the file is safe to remove.
"""
cached_file_set = set(os.listdir(self.cache_dir))
scheduled_file_set = set()
for mkey in media:
media_item = media[mkey]
if media_item["type"] == "file":
if "file_ext" not in media_item.keys():
media_item["file_ext"] = mimetypes.guess_extension(
media_item["metadata"]["mime"], strict=False
)
scheduled_file_set.add(
"{}{}".format(media_item["id"], media_item["file_ext"])
)
expired_files = cached_file_set - scheduled_file_set
logger.debug("Files to remove " + str(expired_files))
for f in expired_files:
try:
path = os.path.join(self.cache_dir, f)
logger.debug("Removing %s" % path)
# check if this file is opened (sometimes Liquidsoap is still
# playing the file due to our knowledge of the track length
# being incorrect!)
if not self.is_file_opened(path):
os.remove(path)
logger.info("File '%s' removed" % path)
else:
logger.info("File '%s' not removed. Still busy!" % path)
except Exception as e:
logger.exception("Problem removing file '%s'" % f)
def manual_schedule_fetch(self):
try:
self.schedule_data = get_schedule(self.api_client)
logger.debug(f"Received event from API client: {self.schedule_data}")
self.process_schedule(self.schedule_data)
return True
except Exception as e:
logger.error("Unable to fetch schedule")
logger.exception(e)
return False
def persistent_manual_schedule_fetch(self, max_attempts=1):
success = False
num_attempts = 0
while not success and num_attempts < max_attempts:
success = self.manual_schedule_fetch()
num_attempts += 1
return success
# This function makes a request to Airtime to see if we need to
# push metadata to TuneIn. We have to do this because TuneIn turns
# off metadata if it does not receive a request every 5 minutes.
def update_metadata_on_tunein(self):
self.v1_api_client.update_metadata_on_tunein()
Timer(120, self.update_metadata_on_tunein).start()
def main(self):
# Make sure all Liquidsoap queues are empty. This is important in the
# case where we've just restarted the pypo scheduler, but Liquidsoap still
# is playing tracks. In this case let's just restart everything from scratch
# so that we can repopulate our dictionary that keeps track of what
# Liquidsoap is playing much more easily.
self.pypo_liquidsoap.clear_all_queues()
self.set_bootstrap_variables()
self.update_metadata_on_tunein()
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we fetch the schedule every 8
# minutes or wait for schedule updates to get pushed.
success = self.persistent_manual_schedule_fetch(max_attempts=5)
if success:
logger.info("Bootstrap schedule received: %s", self.schedule_data)
loops = 1
while True:
logger.info(f"Loop #{loops}")
manual_fetch_needed = False
try:
# our simple_queue.get() requires a timeout, in which case we
# fetch the Airtime schedule manually. It is important to fetch
# the schedule periodically because if we didn't, we would only
# get schedule updates via RabbitMq if the user was constantly
# using the Airtime interface.
# If the user is not using the interface, RabbitMq messages are not
# sent, and we will have very stale (or non-existent!) data about the
# schedule.
# Currently we are checking every POLL_INTERVAL seconds
message = self.fetch_queue.get(
block=True, timeout=self.listener_timeout
)
manual_fetch_needed = False
self.handle_message(message)
except Empty as e:
logger.info("Queue timeout. Fetching schedule manually")
manual_fetch_needed = True
except Exception as e:
logger.exception(e)
try:
if manual_fetch_needed:
self.persistent_manual_schedule_fetch(max_attempts=5)
except Exception as e:
logger.exception("Failed to manually fetch the schedule.")
loops += 1
def run(self):
"""
Entry point of the thread
"""
self.main()

View file

@ -0,0 +1,188 @@
import hashlib
import os
import stat
import time
import traceback
from queue import Empty
from threading import Thread
from libretime_api_client import version2 as api_client
from loguru import logger
from requests.exceptions import ConnectionError, Timeout
class PypoFile(Thread):
def __init__(self, schedule_queue):
Thread.__init__(self)
self.media_queue = schedule_queue
self.media = None
self.api_client = api_client.AirtimeApiClient()
def copy_file(self, media_item):
"""
Copy media_item from local library directory to local cache directory.
"""
src = media_item["uri"]
dst = media_item["dst"]
src_size = media_item["filesize"]
dst_exists = True
try:
dst_size = os.path.getsize(dst)
if dst_size == 0:
dst_exists = False
except Exception as e:
dst_exists = False
do_copy = False
if dst_exists:
# TODO: Check if the locally cached variant of the file is sane.
# This used to be a filesize check that didn't end up working.
# Once we have watched folders updated files from them might
# become an issue here... This needs proper cache management.
# https://github.com/libretime/libretime/issues/756#issuecomment-477853018
# https://github.com/libretime/libretime/pull/845
logger.debug(
"file %s already exists in local cache as %s, skipping copying..."
% (src, dst)
)
else:
do_copy = True
media_item["file_ready"] = not do_copy
if do_copy:
logger.info(f"copying from {src} to local cache {dst}")
try:
with open(dst, "wb") as handle:
logger.info(media_item)
response = self.api_client.services.file_download_url(
id=media_item["id"]
)
if not response.ok:
logger.error(response)
raise Exception(
"%s - Error occurred downloading file"
% response.status_code
)
for chunk in response.iter_content(chunk_size=1024):
handle.write(chunk)
# make file world readable and owner writable
os.chmod(dst, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
if media_item["filesize"] == 0:
file_size = self.report_file_size_and_md5_to_api(
dst, media_item["id"]
)
media_item["filesize"] = file_size
media_item["file_ready"] = True
except Exception as e:
logger.error(f"Could not copy from {src} to {dst}")
logger.error(e)
def report_file_size_and_md5_to_api(self, file_path, file_id):
try:
file_size = os.path.getsize(file_path)
with open(file_path, "rb") as fh:
m = hashlib.md5()
while True:
data = fh.read(8192)
if not data:
break
m.update(data)
md5_hash = m.hexdigest()
except OSError as e:
file_size = 0
logger.error(
"Error getting file size and md5 hash for file id %s" % file_id
)
logger.error(e)
# Make PUT request to LibreTime to update the file size and hash
error_msg = (
"Could not update media file %s with file size and md5 hash:" % file_id
)
try:
payload = {"filesize": file_size, "md5": md5_hash}
response = self.api_client.update_file(file_id, payload)
except (ConnectionError, Timeout):
logger.error(error_msg)
except Exception as e:
logger.error(error_msg)
logger.error(e)
return file_size
def get_highest_priority_media_item(self, schedule):
"""
Get highest priority media_item in the queue. Currently the highest
priority is decided by how close the start time is to "now".
"""
if schedule is None or len(schedule) == 0:
return None
sorted_keys = sorted(schedule.keys())
if len(sorted_keys) == 0:
return None
highest_priority = sorted_keys[0]
media_item = schedule[highest_priority]
logger.debug("Highest priority item: %s" % highest_priority)
# Remove this media_item from the dictionary. On the next iteration
# (from the main function) we won't consider it for prioritization
# anymore. If on the next iteration we have received a new schedule,
# it is very possible we will have to deal with the same media_items
# again. In this situation, the worst possible case is that we try to
# copy the file again and realize we already have it (thus aborting the copy).
del schedule[highest_priority]
return media_item
def main(self):
while True:
try:
if self.media is None or len(self.media) == 0:
# We have no schedule, so we have nothing else to do. Let's
# do a blocked wait on the queue
self.media = self.media_queue.get(block=True)
else:
# We have a schedule we need to process, but we also want
# to check if a newer schedule is available. In this case
# do a non-blocking queue.get and in either case (we get something
# or we don't), get back to work on preparing getting files.
try:
self.media = self.media_queue.get_nowait()
except Empty as e:
pass
media_item = self.get_highest_priority_media_item(self.media)
if media_item is not None:
self.copy_file(media_item)
except Exception as e:
import traceback
top = traceback.format_exc()
logger.error(str(e))
logger.error(top)
raise
def run(self):
"""
Entry point of the thread
"""
try:
self.main()
except Exception as e:
top = traceback.format_exc()
logger.error("PypoFile Exception: %s", top)
time.sleep(5)
logger.info("PypoFile thread exiting")

View file

@ -0,0 +1,229 @@
import time
from datetime import datetime, timedelta
from loguru import logger
from ..utils import seconds_between
from .events import EventKind
from .liquidsoap_gateway import TelnetLiquidsoap
class PypoLiquidsoap:
def __init__(self, telnet_lock, host, port):
self.liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
"s4": None,
}
self.telnet_liquidsoap = TelnetLiquidsoap(
telnet_lock, host, port, list(self.liq_queue_tracker.keys())
)
def get_telnet_dispatcher(self):
return self.telnet_liquidsoap
def play(self, media_item):
if media_item["type"] == EventKind.FILE:
self.handle_file_type(media_item)
elif media_item["type"] == EventKind.EVENT:
self.handle_event_type(media_item)
elif media_item["type"] == EventKind.STREAM_BUFFER_START:
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
elif media_item["type"] == EventKind.STREAM_OUTPUT_START:
if (
media_item["row_id"]
!= self.telnet_liquidsoap.current_prebuffering_stream_id
):
# this is called if the stream wasn't scheduled sufficiently ahead of time
# so that the prebuffering stage could take effect. Let's do the prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item)
elif media_item["type"] == EventKind.STREAM_BUFFER_END:
self.telnet_liquidsoap.stop_web_stream_buffer()
elif media_item["type"] == EventKind.STREAM_OUTPUT_END:
self.telnet_liquidsoap.stop_web_stream_output()
else:
raise UnknownMediaItemType(str(media_item))
def handle_file_type(self, media_item):
"""
Wait 200 seconds (2000 iterations) for file to become ready,
otherwise give up on it.
"""
iter_num = 0
while not media_item["file_ready"] and iter_num < 2000:
time.sleep(0.1)
iter_num += 1
if media_item["file_ready"]:
available_queue = self.find_available_queue()
try:
self.telnet_liquidsoap.queue_push(available_queue, media_item)
self.liq_queue_tracker[available_queue] = media_item
except Exception as e:
logger.error(e)
raise
else:
logger.warning(
"File %s did not become ready in less than 5 seconds. Skipping...",
media_item["dst"],
)
def handle_event_type(self, media_item):
if media_item["event_type"] == "kick_out":
self.telnet_liquidsoap.disconnect_source("live_dj")
elif media_item["event_type"] == "switch_off":
self.telnet_liquidsoap.switch_source("live_dj", "off")
def is_media_item_finished(self, media_item):
if media_item is None:
return True
else:
return datetime.utcnow() > media_item["end"]
def find_available_queue(self):
available_queue = None
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi == None or self.is_media_item_finished(mi):
# queue "i" is available. Push to this queue
available_queue = i
if available_queue == None:
raise NoQueueAvailableException()
return available_queue
def verify_correct_present_media(self, scheduled_now):
"""
verify whether Liquidsoap is currently playing the correct files.
if we find an item that Liquidsoap is not playing, then push it
into one of Liquidsoap's queues. If Liquidsoap is already playing
it do nothing. If Liquidsoap is playing a track that isn't in
currently_playing then stop it.
Check for Liquidsoap media we should source.skip
get liquidsoap items for each queue. Since each queue can only have one
item, we should have a max of 8 items.
2013-03-21-22-56-00_0: {
id: 1,
type: "stream_output_start",
row_id: 41,
uri: "http://stream2.radioblackout.org:80/blackout.ogg",
start: "2013-03-21-22-56-00",
end: "2013-03-21-23-26-00",
show_name: "Untitled Show",
independent_event: true
},
"""
try:
scheduled_now_files = [
x for x in scheduled_now if x["type"] == EventKind.FILE
]
scheduled_now_webstream = [
x for x in scheduled_now if x["type"] in (EventKind.STREAM_OUTPUT_START)
]
schedule_ids = {x["row_id"] for x in scheduled_now_files}
row_id_map = {}
liq_queue_ids = set()
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if not self.is_media_item_finished(mi):
liq_queue_ids.add(mi["row_id"])
row_id_map[mi["row_id"]] = mi
to_be_removed = set()
to_be_added = set()
# Iterate over the new files, and compare them to currently scheduled
# tracks. If already in liquidsoap queue still need to make sure they don't
# have different attributes
# if replay gain changes, it shouldn't change the amplification of the currently playing song
for i in scheduled_now_files:
if i["row_id"] in row_id_map:
mi = row_id_map[i["row_id"]]
correct = (
mi["start"] == i["start"]
and mi["end"] == i["end"]
and mi["row_id"] == i["row_id"]
)
if not correct:
# need to re-add
logger.info("Track %s found to have new attr." % i)
to_be_removed.add(i["row_id"])
to_be_added.add(i["row_id"])
to_be_removed.update(liq_queue_ids - schedule_ids)
to_be_added.update(schedule_ids - liq_queue_ids)
if to_be_removed:
logger.info("Need to remove items from Liquidsoap: %s" % to_be_removed)
# remove files from Liquidsoap's queue
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.stop(i)
if to_be_added:
logger.info("Need to add items to Liquidsoap *now*: %s" % to_be_added)
for i in scheduled_now_files:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
self.play(i)
# handle webstreams
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
logger.debug(f"scheduled now webstream: {scheduled_now_webstream}")
if scheduled_now_webstream:
if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]):
self.play(scheduled_now_webstream[0])
elif current_stream_id != "-1":
# something is playing and it shouldn't be.
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
except KeyError as e:
logger.error("Error: Malformed event in schedule. " + str(e))
def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue)
self.liq_queue_tracker[queue] = None
def is_file(self, media_item):
return media_item["type"] == EventKind.FILE
def clear_queue_tracker(self):
for i in self.liq_queue_tracker.keys():
self.liq_queue_tracker[i] = None
def modify_cue_point(self, link):
assert self.is_file(link)
lateness = seconds_between(link["start"], datetime.utcnow())
if lateness > 0:
logger.debug(f"media item was supposed to start {lateness}s ago")
cue_in_orig = timedelta(seconds=float(link["cue_in"]))
link["cue_in"] = cue_in_orig.total_seconds() + lateness
def clear_all_queues(self):
self.telnet_liquidsoap.queue_clear_all()
class UnknownMediaItemType(Exception):
pass
class NoQueueAvailableException(Exception):
pass

View file

@ -0,0 +1,343 @@
import telnetlib
import traceback
from loguru import logger
from ..timeout import ls_timeout
def create_liquidsoap_annotation(media):
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
filename = media["dst"]
annotation = (
'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",'
+ 'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",'
+ 'schedule_table_id="%s",replay_gain="%s dB"'
) % (
media["id"],
float(media["fade_in"]) / 1000,
float(media["fade_out"]) / 1000,
float(media["cue_in"]),
float(media["cue_out"]),
media["row_id"],
media["replay_gain"],
)
# Override the the artist/title that Liquidsoap extracts from a file's metadata
# with the metadata we get from Airtime. (You can modify metadata in Airtime's library,
# which doesn't get saved back to the file.)
if "metadata" in media:
if "artist_name" in media["metadata"]:
artist_name = media["metadata"]["artist_name"]
if isinstance(artist_name, str):
annotation += ',artist="%s"' % (artist_name.replace('"', '\\"'))
if "track_title" in media["metadata"]:
track_title = media["metadata"]["track_title"]
if isinstance(track_title, str):
annotation += ',title="%s"' % (track_title.replace('"', '\\"'))
annotation += ":" + filename
return annotation
class TelnetLiquidsoap:
def __init__(self, telnet_lock, ls_host, ls_port, queues):
self.telnet_lock = telnet_lock
self.ls_host = ls_host
self.ls_port = ls_port
self.queues = queues
self.current_prebuffering_stream_id = None
def __connect(self):
return telnetlib.Telnet(self.ls_host, self.ls_port)
def __is_empty(self, queue_id):
return True
connection = self.__connect()
msg = "%s.queue\nexit\n" % queue_id
connection.write(msg.encode("utf-8"))
output = connection.read_all().decode("utf-8").splitlines()
if len(output) == 3:
return len(output[0]) == 0
else:
raise Exception("Unexpected list length returned: %s" % output)
@ls_timeout
def queue_clear_all(self):
try:
self.telnet_lock.acquire()
connection = self.__connect()
for i in self.queues:
msg = "queues.%s_skip\n" % i
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ls_timeout
def queue_remove(self, queue_id):
try:
self.telnet_lock.acquire()
connection = self.__connect()
msg = "queues.%s_skip\n" % queue_id
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ls_timeout
def queue_push(self, queue_id, media_item):
try:
self.telnet_lock.acquire()
if not self.__is_empty(queue_id):
raise QueueNotEmptyException()
connection = self.__connect()
annotation = create_liquidsoap_annotation(media_item)
msg = f"{queue_id}.push {annotation}\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
show_name = media_item["show_name"]
msg = "vars.show_name %s\n" % show_name
connection.write(msg.encode("utf-8"))
logger.debug(msg)
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception:
raise
finally:
self.telnet_lock.release()
@ls_timeout
def stop_web_stream_buffer(self):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = "http.stop\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
msg = "dynamic_source.id -1\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def stop_web_stream_output(self):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
# dynamic_source.stop http://87.230.101.24:80/top100station.mp3
msg = "dynamic_source.output_stop\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def start_web_stream(self, media_item):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
# TODO: DO we need this?
msg = "streams.scheduled_play_start\n"
connection.write(msg.encode("utf-8"))
msg = "dynamic_source.output_start\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
self.current_prebuffering_stream_id = None
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def start_web_stream_buffer(self, media_item):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = "dynamic_source.id %s\n" % media_item["row_id"]
logger.debug(msg)
connection.write(msg.encode("utf-8"))
msg = "http.restart %s\n" % media_item["uri"]
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
logger.debug(connection.read_all().decode("utf-8"))
self.current_prebuffering_stream_id = media_item["row_id"]
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def get_current_stream_id(self):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = "dynamic_source.get_id\n"
logger.debug(msg)
connection.write(msg.encode("utf-8"))
connection.write(b"exit\n")
stream_id = connection.read_all().decode("utf-8").splitlines()[0]
logger.debug("stream_id: %s" % stream_id)
return stream_id
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def disconnect_source(self, sourcename):
logger.debug("Disconnecting source: %s", sourcename)
command = ""
if sourcename == "master_dj":
command += "master_harbor.stop\n"
elif sourcename == "live_dj":
command += "live_dj_harbor.stop\n"
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
logger.info(command)
connection.write(command.encode("utf-8"))
connection.write(b"exit\n")
connection.read_all().decode("utf-8")
except Exception:
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
@ls_timeout
def telnet_send(self, commands):
try:
self.telnet_lock.acquire()
connection = telnetlib.Telnet(self.ls_host, self.ls_port)
for line in commands:
logger.info(line)
if type(line) is str:
line = line.encode("utf-8")
connection.write(line)
connection.write(b"exit\n")
connection.read_all().decode("utf-8")
except Exception as exception:
logger.error(str(exception))
logger.error(traceback.format_exc())
finally:
self.telnet_lock.release()
def switch_source(self, sourcename, status):
logger.debug('Switching source: %s to "%s" status', sourcename, status)
command = "streams."
if sourcename == "master_dj":
command += "master_dj_"
elif sourcename == "live_dj":
command += "live_dj_"
elif sourcename == "scheduled_play":
command += "scheduled_play_"
if status == "on":
command += "start\n"
else:
command += "stop\n"
self.telnet_send([command])
class DummyTelnetLiquidsoap:
def __init__(self, telnet_lock):
self.telnet_lock = telnet_lock
self.liquidsoap_mock_queues = {}
for index in range(4):
self.liquidsoap_mock_queues["s" + str(index)] = []
@ls_timeout
def queue_push(self, queue_id, media_item):
try:
self.telnet_lock.acquire()
logger.info(f"Pushing {media_item} to queue {queue_id}")
from datetime import datetime
print(f"Time now: {datetime.utcnow():s}")
annotation = create_liquidsoap_annotation(media_item)
self.liquidsoap_mock_queues[queue_id].append(annotation)
except Exception:
raise
finally:
self.telnet_lock.release()
@ls_timeout
def queue_remove(self, queue_id):
try:
self.telnet_lock.acquire()
logger.info("Purging queue %s" % queue_id)
from datetime import datetime
print(f"Time now: {datetime.utcnow():s}")
except Exception:
raise
finally:
self.telnet_lock.release()
class QueueNotEmptyException(Exception):
pass

View file

@ -0,0 +1,18 @@
audio/ogg ogg
application/ogg ogg
audio/vorbis ogg
audio/mp3 mp3
audio/mpeg mp3
audio/mpeg3 mp3
audio/x-aac aac
audio/aac aac
audio/aacp aac
audio/mp4 m4a
audio/x-flac flac
audio/flac flac
audio/wav wav
audio/x-wav wav
audio/mp2 mp2
audio/mp1 mp1
audio/x-ms-wma wma
audio/basic au

View file

@ -0,0 +1,137 @@
import math
import telnetlib
import time
import traceback
from datetime import datetime
from queue import Queue
from threading import Thread
from libretime_api_client import version1 as api_client
from loguru import logger
from ..config import PUSH_INTERVAL, Config
from ..timeout import ls_timeout
from .queue import PypoLiqQueue
def is_stream(media_item):
return media_item["type"] == "stream_output_start"
def is_file(media_item):
return media_item["type"] == "file"
class PypoPush(Thread):
def __init__(self, q, telnet_lock, pypo_liquidsoap, config: Config):
Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient()
self.queue = q
self.telnet_lock = telnet_lock
self.config = config
self.pushed_objects = {}
self.current_prebuffering_stream_id = None
self.queue_id = 0
self.future_scheduled_queue = Queue()
self.pypo_liquidsoap = pypo_liquidsoap
self.plq = PypoLiqQueue(self.future_scheduled_queue, self.pypo_liquidsoap)
self.plq.daemon = True
self.plq.start()
def main(self):
loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
media_schedule = None
while True:
try:
media_schedule = self.queue.get(block=True)
except Exception as e:
logger.error(str(e))
raise
else:
logger.debug(media_schedule)
# separate media_schedule list into currently_playing and
# scheduled_for_future lists
currently_playing, scheduled_for_future = self.separate_present_future(
media_schedule
)
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0:
logger.info("heartbeat")
loops = 0
loops += 1
def separate_present_future(self, media_schedule):
tnow = datetime.utcnow()
present = []
future = {}
sorted_keys = sorted(media_schedule.keys())
for mkey in sorted_keys:
media_item = media_schedule[mkey]
# Ignore track that already ended
if media_item["type"] == "file" and media_item["end"] < tnow:
logger.debug(f"ignoring ended media_item: {media_item}")
continue
diff_sec = (tnow - media_item["start"]).total_seconds()
if diff_sec >= 0:
logger.debug(f"adding media_item to present: {media_item}")
present.append(media_item)
else:
logger.debug(f"adding media_item to future: {media_item}")
future[mkey] = media_item
return present, future
@ls_timeout
def stop_web_stream_all(self):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(
self.config.playout.liquidsoap_host,
self.config.playout.liquidsoap_port,
)
# msg = 'dynamic_source.read_stop_all xxx\n'
msg = "http.stop\n"
logger.debug(msg)
tn.write(msg)
msg = "dynamic_source.output_stop\n"
logger.debug(msg)
tn.write(msg)
msg = "dynamic_source.id -1\n"
logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
logger.debug(tn.read_all())
except Exception as e:
logger.error(str(e))
finally:
self.telnet_lock.release()
def run(self):
while True:
try:
self.main()
except Exception as e:
top = traceback.format_exc()
logger.error("Pypo Push Exception: %s", top)
time.sleep(5)
logger.info("PypoPush thread exiting")

View file

@ -0,0 +1,79 @@
import signal
import sys
import traceback
from collections import deque
from datetime import datetime
from queue import Empty
from threading import Thread
from loguru import logger
from ..utils import seconds_between
def keyboardInterruptHandler(signum, frame):
logger.info("\nKeyboard Interrupt\n")
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
class PypoLiqQueue(Thread):
def __init__(self, q, pypo_liquidsoap):
Thread.__init__(self)
self.queue = q
self.pypo_liquidsoap = pypo_liquidsoap
def main(self):
time_until_next_play = None
schedule_deque = deque()
media_schedule = None
while True:
try:
if time_until_next_play is None:
logger.info("waiting indefinitely for schedule")
media_schedule = self.queue.get(block=True)
else:
logger.info(
"waiting %ss until next scheduled item" % time_until_next_play
)
media_schedule = self.queue.get(
block=True, timeout=time_until_next_play
)
except Empty as e:
# Time to push a scheduled item.
media_item = schedule_deque.popleft()
self.pypo_liquidsoap.play(media_item)
if len(schedule_deque):
time_until_next_play = seconds_between(
datetime.utcnow(),
schedule_deque[0]["start"],
)
else:
time_until_next_play = None
else:
logger.info("New schedule received")
# new schedule received. Replace old one with this.
schedule_deque.clear()
keys = sorted(media_schedule.keys())
for i in keys:
schedule_deque.append(media_schedule[i])
if len(keys):
time_until_next_play = seconds_between(
datetime.utcnow(),
media_schedule[keys[0]]["start"],
)
else:
time_until_next_play = None
def run(self):
try:
self.main()
except Exception as e:
logger.error("PypoLiqQueue Exception: %s", traceback.format_exc())

View file

@ -0,0 +1,153 @@
from datetime import datetime, timedelta
from typing import Dict
from dateutil.parser import isoparse
from libretime_api_client.version2 import AirtimeApiClient as ApiClient
from libretime_shared.datetime import (
time_fromisoformat,
time_in_milliseconds,
time_in_seconds,
)
from .events import EventKind
EVENT_KEY_FORMAT = "%Y-%m-%d-%H-%M-%S"
def datetime_to_event_key(value: datetime) -> str:
return value.strftime(EVENT_KEY_FORMAT)
def get_schedule(api_client: ApiClient):
current_time = datetime.utcnow()
end_time = current_time + timedelta(days=1)
current_time_str = current_time.isoformat(timespec="seconds")
end_time_str = end_time.isoformat(timespec="seconds")
schedule = api_client.services.schedule_url(
params={
"ends_after": f"{current_time_str}Z",
"ends_before": f"{end_time_str}Z",
"overbooked": False,
"position_status__gt": 0,
}
)
events = {}
for item in schedule:
item["starts_at"] = isoparse(item["starts_at"])
item["ends_at"] = isoparse(item["ends_at"])
show_instance = api_client.services.show_instance_url(id=item["instance_id"])
show = api_client.services.show_url(id=show_instance["show_id"])
if item["file"]:
file = api_client.services.file_url(id=item["file_id"])
events.update(generate_file_events(item, file, show))
elif item["stream"]:
webstream = api_client.services.webstream_url(id=item["stream_id"])
events.update(generate_webstream_events(item, webstream, show))
return {"media": events}
def generate_file_events(
schedule: dict,
file: dict,
show: dict,
) -> Dict[str, dict]:
"""
Generate events for a scheduled file.
"""
events = {}
schedule_start_event_key = datetime_to_event_key(schedule["starts_at"])
schedule_end_event_key = datetime_to_event_key(schedule["ends_at"])
events[schedule_start_event_key] = {
"type": EventKind.FILE,
"independent_event": False,
"row_id": schedule["id"],
"start": schedule_start_event_key,
"end": schedule_end_event_key,
"uri": file["url"],
"id": file["id"],
# Show data
"show_name": show["name"],
# Extra data
"fade_in": time_in_milliseconds(time_fromisoformat(schedule["fade_in"])),
"fade_out": time_in_milliseconds(time_fromisoformat(schedule["fade_out"])),
"cue_in": time_in_seconds(time_fromisoformat(schedule["cue_in"])),
"cue_out": time_in_seconds(time_fromisoformat(schedule["cue_out"])),
"metadata": {
"track_title": file["track_title"],
"artist_name": file["artist_name"],
"mime": file["mime"],
},
"replay_gain": file["replay_gain"],
"filesize": file["size"],
}
return events
def generate_webstream_events(
schedule: dict,
webstream: dict,
show: dict,
) -> Dict[str, dict]:
"""
Generate events for a scheduled webstream.
"""
events = {}
schedule_start_event_key = datetime_to_event_key(schedule["starts_at"])
schedule_end_event_key = datetime_to_event_key(schedule["ends_at"])
events[schedule_start_event_key] = {
"type": EventKind.STREAM_BUFFER_START,
"independent_event": True,
"row_id": schedule["id"],
"start": datetime_to_event_key(schedule["starts_at"] - timedelta(seconds=5)),
"end": datetime_to_event_key(schedule["starts_at"] - timedelta(seconds=5)),
"uri": webstream["url"],
"id": webstream["id"],
}
events[f"{schedule_start_event_key}_0"] = {
"type": EventKind.STREAM_OUTPUT_START,
"independent_event": True,
"row_id": schedule["id"],
"start": schedule_start_event_key,
"end": schedule_end_event_key,
"uri": webstream["url"],
"id": webstream["id"],
# Show data
"show_name": show["name"],
}
# NOTE: stream_*_end were previously triggered 1 second before
# the schedule end.
events[schedule_end_event_key] = {
"type": EventKind.STREAM_BUFFER_END,
"independent_event": True,
"row_id": schedule["id"],
"start": schedule_end_event_key,
"end": schedule_end_event_key,
"uri": webstream["url"],
"id": webstream["id"],
}
events[f"{schedule_end_event_key}_0"] = {
"type": EventKind.STREAM_OUTPUT_END,
"independent_event": True,
"row_id": schedule["id"],
"start": schedule_end_event_key,
"end": schedule_end_event_key,
"uri": webstream["url"],
"id": webstream["id"],
}
return events