From 91d1243554554cee9c9fc571eaf2fa78b0d0e743 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Thu, 14 Mar 2013 16:50:55 -0400 Subject: [PATCH 1/7] CC-1469: Crossfading support (non-equal power) -initial commit --- .../pypo/liquidsoap_scripts/ls_script.liq | 22 +- python_apps/pypo/pypocli.py | 15 +- python_apps/pypo/pypofetch.py | 16 +- python_apps/pypo/pypoliqqueue.py | 104 +++++++++ python_apps/pypo/pypopush.py | 199 +++++++++++++----- python_apps/pypo/telnetliquidsoap.py | 74 +++++++ python_apps/pypo/testpypoliqqueue.py | 98 +++++++++ 7 files changed, 463 insertions(+), 65 deletions(-) create mode 100644 python_apps/pypo/pypoliqqueue.py create mode 100644 python_apps/pypo/telnetliquidsoap.py create mode 100644 python_apps/pypo/testpypoliqqueue.py diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 566df0d9e..75935a6dd 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -35,7 +35,27 @@ just_switched = ref false %include "ls_lib.liq" -queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5)) +sources = ref [] +source_id = ref 0 + +def create_source() + sources := list.append([request.equeue(id="s#{!source_id}", length=0.5)], !sources) + source_id := !source_id + 1 +end + +create_source() +create_source() +create_source() +create_source() + +create_source() +create_source() +create_source() +create_source() + +queue = add(!sources) + +queue = audio_to_stereo(id="queue_src", queue) queue = cue_cut(queue) queue = amplify(1., override="replay_gain", queue) diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 941f3610a..9fed011fe 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -13,11 +13,12 @@ import signal import logging import locale import os -from Queue import Queue +from Queue import Queue from threading import Lock from pypopush import PypoPush +from pypoliqqueue import PypoLiqQueue from pypofetch import PypoFetch from pypofile import PypoFile from recorder import Recorder @@ -63,7 +64,7 @@ try: LogWriter.override_std_err(logger) except Exception, e: print "Couldn't configure logging" - sys.exit() + sys.exit(1) def configure_locale(): logger.debug("Before %s", locale.nl_langinfo(locale.CODESET)) @@ -228,8 +229,16 @@ if __name__ == '__main__': stat.daemon = True stat.start() + pypoLiq_q = Queue() + liq_queue_tracker = dict() + telnet_liquidsoap = TelnetLiquidsoap() + plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ + telnet_liquidsoap) + plq.daemon = True + plq.start() + # all join() are commented out because we want to exit entire pypo - # if pypofetch is exiting + # if pypofetch terminates #pmh.join() #recorder.join() #pp.join() diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index f10c8c958..06552765e 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -7,16 +7,16 @@ import logging.config import json import telnetlib import copy -from threading import Thread import subprocess +import datetime from Queue import Empty +from threading import Thread +from subprocess import Popen, PIPE +from configobj import ConfigObj from api_clients import api_client from std_err_override import LogWriter -from subprocess import Popen, PIPE - -from configobj import ConfigObj # configure logging logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg") @@ -481,6 +481,7 @@ class PypoFetch(Thread): except Exception, e: pass + media_copy = {} for key in media: media_item = media[key] if (media_item['type'] == 'file'): @@ -490,12 +491,17 @@ class PypoFetch(Thread): media_item['file_ready'] = False media_filtered[key] = media_item + media_item['start'] = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S") + media_item['end'] = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S") + media_copy[media_item['start']] = media_item + + self.media_prepare_queue.put(copy.copy(media_filtered)) except Exception, e: self.logger.error("%s", e) # Send the data to pypo-push self.logger.debug("Pushing to pypo-push") - self.push_queue.put(media) + self.push_queue.put(media_copy) # cleanup diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py new file mode 100644 index 000000000..53e9bbeec --- /dev/null +++ b/python_apps/pypo/pypoliqqueue.py @@ -0,0 +1,104 @@ +from threading import Thread +from collections import deque +from datetime import datetime + +import traceback +import sys + +from Queue import Empty + +import signal +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +class PypoLiqQueue(Thread): + def __init__(self, q, telnet_lock, logger, liq_queue_tracker, \ + telnet_liquidsoap): + Thread.__init__(self) + self.queue = q + self.telnet_lock = telnet_lock + self.logger = logger + self.liq_queue_tracker = liq_queue_tracker + self.telnet_liquidsoap = telnet_liquidsoap + + def main(self): + time_until_next_play = None + schedule_deque = deque() + media_schedule = None + + while True: + try: + if time_until_next_play is None: + media_schedule = self.queue.get(block=True) + else: + media_schedule = self.queue.get(block=True, timeout=time_until_next_play) + except Empty, e: + #Time to push a scheduled item. + media_item = schedule_deque.popleft() + self.telnet_to_liquidsoap(media_item) + if len(schedule_deque): + time_until_next_play = \ + self.date_interval_to_seconds( + schedule_deque[0]['start'] - datetime.utcnow()) + else: + time_until_next_play = None + else: + #new schedule received. Replace old one with this. + schedule_deque.clear() + + keys = sorted(media_schedule.keys()) + for i in keys: + schedule_deque.append(media_schedule[i]) + + time_until_next_play = self.date_interval_to_seconds(\ + keys[0] - datetime.utcnow()) + + def is_media_item_finished(self, media_item): + return datetime.utcnow() > media_item['end'] + + def telnet_to_liquidsoap(self, media_item): + """ + telnets to liquidsoap and pushes the media_item to its queue. Push the + show name of every media_item as well, just to keep Liquidsoap up-to-date + about which show is playing. + """ + + available_queue = None + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi == None or self.is_media_item_finished(mi): + #queue "i" is available. Push to this queue + available_queue = i + + if available_queue == None: + raise NoQueueAvailableException() + + try: + self.telnet_liquidsoap.queue_push(available_queue, media_item) + self.liq_queue_tracker[available_queue] = media_item + except Exception as e: + self.logger.error(e) + raise + + def date_interval_to_seconds(self, interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + if seconds < 0: seconds = 0 + + return seconds + + + def run(self): + try: self.main() + except Exception, e: + self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc()) + +class NoQueueAvailableException(Exception): + pass diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index f438b3bb1..ea51d0724 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -9,9 +9,15 @@ import logging.config import telnetlib import calendar import math +import traceback import os -from pypofetch import PypoFetch +from pypofetch import PypoFetch +from telnetliquidsoap import TelnetLiquidsoap +from pypoliqqueue import PypoLiqQueue + + +import Queue from Queue import Empty from threading import Thread @@ -58,6 +64,27 @@ class PypoPush(Thread): self.pushed_objects = {} self.logger = logging.getLogger('push') self.current_prebuffering_stream_id = None + self.queue_id = 0 + self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ + self.logger,\ + LS_HOST,\ + LS_PORT\ + ) + + liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + + self.pypoLiq_q = Queue() + self.plq = PypoLiqQueue(self.pypoLiq_q, \ + telnet_lock, \ + liq_queue_tracker, \ + self.telnet_liquidsoap) + plq.daemon = True + plq.start() def main(self): loops = 0 @@ -74,50 +101,9 @@ class PypoPush(Thread): media_schedule = self.queue.get(block=True) else: media_schedule = self.queue.get(block=True, timeout=time_until_next_play) - - chains = self.get_all_chains(media_schedule) - - #We get to the following lines only if a schedule was received. - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - liquidsoap_stream_id = self.get_current_stream_id_from_liquidsoap() - - tnow = datetime.utcnow() - current_event_chain, original_chain = self.get_current_chain(chains, tnow) - - if len(current_event_chain) > 0: - try: - chains.remove(original_chain) - except ValueError, e: - self.logger.error(str(e)) - - #At this point we know that Liquidsoap is playing something, and that something - #is scheduled. We need to verify whether the schedule we just received matches - #what Liquidsoap is playing, and if not, correct it. - - self.handle_new_schedule(media_schedule, liquidsoap_queue_approx, liquidsoap_stream_id, current_event_chain) - - - #At this point everything in the present has been taken care of and Liquidsoap - #is playing whatever is scheduled. - #Now we need to prepare ourselves for future scheduled events. - # - next_media_item_chain = self.get_next_schedule_chain(chains, tnow) - - self.logger.debug("Next schedule chain: %s", next_media_item_chain) - if next_media_item_chain is not None: - try: - chains.remove(next_media_item_chain) - except ValueError, e: - self.logger.error(str(e)) - - chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") - time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) - self.logger.debug("Blocking %s seconds until show start", time_until_next_play) - else: - self.logger.debug("Blocking indefinitely since no show scheduled") - time_until_next_play = None except Empty, e: #We only get here when a new chain of tracks are ready to be played. + #"timeout" has parameter has been reached. self.push_to_liquidsoap(next_media_item_chain) next_media_item_chain = self.get_next_schedule_chain(chains, datetime.utcnow()) @@ -126,7 +112,8 @@ class PypoPush(Thread): chains.remove(next_media_item_chain) except ValueError, e: self.logger.error(str(e)) - chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + + chain_start = next_media_item_chain[0]['start'] time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) self.logger.debug("Blocking %s seconds until show start", time_until_next_play) else: @@ -134,12 +121,111 @@ class PypoPush(Thread): time_until_next_play = None except Exception, e: self.logger.error(str(e)) + else: + #separate media_schedule list into currently_playing and + #scheduled_for_future lists + currently_playing, scheduled_for_future = \ + self.separate_present_future(media_schedule) + + self.verify_correct_present_media(currently_playing) + + self.future_scheduled_queue.put(scheduled_for_future) + + self.pypoLiq_q.put(scheduled_for_future) + + """ + #queue.get timeout never had a chance to expire. Instead a new + #schedule was received. Let's parse this schedule and generate + #a new timeout. + try: + chains = self.get_all_chains(media_schedule) + + #We get to the following lines only if a schedule was received. + liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() + liquidsoap_stream_id = self.get_current_stream_id_from_liquidsoap() + + tnow = datetime.utcnow() + current_event_chain, original_chain = \ + self.get_current_chain(chains, tnow) + + if len(current_event_chain) > 0: + try: + chains.remove(original_chain) + except ValueError, e: + self.logger.error(str(e)) + + #At this point we know that Liquidsoap is playing something, and that something + #is scheduled. We need to verify whether the schedule we just received matches + #what Liquidsoap is playing, and if not, correct it. + self.handle_new_schedule(media_schedule, \ + liquidsoap_queue_approx, \ + liquidsoap_stream_id, \ + current_event_chain) + + #At this point everything in the present has been taken care of and Liquidsoap + #is playing whatever is scheduled. + #Now we need to prepare ourselves for future scheduled events. + next_media_item_chain = self.get_next_schedule_chain(chains, tnow) + + self.logger.debug("Next schedule chain: %s", next_media_item_chain) + if next_media_item_chain is not None: + try: + chains.remove(next_media_item_chain) + except ValueError, e: + self.logger.error(str(e)) + + chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") + time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) + self.logger.debug("Blocking %s seconds until show start", time_until_next_play) + else: + self.logger.debug("Blocking indefinitely since no show scheduled") + time_until_next_play = None + except Exception, e: + self.logger.error(str(e)) + """ if loops % heartbeat_period == 0: self.logger.info("heartbeat") loops = 0 loops += 1 + + def separate_present_future(self, media_schedule): + tnow = datetime.utcnow() + + present = {} + future = {} + + sorted_keys = sorted(media_schedule.keys()) + for mkey in sorted_keys: + media_item = media_schedule[mkey] + + media_item_start = media_item['start'] + diff_td = tnow - media_item_start + diff_sec = self.date_interval_to_seconds(diff_td) + + if diff_sec >= 0: + present[media_item['start']] = media_item + else: + future[media_item['start']] = media_item + + return present, future + + def verify_correct_present_media(self, currently_playing): + #verify whether Liquidsoap is currently playing the correct items. + #if we find an item that Liquidsoap is not playing, then push it + #into one of Liquidsoap's queues. If Liquidsoap is already playing + #it do nothing. If Liquidsoap is playing a track that isn't in + #currently_playing then stop it. + + #Check for Liquidsoap media we should source.skip + #get liquidsoap items for each queue. Since each queue can only have one + #item, we should have a max of 8 items. + #TODO + + #Check for media Liquidsoap should start playing + #TODO + def get_current_stream_id_from_liquidsoap(self): response = "-1" try: @@ -167,7 +253,7 @@ class PypoPush(Thread): self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = 'queue.queue\n' + msg = 's0.queue\n' tn.write(msg) response = tn.read_until("\r\n").strip(" \r\n") tn.write('exit\n') @@ -355,7 +441,7 @@ class PypoPush(Thread): def modify_cue_point(self, link): tnow = datetime.utcnow() - link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S") + link_start = link['start'] diff_td = tnow - link_start diff_sec = self.date_interval_to_seconds(diff_td) @@ -399,8 +485,8 @@ class PypoPush(Thread): for chain in chains: iteration = 0 for link in chain: - link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S") - link_end = datetime.strptime(link['end'], "%Y-%m-%d-%H-%M-%S") + link_start = link['start'] + link_end = link['end'] self.logger.debug("tnow %s, chain_start %s", tnow, link_start) if link_start <= tnow and tnow < link_end: @@ -423,10 +509,12 @@ class PypoPush(Thread): closest_start = None closest_chain = None for chain in chains: - chain_start = datetime.strptime(chain[0]['start'], "%Y-%m-%d-%H-%M-%S") - chain_end = datetime.strptime(chain[-1]['end'], "%Y-%m-%d-%H-%M-%S") + chain_start = chain[0]['start'] + chain_end = chain[-1]['end'] self.logger.debug("tnow %s, chain_start %s", tnow, chain_start) - if (closest_start == None or chain_start < closest_start) and (chain_start > tnow or (chain_start < tnow and chain_end > tnow)): + if (closest_start == None or chain_start < closest_start) and \ + (chain_start > tnow or \ + (chain_start < tnow and chain_end > tnow)): closest_start = chain_start closest_chain = chain @@ -482,6 +570,8 @@ class PypoPush(Thread): self.stop_web_stream_output(media_item) except Exception, e: self.logger.error('Pypo Push Exception: %s', e) + finally: + self.queue_id = (self.queue_id + 1) % 8 def start_web_stream_buffer(self, media_item): @@ -640,7 +730,7 @@ class PypoPush(Thread): self.logger.debug(msg) tn.write(msg) - msg = "queue.queue\n" + msg = "s0.queue\n" self.logger.debug(msg) tn.write(msg) @@ -687,10 +777,8 @@ class PypoPush(Thread): self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - #tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8')) - annotation = self.create_liquidsoap_annotation(media_item) - msg = 'queue.push %s\n' % annotation.encode('utf-8') + msg = 's%s.push %s\n' % (self.queue_id, annotation.encode('utf-8')) self.logger.debug(msg) tn.write(msg) queue_id = tn.read_until("\r\n").strip("\r\n") @@ -722,7 +810,6 @@ class PypoPush(Thread): def run(self): try: self.main() except Exception, e: - import traceback top = traceback.format_exc() self.logger.error('Pypo Push Exception: %s', top) diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py new file mode 100644 index 000000000..5131d2c62 --- /dev/null +++ b/python_apps/pypo/telnetliquidsoap.py @@ -0,0 +1,74 @@ +import telnetlib + +def create_liquidsoap_annotation(media): + # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. + return 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s",replay_gain="%s dB":%s' \ + % (media['id'], float(media['fade_in']) / 1000, float(media['fade_out']) / 1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['replay_gain'], media['dst']) + +class TelnetLiquidsoap: + + def __init__(self, telnet_lock, logger, ls_host, ls_port): + self.telnet_lock = telnet_lock + self.ls_host = ls_host + self.ls_port = ls_port + self.logger = logger + + def __connect(self): + return telnetlib.Telnet(self.ls_host, self.ls_port) + + def __is_empty(self, tn, queue_id): + return True + + + def queue_push(self, queue_id, media_item): + try: + self.telnet_lock.acquire() + tn = self.__connect() + + if not self.__is_empty(tn, queue_id): + raise QueueNotEmptyException() + + annotation = create_liquidsoap_annotation(media_item) + msg = '%s.push %s\n' % (queue_id, annotation.encode('utf-8')) + self.logger.debug(msg) + tn.write(msg) + + show_name = media_item['show_name'] + msg = 'vars.show_name %s\n' % show_name.encode('utf-8') + tn.write(msg) + self.logger.debug(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + +class DummyTelnetLiquidsoap: + + def __init__(self, telnet_lock, logger): + self.telnet_lock = telnet_lock + self.liquidsoap_mock_queues = {} + self.logger = logger + + for i in range(4): + self.liquidsoap_mock_queues["s"+str(i)] = [] + + def queue_push(self, queue_id, media_item): + try: + self.telnet_lock.acquire() + + self.logger.info("Pushing %s to queue %s" % (media_item, queue_id)) + from datetime import datetime + print "Time now: %s" % datetime.utcnow() + + annotation = create_liquidsoap_annotation(media_item) + self.liquidsoap_mock_queues[queue_id].append(annotation) + except Exception: + raise + finally: + self.telnet_lock.release() + +class QueueNotEmptyException(Exception): + pass diff --git a/python_apps/pypo/testpypoliqqueue.py b/python_apps/pypo/testpypoliqqueue.py new file mode 100644 index 000000000..f1847b34f --- /dev/null +++ b/python_apps/pypo/testpypoliqqueue.py @@ -0,0 +1,98 @@ +from pypoliqqueue import PypoLiqQueue +from telnetliquidsoap import DummyTelnetLiquidsoap, TelnetLiquidsoap + + +from Queue import Queue +from threading import Lock + +import sys +import signal +import logging +from datetime import datetime +from datetime import timedelta + +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +# configure logging +format = '%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s' +logging.basicConfig(level=logging.DEBUG, format=format) +logging.captureWarnings(True) + +telnet_lock = Lock() +pypoPush_q = Queue() + + +pypoLiq_q = Queue() +liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + +#dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging) +dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, \ + "localhost", \ + 1234) + +plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logging, liq_queue_tracker, \ + dummy_telnet_liquidsoap) +plq.daemon = True +plq.start() + + +print "Time now: %s" % datetime.utcnow() + +media_schedule = {} + +start_dt = datetime.utcnow() + timedelta(seconds=1) +end_dt = datetime.utcnow() + timedelta(seconds=6) + +media_schedule[start_dt] = {"id": 5, \ + "type":"file", \ + "row_id":9, \ + "uri":"", \ + "dst":"/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3", \ + "fade_in":0, \ + "fade_out":0, \ + "cue_in":0, \ + "cue_out":300, \ + "start": start_dt, \ + "end": end_dt, \ + "show_name":"Untitled", \ + "replay_gain": 0, \ + "independent_event": True \ + } + + + +start_dt = datetime.utcnow() + timedelta(seconds=2) +end_dt = datetime.utcnow() + timedelta(seconds=6) + +media_schedule[start_dt] = {"id": 5, \ + "type":"file", \ + "row_id":9, \ + "uri":"", \ + "dst":"/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3", \ + "fade_in":0, \ + "fade_out":0, \ + "cue_in":0, \ + "cue_out":300, \ + "start": start_dt, \ + "end": end_dt, \ + "show_name":"Untitled", \ + "replay_gain": 0, \ + "independent_event": True \ + } +pypoLiq_q.put(media_schedule) + +plq.join() + + + + + From 2b7ebafa94e45de557a04ad774baed790dbbb5bd Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Thu, 14 Mar 2013 18:29:52 -0400 Subject: [PATCH 2/7] CC-1469: Crossfading support (non-equal power) -further implementation.. --- python_apps/pypo/pypoliqqueue.py | 21 ++++++++---- python_apps/pypo/pypopush.py | 48 +++++++++++++++++++++------- python_apps/pypo/telnetliquidsoap.py | 15 +++++++++ 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py index 53e9bbeec..027a2cd5f 100644 --- a/python_apps/pypo/pypoliqqueue.py +++ b/python_apps/pypo/pypoliqqueue.py @@ -43,6 +43,8 @@ class PypoLiqQueue(Thread): time_until_next_play = \ self.date_interval_to_seconds( schedule_deque[0]['start'] - datetime.utcnow()) + if time_until_next_play < 0: + time_until_next_play = 0 else: time_until_next_play = None else: @@ -59,13 +61,7 @@ class PypoLiqQueue(Thread): def is_media_item_finished(self, media_item): return datetime.utcnow() > media_item['end'] - def telnet_to_liquidsoap(self, media_item): - """ - telnets to liquidsoap and pushes the media_item to its queue. Push the - show name of every media_item as well, just to keep Liquidsoap up-to-date - about which show is playing. - """ - + def find_available_queue(self): available_queue = None for i in self.liq_queue_tracker: mi = self.liq_queue_tracker[i] @@ -76,6 +72,17 @@ class PypoLiqQueue(Thread): if available_queue == None: raise NoQueueAvailableException() + return available_queue + + def telnet_to_liquidsoap(self, media_item): + """ + telnets to liquidsoap and pushes the media_item to its queue. Push the + show name of every media_item as well, just to keep Liquidsoap up-to-date + about which show is playing. + """ + + available_queue = self.find_available_queue() + try: self.telnet_liquidsoap.queue_push(available_queue, media_item) self.liq_queue_tracker[available_queue] = media_item diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index ea51d0724..b6f83623e 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -71,15 +71,15 @@ class PypoPush(Thread): LS_PORT\ ) - liq_queue_tracker = { + self.liq_queue_tracker = { "s0": None, "s1": None, "s2": None, "s3": None, } - self.pypoLiq_q = Queue() - self.plq = PypoLiqQueue(self.pypoLiq_q, \ + self.future_scheduled_queue = Queue() + self.plq = PypoLiqQueue(self.future_scheduled_queue, \ telnet_lock, \ liq_queue_tracker, \ self.telnet_liquidsoap) @@ -131,8 +131,6 @@ class PypoPush(Thread): self.future_scheduled_queue.put(scheduled_for_future) - self.pypoLiq_q.put(scheduled_for_future) - """ #queue.get timeout never had a chance to expire. Instead a new #schedule was received. Let's parse this schedule and generate @@ -193,7 +191,7 @@ class PypoPush(Thread): def separate_present_future(self, media_schedule): tnow = datetime.utcnow() - present = {} + present = [] future = {} sorted_keys = sorted(media_schedule.keys()) @@ -205,13 +203,13 @@ class PypoPush(Thread): diff_sec = self.date_interval_to_seconds(diff_td) if diff_sec >= 0: - present[media_item['start']] = media_item + present.append(media_item) else: future[media_item['start']] = media_item return present, future - def verify_correct_present_media(self, currently_playing): + def verify_correct_present_media(self, scheduled_now): #verify whether Liquidsoap is currently playing the correct items. #if we find an item that Liquidsoap is not playing, then push it #into one of Liquidsoap's queues. If Liquidsoap is already playing @@ -221,10 +219,38 @@ class PypoPush(Thread): #Check for Liquidsoap media we should source.skip #get liquidsoap items for each queue. Since each queue can only have one #item, we should have a max of 8 items. - #TODO - #Check for media Liquidsoap should start playing - #TODO + schedule_ids = set() + for i in scheduled_now: + schedule_ids.add(i["row_id"]) + + liq_queue_ids = set() + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if not self.plq.is_media_item_finished(mi): + liq_queue_ids.add(mi["row_id"]) + + to_be_added = schedule_ids - liq_queue_ids + to_be_removed = liq_queue_ids - schedule_ids + + if len(to_be_removed): + self.logger.info("Need to remove items from Liquidsoap: %s" % \ + to_be_removed) + + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi["row_id"] in to_be_removed: + self.telnet_liquidsoap.queue_remove(i) + + + if len(to_be_added): + self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ + to_be_added) + + for i in scheduled_now: + if i["row_id"] in to_be_added: + queue_id = self.plq.find_available_queue() + self.telnet_liquidsoap.queue_push(queue_id) def get_current_stream_id_from_liquidsoap(self): response = "-1" diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 5131d2c62..2896c982a 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -19,6 +19,21 @@ class TelnetLiquidsoap: def __is_empty(self, tn, queue_id): return True + def queue_remove(self, queue_id): + try: + self.telnet_lock.acquire() + tn = self.__connect() + + + #TODO: Need a source.skip for each queue + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + def queue_push(self, queue_id, media_item): try: From dd7fc61e23994e80f32b968d902c8340014a8249 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 15 Mar 2013 12:50:23 -0400 Subject: [PATCH 3/7] CC-1469: Crossfading support (non-equal power) -further implementation.. --- .../pypo/liquidsoap_scripts/ls_lib.liq | 46 ++++++++++--------- .../pypo/liquidsoap_scripts/ls_script.liq | 11 ++++- python_apps/pypo/pypocli.py | 14 +++--- python_apps/pypo/pypofetch.py | 2 +- python_apps/pypo/pypoliqqueue.py | 16 +++++-- python_apps/pypo/pypopush.py | 23 +++++----- python_apps/pypo/telnetliquidsoap.py | 20 ++++++-- 7 files changed, 83 insertions(+), 49 deletions(-) diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index 5aa77cac8..ccb37026f 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -354,28 +354,32 @@ end # Add a skip function to a source # when it does not have one # by default -def add_skip_command(s) - # A command to skip - def skip(_) - # get playing (active) queue and flush it - l = list.hd(server.execute("queue.secondary_queue")) - l = string.split(separator=" ",l) - list.iter(fun (rid) -> ignore(server.execute("queue.remove #{rid}")), l) +#def add_skip_command(s) +# # A command to skip +# def skip(_) +# # get playing (active) queue and flush it +# l = list.hd(server.execute("queue.secondary_queue")) +# l = string.split(separator=" ",l) +# list.iter(fun (rid) -> ignore(server.execute("queue.remove #{rid}")), l) +# +# l = list.hd(server.execute("queue.primary_queue")) +# l = string.split(separator=" ", l) +# if list.length(l) > 0 then +# source.skip(s) +# "Skipped" +# else +# "Not skipped" +# end +# end +# # Register the command: +# server.register(namespace="source", +# usage="skip", +# description="Skip the current song.", +# "skip",fun(s) -> begin log("source.skip") skip(s) end) +#end - l = list.hd(server.execute("queue.primary_queue")) - l = string.split(separator=" ", l) - if list.length(l) > 0 then - source.skip(s) - "Skipped" - else - "Not skipped" - end - end - # Register the command: - server.register(namespace="source", - usage="skip", - description="Skip the current song.", - "skip",fun(s) -> begin log("source.skip") skip(s) end) +def clear_queue(s) + source.skip(s) end def set_dynamic_source_id(id) = diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 75935a6dd..353b9c2c5 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -39,7 +39,14 @@ sources = ref [] source_id = ref 0 def create_source() - sources := list.append([request.equeue(id="s#{!source_id}", length=0.5)], !sources) + l = request.equeue(id="s#{!source_id}", length=0.5) + sources := list.append([l], !sources) + server.register(namespace="queues", + "s#{!source_id}_skip", + fun (s) -> begin log("queues.s#{!source_id}_skip") + clear_queue(l) + "Done" + end) source_id := !source_id + 1 end @@ -268,7 +275,7 @@ end # Attach a skip command to the source s: -add_skip_command(s) +#add_skip_command(s) server.register(namespace="streams", description="Stop Master DJ source.", diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 9fed011fe..02dfe6c46 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -229,13 +229,13 @@ if __name__ == '__main__': stat.daemon = True stat.start() - pypoLiq_q = Queue() - liq_queue_tracker = dict() - telnet_liquidsoap = TelnetLiquidsoap() - plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ - telnet_liquidsoap) - plq.daemon = True - plq.start() + #pypoLiq_q = Queue() + #liq_queue_tracker = dict() + #telnet_liquidsoap = TelnetLiquidsoap() + #plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ + #telnet_liquidsoap) + #plq.daemon = True + #plq.start() # all join() are commented out because we want to exit entire pypo # if pypofetch terminates diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 06552765e..3804b3458 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -8,7 +8,7 @@ import json import telnetlib import copy import subprocess -import datetime +from datetime import datetime from Queue import Empty from threading import Thread diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py index 027a2cd5f..87458d4df 100644 --- a/python_apps/pypo/pypoliqqueue.py +++ b/python_apps/pypo/pypoliqqueue.py @@ -32,9 +32,13 @@ class PypoLiqQueue(Thread): while True: try: if time_until_next_play is None: + self.logger.info("waiting indefinitely for schedule") media_schedule = self.queue.get(block=True) else: - media_schedule = self.queue.get(block=True, timeout=time_until_next_play) + self.logger.info("waiting %ss until next scheduled item" % \ + time_until_next_play) + media_schedule = self.queue.get(block=True, \ + timeout=time_until_next_play) except Empty, e: #Time to push a scheduled item. media_item = schedule_deque.popleft() @@ -55,11 +59,15 @@ class PypoLiqQueue(Thread): for i in keys: schedule_deque.append(media_schedule[i]) - time_until_next_play = self.date_interval_to_seconds(\ - keys[0] - datetime.utcnow()) + if len(keys): + time_until_next_play = self.date_interval_to_seconds(\ + keys[0] - datetime.utcnow()) def is_media_item_finished(self, media_item): - return datetime.utcnow() > media_item['end'] + if media_item is None: + return True + else: + return datetime.utcnow() > media_item['end'] def find_available_queue(self): available_queue = None diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index b6f83623e..0b4371042 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -17,8 +17,7 @@ from telnetliquidsoap import TelnetLiquidsoap from pypoliqqueue import PypoLiqQueue -import Queue -from Queue import Empty +from Queue import Empty, Queue from threading import Thread @@ -81,10 +80,11 @@ class PypoPush(Thread): self.future_scheduled_queue = Queue() self.plq = PypoLiqQueue(self.future_scheduled_queue, \ telnet_lock, \ - liq_queue_tracker, \ + self.logger, \ + self.liq_queue_tracker, \ self.telnet_liquidsoap) - plq.daemon = True - plq.start() + self.plq.daemon = True + self.plq.start() def main(self): loops = 0 @@ -198,8 +198,7 @@ class PypoPush(Thread): for mkey in sorted_keys: media_item = media_schedule[mkey] - media_item_start = media_item['start'] - diff_td = tnow - media_item_start + diff_td = tnow - media_item['start'] diff_sec = self.date_interval_to_seconds(diff_td) if diff_sec >= 0: @@ -230,8 +229,8 @@ class PypoPush(Thread): if not self.plq.is_media_item_finished(mi): liq_queue_ids.add(mi["row_id"]) - to_be_added = schedule_ids - liq_queue_ids to_be_removed = liq_queue_ids - schedule_ids + to_be_added = schedule_ids - liq_queue_ids if len(to_be_removed): self.logger.info("Need to remove items from Liquidsoap: %s" % \ @@ -239,8 +238,9 @@ class PypoPush(Thread): for i in self.liq_queue_tracker: mi = self.liq_queue_tracker[i] - if mi["row_id"] in to_be_removed: + if mi is not None and mi["row_id"] in to_be_removed: self.telnet_liquidsoap.queue_remove(i) + self.liq_queue_tracker[i] = None if len(to_be_added): @@ -249,8 +249,10 @@ class PypoPush(Thread): for i in scheduled_now: if i["row_id"] in to_be_added: + self.modify_cue_point(i) queue_id = self.plq.find_available_queue() - self.telnet_liquidsoap.queue_push(queue_id) + self.telnet_liquidsoap.queue_push(queue_id, i) + self.liq_queue_tracker[queue_id] = i def get_current_stream_id_from_liquidsoap(self): response = "-1" @@ -554,7 +556,6 @@ class PypoPush(Thread): """ seconds = (interval.microseconds + \ (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - if seconds < 0: seconds = 0 return seconds diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 2896c982a..7bc7e362d 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -24,9 +24,10 @@ class TelnetLiquidsoap: self.telnet_lock.acquire() tn = self.__connect() - - #TODO: Need a source.skip for each queue - + msg = 'queues.%s_skip\n' % queue_id + self.logger.debug(msg) + tn.write(msg) + tn.write("exit\n") self.logger.debug(tn.read_all()) except Exception: @@ -85,5 +86,18 @@ class DummyTelnetLiquidsoap: finally: self.telnet_lock.release() + def queue_remove(self, queue_id): + try: + self.telnet_lock.acquire() + + self.logger.info("Purging queue %s" % queue_id) + from datetime import datetime + print "Time now: %s" % datetime.utcnow() + + except Exception: + raise + finally: + self.telnet_lock.release() + class QueueNotEmptyException(Exception): pass From 445573dcdbedfdb1bd4048c439eec962d5af0eaf Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 15 Mar 2013 15:07:55 -0400 Subject: [PATCH 4/7] CC-1469: Crossfading support (non-equal power) -webstreams scheduled in the future are now working... --- python_apps/pypo/eventtypes.py | 6 + python_apps/pypo/pypofetch.py | 2 +- python_apps/pypo/pypoliqqueue.py | 65 ++- python_apps/pypo/pypopush.py | 612 +-------------------------- python_apps/pypo/telnetliquidsoap.py | 90 ++++ 5 files changed, 177 insertions(+), 598 deletions(-) create mode 100644 python_apps/pypo/eventtypes.py diff --git a/python_apps/pypo/eventtypes.py b/python_apps/pypo/eventtypes.py new file mode 100644 index 000000000..5f9c871db --- /dev/null +++ b/python_apps/pypo/eventtypes.py @@ -0,0 +1,6 @@ +FILE = "file" +EVENT = "event" +STREAM_BUFFER_START = "stream_buffer_start" +STREAM_OUTPUT_START = "stream_output_start" +STREAM_BUFFER_END = "stream_buffer_end" +STREAM_OUTPUT_END = "stream_output_end" diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 3804b3458..66243a15a 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -540,7 +540,7 @@ class PypoFetch(Thread): #check if this file is opened (sometimes Liquidsoap is still #playing the file due to our knowledge of the track length #being incorrect!) - if not self.is_file_opened(): + if not self.is_file_opened(path): os.remove(path) except Exception, e: self.logger.error(e) diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py index 87458d4df..54d7c5d93 100644 --- a/python_apps/pypo/pypoliqqueue.py +++ b/python_apps/pypo/pypoliqqueue.py @@ -1,9 +1,13 @@ from threading import Thread from collections import deque from datetime import datetime +from pypofetch import PypoFetch + +import eventtypes import traceback import sys +import time from Queue import Empty @@ -52,6 +56,8 @@ class PypoLiqQueue(Thread): else: time_until_next_play = None else: + self.logger.info("New schedule received: %s", media_schedule) + #new schedule received. Replace old one with this. schedule_deque.clear() @@ -62,6 +68,8 @@ class PypoLiqQueue(Thread): if len(keys): time_until_next_play = self.date_interval_to_seconds(\ keys[0] - datetime.utcnow()) + else: + time_until_next_play = None def is_media_item_finished(self, media_item): if media_item is None: @@ -88,15 +96,53 @@ class PypoLiqQueue(Thread): show name of every media_item as well, just to keep Liquidsoap up-to-date about which show is playing. """ - - available_queue = self.find_available_queue() - try: - self.telnet_liquidsoap.queue_push(available_queue, media_item) - self.liq_queue_tracker[available_queue] = media_item - except Exception as e: - self.logger.error(e) - raise + if media_item["type"] == eventtypes.FILE: + self.handle_file_type(media_item) + elif media_item["type"] == eventtypes.EVENT: + self.handle_event_type(media_item) + elif media_item["type"] == eventtypes.STREAM_BUFFER_START: + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + elif media_item["type"] == eventtypes.STREAM_OUTPUT_START: + if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id: + #this is called if the stream wasn't scheduled sufficiently ahead of time + #so that the prebuffering stage could take effect. Let's do the prebuffering now. + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + self.telnet_liquidsoap.start_web_stream(media_item) + elif media_item['type'] == eventtypes.STREAM_BUFFER_END: + self.telnet_liquidsoap.stop_web_stream_buffer(media_item) + elif media_item['type'] == eventtypes.STREAM_OUTPUT_END: + self.telnet_liquidsoap.stop_web_stream_output(media_item) + else: raise UnknownMediaItemType(str(media_item)) + + def handle_event_type(self, media_item): + if media_item['event_type'] == "kick_out": + PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") + elif media_item['event_type'] == "switch_off": + PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") + + + def handle_file_type(self, media_item): + """ + Wait maximum 5 seconds (50 iterations) for file to become ready, + otherwise give up on it. + """ + iter_num = 0 + while not media_item['file_ready'] and iter_num < 50: + time.sleep(0.1) + iter_num += 1 + + if media_item['file_ready']: + available_queue = self.find_available_queue() + + try: + self.telnet_liquidsoap.queue_push(available_queue, media_item) + self.liq_queue_tracker[available_queue] = media_item + except Exception as e: + self.logger.error(e) + raise + else: + self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) def date_interval_to_seconds(self, interval): """ @@ -117,3 +163,6 @@ class PypoLiqQueue(Thread): class NoQueueAvailableException(Exception): pass + +class UnknownMediaItemType(Exception): + pass diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 0b4371042..6ae3e27d1 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -16,7 +16,6 @@ from pypofetch import PypoFetch from telnetliquidsoap import TelnetLiquidsoap from pypoliqqueue import PypoLiqQueue - from Queue import Empty, Queue from threading import Thread @@ -41,7 +40,6 @@ try: LS_HOST = config['ls_host'] LS_PORT = config['ls_port'] PUSH_INTERVAL = 2 - MAX_LIQUIDSOAP_QUEUE_LENGTH = 2 except Exception, e: logger.error('Error loading config file %s', e) sys.exit() @@ -90,37 +88,14 @@ class PypoPush(Thread): loops = 0 heartbeat_period = math.floor(30 / PUSH_INTERVAL) - next_media_item_chain = None media_schedule = None - time_until_next_play = None - chains = None while True: try: - if time_until_next_play is None: - media_schedule = self.queue.get(block=True) - else: - media_schedule = self.queue.get(block=True, timeout=time_until_next_play) - except Empty, e: - #We only get here when a new chain of tracks are ready to be played. - #"timeout" has parameter has been reached. - self.push_to_liquidsoap(next_media_item_chain) - - next_media_item_chain = self.get_next_schedule_chain(chains, datetime.utcnow()) - if next_media_item_chain is not None: - try: - chains.remove(next_media_item_chain) - except ValueError, e: - self.logger.error(str(e)) - - chain_start = next_media_item_chain[0]['start'] - time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) - self.logger.debug("Blocking %s seconds until show start", time_until_next_play) - else: - self.logger.debug("Blocking indefinitely since no show scheduled next") - time_until_next_play = None + media_schedule = self.queue.get(block=True) except Exception, e: self.logger.error(str(e)) + raise else: #separate media_schedule list into currently_playing and #scheduled_for_future lists @@ -128,60 +103,8 @@ class PypoPush(Thread): self.separate_present_future(media_schedule) self.verify_correct_present_media(currently_playing) - self.future_scheduled_queue.put(scheduled_for_future) - """ - #queue.get timeout never had a chance to expire. Instead a new - #schedule was received. Let's parse this schedule and generate - #a new timeout. - try: - chains = self.get_all_chains(media_schedule) - - #We get to the following lines only if a schedule was received. - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - liquidsoap_stream_id = self.get_current_stream_id_from_liquidsoap() - - tnow = datetime.utcnow() - current_event_chain, original_chain = \ - self.get_current_chain(chains, tnow) - - if len(current_event_chain) > 0: - try: - chains.remove(original_chain) - except ValueError, e: - self.logger.error(str(e)) - - #At this point we know that Liquidsoap is playing something, and that something - #is scheduled. We need to verify whether the schedule we just received matches - #what Liquidsoap is playing, and if not, correct it. - self.handle_new_schedule(media_schedule, \ - liquidsoap_queue_approx, \ - liquidsoap_stream_id, \ - current_event_chain) - - #At this point everything in the present has been taken care of and Liquidsoap - #is playing whatever is scheduled. - #Now we need to prepare ourselves for future scheduled events. - next_media_item_chain = self.get_next_schedule_chain(chains, tnow) - - self.logger.debug("Next schedule chain: %s", next_media_item_chain) - if next_media_item_chain is not None: - try: - chains.remove(next_media_item_chain) - except ValueError, e: - self.logger.error(str(e)) - - chain_start = datetime.strptime(next_media_item_chain[0]['start'], "%Y-%m-%d-%H-%M-%S") - time_until_next_play = self.date_interval_to_seconds(chain_start - datetime.utcnow()) - self.logger.debug("Blocking %s seconds until show start", time_until_next_play) - else: - self.logger.debug("Blocking indefinitely since no show scheduled") - time_until_next_play = None - except Exception, e: - self.logger.error(str(e)) - """ - if loops % heartbeat_period == 0: self.logger.info("heartbeat") loops = 0 @@ -209,7 +132,7 @@ class PypoPush(Thread): return present, future def verify_correct_present_media(self, scheduled_now): - #verify whether Liquidsoap is currently playing the correct items. + #verify whether Liquidsoap is currently playing the correct files. #if we find an item that Liquidsoap is not playing, then push it #into one of Liquidsoap's queues. If Liquidsoap is already playing #it do nothing. If Liquidsoap is playing a track that isn't in @@ -219,6 +142,9 @@ class PypoPush(Thread): #get liquidsoap items for each queue. Since each queue can only have one #item, we should have a max of 8 items. + #TODO: Verify start, end, replay_gain is the same + #TODO: Verify this is a file or webstream and also handle webstreams + schedule_ids = set() for i in scheduled_now: schedule_ids.add(i["row_id"]) @@ -273,198 +199,24 @@ class PypoPush(Thread): return response - def get_queue_items_from_liquidsoap(self): - """ - This function connects to Liquidsoap to find what media items are in its queue. - """ - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) + #def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id): + #correct = False + #if media_item is None: + #correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1") + #else: + #if is_file(media_item): + #if len(liquidsoap_queue_approx) == 0: + #correct = False + #else: + #correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \ + #liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \ + #liquidsoap_queue_approx[0]['end'] == media_item['end'] and \ + #liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain'] + #elif is_stream(media_item): + #correct = liquidsoap_stream_id == str(media_item['row_id']) - msg = 's0.queue\n' - tn.write(msg) - response = tn.read_until("\r\n").strip(" \r\n") - tn.write('exit\n') - tn.read_all() - except Exception, e: - self.logger.error("Error connecting to Liquidsoap: %s", e) - response = [] - finally: - self.telnet_lock.release() - - liquidsoap_queue_approx = [] - - if len(response) > 0: - items_in_queue = response.split(" ") - - self.logger.debug("items_in_queue: %s", items_in_queue) - - for item in items_in_queue: - if item in self.pushed_objects: - liquidsoap_queue_approx.append(self.pushed_objects[item]) - else: - """ - We should only reach here if Pypo crashed and restarted (because self.pushed_objects was reset). In this case - let's clear the entire Liquidsoap queue. - """ - self.logger.error("ID exists in liquidsoap queue that does not exist in our pushed_objects queue: " + item) - self.clear_liquidsoap_queue() - liquidsoap_queue_approx = [] - break - - return liquidsoap_queue_approx - - def is_correct_current_item(self, media_item, liquidsoap_queue_approx, liquidsoap_stream_id): - correct = False - if media_item is None: - correct = (len(liquidsoap_queue_approx) == 0 and liquidsoap_stream_id == "-1") - else: - if is_file(media_item): - if len(liquidsoap_queue_approx) == 0: - correct = False - else: - correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \ - liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \ - liquidsoap_queue_approx[0]['end'] == media_item['end'] and \ - liquidsoap_queue_approx[0]['replay_gain'] == media_item['replay_gain'] - elif is_stream(media_item): - correct = liquidsoap_stream_id == str(media_item['row_id']) - - self.logger.debug("Is current item correct?: %s", str(correct)) - return correct - - - #clear all webstreams and files from Liquidsoap - def clear_all_liquidsoap_items(self): - self.remove_from_liquidsoap_queue(0, None) - self.stop_web_stream_all() - - def handle_new_schedule(self, media_schedule, liquidsoap_queue_approx, liquidsoap_stream_id, current_event_chain): - """ - This function's purpose is to gracefully handle situations where - Liquidsoap already has a track in its queue, but the schedule - has changed. If the schedule has changed, this function's job is to - call other functions that will connect to Liquidsoap and alter its - queue. - """ - file_chain = filter(lambda item: (item["type"] == "file"), current_event_chain) - stream_chain = filter(lambda item: (item["type"] == "stream_output_start"), current_event_chain) - - self.logger.debug(current_event_chain) - - #Take care of the case where the current playing may be incorrect - if len(current_event_chain) > 0: - - current_item = current_event_chain[0] - if not self.is_correct_current_item(current_item, liquidsoap_queue_approx, liquidsoap_stream_id): - self.clear_all_liquidsoap_items() - if is_stream(current_item): - if current_item['row_id'] != self.current_prebuffering_stream_id: - #this is called if the stream wasn't scheduled sufficiently ahead of time - #so that the prebuffering stage could take effect. Let's do the prebuffering now. - self.start_web_stream_buffer(current_item) - self.start_web_stream(current_item) - if is_file(current_item): - file_chain = self.modify_first_link_cue_point(file_chain) - self.push_to_liquidsoap(file_chain) - #we've changed the queue, so let's refetch it - liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - - elif not self.is_correct_current_item(None, liquidsoap_queue_approx, liquidsoap_stream_id): - #Liquidsoap is playing something even though it shouldn't be - self.clear_all_liquidsoap_items() - - - #If the current item scheduled is a file, then files come in chains, and - #therefore we need to make sure the entire chain is correct. - if len(current_event_chain) > 0 and is_file(current_event_chain[0]): - problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx) - - if problem_at_iteration is not None: - #Items that are in Liquidsoap's queue aren't scheduled anymore. We need to connect - #and remove these items. - self.logger.debug("Change in link %s of current chain", problem_at_iteration) - self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx[problem_at_iteration:]) - - if problem_at_iteration is None and len(file_chain) > len(liquidsoap_queue_approx): - self.logger.debug("New schedule has longer current chain.") - problem_at_iteration = len(liquidsoap_queue_approx) - - if problem_at_iteration is not None: - self.logger.debug("Change in chain at link %s", problem_at_iteration) - - chain_to_push = file_chain[problem_at_iteration:] - if len(chain_to_push) > 0: - chain_to_push = self.modify_first_link_cue_point(chain_to_push) - self.push_to_liquidsoap(chain_to_push) - - - """ - Compare whats in the liquidsoap_queue to the new schedule we just - received in media_schedule. This function only iterates over liquidsoap_queue_approx - and finds if every item in that list is still scheduled in "media_schedule". It doesn't - take care of the case where media_schedule has more items than liquidsoap_queue_approx - """ - def find_removed_items(self, media_schedule, liquidsoap_queue_approx): - #iterate through the items we got from the liquidsoap queue and - #see if they are the same as the newly received schedule - iteration = 0 - problem_at_iteration = None - for queue_item in liquidsoap_queue_approx: - if queue_item['start'] in media_schedule.keys(): - media_item = media_schedule[queue_item['start']] - if queue_item['row_id'] == media_item['row_id']: - if queue_item['end'] == media_item['end']: - #Everything OK for this iteration. - pass - else: - problem_at_iteration = iteration - break - else: - #A different item has been scheduled at the same time! Need to remove - #all tracks from the Liquidsoap queue starting at this point, and re-add - #them. - problem_at_iteration = iteration - break - else: - #There are no more items scheduled for this time! The user has shortened - #the playlist, so we simply need to remove tracks from the queue. - problem_at_iteration = iteration - break - iteration += 1 - return problem_at_iteration - - - - def get_all_chains(self, media_schedule): - chains = [] - - current_chain = [] - - sorted_keys = sorted(media_schedule.keys()) - - for mkey in sorted_keys: - media_item = media_schedule[mkey] - if media_item['independent_event']: - if len(current_chain) > 0: - chains.append(current_chain) - - chains.append([media_item]) - current_chain = [] - elif len(current_chain) == 0: - current_chain.append(media_item) - elif media_item['start'] == current_chain[-1]['end']: - current_chain.append(media_item) - else: - #current item is not a continuation of the chain. - #Start a new one instead - chains.append(current_chain) - current_chain = [media_item] - - if len(current_chain) > 0: - chains.append(current_chain) - - return chains + #self.logger.debug("Is current item correct?: %s", str(correct)) + #return correct def modify_cue_point(self, link): tnow = datetime.utcnow() @@ -479,75 +231,6 @@ class PypoPush(Thread): original_cue_in_td = timedelta(seconds=float(link['cue_in'])) link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec - def modify_first_link_cue_point(self, chain): - if not len(chain): - return [] - - first_link = chain[0] - - self.modify_cue_point(first_link) - if float(first_link['cue_in']) >= float(first_link['cue_out']): - chain = chain [1:] - - return chain - - """ - Returns two chains, original chain and current_chain. current_chain is a subset of - original_chain but can also be equal to original chain. - - We return original chain because the user of this function may want to clean - up the input 'chains' list - - chain, original = get_current_chain(chains) - - and - chains.remove(chain) can throw a ValueError exception - - but - chains.remove(original) won't - """ - def get_current_chain(self, chains, tnow): - current_chain = [] - original_chain = None - - for chain in chains: - iteration = 0 - for link in chain: - link_start = link['start'] - link_end = link['end'] - - self.logger.debug("tnow %s, chain_start %s", tnow, link_start) - if link_start <= tnow and tnow < link_end: - current_chain = chain[iteration:] - original_chain = chain - break - iteration += 1 - - return current_chain, original_chain - - """ - The purpose of this function is to take a look at the last received schedule from - pypo-fetch and return the next chain of media_items. A chain is defined as a sequence - of media_items where the end time of media_item 'n' is the start time of media_item - 'n+1' - """ - def get_next_schedule_chain(self, chains, tnow): - #all media_items are now divided into chains. Let's find the one that - #starts closest in the future. - closest_start = None - closest_chain = None - for chain in chains: - chain_start = chain[0]['start'] - chain_end = chain[-1]['end'] - self.logger.debug("tnow %s, chain_start %s", tnow, chain_start) - if (closest_start == None or chain_start < closest_start) and \ - (chain_start > tnow or \ - (chain_start < tnow and chain_end > tnow)): - closest_start = chain_start - closest_chain = chain - - return closest_chain - def date_interval_to_seconds(self, interval): """ @@ -559,94 +242,6 @@ class PypoPush(Thread): return seconds - def push_to_liquidsoap(self, event_chain): - - try: - for media_item in event_chain: - if media_item['type'] == "file": - - """ - Wait maximum 5 seconds (50 iterations) for file to become ready, otherwise - give up on it. - """ - iter_num = 0 - while not media_item['file_ready'] and iter_num < 50: - time.sleep(0.1) - iter_num += 1 - - if media_item['file_ready']: - self.telnet_to_liquidsoap(media_item) - else: - self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) - elif media_item['type'] == "event": - if media_item['event_type'] == "kick_out": - PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") - elif media_item['event_type'] == "switch_off": - PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") - elif media_item['type'] == 'stream_buffer_start': - self.start_web_stream_buffer(media_item) - elif media_item['type'] == "stream_output_start": - if media_item['row_id'] != self.current_prebuffering_stream_id: - #this is called if the stream wasn't scheduled sufficiently ahead of time - #so that the prebuffering stage could take effect. Let's do the prebuffering now. - self.start_web_stream_buffer(media_item) - self.start_web_stream(media_item) - elif media_item['type'] == "stream_buffer_end": - self.stop_web_stream_buffer(media_item) - elif media_item['type'] == "stream_output_end": - self.stop_web_stream_output(media_item) - except Exception, e: - self.logger.error('Pypo Push Exception: %s', e) - finally: - self.queue_id = (self.queue_id + 1) % 8 - - - def start_web_stream_buffer(self, media_item): - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - msg = 'dynamic_source.id %s\n' % media_item['row_id'] - self.logger.debug(msg) - tn.write(msg) - - #msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1') - msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1') - self.logger.debug(msg) - tn.write(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - - self.current_prebuffering_stream_id = media_item['row_id'] - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - - - def start_web_stream(self, media_item): - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - #TODO: DO we need this? - msg = 'streams.scheduled_play_start\n' - tn.write(msg) - - msg = 'dynamic_source.output_start\n' - self.logger.debug(msg) - tn.write(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - - self.current_prebuffering_stream_id = None - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - def stop_web_stream_all(self): try: self.telnet_lock.acquire() @@ -673,167 +268,6 @@ class PypoPush(Thread): finally: self.telnet_lock.release() - def stop_web_stream_buffer(self, media_item): - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - - #msg = 'dynamic_source.read_stop %s\n' % media_item['row_id'] - msg = 'http.stop\n' - self.logger.debug(msg) - tn.write(msg) - - msg = 'dynamic_source.id -1\n' - self.logger.debug(msg) - tn.write(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - - def stop_web_stream_output(self, media_item): - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - - msg = 'dynamic_source.output_stop\n' - self.logger.debug(msg) - tn.write(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - - def clear_liquidsoap_queue(self): - self.logger.debug("Clearing Liquidsoap queue") - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = "source.skip\n" - tn.write(msg) - tn.write("exit\n") - tn.read_all() - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - - def remove_from_liquidsoap_queue(self, problem_at_iteration, liquidsoap_queue_approx): - - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - if problem_at_iteration == 0: - msg = "source.skip\n" - self.logger.debug(msg) - tn.write(msg) - else: - # Remove things in reverse order. - queue_copy = liquidsoap_queue_approx[::-1] - - for queue_item in queue_copy: - msg = "queue.remove %s\n" % queue_item['queue_id'] - self.logger.debug(msg) - tn.write(msg) - response = tn.read_until("\r\n").strip("\r\n") - - if "No such request in my queue" in response: - """ - Cannot remove because Liquidsoap started playing the item. Need - to use source.skip instead - """ - msg = "source.skip\n" - self.logger.debug(msg) - tn.write(msg) - - msg = "s0.queue\n" - self.logger.debug(msg) - tn.write(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - - def sleep_until_start(self, media_item): - """ - The purpose of this function is to look at the difference between - "now" and when the media_item starts, and sleep for that period of time. - After waking from sleep, this function returns. - """ - - mi_start = media_item['start'][0:19] - - #strptime returns struct_time in local time - epoch_start = calendar.timegm(time.strptime(mi_start, '%Y-%m-%d-%H-%M-%S')) - - #Return the time as a floating point number expressed in seconds since the epoch, in UTC. - epoch_now = time.time() - - self.logger.debug("Epoch start: %s" % epoch_start) - self.logger.debug("Epoch now: %s" % epoch_now) - - sleep_time = epoch_start - epoch_now - - if sleep_time < 0: - sleep_time = 0 - - self.logger.debug('sleeping for %s s' % (sleep_time)) - time.sleep(sleep_time) - - def telnet_to_liquidsoap(self, media_item): - """ - telnets to liquidsoap and pushes the media_item to its queue. Push the - show name of every media_item as well, just to keep Liquidsoap up-to-date - about which show is playing. - """ - try: - self.telnet_lock.acquire() - tn = telnetlib.Telnet(LS_HOST, LS_PORT) - - annotation = self.create_liquidsoap_annotation(media_item) - msg = 's%s.push %s\n' % (self.queue_id, annotation.encode('utf-8')) - self.logger.debug(msg) - tn.write(msg) - queue_id = tn.read_until("\r\n").strip("\r\n") - - #remember the media_item's queue id which we may use - #later if we need to remove it from the queue. - media_item['queue_id'] = queue_id - - #add media_item to the end of our queue - self.pushed_objects[queue_id] = media_item - - show_name = media_item['show_name'] - msg = 'vars.show_name %s\n' % show_name.encode('utf-8') - tn.write(msg) - self.logger.debug(msg) - - tn.write("exit\n") - self.logger.debug(tn.read_all()) - except Exception, e: - self.logger.error(str(e)) - finally: - self.telnet_lock.release() - - def create_liquidsoap_annotation(self, media): - # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. - return 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s",replay_gain="%s dB":%s' \ - % (media['id'], float(media['fade_in']) / 1000, float(media['fade_out']) / 1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['replay_gain'], media['dst']) - def run(self): try: self.main() except Exception, e: diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 7bc7e362d..019a62b29 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -12,6 +12,7 @@ class TelnetLiquidsoap: self.ls_host = ls_host self.ls_port = ls_port self.logger = logger + self.current_prebuffering_stream_id = None def __connect(self): return telnetlib.Telnet(self.ls_host, self.ls_port) @@ -61,6 +62,95 @@ class TelnetLiquidsoap: finally: self.telnet_lock.release() + + def stop_web_stream_buffer(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 + + #msg = 'dynamic_source.read_stop %s\n' % media_item['row_id'] + msg = 'http.stop\n' + self.logger.debug(msg) + tn.write(msg) + + msg = 'dynamic_source.id -1\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + def stop_web_stream_output(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 + + msg = 'dynamic_source.output_stop\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + def start_web_stream(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + #TODO: DO we need this? + msg = 'streams.scheduled_play_start\n' + tn.write(msg) + + msg = 'dynamic_source.output_start\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + self.current_prebuffering_stream_id = None + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + def start_web_stream_buffer(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + msg = 'dynamic_source.id %s\n' % media_item['row_id'] + self.logger.debug(msg) + tn.write(msg) + + #msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1') + msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1') + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + #TODO.. + self.current_prebuffering_stream_id = media_item['row_id'] + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + class DummyTelnetLiquidsoap: def __init__(self, telnet_lock, logger): From 6dae7d3973b76e89d2ad5e22b4d001ee8189fb4c Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 15 Mar 2013 17:00:36 -0400 Subject: [PATCH 5/7] CC-1469: Crossfading support (non-equal power) -cleanup --- python_apps/pypo/pypopush.py | 4 ++++ python_apps/pypo/telnetliquidsoap.py | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 6ae3e27d1..8e85a8e01 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -168,6 +168,8 @@ class PypoPush(Thread): self.telnet_liquidsoap.queue_remove(i) self.liq_queue_tracker[i] = None + #liquidsoap.stop_play(mi) + if len(to_be_added): self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ @@ -180,6 +182,8 @@ class PypoPush(Thread): self.telnet_liquidsoap.queue_push(queue_id, i) self.liq_queue_tracker[queue_id] = i + #liquidsoap.start_play(i) + def get_current_stream_id_from_liquidsoap(self): response = "-1" try: diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 019a62b29..bfd49ec99 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -135,7 +135,6 @@ class TelnetLiquidsoap: self.logger.debug(msg) tn.write(msg) - #msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1') msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1') self.logger.debug(msg) tn.write(msg) From 33ca2e7c9c1fba64abe2bb07b0e0dc57eaca3de8 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 22 Mar 2013 12:16:17 -0400 Subject: [PATCH 6/7] CC-1469: Crossfading support (non-equal power) -refactor --- python_apps/pypo/airtime-playout | 10 +- python_apps/pypo/pypocli.py | 9 -- python_apps/pypo/pypofetch.py | 9 +- python_apps/pypo/pypoliqqueue.py | 91 +----------- python_apps/pypo/pypoliquidsoap.py | 214 +++++++++++++++++++++++++++ python_apps/pypo/pypopush.py | 94 ++---------- python_apps/pypo/recorder.py | 2 - python_apps/pypo/telnetliquidsoap.py | 26 +++- 8 files changed, 261 insertions(+), 194 deletions(-) create mode 100644 python_apps/pypo/pypoliquidsoap.py diff --git a/python_apps/pypo/airtime-playout b/python_apps/pypo/airtime-playout index 56aa587cd..5521c91ed 100755 --- a/python_apps/pypo/airtime-playout +++ b/python_apps/pypo/airtime-playout @@ -3,14 +3,14 @@ virtualenv_bin="/usr/lib/airtime/airtime_virtualenv/bin/" . ${virtualenv_bin}activate -pypo_user="pypo" +# Absolute path to this script +SCRIPT=`readlink -f $0` +# Absolute directory this script is in +pypo_path=`dirname $SCRIPT` -# Location of pypo_cli.py Python script -pypo_path="/usr/lib/airtime/pypo/bin/" api_client_path="/usr/lib/airtime/" pypo_script="pypocli.py" cd ${pypo_path} -exec 2>&1 set +e cat /etc/default/locale | grep -i "LANG=.*UTF-\?8" @@ -26,6 +26,6 @@ export LC_ALL=`cat /etc/default/locale | grep "LANG=" | cut -d= -f2 | tr -d "\n\ export TERM=xterm -exec python ${pypo_path}${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1 +exec python ${pypo_path}/${pypo_script} > /var/log/airtime/pypo/py-interpreter.log 2>&1 # EOF diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 02dfe6c46..de30d65dd 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -18,7 +18,6 @@ from Queue import Queue from threading import Lock from pypopush import PypoPush -from pypoliqqueue import PypoLiqQueue from pypofetch import PypoFetch from pypofile import PypoFile from recorder import Recorder @@ -229,14 +228,6 @@ if __name__ == '__main__': stat.daemon = True stat.start() - #pypoLiq_q = Queue() - #liq_queue_tracker = dict() - #telnet_liquidsoap = TelnetLiquidsoap() - #plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ - #telnet_liquidsoap) - #plq.daemon = True - #plq.start() - # all join() are commented out because we want to exit entire pypo # if pypofetch terminates #pmh.join() diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 66243a15a..17d5e5898 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -8,6 +8,7 @@ import json import telnetlib import copy import subprocess +import signal from datetime import datetime from Queue import Empty @@ -24,6 +25,12 @@ logging.config.fileConfig(logging_cfg) logger = logging.getLogger() LogWriter.override_std_err(logger) +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + #need to wait for Python 2.7 for this.. #logging.captureWarnings(True) @@ -34,8 +41,6 @@ try: LS_PORT = config['ls_port'] #POLL_INTERVAL = int(config['poll_interval']) POLL_INTERVAL = 1800 - - except Exception, e: logger.error('Error loading config file: %s', e) sys.exit() diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py index 54d7c5d93..439255704 100644 --- a/python_apps/pypo/pypoliqqueue.py +++ b/python_apps/pypo/pypoliqqueue.py @@ -1,14 +1,12 @@ from threading import Thread from collections import deque from datetime import datetime -from pypofetch import PypoFetch - -import eventtypes import traceback import sys import time + from Queue import Empty import signal @@ -19,14 +17,11 @@ def keyboardInterruptHandler(signum, frame): signal.signal(signal.SIGINT, keyboardInterruptHandler) class PypoLiqQueue(Thread): - def __init__(self, q, telnet_lock, logger, liq_queue_tracker, \ - telnet_liquidsoap): + def __init__(self, q, pypo_liquidsoap, logger): Thread.__init__(self) self.queue = q - self.telnet_lock = telnet_lock self.logger = logger - self.liq_queue_tracker = liq_queue_tracker - self.telnet_liquidsoap = telnet_liquidsoap + self.pypo_liquidsoap = pypo_liquidsoap def main(self): time_until_next_play = None @@ -46,7 +41,7 @@ class PypoLiqQueue(Thread): except Empty, e: #Time to push a scheduled item. media_item = schedule_deque.popleft() - self.telnet_to_liquidsoap(media_item) + self.pypo_liquidsoap.push_item(media_item) if len(schedule_deque): time_until_next_play = \ self.date_interval_to_seconds( @@ -71,78 +66,6 @@ class PypoLiqQueue(Thread): else: time_until_next_play = None - def is_media_item_finished(self, media_item): - if media_item is None: - return True - else: - return datetime.utcnow() > media_item['end'] - - def find_available_queue(self): - available_queue = None - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if mi == None or self.is_media_item_finished(mi): - #queue "i" is available. Push to this queue - available_queue = i - - if available_queue == None: - raise NoQueueAvailableException() - - return available_queue - - def telnet_to_liquidsoap(self, media_item): - """ - telnets to liquidsoap and pushes the media_item to its queue. Push the - show name of every media_item as well, just to keep Liquidsoap up-to-date - about which show is playing. - """ - - if media_item["type"] == eventtypes.FILE: - self.handle_file_type(media_item) - elif media_item["type"] == eventtypes.EVENT: - self.handle_event_type(media_item) - elif media_item["type"] == eventtypes.STREAM_BUFFER_START: - self.telnet_liquidsoap.start_web_stream_buffer(media_item) - elif media_item["type"] == eventtypes.STREAM_OUTPUT_START: - if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id: - #this is called if the stream wasn't scheduled sufficiently ahead of time - #so that the prebuffering stage could take effect. Let's do the prebuffering now. - self.telnet_liquidsoap.start_web_stream_buffer(media_item) - self.telnet_liquidsoap.start_web_stream(media_item) - elif media_item['type'] == eventtypes.STREAM_BUFFER_END: - self.telnet_liquidsoap.stop_web_stream_buffer(media_item) - elif media_item['type'] == eventtypes.STREAM_OUTPUT_END: - self.telnet_liquidsoap.stop_web_stream_output(media_item) - else: raise UnknownMediaItemType(str(media_item)) - - def handle_event_type(self, media_item): - if media_item['event_type'] == "kick_out": - PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") - elif media_item['event_type'] == "switch_off": - PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") - - - def handle_file_type(self, media_item): - """ - Wait maximum 5 seconds (50 iterations) for file to become ready, - otherwise give up on it. - """ - iter_num = 0 - while not media_item['file_ready'] and iter_num < 50: - time.sleep(0.1) - iter_num += 1 - - if media_item['file_ready']: - available_queue = self.find_available_queue() - - try: - self.telnet_liquidsoap.queue_push(available_queue, media_item) - self.liq_queue_tracker[available_queue] = media_item - except Exception as e: - self.logger.error(e) - raise - else: - self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) def date_interval_to_seconds(self, interval): """ @@ -155,14 +78,10 @@ class PypoLiqQueue(Thread): return seconds - def run(self): try: self.main() except Exception, e: self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc()) -class NoQueueAvailableException(Exception): - pass -class UnknownMediaItemType(Exception): - pass + diff --git a/python_apps/pypo/pypoliquidsoap.py b/python_apps/pypo/pypoliquidsoap.py new file mode 100644 index 000000000..97390b93f --- /dev/null +++ b/python_apps/pypo/pypoliquidsoap.py @@ -0,0 +1,214 @@ +from pypofetch import PypoFetch +from telnetliquidsoap import TelnetLiquidsoap + +from datetime import datetime +from datetime import timedelta + +import eventtypes +import time + +class PypoLiquidsoap(): + def __init__(self, logger, telnet_lock, host, port): + self.logger = logger + self.liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + + self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ + logger,\ + host,\ + port) + + + def push_item(self, media_item): + if media_item["type"] == eventtypes.FILE: + self.handle_file_type(media_item) + elif media_item["type"] == eventtypes.EVENT: + self.handle_event_type(media_item) + elif media_item["type"] == eventtypes.STREAM_BUFFER_START: + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + elif media_item["type"] == eventtypes.STREAM_OUTPUT_START: + if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id: + #this is called if the stream wasn't scheduled sufficiently ahead of time + #so that the prebuffering stage could take effect. Let's do the prebuffering now. + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + self.telnet_liquidsoap.start_web_stream(media_item) + elif media_item['type'] == eventtypes.STREAM_BUFFER_END: + self.telnet_liquidsoap.stop_web_stream_buffer() + elif media_item['type'] == eventtypes.STREAM_OUTPUT_END: + self.telnet_liquidsoap.stop_web_stream_output() + else: raise UnknownMediaItemType(str(media_item)) + + def handle_file_type(self, media_item): + """ + Wait maximum 5 seconds (50 iterations) for file to become ready, + otherwise give up on it. + """ + iter_num = 0 + while not media_item['file_ready'] and iter_num < 50: + time.sleep(0.1) + iter_num += 1 + + if media_item['file_ready']: + available_queue = self.find_available_queue() + + try: + self.telnet_liquidsoap.queue_push(available_queue, media_item) + self.liq_queue_tracker[available_queue] = media_item + except Exception as e: + self.logger.error(e) + raise + else: + self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) + + def handle_event_type(self, media_item): + if media_item['event_type'] == "kick_out": + PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") + elif media_item['event_type'] == "switch_off": + PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") + + + def is_media_item_finished(self, media_item): + if media_item is None: + return True + else: + return datetime.utcnow() > media_item['end'] + + def find_available_queue(self): + available_queue = None + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi == None or self.is_media_item_finished(mi): + #queue "i" is available. Push to this queue + available_queue = i + + if available_queue == None: + raise NoQueueAvailableException() + + return available_queue + + def get_queues(): + return self.liq_queue_tracker + + + def verify_correct_present_media(self, scheduled_now): + #verify whether Liquidsoap is currently playing the correct files. + #if we find an item that Liquidsoap is not playing, then push it + #into one of Liquidsoap's queues. If Liquidsoap is already playing + #it do nothing. If Liquidsoap is playing a track that isn't in + #currently_playing then stop it. + + #Check for Liquidsoap media we should source.skip + #get liquidsoap items for each queue. Since each queue can only have one + #item, we should have a max of 8 items. + + #TODO: Verify start, end, replay_gain is the same + #TODO: Verify this is a file or webstream and also handle webstreams + #TODO: Do webstreams span a time or do they have an instantaneous point? + + #2013-03-21-22-56-00_0: { + #id: 1, + #type: "stream_output_start", + #row_id: 41, + #uri: "http://stream2.radioblackout.org:80/blackout.ogg", + #start: "2013-03-21-22-56-00", + #end: "2013-03-21-23-26-00", + #show_name: "Untitled Show", + #independent_event: true + #}, + + + scheduled_now_files = \ + filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now) + + scheduled_now_webstream = \ + filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \ + scheduled_now) + + schedule_ids = set(map(lambda x: i["row_id"], scheduled_now_files)) + + + liq_queue_ids = set() + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if not self.is_media_item_finished(mi): + liq_queue_ids.add(mi["row_id"]) + + to_be_removed = liq_queue_ids - schedule_ids + to_be_added = schedule_ids - liq_queue_ids + + if len(to_be_removed): + self.logger.info("Need to remove items from Liquidsoap: %s" % \ + to_be_removed) + + #remove files from Liquidsoap's queue + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi is not None and mi["row_id"] in to_be_removed: + self.stop(i) + + #stop any webstreams + if self.telnet_liquidsoap.get_current_stream_id() in to_be_removed: + self.telnet_liquidsoap.stop_web_stream_buffer() + self.telnet_liquidsoap.stop_web_stream_output() + + if len(to_be_added): + self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ + to_be_added) + + for i in scheduled_now: + if i["row_id"] in to_be_added: + self.modify_cue_point(i) + self.play(i) + + current_stream_id = self.telnet_liquidsoap.get_current_stream_id() + #if len(scheduled_now_webstream): + #if current_stream_id != scheduled_now_webstream[0]: + #self.play(scheduled_now_webstream[0]) + + def play(self, media_item): + self.telnet_liquidsoap.push_item(media_item) + + def stop(self, queue): + self.telnet_liquidsoap.queue_remove(queue) + self.liq_queue_tracker[queue] = None + + def is_file(self, media_item): + return media_item["type"] == eventtypes.FILE + + def modify_cue_point(self, link): + if not self.is_file(link): + return + + tnow = datetime.utcnow() + + link_start = link['start'] + + diff_td = tnow - link_start + diff_sec = self.date_interval_to_seconds(diff_td) + + if diff_sec > 0: + self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) + original_cue_in_td = timedelta(seconds=float(link['cue_in'])) + link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec + + def date_interval_to_seconds(self, interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + if seconds < 0: seconds = 0 + + return seconds + + +class UnknownMediaItemType(Exception): + pass + +class NoQueueAvailableException(Exception): + pass diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 8e85a8e01..4d6c21964 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -13,8 +13,8 @@ import traceback import os from pypofetch import PypoFetch -from telnetliquidsoap import TelnetLiquidsoap from pypoliqqueue import PypoLiqQueue +from pypoliquidsoap import PypoLiquidsoap from Queue import Empty, Queue @@ -62,28 +62,18 @@ class PypoPush(Thread): self.logger = logging.getLogger('push') self.current_prebuffering_stream_id = None self.queue_id = 0 - self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ - self.logger,\ - LS_HOST,\ - LS_PORT\ - ) - - self.liq_queue_tracker = { - "s0": None, - "s1": None, - "s2": None, - "s3": None, - } self.future_scheduled_queue = Queue() + self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\ + LS_HOST, LS_PORT) + self.plq = PypoLiqQueue(self.future_scheduled_queue, \ - telnet_lock, \ - self.logger, \ - self.liq_queue_tracker, \ - self.telnet_liquidsoap) + self.pypo_liquidsoap, \ + self.logger) self.plq.daemon = True self.plq.start() + def main(self): loops = 0 heartbeat_period = math.floor(30 / PUSH_INTERVAL) @@ -97,12 +87,13 @@ class PypoPush(Thread): self.logger.error(str(e)) raise else: + self.logger.debug(media_schedule) #separate media_schedule list into currently_playing and #scheduled_for_future lists currently_playing, scheduled_for_future = \ self.separate_present_future(media_schedule) - self.verify_correct_present_media(currently_playing) + self.pypo_liquidsoap.verify_correct_present_media(currently_playing) self.future_scheduled_queue.put(scheduled_for_future) if loops % heartbeat_period == 0: @@ -131,59 +122,6 @@ class PypoPush(Thread): return present, future - def verify_correct_present_media(self, scheduled_now): - #verify whether Liquidsoap is currently playing the correct files. - #if we find an item that Liquidsoap is not playing, then push it - #into one of Liquidsoap's queues. If Liquidsoap is already playing - #it do nothing. If Liquidsoap is playing a track that isn't in - #currently_playing then stop it. - - #Check for Liquidsoap media we should source.skip - #get liquidsoap items for each queue. Since each queue can only have one - #item, we should have a max of 8 items. - - #TODO: Verify start, end, replay_gain is the same - #TODO: Verify this is a file or webstream and also handle webstreams - - schedule_ids = set() - for i in scheduled_now: - schedule_ids.add(i["row_id"]) - - liq_queue_ids = set() - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if not self.plq.is_media_item_finished(mi): - liq_queue_ids.add(mi["row_id"]) - - to_be_removed = liq_queue_ids - schedule_ids - to_be_added = schedule_ids - liq_queue_ids - - if len(to_be_removed): - self.logger.info("Need to remove items from Liquidsoap: %s" % \ - to_be_removed) - - for i in self.liq_queue_tracker: - mi = self.liq_queue_tracker[i] - if mi is not None and mi["row_id"] in to_be_removed: - self.telnet_liquidsoap.queue_remove(i) - self.liq_queue_tracker[i] = None - - #liquidsoap.stop_play(mi) - - - if len(to_be_added): - self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ - to_be_added) - - for i in scheduled_now: - if i["row_id"] in to_be_added: - self.modify_cue_point(i) - queue_id = self.plq.find_available_queue() - self.telnet_liquidsoap.queue_push(queue_id, i) - self.liq_queue_tracker[queue_id] = i - - #liquidsoap.start_play(i) - def get_current_stream_id_from_liquidsoap(self): response = "-1" try: @@ -222,20 +160,6 @@ class PypoPush(Thread): #self.logger.debug("Is current item correct?: %s", str(correct)) #return correct - def modify_cue_point(self, link): - tnow = datetime.utcnow() - - link_start = link['start'] - - diff_td = tnow - link_start - diff_sec = self.date_interval_to_seconds(diff_td) - - if diff_sec > 0: - self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) - original_cue_in_td = timedelta(seconds=float(link['cue_in'])) - link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec - - def date_interval_to_seconds(self, interval): """ Convert timedelta object into int representing the number of seconds. If diff --git a/python_apps/pypo/recorder.py b/python_apps/pypo/recorder.py index 70d99144d..b3818f32d 100644 --- a/python_apps/pypo/recorder.py +++ b/python_apps/pypo/recorder.py @@ -303,8 +303,6 @@ class Recorder(Thread): heartbeat_period = math.floor(30 / PUSH_INTERVAL) while True: - if self.loops % heartbeat_period == 0: - self.logger.info("heartbeat") if self.loops * PUSH_INTERVAL > 3600: self.loops = 0 """ diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index bfd49ec99..28e034ef8 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -63,13 +63,12 @@ class TelnetLiquidsoap: self.telnet_lock.release() - def stop_web_stream_buffer(self, media_item): + def stop_web_stream_buffer(self): try: self.telnet_lock.acquire() tn = telnetlib.Telnet(self.ls_host, self.ls_port) #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - #msg = 'dynamic_source.read_stop %s\n' % media_item['row_id'] msg = 'http.stop\n' self.logger.debug(msg) tn.write(msg) @@ -86,7 +85,7 @@ class TelnetLiquidsoap: finally: self.telnet_lock.release() - def stop_web_stream_output(self, media_item): + def stop_web_stream_output(self): try: self.telnet_lock.acquire() tn = telnetlib.Telnet(self.ls_host, self.ls_port) @@ -142,13 +141,30 @@ class TelnetLiquidsoap: tn.write("exit\n") self.logger.debug(tn.read_all()) - #TODO.. self.current_prebuffering_stream_id = media_item['row_id'] except Exception, e: self.logger.error(str(e)) finally: self.telnet_lock.release() - + + def get_current_stream_id(): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + msg = 'dynamic_source.get_id\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + stream_id = tn.read_all().splitlines()[0] + self.logger.debug("stream_id: %s" % stream_id) + + return stream_id + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() class DummyTelnetLiquidsoap: From 8a5c82d71a78a02679da1e2e4d110722cc3fa17d Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Fri, 22 Mar 2013 15:21:28 -0400 Subject: [PATCH 7/7] CC-1469: Crossfading support (non-equal power) -fix few bugs --- python_apps/pypo/pypoliquidsoap.py | 22 ++++++++++------------ python_apps/pypo/telnetliquidsoap.py | 4 ++-- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/python_apps/pypo/pypoliquidsoap.py b/python_apps/pypo/pypoliquidsoap.py index 97390b93f..762d20091 100644 --- a/python_apps/pypo/pypoliquidsoap.py +++ b/python_apps/pypo/pypoliquidsoap.py @@ -23,7 +23,7 @@ class PypoLiquidsoap(): port) - def push_item(self, media_item): + def play(self, media_item): if media_item["type"] == eventtypes.FILE: self.handle_file_type(media_item) elif media_item["type"] == eventtypes.EVENT: @@ -128,7 +128,7 @@ class PypoLiquidsoap(): filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \ scheduled_now) - schedule_ids = set(map(lambda x: i["row_id"], scheduled_now_files)) + schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files)) liq_queue_ids = set() @@ -150,10 +150,7 @@ class PypoLiquidsoap(): if mi is not None and mi["row_id"] in to_be_removed: self.stop(i) - #stop any webstreams - if self.telnet_liquidsoap.get_current_stream_id() in to_be_removed: - self.telnet_liquidsoap.stop_web_stream_buffer() - self.telnet_liquidsoap.stop_web_stream_output() + if len(to_be_added): self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ @@ -165,12 +162,13 @@ class PypoLiquidsoap(): self.play(i) current_stream_id = self.telnet_liquidsoap.get_current_stream_id() - #if len(scheduled_now_webstream): - #if current_stream_id != scheduled_now_webstream[0]: - #self.play(scheduled_now_webstream[0]) - - def play(self, media_item): - self.telnet_liquidsoap.push_item(media_item) + if len(scheduled_now_webstream): + if current_stream_id != scheduled_now_webstream[0]: + self.play(scheduled_now_webstream[0]) + elif current_stream_id != "-1": + #something is playing and it shouldn't be. + self.telnet_liquidsoap.stop_web_stream_buffer() + self.telnet_liquidsoap.stop_web_stream_output() def stop(self, queue): self.telnet_liquidsoap.queue_remove(queue) diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 28e034ef8..223bc475e 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -147,8 +147,8 @@ class TelnetLiquidsoap: finally: self.telnet_lock.release() - def get_current_stream_id(): - try: + def get_current_stream_id(self): + try: self.telnet_lock.acquire() tn = telnetlib.Telnet(self.ls_host, self.ls_port)