From 1e57c12ce7e1fabb5c0893f4835c770c9efb5e29 Mon Sep 17 00:00:00 2001 From: Martin Konecny Date: Wed, 15 Aug 2012 16:43:01 -0400 Subject: [PATCH] CC-1665: Scheduled stream rebroadcasting and recording -pre buffer streams part 1 --- airtime_mvc/application/models/Schedule.php | 18 ++++++- .../pypo/liquidsoap_scripts/ls_lib.liq | 6 +-- .../pypo/liquidsoap_scripts/ls_script.liq | 50 +++++++++++++++---- python_apps/pypo/pypopush.py | 32 +++++++++--- 4 files changed, 85 insertions(+), 21 deletions(-) diff --git a/airtime_mvc/application/models/Schedule.php b/airtime_mvc/application/models/Schedule.php index 863417751..07de1f4b8 100644 --- a/airtime_mvc/application/models/Schedule.php +++ b/airtime_mvc/application/models/Schedule.php @@ -698,6 +698,22 @@ SQL; ); if ($type == "stream") { + //create an event to start stream buffering 5 seconds ahead of the streams actual time. + $buffer_start = new DateTime($item["start"], new DateTimeZone('UTC')); + $buffer_start->sub(new DateInterval("PT5S")); + + $stream_buffer_start = Application_Model_Schedule::AirtimeTimeToPypoTime($buffer_start->format("Y-m-d H:i:s")); + + //TODO: Make sure no other media is being overwritten! + $data["media"][$stream_buffer_start] = array( + 'start' => $stream_buffer_start, + 'end' => $stream_buffer_start, + 'uri' => $uri, + 'type' => 'stream_buffer_start', + 'independent_event' => true + ); + + //since a stream never ends we have to insert an additional "kick stream" event. The "start" //time of this event is the "end" time of the stream minus 1 second. $dt = new DateTime($item["end"], new DateTimeZone('UTC')); @@ -920,7 +936,7 @@ SQL; $hValue = trim(substr($data["add_show_duration"], 0, $hPos)); } if ($mPos !== false) { - $hPos = $hPos === FALSE ? 0 : $hPos+1; + $hPos = $hPos === false ? 0 : $hPos+1; $mValue = trim(substr($data["add_show_duration"], $hPos, -1 )); } diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq index ed5085e12..feb303e68 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq @@ -1,5 +1,5 @@ def notify(m) - current_media_id := string_of(m['schedule_table_id']) + #current_media_id := string_of(m['schedule_table_id']) command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --data='#{!pypo_data}' --media-id=#{m['schedule_table_id']} &" log(command) system(command) @@ -7,7 +7,7 @@ end def notify_stream(m) json_str = string.replace(pattern="\n",(fun (s) -> ""), json_of(m)) - command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --webstream='#{json_str}' --media-id=#{!current_media_id} &" + command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --webstream='#{json_str}' --media-id=#{!current_dyn_id} &" log(command) system(command) end @@ -253,7 +253,7 @@ end # Function to create a playlist source and output it. def create_dynamic_source(uri) = # The playlist source - s = input.http(uri) + s = input.http(buffer=4., max=12., uri) # The output active_dyn_out = dyn_out(s) diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq index 540bd29fa..7d3c1d56e 100644 --- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq +++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq @@ -8,6 +8,7 @@ set("server.telnet.port", 1234) #Dynamic source list dyn_sources = ref [] +webstream_enabled = ref false time = ref string_of(gettimeofday()) @@ -27,7 +28,7 @@ stream_metadata_type = ref 0 default_dj_fade = ref 0. station_name = ref '' show_name = ref '' -current_media_id = ref '' +#current_media_id = ref '' s1_connected = ref '' s2_connected = ref '' @@ -47,16 +48,43 @@ queue = map_metadata(append_title, queue) # the crossfade function controls fade in/out queue = crossfade(queue) -queue = fallback([web_stream, queue]) +queue = switch(track_sensitive=false, [({!webstream_enabled},web_stream), ({true}, queue)]) + ignore(output.dummy(queue, fallible=true)) -server.register(namespace="vars", "pypo_data", fun (s) -> begin pypo_data := s "Done" end) -server.register(namespace="vars", "stream_metadata_type", fun (s) -> begin stream_metadata_type := int_of_string(s) s end) -server.register(namespace="vars", "show_name", fun (s) -> begin show_name := s s end) -server.register(namespace="vars", "station_name", fun (s) -> begin station_name := s s end) -server.register(namespace="vars", "bootup_time", fun (s) -> begin time := s s end) -server.register(namespace="streams", "connection_status", fun (s) -> begin "1:#{!s1_connected},2:#{!s2_connected},3:#{!s3_connected}" end) -server.register(namespace="vars", "default_dj_fade", fun (s) -> begin default_dj_fade := float_of_string(s) s end) +server.register(namespace="vars", + "pypo_data", + fun (s) -> begin pypo_data := s "Done" end) +server.register(namespace="vars", + "stream_metadata_type", + fun (s) -> begin stream_metadata_type := int_of_string(s) s end) +server.register(namespace="vars", + "show_name", + fun (s) -> begin show_name := s s end) +server.register(namespace="vars", + "station_name", + fun (s) -> begin station_name := s s end) +server.register(namespace="vars", + "bootup_time", + fun (s) -> begin time := s s end) +server.register(namespace="streams", + "connection_status", + fun (s) -> begin "1:#{!s1_connected},2:#{!s2_connected},3:#{!s3_connected}" end) +server.register(namespace="vars", + "default_dj_fade", + fun (s) -> begin default_dj_fade := float_of_string(s) s end) + +server.register(namespace="dynamic_source", + description="Enable webstream output", + usage='start', + "output_start", + fun (s) -> begin webstream_enabled := true "enabled" end) +server.register(namespace="dynamic_source", + description="Enable webstream output", + usage='stop', + "output_stop", + fun (s) -> begin webstream_enabled := false "disabled" end) + server.register(namespace="dynamic_source", description="Set the cc_schedule row id", usage="id ", @@ -65,12 +93,12 @@ server.register(namespace="dynamic_source", server.register(namespace="dynamic_source", description="Start a new dynamic source.", usage="start ", - "start", + "read_start", create_dynamic_source) server.register(namespace="dynamic_source", description="Stop a dynamic source.", usage="stop ", - "stop", + "read_stop", destroy_dynamic_source) default = amplify(id="silence_src", 0.00001, noise()) diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py index 09dd7de37..09c1c89a1 100644 --- a/python_apps/pypo/pypopush.py +++ b/python_apps/pypo/pypopush.py @@ -386,17 +386,33 @@ class PypoPush(Thread): PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj") elif media_item['event_type'] == "switch_off": PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off") + elif media_item['type'] == 'stream_buffer_start': + self.start_web_stream_buffer(media_item) elif media_item['type'] == "stream": - """ - Source is a stream that we need to being downloading to a file. Then we may simply - point Liquidsoap to play this file. - """ self.start_web_stream(media_item) elif media_item['type'] == "stream_end": self.stop_web_stream(media_item) except Exception, e: self.logger.error('Pypo Push Exception: %s', e) + def start_web_stream_buffer(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(LS_HOST, LS_PORT) + + msg = 'dynamic_source.read_start\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + + def start_web_stream(self, media_item): try: self.telnet_lock.acquire() @@ -410,7 +426,7 @@ class PypoPush(Thread): msg = 'streams.scheduled_play_start\n' tn.write(msg) - msg = 'dynamic_source.start %s\n' % media_item['uri'].encode('latin-1') + msg = 'dynamic_source.output_start\n' self.logger.debug(msg) tn.write(msg) @@ -429,7 +445,11 @@ class PypoPush(Thread): tn = telnetlib.Telnet(LS_HOST, LS_PORT) #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 - msg = 'dynamic_source.stop %s\n' % media_item['uri'].encode('latin-1') + msg = 'dynamic_source.read_stop %s\n' % media_item['uri'].encode('latin-1') + self.logger.debug(msg) + tn.write(msg) + + msg = 'dynamic_source.output_stop\n' self.logger.debug(msg) tn.write(msg)