diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index 5aa77cac8..ccb37026f 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -354,28 +354,32 @@ end # Add a skip function to a source # when it does not have one # by default -def add_skip_command(s) - # A command to skip - def skip(_) - # get playing (active) queue and flush it - l = list.hd(server.execute("queue.secondary_queue")) - l = string.split(separator=" ",l) - list.iter(fun (rid) -> ignore(server.execute("queue.remove #{rid}")), l) +#def add_skip_command(s) +# # A command to skip +# def skip(_) +# # get playing (active) queue and flush it +# l = list.hd(server.execute("queue.secondary_queue")) +# l = string.split(separator=" ",l) +# list.iter(fun (rid) -> ignore(server.execute("queue.remove #{rid}")), l) +# +# l = list.hd(server.execute("queue.primary_queue")) +# l = string.split(separator=" ", l) +# if list.length(l) > 0 then +# source.skip(s) +# "Skipped" +# else +# "Not skipped" +# end +# end +# # Register the command: +# server.register(namespace="source", +# usage="skip", +# description="Skip the current song.", +# "skip",fun(s) -> begin log("source.skip") skip(s) end) +#end - l = list.hd(server.execute("queue.primary_queue")) - l = string.split(separator=" ", l) - if list.length(l) > 0 then - source.skip(s) - "Skipped" - else - "Not skipped" - end - end - # Register the command: - server.register(namespace="source", - usage="skip", - description="Skip the current song.", - "skip",fun(s) -> begin log("source.skip") skip(s) end) +def clear_queue(s) + source.skip(s) end def set_dynamic_source_id(id) = diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 75935a6dd..353b9c2c5 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -39,7 +39,14 @@ sources = ref [] source_id = ref 0 def create_source() - sources := list.append([request.equeue(id="s#{!source_id}", length=0.5)], !sources) + l = request.equeue(id="s#{!source_id}", length=0.5) + sources := list.append([l], !sources) + server.register(namespace="queues", + "s#{!source_id}_skip", + fun (s) -> begin log("queues.s#{!source_id}_skip") + clear_queue(l) + "Done" + end) source_id := !source_id + 1 end @@ -268,7 +275,7 @@ end # Attach a skip command to the source s: -add_skip_command(s) +#add_skip_command(s) server.register(namespace="streams", description="Stop Master DJ source.", diff --git a/python_apps/pypo/pypocli.py b/python_apps/pypo/pypocli.py index 9fed011fe..02dfe6c46 100644 --- a/python_apps/pypo/pypocli.py +++ b/python_apps/pypo/pypocli.py @@ -229,13 +229,13 @@ if __name__ == '__main__': stat.daemon = True stat.start() - pypoLiq_q = Queue() - liq_queue_tracker = dict() - telnet_liquidsoap = TelnetLiquidsoap() - plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ - telnet_liquidsoap) - plq.daemon = True - plq.start() + #pypoLiq_q = Queue() + #liq_queue_tracker = dict() + #telnet_liquidsoap = TelnetLiquidsoap() + #plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logger, liq_queue_tracker, \ + #telnet_liquidsoap) + #plq.daemon = True + #plq.start() # all join() are commented out because we want to exit entire pypo # if pypofetch terminates diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py index 06552765e..3804b3458 100644 --- a/python_apps/pypo/pypofetch.py +++ b/python_apps/pypo/pypofetch.py @@ -8,7 +8,7 @@ import json import telnetlib import copy import subprocess -import datetime +from datetime import datetime from Queue import Empty from threading import Thread diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py index 027a2cd5f..87458d4df 100644 --- a/python_apps/pypo/pypoliqqueue.py +++ b/python_apps/pypo/pypoliqqueue.py @@ -32,9 +32,13 @@ class PypoLiqQueue(Thread): while True: try: if time_until_next_play is None: + self.logger.info("waiting indefinitely for schedule") media_schedule = self.queue.get(block=True) else: - media_schedule = self.queue.get(block=True, timeout=time_until_next_play) + self.logger.info("waiting %ss until next scheduled item" % \ + time_until_next_play) + media_schedule = self.queue.get(block=True, \ + timeout=time_until_next_play) except Empty, e: #Time to push a scheduled item. media_item = schedule_deque.popleft() @@ -55,11 +59,15 @@ class PypoLiqQueue(Thread): for i in keys: schedule_deque.append(media_schedule[i]) - time_until_next_play = self.date_interval_to_seconds(\ - keys[0] - datetime.utcnow()) + if len(keys): + time_until_next_play = self.date_interval_to_seconds(\ + keys[0] - datetime.utcnow()) def is_media_item_finished(self, media_item): - return datetime.utcnow() > media_item['end'] + if media_item is None: + return True + else: + return datetime.utcnow() > media_item['end'] def find_available_queue(self): available_queue = None diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index b6f83623e..0b4371042 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -17,8 +17,7 @@ from telnetliquidsoap import TelnetLiquidsoap from pypoliqqueue import PypoLiqQueue -import Queue -from Queue import Empty +from Queue import Empty, Queue from threading import Thread @@ -81,10 +80,11 @@ class PypoPush(Thread): self.future_scheduled_queue = Queue() self.plq = PypoLiqQueue(self.future_scheduled_queue, \ telnet_lock, \ - liq_queue_tracker, \ + self.logger, \ + self.liq_queue_tracker, \ self.telnet_liquidsoap) - plq.daemon = True - plq.start() + self.plq.daemon = True + self.plq.start() def main(self): loops = 0 @@ -198,8 +198,7 @@ class PypoPush(Thread): for mkey in sorted_keys: media_item = media_schedule[mkey] - media_item_start = media_item['start'] - diff_td = tnow - media_item_start + diff_td = tnow - media_item['start'] diff_sec = self.date_interval_to_seconds(diff_td) if diff_sec >= 0: @@ -230,8 +229,8 @@ class PypoPush(Thread): if not self.plq.is_media_item_finished(mi): liq_queue_ids.add(mi["row_id"]) - to_be_added = schedule_ids - liq_queue_ids to_be_removed = liq_queue_ids - schedule_ids + to_be_added = schedule_ids - liq_queue_ids if len(to_be_removed): self.logger.info("Need to remove items from Liquidsoap: %s" % \ @@ -239,8 +238,9 @@ class PypoPush(Thread): for i in self.liq_queue_tracker: mi = self.liq_queue_tracker[i] - if mi["row_id"] in to_be_removed: + if mi is not None and mi["row_id"] in to_be_removed: self.telnet_liquidsoap.queue_remove(i) + self.liq_queue_tracker[i] = None if len(to_be_added): @@ -249,8 +249,10 @@ class PypoPush(Thread): for i in scheduled_now: if i["row_id"] in to_be_added: + self.modify_cue_point(i) queue_id = self.plq.find_available_queue() - self.telnet_liquidsoap.queue_push(queue_id) + self.telnet_liquidsoap.queue_push(queue_id, i) + self.liq_queue_tracker[queue_id] = i def get_current_stream_id_from_liquidsoap(self): response = "-1" @@ -554,7 +556,6 @@ class PypoPush(Thread): """ seconds = (interval.microseconds + \ (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) - if seconds < 0: seconds = 0 return seconds diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py index 2896c982a..7bc7e362d 100644 --- a/python_apps/pypo/telnetliquidsoap.py +++ b/python_apps/pypo/telnetliquidsoap.py @@ -24,9 +24,10 @@ class TelnetLiquidsoap: self.telnet_lock.acquire() tn = self.__connect() - - #TODO: Need a source.skip for each queue - + msg = 'queues.%s_skip\n' % queue_id + self.logger.debug(msg) + tn.write(msg) + tn.write("exit\n") self.logger.debug(tn.read_all()) except Exception: @@ -85,5 +86,18 @@ class DummyTelnetLiquidsoap: finally: self.telnet_lock.release() + def queue_remove(self, queue_id): + try: + self.telnet_lock.acquire() + + self.logger.info("Purging queue %s" % queue_id) + from datetime import datetime + print "Time now: %s" % datetime.utcnow() + + except Exception: + raise + finally: + self.telnet_lock.release() + class QueueNotEmptyException(Exception): pass