From 1e57c12ce7e1fabb5c0893f4835c770c9efb5e29 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Wed, 15 Aug 2012 16:43:01 -0400 Subject: [PATCH 1/6] CC-1665: Scheduled stream rebroadcasting and recording -pre buffer streams part 1 --- airtime_mvc/application/models/Schedule.php | 18 ++++++- .../pypo/liquidsoap_scripts/ls_lib.liq | 6 +-- .../pypo/liquidsoap_scripts/ls_script.liq | 50 +++++++++++++++---- python_apps/pypo/pypopush.py | 32 +++++++++--- 4 files changed, 85 insertions(+), 21 deletions(-) diff --git a/airtime_mvc/application/models/Schedule.php b/airtime_mvc/application/models/Schedule.php index 863417751..07de1f4b8 100644 --- a/airtime_mvc/application/models/Schedule.php +++ b/airtime_mvc/application/models/Schedule.php @@ -698,6 +698,22 @@ SQL; ); if ($type == "stream") { + //create an event to start stream buffering 5 seconds ahead of the streams actual time. + $buffer_start = new DateTime($item["start"], new DateTimeZone('UTC')); + $buffer_start->sub(new DateInterval("PT5S")); + + $stream_buffer_start = Application_Model_Schedule::AirtimeTimeToPypoTime($buffer_start->format("Y-m-d H:i:s")); + + //TODO: Make sure no other media is being overwritten! + $data["media"][$stream_buffer_start] = array( + 'start' => $stream_buffer_start, + 'end' => $stream_buffer_start, + 'uri' => $uri, + 'type' => 'stream_buffer_start', + 'independent_event' => true + ); + + //since a stream never ends we have to insert an additional "kick stream" event. The "start" //time of this event is the "end" time of the stream minus 1 second. $dt = new DateTime($item["end"], new DateTimeZone('UTC')); @@ -920,7 +936,7 @@ SQL; $hValue = trim(substr($data["add_show_duration"], 0, $hPos)); } if ($mPos !== false) { - $hPos = $hPos === FALSE ? 0 : $hPos+1; + $hPos = $hPos === false ? 0 : $hPos+1; $mValue = trim(substr($data["add_show_duration"], $hPos, -1 )); } diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index ed5085e12..feb303e68 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -1,5 +1,5 @@ def notify(m) - current_media_id := string_of(m['schedule_table_id']) + #current_media_id := string_of(m['schedule_table_id']) command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --data='#{!pypo_data}' --media-id=#{m['schedule_table_id']} &" log(command) system(command) @@ -7,7 +7,7 @@ end def notify_stream(m) json_str = string.replace(pattern="\n",(fun (s) -> ""), json_of(m)) - command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --webstream='#{json_str}' --media-id=#{!current_media_id} &" + command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --webstream='#{json_str}' --media-id=#{!current_dyn_id} &" log(command) system(command) end @@ -253,7 +253,7 @@ end # Function to create a playlist source and output it. def create_dynamic_source(uri) = # The playlist source - s = input.http(uri) + s = input.http(buffer=4., max=12., uri) # The output active_dyn_out = dyn_out(s) diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 540bd29fa..7d3c1d56e 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -8,6 +8,7 @@ set("server.telnet.port", 1234) #Dynamic source list dyn_sources = ref [] +webstream_enabled = ref false time = ref string_of(gettimeofday()) @@ -27,7 +28,7 @@ stream_metadata_type = ref 0 default_dj_fade = ref 0. station_name = ref '' show_name = ref '' -current_media_id = ref '' +#current_media_id = ref '' s1_connected = ref '' s2_connected = ref '' @@ -47,16 +48,43 @@ queue = map_metadata(append_title, queue) # the crossfade function controls fade in/out queue = crossfade(queue) -queue = fallback([web_stream, queue]) +queue = switch(track_sensitive=false, [({!webstream_enabled},web_stream), ({true}, queue)]) + ignore(output.dummy(queue, fallible=true)) -server.register(namespace="vars", "pypo_data", fun (s) -> begin pypo_data := s "Done" end) -server.register(namespace="vars", "stream_metadata_type", fun (s) -> begin stream_metadata_type := int_of_string(s) s end) -server.register(namespace="vars", "show_name", fun (s) -> begin show_name := s s end) -server.register(namespace="vars", "station_name", fun (s) -> begin station_name := s s end) -server.register(namespace="vars", "bootup_time", fun (s) -> begin time := s s end) -server.register(namespace="streams", "connection_status", fun (s) -> begin "1:#{!s1_connected},2:#{!s2_connected},3:#{!s3_connected}" end) -server.register(namespace="vars", "default_dj_fade", fun (s) -> begin default_dj_fade := float_of_string(s) s end) +server.register(namespace="vars", + "pypo_data", + fun (s) -> begin pypo_data := s "Done" end) +server.register(namespace="vars", + "stream_metadata_type", + fun (s) -> begin stream_metadata_type := int_of_string(s) s end) +server.register(namespace="vars", + "show_name", + fun (s) -> begin show_name := s s end) +server.register(namespace="vars", + "station_name", + fun (s) -> begin station_name := s s end) +server.register(namespace="vars", + "bootup_time", + fun (s) -> begin time := s s end) +server.register(namespace="streams", + "connection_status", + fun (s) -> begin "1:#{!s1_connected},2:#{!s2_connected},3:#{!s3_connected}" end) +server.register(namespace="vars", + "default_dj_fade", + fun (s) -> begin default_dj_fade := float_of_string(s) s end) + +server.register(namespace="dynamic_source", + description="Enable webstream output", + usage='start', + "output_start", + fun (s) -> begin webstream_enabled := true "enabled" end) +server.register(namespace="dynamic_source", + description="Enable webstream output", + usage='stop', + "output_stop", + fun (s) -> begin webstream_enabled := false "disabled" end) + server.register(namespace="dynamic_source", description="Set the cc_schedule row id", usage="id ", @@ -65,12 +93,12 @@ server.register(namespace="dynamic_source", server.register(namespace="dynamic_source", description="Start a new dynamic source.", usage="start ", - "start", + "read_start", create_dynamic_source) server.register(namespace="dynamic_source", description="Stop a dynamic source.", usage="stop ", - "stop", + "read_stop", destroy_dynamic_source) default = amplify(id="silence_src", 0.00001, noise()) diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 09dd7de37..09c1c89a1 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -386,17 +386,33 @@ class PypoPush(Thread): PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") elif media_item['event_type'] == "switch_off": PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") + elif media_item['type'] == 'stream_buffer_start': + self.start_web_stream_buffer(media_item) elif media_item['type'] == "stream": - """ - Source is a stream that we need to being downloading to a file. Then we may simply - point Liquidsoap to play this file. - """ self.start_web_stream(media_item) elif media_item['type'] == "stream_end": self.stop_web_stream(media_item) except Exception, e: self.logger.error('Pypo Push Exception: %s', e) + def start_web_stream_buffer(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + + msg = 'dynamic_source.read_start\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + + def start_web_stream(self, media_item): try: self.telnet_lock.acquire() @@ -410,7 +426,7 @@ class PypoPush(Thread): msg = 'streams.scheduled_play_start\n' tn.write(msg) - msg = 'dynamic_source.start %s\n' % media_item['uri'].encode('latin-1') + msg = 'dynamic_source.output_start\n' self.logger.debug(msg) tn.write(msg) @@ -429,7 +445,11 @@ class PypoPush(Thread): tn = telnetlib.Telnet(LS_HOST, LS_PORT) #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - msg = 'dynamic_source.stop %s\n' % media_item['uri'].encode('latin-1') + msg = 'dynamic_source.read_stop %s\n' % media_item['uri'].encode('latin-1') + self.logger.debug(msg) + tn.write(msg) + + msg = 'dynamic_source.output_stop\n' self.logger.debug(msg) tn.write(msg) From 4ee0c368ac14e1566da31388438aa3f09bb06fe8 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Wed, 15 Aug 2012 17:03:14 -0400 Subject: [PATCH 2/6] CC-1665: Scheduled stream rebroadcasting and recording -pre buffer streams part 2 --- airtime_mvc/application/models/Schedule.php | 1 + python_apps/pypo/pypopush.py | 11 +++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airtime_mvc/application/models/Schedule.php b/airtime_mvc/application/models/Schedule.php index 07de1f4b8..f7f518f4c 100644 --- a/airtime_mvc/application/models/Schedule.php +++ b/airtime_mvc/application/models/Schedule.php @@ -709,6 +709,7 @@ SQL; 'start' => $stream_buffer_start, 'end' => $stream_buffer_start, 'uri' => $uri, + 'row_id' => $item["id"], 'type' => 'stream_buffer_start', 'independent_event' => true ); diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 09c1c89a1..b04c60d82 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -400,7 +400,11 @@ class PypoPush(Thread): self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - msg = 'dynamic_source.read_start\n' + msg = 'dynamic_source.id %s\n' % media_item['row_id'] + tn.write(msg) + + #example: dynamic_source.read_start http://87.230.101.24:80/top100station.mp3 + msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1') self.logger.debug(msg) tn.write(msg) @@ -412,15 +416,10 @@ class PypoPush(Thread): self.telnet_lock.release() - def start_web_stream(self, media_item): try: self.telnet_lock.acquire() tn = telnetlib.Telnet(LS_HOST, LS_PORT) - #dynamic_source.start http://87.230.101.24:80/top100station.mp3 - - msg = 'dynamic_source.id %s\n' % media_item['row_id'] - tn.write(msg) #TODO: DO we need this? msg = 'streams.scheduled_play_start\n' From 5c1c836f11d1b3420d3fb2e9adac6cc7550eee6a Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Wed, 15 Aug 2012 18:30:12 -0400 Subject: [PATCH 3/6] CC-1665: Scheduled stream rebroadcasting and recording -pre buffer streams part 3 -finished --- python_apps/pypo/liquidsoap_scripts/ls_lib.liq | 2 +- python_apps/pypo/liquidsoap_scripts/ls_script.liq | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index feb303e68..172ba77b1 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -253,7 +253,7 @@ end # Function to create a playlist source and output it. def create_dynamic_source(uri) = # The playlist source - s = input.http(buffer=4., max=12., uri) + s = input.http(buffer=2., max=12., uri) # The output active_dyn_out = dyn_out(s) diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 7d3c1d56e..ddc7ae1a1 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -11,7 +11,6 @@ dyn_sources = ref [] webstream_enabled = ref false time = ref string_of(gettimeofday()) - queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5)) queue = cue_cut(queue) queue = amplify(1., override="replay_gain", queue) @@ -40,7 +39,7 @@ just_switched = ref false %include "ls_lib.liq" -web_stream = input.harbor("test-harbor",port=8999,password="hackme") +web_stream = mksafe(input.harbor("test-harbor", port=8999, password="hackme")) web_stream = on_metadata(notify_stream, web_stream) queue = on_metadata(notify, queue) @@ -48,7 +47,7 @@ queue = map_metadata(append_title, queue) # the crossfade function controls fade in/out queue = crossfade(queue) -queue = switch(track_sensitive=false, [({!webstream_enabled},web_stream), ({true}, queue)]) +queue = switch(id="stream_queue_switch", track_sensitive=false, [({!webstream_enabled},web_stream), ({true}, queue)]) ignore(output.dummy(queue, fallible=true)) From a87a790ce6e167f11f900bd75bf09b8d5016d5cd Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Thu, 16 Aug 2012 10:26:57 -0400 Subject: [PATCH 4/6] cc-4235: removed some magic to find bug --- python_apps/media-monitor/media-monitor.cfg | 2 +- .../media/monitor/eventcontractor.py | 1 + .../media-monitor2/media/monitor/listeners.py | 3 ++- .../media-monitor2/media/monitor/organizer.py | 20 +++++++++---------- .../media/monitor/watchersyncer.py | 5 +++-- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/python_apps/media-monitor/media-monitor.cfg b/python_apps/media-monitor/media-monitor.cfg index 546230b3d..b1167f56b 100644 --- a/python_apps/media-monitor/media-monitor.cfg +++ b/python_apps/media-monitor/media-monitor.cfg @@ -25,7 +25,7 @@ check_airtime_events = 30 #how long to queue metadata input from airtime. touch_interval = 5 chunking_number = 450 request_max_wait = 3.0 -rmq_event_wait = 0.5 +rmq_event_wait = 0.1 logpath = '/var/log/airtime/media-monitor/media-monitor.log' index_path = '/var/tmp/airtime/media-monitor/last_index' diff --git a/python_apps/media-monitor2/media/monitor/eventcontractor.py b/python_apps/media-monitor2/media/monitor/eventcontractor.py index cbc1f5c90..75fb384df 100644 --- a/python_apps/media-monitor2/media/monitor/eventcontractor.py +++ b/python_apps/media-monitor2/media/monitor/eventcontractor.py @@ -47,5 +47,6 @@ class EventContractor(Loggable): return True # We actually added something, hence we return true. def __unregister(self, evt): + self.logger.info("Unregistering. Left: '%d'" % len(self.store.keys())) try: del self.store[evt.path] except Exception as e: self.unexpected_exception(e) diff --git a/python_apps/media-monitor2/media/monitor/listeners.py b/python_apps/media-monitor2/media/monitor/listeners.py index ecc2b793a..6b959f250 100644 --- a/python_apps/media-monitor2/media/monitor/listeners.py +++ b/python_apps/media-monitor2/media/monitor/listeners.py @@ -66,9 +66,11 @@ class BaseListener(object): class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable): def process_IN_CLOSE_WRITE(self, event): + self.logger.info("===> IN_CLOSE_WRITE : '%s'" % event.pathname) self.process_to_organize(event) # got cookie def process_IN_MOVED_TO(self, event): + self.logger.info("===> IN_MOVED_TO : '%s'" % event.pathname) self.process_to_organize(event) def flush_events(self, path): @@ -85,7 +87,6 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable): flushed += 1 self.logger.info("Flushed organized directory with %d files" % flushed) - @mediate_ignored @IncludeOnly(mmp.supported_extensions) def process_to_organize(self, event): dispatcher.send(signal=self.signal, sender=self, diff --git a/python_apps/media-monitor2/media/monitor/organizer.py b/python_apps/media-monitor2/media/monitor/organizer.py index e5d0247de..7bab09fd5 100644 --- a/python_apps/media-monitor2/media/monitor/organizer.py +++ b/python_apps/media-monitor2/media/monitor/organizer.py @@ -15,16 +15,16 @@ class Organizer(ReportHandler,Loggable): StoreWatchListener) """ - _instance = None - def __new__(cls, channel, target_path, recorded_path): - if cls._instance: - cls._instance.channel = channel - cls._instance.target_path = target_path - cls._instance.recorded_path = recorded_path - else: - cls._instance = super(Organizer, cls).__new__( cls, channel, - target_path, recorded_path) - return cls._instance + #_instance = None + #def __new__(cls, channel, target_path, recorded_path): + #if cls._instance: + #cls._instance.channel = channel + #cls._instance.target_path = target_path + #cls._instance.recorded_path = recorded_path + #else: + #cls._instance = super(Organizer, cls).__new__( cls, channel, + #target_path, recorded_path) + #return cls._instance def __init__(self, channel, target_path, recorded_path): self.channel = channel diff --git a/python_apps/media-monitor2/media/monitor/watchersyncer.py b/python_apps/media-monitor2/media/monitor/watchersyncer.py index c400ad6d5..c1c7a610c 100644 --- a/python_apps/media-monitor2/media/monitor/watchersyncer.py +++ b/python_apps/media-monitor2/media/monitor/watchersyncer.py @@ -124,8 +124,9 @@ class WatchSyncer(ReportHandler,Loggable): try: # If there is a strange bug anywhere in the code the next line # should be a suspect - if self.contractor.register( event ): - self.push_queue( event ) + #if self.contractor.register( event ): + #self.push_queue( event ) + self.push_queue( event ) except BadSongFile as e: self.fatal_exception("Received bas song file '%s'" % e.path, e) except Exception as e: From ecbde40979fafbaf0aca607b509b8af37dd307a6 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Thu, 16 Aug 2012 10:42:28 -0400 Subject: [PATCH 5/6] fixed failing unit test --- python_apps/media-monitor2/tests/test_notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_apps/media-monitor2/tests/test_notifier.py b/python_apps/media-monitor2/tests/test_notifier.py index a42a21e12..fbab67148 100644 --- a/python_apps/media-monitor2/tests/test_notifier.py +++ b/python_apps/media-monitor2/tests/test_notifier.py @@ -18,7 +18,7 @@ class TestReceiver(unittest.TestCase): def test_supported(self): # Every supported message should fire something - for event_type in self.amr.dispatch_tables.keys(): + for event_type in self.amr.dispatch_table.keys(): msg = { 'event_type' : event_type, 'extra_param' : 123 } filtered = filter_ev(msg) # There should be a better way to test the following without From d50c4ded6bee57fce28325e363a22b880b2512d5 Mon Sep 17 00:00:00 2001 From: Rudi Grinberg Date: Thu, 16 Aug 2012 11:23:01 -0400 Subject: [PATCH 6/6] cc-4235: added debugging statements --- python_apps/media-monitor2/media/monitor/listeners.py | 8 ++++---- python_apps/media-monitor2/media/monitor/manager.py | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python_apps/media-monitor2/media/monitor/listeners.py b/python_apps/media-monitor2/media/monitor/listeners.py index 6b959f250..b41eff9be 100644 --- a/python_apps/media-monitor2/media/monitor/listeners.py +++ b/python_apps/media-monitor2/media/monitor/listeners.py @@ -61,18 +61,18 @@ def mediate_ignored(fn): return wrapped class BaseListener(object): - def my_init(self, signal): - self.signal = signal + def my_init(self, signal): self.signal = signal class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable): def process_IN_CLOSE_WRITE(self, event): - self.logger.info("===> IN_CLOSE_WRITE : '%s'" % event.pathname) self.process_to_organize(event) # got cookie def process_IN_MOVED_TO(self, event): - self.logger.info("===> IN_MOVED_TO : '%s'" % event.pathname) self.process_to_organize(event) + def process_default(self, event): + self.logger.info("===> Not handling: '%s'" % str(event)) + def flush_events(self, path): """ organize the whole directory at path. (pretty much by doing what diff --git a/python_apps/media-monitor2/media/monitor/manager.py b/python_apps/media-monitor2/media/monitor/manager.py index 1988568bb..11dc6fa9a 100644 --- a/python_apps/media-monitor2/media/monitor/manager.py +++ b/python_apps/media-monitor2/media/monitor/manager.py @@ -67,6 +67,8 @@ class Manager(Loggable): del(self.__wd_path[path]) def __add_watch(self,path,listener): + self.logger.info("Adding listener '%s' to '%s'" % + ( listener.__class__.__name__, path) ) wd = self.wm.add_watch(path, pyinotify.ALL_EVENTS, rec=True, auto_add=True, proc_fun=listener) self.__wd_path[path] = wd.values()[0]