CC-1469: Crossfading support (non-equal power)

-refactor
This commit is contained in:
Martin Konecny 2013-03-22 12:16:17 -04:00
parent 6dae7d3973
commit 33ca2e7c9c
8 changed files with 261 additions and 194 deletions

View file

@ -3,14 +3,14 @@
virtualenv_bin="/usr/lib/airtime/airtime_virtualenv/bin/" virtualenv_bin="/usr/lib/airtime/airtime_virtualenv/bin/"
. ${virtualenv_bin}activate . ${virtualenv_bin}activate
pypo_user="pypo" # Absolute path to this script
SCRIPT=`readlink -f $0`
# Absolute directory this script is in
pypo_path=`dirname $SCRIPT`
# Location of pypo_cli.py Python script
pypo_path="/usr/lib/airtime/pypo/bin/"
api_client_path="/usr/lib/airtime/" api_client_path="/usr/lib/airtime/"
pypo_script="pypocli.py" pypo_script="pypocli.py"
cd ${pypo_path} cd ${pypo_path}
exec 2>&1
set +e set +e
cat /etc/default/locale | grep -i "LANG=.*UTF-\?8" cat /etc/default/locale | grep -i "LANG=.*UTF-\?8"
@ -26,6 +26,6 @@ export LC_ALL=`cat /etc/default/locale | grep "LANG=" | cut -d= -f2 | tr -d "\n\
export TERM=xterm export TERM=xterm
exec python ${pypo_path}${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1 exec python ${pypo_path}/${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1
# EOF # EOF

View file

@ -18,7 +18,6 @@ from Queue import Queue
from threading import Lock from threading import Lock
from pypopush import PypoPush from pypopush import PypoPush
from pypoliqqueue import PypoLiqQueue
from pypofetch import PypoFetch from pypofetch import PypoFetch
from pypofile import PypoFile from pypofile import PypoFile
from recorder import Recorder from recorder import Recorder
@ -229,14 +228,6 @@ if __name__ == '__main__':
stat.daemon = True stat.daemon = True
stat.start() stat.start()
#pypoLiq_q = Queue()
#liq_queue_tracker = dict()
#telnet_liquidsoap = TelnetLiquidsoap()
#plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \
#telnet_liquidsoap)
#plq.daemon = True
#plq.start()
# all join() are commented out because we want to exit entire pypo # all join() are commented out because we want to exit entire pypo
# if pypofetch terminates # if pypofetch terminates
#pmh.join() #pmh.join()

View file

@ -8,6 +8,7 @@ import json
import telnetlib import telnetlib
import copy import copy
import subprocess import subprocess
import signal
from datetime import datetime from datetime import datetime
from Queue import Empty from Queue import Empty
@ -24,6 +25,12 @@ logging.config.fileConfig(logging_cfg)
logger = logging.getLogger() logger = logging.getLogger()
LogWriter.override_std_err(logger) LogWriter.override_std_err(logger)
def keyboardInterruptHandler(signum, frame):
logger = logging.getLogger()
logger.info('\nKeyboard Interrupt\n')
sys.exit(0)
signal.signal(signal.SIGINT, keyboardInterruptHandler)
#need to wait for Python 2.7 for this.. #need to wait for Python 2.7 for this..
#logging.captureWarnings(True) #logging.captureWarnings(True)
@ -34,8 +41,6 @@ try:
LS_PORT = config['ls_port'] LS_PORT = config['ls_port']
#POLL_INTERVAL = int(config['poll_interval']) #POLL_INTERVAL = int(config['poll_interval'])
POLL_INTERVAL = 1800 POLL_INTERVAL = 1800
except Exception, e: except Exception, e:
logger.error('Error loading config file: %s', e) logger.error('Error loading config file: %s', e)
sys.exit() sys.exit()

View file

@ -1,14 +1,12 @@
from threading import Thread from threading import Thread
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
from pypofetch import PypoFetch
import eventtypes
import traceback import traceback
import sys import sys
import time import time
from Queue import Empty from Queue import Empty
import signal import signal
@ -19,14 +17,11 @@ def keyboardInterruptHandler(signum, frame):
signal.signal(signal.SIGINT, keyboardInterruptHandler) signal.signal(signal.SIGINT, keyboardInterruptHandler)
class PypoLiqQueue(Thread): class PypoLiqQueue(Thread):
def __init__(self, q, telnet_lock, logger, liq_queue_tracker, \ def __init__(self, q, pypo_liquidsoap, logger):
telnet_liquidsoap):
Thread.__init__(self) Thread.__init__(self)
self.queue = q self.queue = q
self.telnet_lock = telnet_lock
self.logger = logger self.logger = logger
self.liq_queue_tracker = liq_queue_tracker self.pypo_liquidsoap = pypo_liquidsoap
self.telnet_liquidsoap = telnet_liquidsoap
def main(self): def main(self):
time_until_next_play = None time_until_next_play = None
@ -46,7 +41,7 @@ class PypoLiqQueue(Thread):
except Empty, e: except Empty, e:
#Time to push a scheduled item. #Time to push a scheduled item.
media_item = schedule_deque.popleft() media_item = schedule_deque.popleft()
self.telnet_to_liquidsoap(media_item) self.pypo_liquidsoap.push_item(media_item)
if len(schedule_deque): if len(schedule_deque):
time_until_next_play = \ time_until_next_play = \
self.date_interval_to_seconds( self.date_interval_to_seconds(
@ -71,78 +66,6 @@ class PypoLiqQueue(Thread):
else: else:
time_until_next_play = None time_until_next_play = None
def is_media_item_finished(self, media_item):
if media_item is None:
return True
else:
return datetime.utcnow() > media_item['end']
def find_available_queue(self):
available_queue = None
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi == None or self.is_media_item_finished(mi):
#queue "i" is available. Push to this queue
available_queue = i
if available_queue == None:
raise NoQueueAvailableException()
return available_queue
def telnet_to_liquidsoap(self, media_item):
"""
telnets to liquidsoap and pushes the media_item to its queue. Push the
show name of every media_item as well, just to keep Liquidsoap up-to-date
about which show is playing.
"""
if media_item["type"] == eventtypes.FILE:
self.handle_file_type(media_item)
elif media_item["type"] == eventtypes.EVENT:
self.handle_event_type(media_item)
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id:
#this is called if the stream wasn't scheduled sufficiently ahead of time
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item)
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
self.telnet_liquidsoap.stop_web_stream_buffer(media_item)
elif media_item['type'] == eventtypes.STREAM_OUTPUT_END:
self.telnet_liquidsoap.stop_web_stream_output(media_item)
else: raise UnknownMediaItemType(str(media_item))
def handle_event_type(self, media_item):
if media_item['event_type'] == "kick_out":
PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj")
elif media_item['event_type'] == "switch_off":
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
def handle_file_type(self, media_item):
"""
Wait maximum 5 seconds (50 iterations) for file to become ready,
otherwise give up on it.
"""
iter_num = 0
while not media_item['file_ready'] and iter_num < 50:
time.sleep(0.1)
iter_num += 1
if media_item['file_ready']:
available_queue = self.find_available_queue()
try:
self.telnet_liquidsoap.queue_push(available_queue, media_item)
self.liq_queue_tracker[available_queue] = media_item
except Exception as e:
self.logger.error(e)
raise
else:
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
def date_interval_to_seconds(self, interval): def date_interval_to_seconds(self, interval):
""" """
@ -155,14 +78,10 @@ class PypoLiqQueue(Thread):
return seconds return seconds
def run(self): def run(self):
try: self.main() try: self.main()
except Exception, e: except Exception, e:
self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc()) self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc())
class NoQueueAvailableException(Exception):
pass
class UnknownMediaItemType(Exception):
pass

View file

@ -0,0 +1,214 @@
from pypofetch import PypoFetch
from telnetliquidsoap import TelnetLiquidsoap
from datetime import datetime
from datetime import timedelta
import eventtypes
import time
class PypoLiquidsoap():
def __init__(self, logger, telnet_lock, host, port):
self.logger = logger
self.liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
}
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \
logger,\
host,\
port)
def push_item(self, media_item):
if media_item["type"] == eventtypes.FILE:
self.handle_file_type(media_item)
elif media_item["type"] == eventtypes.EVENT:
self.handle_event_type(media_item)
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id:
#this is called if the stream wasn't scheduled sufficiently ahead of time
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
self.telnet_liquidsoap.start_web_stream(media_item)
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
self.telnet_liquidsoap.stop_web_stream_buffer()
elif media_item['type'] == eventtypes.STREAM_OUTPUT_END:
self.telnet_liquidsoap.stop_web_stream_output()
else: raise UnknownMediaItemType(str(media_item))
def handle_file_type(self, media_item):
"""
Wait maximum 5 seconds (50 iterations) for file to become ready,
otherwise give up on it.
"""
iter_num = 0
while not media_item['file_ready'] and iter_num < 50:
time.sleep(0.1)
iter_num += 1
if media_item['file_ready']:
available_queue = self.find_available_queue()
try:
self.telnet_liquidsoap.queue_push(available_queue, media_item)
self.liq_queue_tracker[available_queue] = media_item
except Exception as e:
self.logger.error(e)
raise
else:
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
def handle_event_type(self, media_item):
if media_item['event_type'] == "kick_out":
PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj")
elif media_item['event_type'] == "switch_off":
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
def is_media_item_finished(self, media_item):
if media_item is None:
return True
else:
return datetime.utcnow() > media_item['end']
def find_available_queue(self):
available_queue = None
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi == None or self.is_media_item_finished(mi):
#queue "i" is available. Push to this queue
available_queue = i
if available_queue == None:
raise NoQueueAvailableException()
return available_queue
def get_queues():
return self.liq_queue_tracker
def verify_correct_present_media(self, scheduled_now):
#verify whether Liquidsoap is currently playing the correct files.
#if we find an item that Liquidsoap is not playing, then push it
#into one of Liquidsoap's queues. If Liquidsoap is already playing
#it do nothing. If Liquidsoap is playing a track that isn't in
#currently_playing then stop it.
#Check for Liquidsoap media we should source.skip
#get liquidsoap items for each queue. Since each queue can only have one
#item, we should have a max of 8 items.
#TODO: Verify start, end, replay_gain is the same
#TODO: Verify this is a file or webstream and also handle webstreams
#TODO: Do webstreams span a time or do they have an instantaneous point?
#2013-03-21-22-56-00_0: {
#id: 1,
#type: "stream_output_start",
#row_id: 41,
#uri: "http://stream2.radioblackout.org:80/blackout.ogg",
#start: "2013-03-21-22-56-00",
#end: "2013-03-21-23-26-00",
#show_name: "Untitled Show",
#independent_event: true
#},
scheduled_now_files = \
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
scheduled_now_webstream = \
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \
scheduled_now)
schedule_ids = set(map(lambda x: i["row_id"], scheduled_now_files))
liq_queue_ids = set()
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if not self.is_media_item_finished(mi):
liq_queue_ids.add(mi["row_id"])
to_be_removed = liq_queue_ids - schedule_ids
to_be_added = schedule_ids - liq_queue_ids
if len(to_be_removed):
self.logger.info("Need to remove items from Liquidsoap: %s" % \
to_be_removed)
#remove files from Liquidsoap's queue
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.stop(i)
#stop any webstreams
if self.telnet_liquidsoap.get_current_stream_id() in to_be_removed:
self.telnet_liquidsoap.stop_web_stream_buffer()
self.telnet_liquidsoap.stop_web_stream_output()
if len(to_be_added):
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
to_be_added)
for i in scheduled_now:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
self.play(i)
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
#if len(scheduled_now_webstream):
#if current_stream_id != scheduled_now_webstream[0]:
#self.play(scheduled_now_webstream[0])
def play(self, media_item):
self.telnet_liquidsoap.push_item(media_item)
def stop(self, queue):
self.telnet_liquidsoap.queue_remove(queue)
self.liq_queue_tracker[queue] = None
def is_file(self, media_item):
return media_item["type"] == eventtypes.FILE
def modify_cue_point(self, link):
if not self.is_file(link):
return
tnow = datetime.utcnow()
link_start = link['start']
diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
def date_interval_to_seconds(self, interval):
"""
Convert timedelta object into int representing the number of seconds. If
number of seconds is less than 0, then return 0.
"""
seconds = (interval.microseconds + \
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
if seconds < 0: seconds = 0
return seconds
class UnknownMediaItemType(Exception):
pass
class NoQueueAvailableException(Exception):
pass

View file

@ -13,8 +13,8 @@ import traceback
import os import os
from pypofetch import PypoFetch from pypofetch import PypoFetch
from telnetliquidsoap import TelnetLiquidsoap
from pypoliqqueue import PypoLiqQueue from pypoliqqueue import PypoLiqQueue
from pypoliquidsoap import PypoLiquidsoap
from Queue import Empty, Queue from Queue import Empty, Queue
@ -62,28 +62,18 @@ class PypoPush(Thread):
self.logger = logging.getLogger('push') self.logger = logging.getLogger('push')
self.current_prebuffering_stream_id = None self.current_prebuffering_stream_id = None
self.queue_id = 0 self.queue_id = 0
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \
self.logger,\
LS_HOST,\
LS_PORT\
)
self.liq_queue_tracker = {
"s0": None,
"s1": None,
"s2": None,
"s3": None,
}
self.future_scheduled_queue = Queue() self.future_scheduled_queue = Queue()
self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\
LS_HOST, LS_PORT)
self.plq = PypoLiqQueue(self.future_scheduled_queue, \ self.plq = PypoLiqQueue(self.future_scheduled_queue, \
telnet_lock, \ self.pypo_liquidsoap, \
self.logger, \ self.logger)
self.liq_queue_tracker, \
self.telnet_liquidsoap)
self.plq.daemon = True self.plq.daemon = True
self.plq.start() self.plq.start()
def main(self): def main(self):
loops = 0 loops = 0
heartbeat_period = math.floor(30 / PUSH_INTERVAL) heartbeat_period = math.floor(30 / PUSH_INTERVAL)
@ -97,12 +87,13 @@ class PypoPush(Thread):
self.logger.error(str(e)) self.logger.error(str(e))
raise raise
else: else:
self.logger.debug(media_schedule)
#separate media_schedule list into currently_playing and #separate media_schedule list into currently_playing and
#scheduled_for_future lists #scheduled_for_future lists
currently_playing, scheduled_for_future = \ currently_playing, scheduled_for_future = \
self.separate_present_future(media_schedule) self.separate_present_future(media_schedule)
self.verify_correct_present_media(currently_playing) self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
self.future_scheduled_queue.put(scheduled_for_future) self.future_scheduled_queue.put(scheduled_for_future)
if loops % heartbeat_period == 0: if loops % heartbeat_period == 0:
@ -131,59 +122,6 @@ class PypoPush(Thread):
return present, future return present, future
def verify_correct_present_media(self, scheduled_now):
#verify whether Liquidsoap is currently playing the correct files.
#if we find an item that Liquidsoap is not playing, then push it
#into one of Liquidsoap's queues. If Liquidsoap is already playing
#it do nothing. If Liquidsoap is playing a track that isn't in
#currently_playing then stop it.
#Check for Liquidsoap media we should source.skip
#get liquidsoap items for each queue. Since each queue can only have one
#item, we should have a max of 8 items.
#TODO: Verify start, end, replay_gain is the same
#TODO: Verify this is a file or webstream and also handle webstreams
schedule_ids = set()
for i in scheduled_now:
schedule_ids.add(i["row_id"])
liq_queue_ids = set()
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if not self.plq.is_media_item_finished(mi):
liq_queue_ids.add(mi["row_id"])
to_be_removed = liq_queue_ids - schedule_ids
to_be_added = schedule_ids - liq_queue_ids
if len(to_be_removed):
self.logger.info("Need to remove items from Liquidsoap: %s" % \
to_be_removed)
for i in self.liq_queue_tracker:
mi = self.liq_queue_tracker[i]
if mi is not None and mi["row_id"] in to_be_removed:
self.telnet_liquidsoap.queue_remove(i)
self.liq_queue_tracker[i] = None
#liquidsoap.stop_play(mi)
if len(to_be_added):
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
to_be_added)
for i in scheduled_now:
if i["row_id"] in to_be_added:
self.modify_cue_point(i)
queue_id = self.plq.find_available_queue()
self.telnet_liquidsoap.queue_push(queue_id, i)
self.liq_queue_tracker[queue_id] = i
#liquidsoap.start_play(i)
def get_current_stream_id_from_liquidsoap(self): def get_current_stream_id_from_liquidsoap(self):
response = "-1" response = "-1"
try: try:
@ -222,20 +160,6 @@ class PypoPush(Thread):
#self.logger.debug("Is current item correct?: %s", str(correct)) #self.logger.debug("Is current item correct?: %s", str(correct))
#return correct #return correct
def modify_cue_point(self, link):
tnow = datetime.utcnow()
link_start = link['start']
diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
def date_interval_to_seconds(self, interval): def date_interval_to_seconds(self, interval):
""" """
Convert timedelta object into int representing the number of seconds. If Convert timedelta object into int representing the number of seconds. If

View file

@ -303,8 +303,6 @@ class Recorder(Thread):
heartbeat_period = math.floor(30 / PUSH_INTERVAL) heartbeat_period = math.floor(30 / PUSH_INTERVAL)
while True: while True:
if self.loops % heartbeat_period == 0:
self.logger.info("heartbeat")
if self.loops * PUSH_INTERVAL > 3600: if self.loops * PUSH_INTERVAL > 3600:
self.loops = 0 self.loops = 0
""" """

View file

@ -63,13 +63,12 @@ class TelnetLiquidsoap:
self.telnet_lock.release() self.telnet_lock.release()
def stop_web_stream_buffer(self, media_item): def stop_web_stream_buffer(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port) tn = telnetlib.Telnet(self.ls_host, self.ls_port)
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3 #dynamic_source.stop http://87.230.101.24:80/top100station.mp3
#msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
msg = 'http.stop\n' msg = 'http.stop\n'
self.logger.debug(msg) self.logger.debug(msg)
tn.write(msg) tn.write(msg)
@ -86,7 +85,7 @@ class TelnetLiquidsoap:
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
def stop_web_stream_output(self, media_item): def stop_web_stream_output(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port) tn = telnetlib.Telnet(self.ls_host, self.ls_port)
@ -142,13 +141,30 @@ class TelnetLiquidsoap:
tn.write("exit\n") tn.write("exit\n")
self.logger.debug(tn.read_all()) self.logger.debug(tn.read_all())
#TODO..
self.current_prebuffering_stream_id = media_item['row_id'] self.current_prebuffering_stream_id = media_item['row_id']
except Exception, e: except Exception, e:
self.logger.error(str(e)) self.logger.error(str(e))
finally: finally:
self.telnet_lock.release() self.telnet_lock.release()
def get_current_stream_id():
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
msg = 'dynamic_source.get_id\n'
self.logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
stream_id = tn.read_all().splitlines()[0]
self.logger.debug("stream_id: %s" % stream_id)
return stream_id
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
class DummyTelnetLiquidsoap: class DummyTelnetLiquidsoap: