From 8c73731c415109f6844ec52004269002e5b5376d Mon Sep 17 00:00:00 2001
From: Martin Konecny <martin.konecny@gmail.com>
Date: Thu, 15 Nov 2012 16:43:22 -0500
Subject: [PATCH] CC-4633: Use David's new method of Liquidsoap webstream
 switching

-fixed
---
 .../pypo/liquidsoap_scripts/ls_lib.liq        | 238 ++++++++++--------
 .../pypo/liquidsoap_scripts/ls_script.liq     |  59 +++--
 python_apps/pypo/pypopush.py                  |  10 +-
 3 files changed, 171 insertions(+), 136 deletions(-)

diff --git a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq
index 0305a72e4..bebfb076c 100644
--- a/python_apps/pypo/liquidsoap_scripts/ls_lib.liq
+++ b/python_apps/pypo/liquidsoap_scripts/ls_lib.liq
@@ -376,13 +376,6 @@ def add_skip_command(s)
                  "skip",fun(s) -> begin log("source.skip") skip(s) end)
 end
 
-dyn_out = output.icecast(%wav,
-                     host="localhost",
-                     port=8999,
-                     password=stream_harbor_pass,
-                     mount="test-harbor",
-                     fallible=true)
-
 def set_dynamic_source_id(id) =
     current_dyn_id := id 
     string_of(!current_dyn_id)
@@ -392,123 +385,158 @@ def get_dynamic_source_id() =
     string_of(!current_dyn_id)
 end
 
+#cc-4633
 
-# Function to create a playlist source and output it.
-def create_dynamic_source(uri) =
-    # The playlist source 
-    s = audio_to_stereo(input.http(buffer=2., max=12., uri))
 
-    # The output
-    active_dyn_out = dyn_out(s)
+# NOTE
+# A few values are hardcoded and may be dependent:
+#  - the delay in gracetime is linked with the buffer duration of input.http
+#    (delay should be a bit less than buffer)
+#  - crossing duration should be less than buffer length
+#    (at best, a higher duration will be ineffective)
 
-    # We register both source and output 
-    # in the list of sources
-    dyn_sources := 
-      list.append([(!current_dyn_id, s),(!current_dyn_id, active_dyn_out)], !dyn_sources)
+# HTTP input with "restart" command that waits for "stop" to be effected
+# before "start" command is issued. Optionally it takes a new URL to play,
+# which makes it a convenient replacement for "url".
+# In the future, this may become a core feature of the HTTP input.
+# TODO If we stop and restart quickly several times in a row,
+#   the data bursts accumulate and create buffer overflow.
+#   Flushing the buffer on restart could be a good idea, but
+#   it would also create an interruptions while the buffer is
+#   refilling... on the other hand, this would avoid having to
+#   fade using both cross() and switch().
+def input.http_restart(~id,~initial_url="http://dummy/url")
+
+  source = input.http(buffer=5.,max=15.,id=id,autostart=false,initial_url)
+
+  def stopped()
+    "stopped" == list.hd(server.execute("#{id}.status"))
+  end
+
+  server.register(namespace=id,
+                  "restart",
+                  usage="restart [url]",
+                  fun (url) -> begin
+                    if url != "" then
+                      log(string_of(server.execute("#{id}.url #{url}")))
+                    end
+                    log(string_of(server.execute("#{id}.stop")))
+                    add_timeout(0.5,
+                      { if stopped() then
+                          log(string_of(server.execute("#{id}.start"))) ;
+                          (-1.)
+                        else 0.5 end})
+                    "OK"
+                  end)
+
+  # Dummy output should be useless if HTTP stream is meant
+  # to be listened to immediately. Otherwise, apply it.
+  #
+  # output.dummy(fallible=true,source)
+
+  source
 
-    notify([("schedule_table_id", !current_dyn_id)])
-    "Done!"
 end
 
+# Transitions between URL changes in HTTP streams.
+def cross_http(~debug=true,~http_input_id,source)
 
-# A function to destroy a dynamic source
-def destroy_dynamic_source(id) = 
-  # We need to find the source in the list,
-  # remove it and destroy it. Currently, the language
-  # lacks some nice operators for that so we do it
-  # the functional way
+  id = http_input_id
+  last_url = ref ""
+  change = ref false
 
-  # This function is executed on every item in the list
-  # of dynamic sources
-  def parse_list(ret, current_element) = 
-    # ret is of the form: (matching_sources, remaining_sources)
-    # We extract those two:
-    matching_sources = fst(ret)
-    remaining_sources = snd(ret)
-
-    # current_element is of the form: ("uri", source) so 
-    # we check the first element
-    current_id = fst(current_element)
-    if current_id == id then
-      # In this case, we add the source to the list of
-      # matched sources
-      (list.append( [snd(current_element)], 
-                     matching_sources),
-       remaining_sources)
-    else
-       # In this case, we put the element in the list of remaining
-       # sources
-      (matching_sources,
-       list.append([current_element], 
-                    remaining_sources))
+  def on_m(m)
+    changed = m["source_url"] != !last_url
+    log("URL now #{m['source_url']} (change: #{changed})")
+    if changed then
+      if !last_url != "" then change := true end
+      last_url := m["source_url"]
     end
   end
-    
-  # Now we execute the function:
-  result = list.fold(parse_list, ([], []), !dyn_sources)
-  matching_sources = fst(result)
-  remaining_sources = snd(result)
 
-  # We store the remaining sources in dyn_sources
-  dyn_sources := remaining_sources
+  # We use both metadata and status to know about the current URL.
+  # Using only metadata may be more precise is crazy corner cases,
+  # but it's also asking too much: the metadata may not pass through
+  # before the crosser is instantiated.
+  # Using only status in crosser misses some info, eg. on first URL.
+  source = on_metadata(on_m,source)
 
-  # If no source matched, we return an error
-  if list.length(matching_sources) == 0 then
-    "Error: no matching sources!"
-  else
-    # We stop all sources
-    list.iter(source.shutdown, matching_sources)
-    # And return
-    "Done!"
+  cross_d = 3.
+
+  def crosser(a,b)
+    url = list.hd(server.execute('#{id}.url'))
+    status = list.hd(server.execute('#{id}.status'))
+    on_m([("source_url",url)])
+    if debug then
+      log("New track inside HTTP stream")
+      log("  status: #{status}")
+      log("  need to cross: #{!change}")
+      log("  remaining #{source.remaining(a)} sec before, \
+             #{source.remaining(b)} sec after")
+    end
+    if !change then
+      change := false
+      # In principle one should avoid crossing on a live stream
+      # it'd be okay to do it here (eg. use add instead of sequence)
+      # because it's only once per URL, but be cautious.
+      sequence([fade.out(duration=cross_d,a),fade.in(b)])
+    else
+      # This is done on tracks inside a single stream.
+      # Do NOT cross here or you'll gradually empty the buffer!
+      sequence([a,b])
+    end
   end
+
+  # Setting conservative=true would mess with the delayed switch below
+  cross(duration=cross_d,conservative=false,crosser,source)
+
 end
 
+# Custom fallback between http and default source with fading of
+# beginning and end of HTTP stream.
+# It does not take potential URL changes into account, as long as
+# they do not interrupt streaming (thanks to the HTTP buffer).
+def http_fallback(~http_input_id,~http,~default)
 
+  id = http_input_id
 
+  # We use a custom switching predicate to trigger switching (and thus,
+  # transitions) before the end of a track (rather, end of HTTP stream).
+  # It is complexified because we don't want to trigger switching when
+  # HTTP disconnects for just an instant, when changing URL: for that
+  # we use gracetime below.
 
-# A function to destroy a dynamic source
-def destroy_dynamic_source_all() = 
-  # We need to find the source in the list,
-  # remove it and destroy it. Currently, the language
-  # lacks some nice operators for that so we do it
-  # the functional way
-
-  # This function is executed on every item in the list
-  # of dynamic sources
-  def parse_list(ret, current_element) = 
-    # ret is of the form: (matching_sources, remaining_sources)
-    # We extract those two:
-    matching_sources = fst(ret)
-    remaining_sources = snd(ret)
-
-    # current_element is of the form: ("uri", source) so 
-    # we check the first element
-    current_uri = fst(current_element)
-    # in this case, we add the source to the list of
-    # matched sources
-    (list.append( [snd(current_element)], 
-                    matching_sources),
-    remaining_sources)
+  def gracetime(~delay=3.,f)
+    last_true = ref 0.
+    { if f() then
+        last_true := gettimeofday()
+        true
+      else
+        gettimeofday() < !last_true+delay
+      end }
   end
-    
-  # now we execute the function:
-  result = list.fold(parse_list, ([], []), !dyn_sources)
-  matching_sources = fst(result)
-  remaining_sources = snd(result)
 
-  # we store the remaining sources in dyn_sources
-  dyn_sources := remaining_sources
-
-  # if no source matched, we return an error
-  if list.length(matching_sources) == 0 then
-    "error: no matching sources!"
-  else
-    # we stop all sources
-    list.iter(source.shutdown, matching_sources)
-    # And return
-    "Done!"
+  def connected()
+    status = list.hd(server.execute("#{id}.status"))
+    not(list.mem(status,["polling","stopped"]))
   end
+  connected = gracetime(connected)
+
+  def to_live(a,b) =
+    log("TRANSITION to live")
+    add(normalize=false,
+        [fade.initial(b),fade.final(a)])
+  end
+  def to_static(a,b) =
+    log("TRANSITION to static")
+    sequence([fade.out(a),fade.initial(b)])
+  end
+
+  switch(
+    track_sensitive=false,
+    transitions=[to_live,to_static],
+    [(# make sure it is connected, and not buffering
+      {connected() and source.is_ready(http) and !webstream_enabled}, http),
+     ({true},default)])
+
 end
-
-
-
diff --git a/python_apps/pypo/liquidsoap_scripts/ls_script.liq b/python_apps/pypo/liquidsoap_scripts/ls_script.liq
index 34e56786e..962b20b2c 100644
--- a/python_apps/pypo/liquidsoap_scripts/ls_script.liq
+++ b/python_apps/pypo/liquidsoap_scripts/ls_script.liq
@@ -7,11 +7,11 @@ set("server.telnet", true)
 set("server.telnet.port", 1234)
 
 #Dynamic source list
-dyn_sources = ref []
+#dyn_sources = ref []
 webstream_enabled = ref false
 
 time = ref string_of(gettimeofday())
-queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
+queue = audio_to_stereo(id="queue_src", mksafe(request.equeue(id="queue", length=0.5)))
 queue = cue_cut(queue)
 queue = amplify(1., override="replay_gain", queue)
 
@@ -35,15 +35,19 @@ s2_namespace = ref ''
 s3_namespace = ref ''
 just_switched = ref false
 
-stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
+#stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
 
 %include "ls_lib.liq"
 
-web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass)
-web_stream = on_metadata(notify_stream, web_stream)
-output.dummy(fallible=true, web_stream)
+#web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass)
+#web_stream = on_metadata(notify_stream, web_stream)
+#output.dummy(fallible=true, web_stream)
 
 
+http = input.http_restart(id="http")
+http = cross_http(http_input_id="http",http)
+stream_queue = http_fallback(http_input_id="http",http=http,default=queue)
+
 # the crossfade function controls fade in/out
 queue = crossfade_airtime(queue)
 queue = on_metadata(notify, queue)
@@ -51,10 +55,10 @@ queue = map_metadata(update=false, append_title, queue)
 output.dummy(fallible=true, queue)
 
 
-stream_queue = switch(id="stream_queue_switch", track_sensitive=false,
-    transitions=[transition, transition],
-    [({!webstream_enabled},web_stream),
-    ({true}, queue)])
+#stream_queue = switch(id="stream_queue_switch", track_sensitive=false,
+#    transitions=[transition, transition],
+#    [({!webstream_enabled},web_stream),
+#    ({true}, queue)])
 
 ignore(output.dummy(stream_queue, fallible=true))
 
@@ -92,32 +96,33 @@ server.register(namespace="dynamic_source",
                 fun (s) -> begin log("dynamic_source.output_stop") webstream_enabled := false "disabled" end)
 
 server.register(namespace="dynamic_source",
-                description="Set the cc_schedule row id",
+                description="Set the streams cc_schedule row id",
                 usage="id <id>",
                 "id",
                 fun (s) -> begin log("dynamic_source.id") set_dynamic_source_id(s) end)
 
 server.register(namespace="dynamic_source",
-                description="Get the cc_schedule row id",
+                description="Get the streams cc_schedule row id",
                 usage="get_id",
                 "get_id",
                 fun (s) -> begin log("dynamic_source.get_id") get_dynamic_source_id() end)
 
-server.register(namespace="dynamic_source",
-                description="Start a new dynamic source.",
-                usage="start <uri>",
-                "read_start",
-                fun (uri) -> begin log("dynamic_source.read_start") create_dynamic_source(uri) end)
-server.register(namespace="dynamic_source",
-                description="Stop a dynamic source.",
-                usage="stop <id>",
-                "read_stop",
-                fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source(s) end)
-server.register(namespace="dynamic_source",
-                description="Stop a dynamic source.",
-                usage="stop <id>",
-                "read_stop_all",
-                fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end)
+#server.register(namespace="dynamic_source",
+#                description="Start a new dynamic source.",
+#                usage="start <uri>",
+#                "read_start",
+#                fun (uri) -> begin log("dynamic_source.read_start") begin_stream_read(uri) end)
+#server.register(namespace="dynamic_source",
+#                description="Stop a dynamic source.",
+#                usage="stop <id>",
+#                "read_stop",
+#                fun (s) -> begin log("dynamic_source.read_stop") stop_stream_read(s) end)
+
+#server.register(namespace="dynamic_source",
+#                description="Stop a dynamic source.",
+#                usage="stop <id>",
+#                "read_stop_all",
+#                fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end)
 
 default = amplify(id="silence_src", 0.00001, noise())
 default = rewrite_metadata([("artist","Airtime"), ("title", "offline")], default)
diff --git a/python_apps/pypo/pypopush.py b/python_apps/pypo/pypopush.py
index 0382b5e06..c0bb36ff9 100644
--- a/python_apps/pypo/pypopush.py
+++ b/python_apps/pypo/pypopush.py
@@ -478,8 +478,8 @@ class PypoPush(Thread):
             self.logger.debug(msg)
             tn.write(msg)
 
-            #example: dynamic_source.read_start http://87.230.101.24:80/top100station.mp3
-            msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
+            #msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
+            msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1')
             self.logger.debug(msg)
             tn.write(msg)
 
@@ -520,7 +520,8 @@ class PypoPush(Thread):
             self.telnet_lock.acquire()
             tn = telnetlib.Telnet(LS_HOST, LS_PORT)
 
-            msg = 'dynamic_source.read_stop_all xxx\n'
+            #msg = 'dynamic_source.read_stop_all xxx\n'
+            msg = 'http.stop\n'
             self.logger.debug(msg)
             tn.write(msg)
 
@@ -546,7 +547,8 @@ class PypoPush(Thread):
             tn = telnetlib.Telnet(LS_HOST, LS_PORT)
             #dynamic_source.stop http://87.230.101.24:80/top100station.mp3
 
-            msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
+            #msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
+            msg = 'http.stop\n'
             self.logger.debug(msg)
             tn.write(msg)