sintonia/python_apps/pypo/schedule/pypoliquidsoap.py

282 lines
10 KiB
Python

"""
schedule.pypoliquidsoap
~~~~~~~~~
An attempt to abstract the various different ways we need to start/stop
files and webstreams into one unified interface with play() and stop()
methods instead.
:author: (c) 2012 by Martin Konecny.
:license: GPLv3, see LICENSE for more details.
"""
from pypofetch import PypoFetch
from telnetliquidsoap import TelnetLiquidsoap
from schedule import pure
from datetime import datetime
from datetime import timedelta
import eventtypes
import constants
import time
import re
class PypoLiquidsoap():
def __init__(self, logger, host, port):
self.logger = logger
self.liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
}
self.telnet_liquidsoap = TelnetLiquidsoap(
logger,\
host,\
port,\
self.liq_queue_tracker.keys())
def get_telnet_dispatcher(self):
return self.telnet_liquidsoap
def play(self, media_item):
if media_item["type"] == eventtypes.FILE:
self.handle_file_type(media_item)
elif media_item["type"] == eventtypes.EVENT:
self.handle_event_type(media_item)
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
if media_item['row_id'] != \
self.telnet_liquidsoap.current_prebuffering_stream_id:
#this is called if the stream wasn't scheduled sufficiently
#ahead of time so that the prebuffering stage could take
#effect. Let's do the prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item)
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
self.telnet_liquidsoap.stop_web_stream_buffer()
elif media_item['type'] == eventtypes.STREAM_OUTPUT_END:
self.telnet_liquidsoap.stop_web_stream_output()
else: raise UnknownMediaItemType(str(media_item))
def handle_file_type(self, media_item):
"""
Wait maximum 5 seconds (50 iterations) for file to become ready,
otherwise give up on it.
"""
iter_num = 0
while not media_item['file_ready'] and iter_num < 50:
time.sleep(0.1)
iter_num += 1
if media_item['file_ready']:
available_queue = self.find_available_queue()
try:
self.telnet_liquidsoap.queue_push(available_queue, media_item)
self.liq_queue_tracker[available_queue] = media_item
except Exception as e:
self.logger.error(e)
raise
else:
self.logger.warn("File %s did not become ready in less than 5 " +
"seconds. Skipping...", media_item['dst'])
def handle_event_type(self, media_item):
if media_item['event_type'] == "kick_out":
self.telnet_liquidsoap.disconnect_source("live_dj")
elif media_item['event_type'] == "switch_off":
self.telnet_liquidsoap.switch_source("live_dj", "off")
def is_media_item_finished(self, media_item):
if media_item is None:
return True
else:
return datetime.utcnow() > media_item['end']
def find_available_queue(self):
available_queue = None
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi == None or self.is_media_item_finished(mi):
#queue "i" is available. Push to this queue
available_queue = i
if available_queue == None:
raise NoQueueAvailableException()
return available_queue
def verify_correct_present_media(self, scheduled_now):
#verify whether Liquidsoap is currently playing the correct files.
#if we find an item that Liquidsoap is not playing, then push it
#into one of Liquidsoap's queues. If Liquidsoap is already playing
#it do nothing. If Liquidsoap is playing a track that isn't in
#currently_playing then stop it.
#Check for Liquidsoap media we should source.skip
#get liquidsoap items for each queue. Since each queue can only have
#one item, we should have a max of 8 items.
#2013-03-21-22-56-00_0: {
#id: 1,
#type: "stream_output_start",
#row_id: 41,
#uri: "http://stream2.radioblackout.org:80/blackout.ogg",
#start: "2013-03-21-22-56-00",
#end: "2013-03-21-23-26-00",
#show_name: "Untitled Show",
#independent_event: true
#},
scheduled_now_files = \
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
scheduled_now_webstream = \
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START,
scheduled_now)
schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files))
row_id_map = {}
liq_queue_ids = set()
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if not self.is_media_item_finished(mi):
liq_queue_ids.add(mi["row_id"])
row_id_map[mi["row_id"]] = mi
to_be_removed = set()
to_be_added = set()
#Iterate over the new files, and compare them to currently scheduled
#tracks. If already in liquidsoap queue still need to make sure they
#don't have different attributes such replay_gain etc.
for i in scheduled_now_files:
if i["row_id"] in row_id_map:
mi = row_id_map[i["row_id"]]
correct = mi['start'] == i['start'] and \
mi['end'] == i['end'] and \
mi['row_id'] == i['row_id'] and \
mi['replay_gain'] == i['replay_gain']
if not correct:
#need to re-add
self.logger.info("Track %s found to have new attr." % i)
to_be_removed.add(i["row_id"])
to_be_added.add(i["row_id"])
to_be_removed.update(liq_queue_ids - schedule_ids)
to_be_added.update(schedule_ids - liq_queue_ids)
if to_be_removed:
self.logger.info("Need to remove items from Liquidsoap: %s" %
to_be_removed)
#remove files from Liquidsoap's queue
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.stop(i)
if to_be_added:
self.logger.info("Need to add items to Liquidsoap *now*: %s" %
to_be_added)
for i in scheduled_now:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
self.play(i)
#handle webstreams
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
if scheduled_now_webstream:
if current_stream_id != scheduled_now_webstream[0]:
self.play(scheduled_now_webstream[0])
elif current_stream_id != "-1":
#something is playing and it shouldn't be.
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue)
self.liq_queue_tracker[queue] = None
def is_file(self, media_item):
return media_item["type"] == eventtypes.FILE
def clear_queue_tracker(self):
for i in self.liq_queue_tracker.keys():
self.liq_queue_tracker[i] = None
def modify_cue_point(self, link):
if not self.is_file(link):
return
tnow = datetime.utcnow()
link_start = link['start']
diff_td = tnow - link_start
diff_sec = pure.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. " +
"Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = \
pure.date_interval_to_seconds(original_cue_in_td) + diff_sec
def clear_all_queues(self):
self.telnet_liquidsoap.queue_clear_all()
def get_liquidsoap_version(self, version_string):
m = re.match(r"Liquidsoap (\d+.\d+.\d+)", version_string)
if m:
return m.group(1)
else:
return None
def liquidsoap_startup_test(self):
liquidsoap_version_string = \
self.get_liquidsoap_version(
self.telnet_liquidsoap.liquidsoap_get_info())
while not liquidsoap_version_string:
self.logger.warning("Liquidsoap doesn't appear to be running!, " +
"Sleeping and trying again")
time.sleep(1)
liquidsoap_version_string = \
self.get_liquidsoap_version(
self.telnet_liquidsoap.liquidsoap_get_info())
while pure.version_cmp(
liquidsoap_version_string,
constants.LIQUIDSOAP_MIN_VERSION) < 0:
self.logger.warning("Liquidsoap is running but in incorrect " +
"version! Make sure you have at least Liquidsoap %s " +
"installed",
constants.LIQUIDSOAP_MIN_VERSION)
time.sleep(1)
liquidsoap_version_string = \
self.get_liquidsoap_version(
self.telnet_liquidsoap.liquidsoap_get_info())
self.logger.info("Liquidsoap version string found %s",
liquidsoap_version_string)
class UnknownMediaItemType(Exception):
pass
class NoQueueAvailableException(Exception):
pass