diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index b9d003a38..9ac4a2eee 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -438,19 +438,19 @@ def destroy_dynamic_source(uri) = # current_element is of the form: ("uri", source) so # we check the first element current_uri = fst(current_element) - if current_uri == uri then + #if current_uri == uri then # In this case, we add the source to the list of # matched sources (list.append( [snd(current_element)], matching_sources), remaining_sources) - else + #else # In this case, we put the element in the list of remaining # sources - (matching_sources, - list.append([current_element], - remaining_sources)) - end + #(matching_sources, + # list.append([current_element], + # remaining_sources)) + #end end # Now we execute the function: diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 5416342ce..0cb5c8bc1 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -99,6 +99,11 @@ server.register(namespace="dynamic_source", usage="stop ", "read_stop", fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source(s) end) +server.register(namespace="dynamic_source", + description="Stop a dynamic source.", + usage="stop ", + "read_stop_all", + fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source(s) end) default = amplify(id="silence_src", 0.00001, noise()) default = rewrite_metadata([("artist","Airtime"), ("title", "offline")], default) diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 9fa508aa5..d54313beb 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -39,6 +39,12 @@ except Exception, e: logger.error('Error loading config file %s', e) sys.exit() +def is_stream(media_item): + return media_item['type'] == 'stream' + +def is_file(media_item): + return media_item['type'] == 'file' + class PypoPush(Thread): def __init__(self, q, telnet_lock): Thread.__init__(self) @@ -186,6 +192,33 @@ class PypoPush(Thread): return liquidsoap_queue_approx + def is_correct_current_item(self, media_item, liquidsoap_queue_approx): + correct = False + if media_item is None: + correct = (len(liquidsoap_queue_approx) == 0 and self.current_stream_info is None) + else: + if is_file(media_item): + if len(liquidsoap_queue_approx) == 0: + correct = False + else: + correct = liquidsoap_queue_approx[0]['start'] == media_item['start'] and \ + liquidsoap_queue_approx[0]['row_id'] == media_item['row_id'] and \ + liquidsoap_queue_approx[0]['end'] == media_item['end'] + elif is_stream(media_item): + if self.current_stream_info is None: + correct = False + else: + correct = self.current_stream_info['uri'] == media_item['uri'] + + self.logger.debug("Is current item correct?: %s", str(correct)) + return correct + + + #clear all webstreams and files from Liquidsoap + def clear_all_liquidsoap_items(self): + self.remove_from_liquidsoap_queue(0, None) + self.stop_web_stream_all() + def handle_new_schedule(self, media_schedule, liquidsoap_queue_approx, current_event_chain): """ This function's purpose is to gracefully handle situations where @@ -194,44 +227,58 @@ class PypoPush(Thread): call other functions that will connect to Liquidsoap and alter its queue. """ - media_chain = filter(lambda item: (item["type"] == "file"), current_event_chain) + file_chain = filter(lambda item: (item["type"] == "file"), current_event_chain) stream_chain = filter(lambda item: (item["type"] == "stream"), current_event_chain) self.logger.debug(self.current_stream_info) self.logger.debug(current_event_chain) + #Take care of the case where the current playing may be incorrect + if len(current_event_chain) > 0: - if self.current_stream_info: - if len(current_event_chain) > 0 and current_event_chain[0]['type'] == "stream": - if self.current_stream_info['uri'] != stream_chain[0]['uri']: - #Liquidsoap is rebroadcasting a webstream and a webstream is scheduled - #to play, but they are not the same! - self.stop_web_stream(self.current_stream_info) - self.start_web_stream(stream_chain[0]) - else: - #Liquidsoap is rebroadcasting a webstream, but there is no stream - #in the schedule. Let's stop streaming. - self.stop_web_stream(self.current_stream_info) + current_item = current_event_chain[0] + if not self.is_correct_current_item(current_item, liquidsoap_queue_approx): + self.clear_all_liquidsoap_items() + if is_stream(current_item): + if current_item['row_id'] != self.current_prebuffering_stream_id: + #this is called if the stream wasn't scheduled sufficiently ahead of time + #so that the prebuffering stage could take effect. Let's do the prebuffering now. + self.start_web_stream_buffer(current_item) + self.start_web_stream(current_item) + if is_file(current_item): + self.modify_cue_point(file_chain[0]) + self.push_to_liquidsoap(file_chain) + #we've changed the queue, so let's refetch it + liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap() - problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx) + elif not self.is_correct_current_item(None, liquidsoap_queue_approx): + #Liquidsoap is playing something even though it shouldn't be + self.clear_all_liquidsoap_items() - if problem_at_iteration is not None: - #Items that are in Liquidsoap's queue aren't scheduled anymore. We need to connect - #and remove these items. - self.logger.debug("Change in link %s of current chain", problem_at_iteration) - self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx[problem_at_iteration:]) - if problem_at_iteration is None and len(media_chain) > len(liquidsoap_queue_approx): - self.logger.debug("New schedule has longer current chain.") - problem_at_iteration = len(liquidsoap_queue_approx) + #If the current item scheduled is a file, then files come in chains, and + #therefore we need to make sure the entire chain is correct. + if len(current_event_chain) > 0 and is_file(current_event_chain[0]): + problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx) - if problem_at_iteration is not None: - self.logger.debug("Change in chain at link %s", problem_at_iteration) + if problem_at_iteration is not None: + #Items that are in Liquidsoap's queue aren't scheduled anymore. We need to connect + #and remove these items. + self.logger.debug("Change in link %s of current chain", problem_at_iteration) + self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx[problem_at_iteration:]) + + if problem_at_iteration is None and len(file_chain) > len(liquidsoap_queue_approx): + self.logger.debug("New schedule has longer current chain.") + problem_at_iteration = len(liquidsoap_queue_approx) + + if problem_at_iteration is not None: + self.logger.debug("Change in chain at link %s", problem_at_iteration) + + chain_to_push = file_chain[problem_at_iteration:] + if len(chain_to_push) > 0: + self.modify_cue_point(chain_to_push[0]) + self.push_to_liquidsoap(chain_to_push) - chain_to_push = media_chain[problem_at_iteration:] - if len(chain_to_push) > 0: - self.modify_cue_point(chain_to_push[0]) - self.push_to_liquidsoap(chain_to_push) """ Compare whats in the liquidsoap_queue to the new schedule we just @@ -457,6 +504,28 @@ class PypoPush(Thread): finally: self.telnet_lock.release() + def stop_web_stream_all(self): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + + msg = 'dynamic_source.read_stop_all xxx\n' + self.logger.debug(msg) + tn.write(msg) + + msg = 'dynamic_source.output_stop\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + self.current_stream_info = None + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + def stop_web_stream(self, media_item): try: self.telnet_lock.acquire()