CC-4633: Use David's new method of Liquidsoap webstream switching
-fixed
This commit is contained in:
parent
7efa7169ca
commit
8c73731c41
|
@ -376,13 +376,6 @@ def add_skip_command(s)
|
||||||
"skip",fun(s) -> begin log("source.skip") skip(s) end)
|
"skip",fun(s) -> begin log("source.skip") skip(s) end)
|
||||||
end
|
end
|
||||||
|
|
||||||
dyn_out = output.icecast(%wav,
|
|
||||||
host="localhost",
|
|
||||||
port=8999,
|
|
||||||
password=stream_harbor_pass,
|
|
||||||
mount="test-harbor",
|
|
||||||
fallible=true)
|
|
||||||
|
|
||||||
def set_dynamic_source_id(id) =
|
def set_dynamic_source_id(id) =
|
||||||
current_dyn_id := id
|
current_dyn_id := id
|
||||||
string_of(!current_dyn_id)
|
string_of(!current_dyn_id)
|
||||||
|
@ -392,123 +385,158 @@ def get_dynamic_source_id() =
|
||||||
string_of(!current_dyn_id)
|
string_of(!current_dyn_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
#cc-4633
|
||||||
|
|
||||||
# Function to create a playlist source and output it.
|
|
||||||
def create_dynamic_source(uri) =
|
|
||||||
# The playlist source
|
|
||||||
s = audio_to_stereo(input.http(buffer=2., max=12., uri))
|
|
||||||
|
|
||||||
# The output
|
# NOTE
|
||||||
active_dyn_out = dyn_out(s)
|
# A few values are hardcoded and may be dependent:
|
||||||
|
# - the delay in gracetime is linked with the buffer duration of input.http
|
||||||
|
# (delay should be a bit less than buffer)
|
||||||
|
# - crossing duration should be less than buffer length
|
||||||
|
# (at best, a higher duration will be ineffective)
|
||||||
|
|
||||||
# We register both source and output
|
# HTTP input with "restart" command that waits for "stop" to be effected
|
||||||
# in the list of sources
|
# before "start" command is issued. Optionally it takes a new URL to play,
|
||||||
dyn_sources :=
|
# which makes it a convenient replacement for "url".
|
||||||
list.append([(!current_dyn_id, s),(!current_dyn_id, active_dyn_out)], !dyn_sources)
|
# In the future, this may become a core feature of the HTTP input.
|
||||||
|
# TODO If we stop and restart quickly several times in a row,
|
||||||
|
# the data bursts accumulate and create buffer overflow.
|
||||||
|
# Flushing the buffer on restart could be a good idea, but
|
||||||
|
# it would also create an interruptions while the buffer is
|
||||||
|
# refilling... on the other hand, this would avoid having to
|
||||||
|
# fade using both cross() and switch().
|
||||||
|
def input.http_restart(~id,~initial_url="http://dummy/url")
|
||||||
|
|
||||||
|
source = input.http(buffer=5.,max=15.,id=id,autostart=false,initial_url)
|
||||||
|
|
||||||
|
def stopped()
|
||||||
|
"stopped" == list.hd(server.execute("#{id}.status"))
|
||||||
|
end
|
||||||
|
|
||||||
|
server.register(namespace=id,
|
||||||
|
"restart",
|
||||||
|
usage="restart [url]",
|
||||||
|
fun (url) -> begin
|
||||||
|
if url != "" then
|
||||||
|
log(string_of(server.execute("#{id}.url #{url}")))
|
||||||
|
end
|
||||||
|
log(string_of(server.execute("#{id}.stop")))
|
||||||
|
add_timeout(0.5,
|
||||||
|
{ if stopped() then
|
||||||
|
log(string_of(server.execute("#{id}.start"))) ;
|
||||||
|
(-1.)
|
||||||
|
else 0.5 end})
|
||||||
|
"OK"
|
||||||
|
end)
|
||||||
|
|
||||||
|
# Dummy output should be useless if HTTP stream is meant
|
||||||
|
# to be listened to immediately. Otherwise, apply it.
|
||||||
|
#
|
||||||
|
# output.dummy(fallible=true,source)
|
||||||
|
|
||||||
|
source
|
||||||
|
|
||||||
notify([("schedule_table_id", !current_dyn_id)])
|
|
||||||
"Done!"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Transitions between URL changes in HTTP streams.
|
||||||
|
def cross_http(~debug=true,~http_input_id,source)
|
||||||
|
|
||||||
# A function to destroy a dynamic source
|
id = http_input_id
|
||||||
def destroy_dynamic_source(id) =
|
last_url = ref ""
|
||||||
# We need to find the source in the list,
|
change = ref false
|
||||||
# remove it and destroy it. Currently, the language
|
|
||||||
# lacks some nice operators for that so we do it
|
|
||||||
# the functional way
|
|
||||||
|
|
||||||
# This function is executed on every item in the list
|
def on_m(m)
|
||||||
# of dynamic sources
|
changed = m["source_url"] != !last_url
|
||||||
def parse_list(ret, current_element) =
|
log("URL now #{m['source_url']} (change: #{changed})")
|
||||||
# ret is of the form: (matching_sources, remaining_sources)
|
if changed then
|
||||||
# We extract those two:
|
if !last_url != "" then change := true end
|
||||||
matching_sources = fst(ret)
|
last_url := m["source_url"]
|
||||||
remaining_sources = snd(ret)
|
|
||||||
|
|
||||||
# current_element is of the form: ("uri", source) so
|
|
||||||
# we check the first element
|
|
||||||
current_id = fst(current_element)
|
|
||||||
if current_id == id then
|
|
||||||
# In this case, we add the source to the list of
|
|
||||||
# matched sources
|
|
||||||
(list.append( [snd(current_element)],
|
|
||||||
matching_sources),
|
|
||||||
remaining_sources)
|
|
||||||
else
|
|
||||||
# In this case, we put the element in the list of remaining
|
|
||||||
# sources
|
|
||||||
(matching_sources,
|
|
||||||
list.append([current_element],
|
|
||||||
remaining_sources))
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Now we execute the function:
|
|
||||||
result = list.fold(parse_list, ([], []), !dyn_sources)
|
|
||||||
matching_sources = fst(result)
|
|
||||||
remaining_sources = snd(result)
|
|
||||||
|
|
||||||
# We store the remaining sources in dyn_sources
|
# We use both metadata and status to know about the current URL.
|
||||||
dyn_sources := remaining_sources
|
# Using only metadata may be more precise is crazy corner cases,
|
||||||
|
# but it's also asking too much: the metadata may not pass through
|
||||||
|
# before the crosser is instantiated.
|
||||||
|
# Using only status in crosser misses some info, eg. on first URL.
|
||||||
|
source = on_metadata(on_m,source)
|
||||||
|
|
||||||
# If no source matched, we return an error
|
cross_d = 3.
|
||||||
if list.length(matching_sources) == 0 then
|
|
||||||
"Error: no matching sources!"
|
def crosser(a,b)
|
||||||
else
|
url = list.hd(server.execute('#{id}.url'))
|
||||||
# We stop all sources
|
status = list.hd(server.execute('#{id}.status'))
|
||||||
list.iter(source.shutdown, matching_sources)
|
on_m([("source_url",url)])
|
||||||
# And return
|
if debug then
|
||||||
"Done!"
|
log("New track inside HTTP stream")
|
||||||
|
log(" status: #{status}")
|
||||||
|
log(" need to cross: #{!change}")
|
||||||
|
log(" remaining #{source.remaining(a)} sec before, \
|
||||||
|
#{source.remaining(b)} sec after")
|
||||||
|
end
|
||||||
|
if !change then
|
||||||
|
change := false
|
||||||
|
# In principle one should avoid crossing on a live stream
|
||||||
|
# it'd be okay to do it here (eg. use add instead of sequence)
|
||||||
|
# because it's only once per URL, but be cautious.
|
||||||
|
sequence([fade.out(duration=cross_d,a),fade.in(b)])
|
||||||
|
else
|
||||||
|
# This is done on tracks inside a single stream.
|
||||||
|
# Do NOT cross here or you'll gradually empty the buffer!
|
||||||
|
sequence([a,b])
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Setting conservative=true would mess with the delayed switch below
|
||||||
|
cross(duration=cross_d,conservative=false,crosser,source)
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Custom fallback between http and default source with fading of
|
||||||
|
# beginning and end of HTTP stream.
|
||||||
|
# It does not take potential URL changes into account, as long as
|
||||||
|
# they do not interrupt streaming (thanks to the HTTP buffer).
|
||||||
|
def http_fallback(~http_input_id,~http,~default)
|
||||||
|
|
||||||
|
id = http_input_id
|
||||||
|
|
||||||
|
# We use a custom switching predicate to trigger switching (and thus,
|
||||||
|
# transitions) before the end of a track (rather, end of HTTP stream).
|
||||||
|
# It is complexified because we don't want to trigger switching when
|
||||||
|
# HTTP disconnects for just an instant, when changing URL: for that
|
||||||
|
# we use gracetime below.
|
||||||
|
|
||||||
# A function to destroy a dynamic source
|
def gracetime(~delay=3.,f)
|
||||||
def destroy_dynamic_source_all() =
|
last_true = ref 0.
|
||||||
# We need to find the source in the list,
|
{ if f() then
|
||||||
# remove it and destroy it. Currently, the language
|
last_true := gettimeofday()
|
||||||
# lacks some nice operators for that so we do it
|
true
|
||||||
# the functional way
|
else
|
||||||
|
gettimeofday() < !last_true+delay
|
||||||
# This function is executed on every item in the list
|
end }
|
||||||
# of dynamic sources
|
|
||||||
def parse_list(ret, current_element) =
|
|
||||||
# ret is of the form: (matching_sources, remaining_sources)
|
|
||||||
# We extract those two:
|
|
||||||
matching_sources = fst(ret)
|
|
||||||
remaining_sources = snd(ret)
|
|
||||||
|
|
||||||
# current_element is of the form: ("uri", source) so
|
|
||||||
# we check the first element
|
|
||||||
current_uri = fst(current_element)
|
|
||||||
# in this case, we add the source to the list of
|
|
||||||
# matched sources
|
|
||||||
(list.append( [snd(current_element)],
|
|
||||||
matching_sources),
|
|
||||||
remaining_sources)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# now we execute the function:
|
|
||||||
result = list.fold(parse_list, ([], []), !dyn_sources)
|
|
||||||
matching_sources = fst(result)
|
|
||||||
remaining_sources = snd(result)
|
|
||||||
|
|
||||||
# we store the remaining sources in dyn_sources
|
def connected()
|
||||||
dyn_sources := remaining_sources
|
status = list.hd(server.execute("#{id}.status"))
|
||||||
|
not(list.mem(status,["polling","stopped"]))
|
||||||
# if no source matched, we return an error
|
|
||||||
if list.length(matching_sources) == 0 then
|
|
||||||
"error: no matching sources!"
|
|
||||||
else
|
|
||||||
# we stop all sources
|
|
||||||
list.iter(source.shutdown, matching_sources)
|
|
||||||
# And return
|
|
||||||
"Done!"
|
|
||||||
end
|
end
|
||||||
|
connected = gracetime(connected)
|
||||||
|
|
||||||
|
def to_live(a,b) =
|
||||||
|
log("TRANSITION to live")
|
||||||
|
add(normalize=false,
|
||||||
|
[fade.initial(b),fade.final(a)])
|
||||||
|
end
|
||||||
|
def to_static(a,b) =
|
||||||
|
log("TRANSITION to static")
|
||||||
|
sequence([fade.out(a),fade.initial(b)])
|
||||||
|
end
|
||||||
|
|
||||||
|
switch(
|
||||||
|
track_sensitive=false,
|
||||||
|
transitions=[to_live,to_static],
|
||||||
|
[(# make sure it is connected, and not buffering
|
||||||
|
{connected() and source.is_ready(http) and !webstream_enabled}, http),
|
||||||
|
({true},default)])
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,11 +7,11 @@ set("server.telnet", true)
|
||||||
set("server.telnet.port", 1234)
|
set("server.telnet.port", 1234)
|
||||||
|
|
||||||
#Dynamic source list
|
#Dynamic source list
|
||||||
dyn_sources = ref []
|
#dyn_sources = ref []
|
||||||
webstream_enabled = ref false
|
webstream_enabled = ref false
|
||||||
|
|
||||||
time = ref string_of(gettimeofday())
|
time = ref string_of(gettimeofday())
|
||||||
queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
|
queue = audio_to_stereo(id="queue_src", mksafe(request.equeue(id="queue", length=0.5)))
|
||||||
queue = cue_cut(queue)
|
queue = cue_cut(queue)
|
||||||
queue = amplify(1., override="replay_gain", queue)
|
queue = amplify(1., override="replay_gain", queue)
|
||||||
|
|
||||||
|
@ -35,15 +35,19 @@ s2_namespace = ref ''
|
||||||
s3_namespace = ref ''
|
s3_namespace = ref ''
|
||||||
just_switched = ref false
|
just_switched = ref false
|
||||||
|
|
||||||
stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
|
#stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
|
||||||
|
|
||||||
%include "ls_lib.liq"
|
%include "ls_lib.liq"
|
||||||
|
|
||||||
web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass)
|
#web_stream = input.harbor("test-harbor", port=8999, password=stream_harbor_pass)
|
||||||
web_stream = on_metadata(notify_stream, web_stream)
|
#web_stream = on_metadata(notify_stream, web_stream)
|
||||||
output.dummy(fallible=true, web_stream)
|
#output.dummy(fallible=true, web_stream)
|
||||||
|
|
||||||
|
|
||||||
|
http = input.http_restart(id="http")
|
||||||
|
http = cross_http(http_input_id="http",http)
|
||||||
|
stream_queue = http_fallback(http_input_id="http",http=http,default=queue)
|
||||||
|
|
||||||
# the crossfade function controls fade in/out
|
# the crossfade function controls fade in/out
|
||||||
queue = crossfade_airtime(queue)
|
queue = crossfade_airtime(queue)
|
||||||
queue = on_metadata(notify, queue)
|
queue = on_metadata(notify, queue)
|
||||||
|
@ -51,10 +55,10 @@ queue = map_metadata(update=false, append_title, queue)
|
||||||
output.dummy(fallible=true, queue)
|
output.dummy(fallible=true, queue)
|
||||||
|
|
||||||
|
|
||||||
stream_queue = switch(id="stream_queue_switch", track_sensitive=false,
|
#stream_queue = switch(id="stream_queue_switch", track_sensitive=false,
|
||||||
transitions=[transition, transition],
|
# transitions=[transition, transition],
|
||||||
[({!webstream_enabled},web_stream),
|
# [({!webstream_enabled},web_stream),
|
||||||
({true}, queue)])
|
# ({true}, queue)])
|
||||||
|
|
||||||
ignore(output.dummy(stream_queue, fallible=true))
|
ignore(output.dummy(stream_queue, fallible=true))
|
||||||
|
|
||||||
|
@ -92,32 +96,33 @@ server.register(namespace="dynamic_source",
|
||||||
fun (s) -> begin log("dynamic_source.output_stop") webstream_enabled := false "disabled" end)
|
fun (s) -> begin log("dynamic_source.output_stop") webstream_enabled := false "disabled" end)
|
||||||
|
|
||||||
server.register(namespace="dynamic_source",
|
server.register(namespace="dynamic_source",
|
||||||
description="Set the cc_schedule row id",
|
description="Set the streams cc_schedule row id",
|
||||||
usage="id <id>",
|
usage="id <id>",
|
||||||
"id",
|
"id",
|
||||||
fun (s) -> begin log("dynamic_source.id") set_dynamic_source_id(s) end)
|
fun (s) -> begin log("dynamic_source.id") set_dynamic_source_id(s) end)
|
||||||
|
|
||||||
server.register(namespace="dynamic_source",
|
server.register(namespace="dynamic_source",
|
||||||
description="Get the cc_schedule row id",
|
description="Get the streams cc_schedule row id",
|
||||||
usage="get_id",
|
usage="get_id",
|
||||||
"get_id",
|
"get_id",
|
||||||
fun (s) -> begin log("dynamic_source.get_id") get_dynamic_source_id() end)
|
fun (s) -> begin log("dynamic_source.get_id") get_dynamic_source_id() end)
|
||||||
|
|
||||||
server.register(namespace="dynamic_source",
|
#server.register(namespace="dynamic_source",
|
||||||
description="Start a new dynamic source.",
|
# description="Start a new dynamic source.",
|
||||||
usage="start <uri>",
|
# usage="start <uri>",
|
||||||
"read_start",
|
# "read_start",
|
||||||
fun (uri) -> begin log("dynamic_source.read_start") create_dynamic_source(uri) end)
|
# fun (uri) -> begin log("dynamic_source.read_start") begin_stream_read(uri) end)
|
||||||
server.register(namespace="dynamic_source",
|
#server.register(namespace="dynamic_source",
|
||||||
description="Stop a dynamic source.",
|
# description="Stop a dynamic source.",
|
||||||
usage="stop <id>",
|
# usage="stop <id>",
|
||||||
"read_stop",
|
# "read_stop",
|
||||||
fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source(s) end)
|
# fun (s) -> begin log("dynamic_source.read_stop") stop_stream_read(s) end)
|
||||||
server.register(namespace="dynamic_source",
|
|
||||||
description="Stop a dynamic source.",
|
#server.register(namespace="dynamic_source",
|
||||||
usage="stop <id>",
|
# description="Stop a dynamic source.",
|
||||||
"read_stop_all",
|
# usage="stop <id>",
|
||||||
fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end)
|
# "read_stop_all",
|
||||||
|
# fun (s) -> begin log("dynamic_source.read_stop") destroy_dynamic_source_all() end)
|
||||||
|
|
||||||
default = amplify(id="silence_src", 0.00001, noise())
|
default = amplify(id="silence_src", 0.00001, noise())
|
||||||
default = rewrite_metadata([("artist","Airtime"), ("title", "offline")], default)
|
default = rewrite_metadata([("artist","Airtime"), ("title", "offline")], default)
|
||||||
|
|
|
@ -478,8 +478,8 @@ class PypoPush(Thread):
|
||||||
self.logger.debug(msg)
|
self.logger.debug(msg)
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
#example: dynamic_source.read_start http://87.230.101.24:80/top100station.mp3
|
#msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
|
||||||
msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
|
msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1')
|
||||||
self.logger.debug(msg)
|
self.logger.debug(msg)
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
|
@ -520,7 +520,8 @@ class PypoPush(Thread):
|
||||||
self.telnet_lock.acquire()
|
self.telnet_lock.acquire()
|
||||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||||
|
|
||||||
msg = 'dynamic_source.read_stop_all xxx\n'
|
#msg = 'dynamic_source.read_stop_all xxx\n'
|
||||||
|
msg = 'http.stop\n'
|
||||||
self.logger.debug(msg)
|
self.logger.debug(msg)
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
|
@ -546,7 +547,8 @@ class PypoPush(Thread):
|
||||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||||
|
|
||||||
msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
|
#msg = 'dynamic_source.read_stop %s\n' % media_item['row_id']
|
||||||
|
msg = 'http.stop\n'
|
||||||
self.logger.debug(msg)
|
self.logger.debug(msg)
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue