Merge branch 'devel' of dev.sourcefabric.org:airtime into devel
This commit is contained in:
commit
09c66f9f94
airtime_mvc/application/models
python_apps
media-monitor
media-monitor2
media/monitor
tests
pypo
|
@ -698,6 +698,23 @@ SQL;
|
||||||
);
|
);
|
||||||
|
|
||||||
if ($type == "stream") {
|
if ($type == "stream") {
|
||||||
|
//create an event to start stream buffering 5 seconds ahead of the streams actual time.
|
||||||
|
$buffer_start = new DateTime($item["start"], new DateTimeZone('UTC'));
|
||||||
|
$buffer_start->sub(new DateInterval("PT5S"));
|
||||||
|
|
||||||
|
$stream_buffer_start = Application_Model_Schedule::AirtimeTimeToPypoTime($buffer_start->format("Y-m-d H:i:s"));
|
||||||
|
|
||||||
|
//TODO: Make sure no other media is being overwritten!
|
||||||
|
$data["media"][$stream_buffer_start] = array(
|
||||||
|
'start' => $stream_buffer_start,
|
||||||
|
'end' => $stream_buffer_start,
|
||||||
|
'uri' => $uri,
|
||||||
|
'row_id' => $item["id"],
|
||||||
|
'type' => 'stream_buffer_start',
|
||||||
|
'independent_event' => true
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
//since a stream never ends we have to insert an additional "kick stream" event. The "start"
|
//since a stream never ends we have to insert an additional "kick stream" event. The "start"
|
||||||
//time of this event is the "end" time of the stream minus 1 second.
|
//time of this event is the "end" time of the stream minus 1 second.
|
||||||
$dt = new DateTime($item["end"], new DateTimeZone('UTC'));
|
$dt = new DateTime($item["end"], new DateTimeZone('UTC'));
|
||||||
|
@ -920,7 +937,7 @@ SQL;
|
||||||
$hValue = trim(substr($data["add_show_duration"], 0, $hPos));
|
$hValue = trim(substr($data["add_show_duration"], 0, $hPos));
|
||||||
}
|
}
|
||||||
if ($mPos !== false) {
|
if ($mPos !== false) {
|
||||||
$hPos = $hPos === FALSE ? 0 : $hPos+1;
|
$hPos = $hPos === false ? 0 : $hPos+1;
|
||||||
$mValue = trim(substr($data["add_show_duration"], $hPos, -1 ));
|
$mValue = trim(substr($data["add_show_duration"], $hPos, -1 ));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ check_airtime_events = 30 #how long to queue metadata input from airtime.
|
||||||
touch_interval = 5
|
touch_interval = 5
|
||||||
chunking_number = 450
|
chunking_number = 450
|
||||||
request_max_wait = 3.0
|
request_max_wait = 3.0
|
||||||
rmq_event_wait = 0.5
|
rmq_event_wait = 0.1
|
||||||
logpath = '/var/log/airtime/media-monitor/media-monitor.log'
|
logpath = '/var/log/airtime/media-monitor/media-monitor.log'
|
||||||
index_path = '/var/tmp/airtime/media-monitor/last_index'
|
index_path = '/var/tmp/airtime/media-monitor/last_index'
|
||||||
|
|
||||||
|
|
|
@ -47,5 +47,6 @@ class EventContractor(Loggable):
|
||||||
return True # We actually added something, hence we return true.
|
return True # We actually added something, hence we return true.
|
||||||
|
|
||||||
def __unregister(self, evt):
|
def __unregister(self, evt):
|
||||||
|
self.logger.info("Unregistering. Left: '%d'" % len(self.store.keys()))
|
||||||
try: del self.store[evt.path]
|
try: del self.store[evt.path]
|
||||||
except Exception as e: self.unexpected_exception(e)
|
except Exception as e: self.unexpected_exception(e)
|
||||||
|
|
|
@ -61,8 +61,7 @@ def mediate_ignored(fn):
|
||||||
return wrapped
|
return wrapped
|
||||||
|
|
||||||
class BaseListener(object):
|
class BaseListener(object):
|
||||||
def my_init(self, signal):
|
def my_init(self, signal): self.signal = signal
|
||||||
self.signal = signal
|
|
||||||
|
|
||||||
class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
|
class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
|
||||||
def process_IN_CLOSE_WRITE(self, event):
|
def process_IN_CLOSE_WRITE(self, event):
|
||||||
|
@ -71,6 +70,9 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
|
||||||
def process_IN_MOVED_TO(self, event):
|
def process_IN_MOVED_TO(self, event):
|
||||||
self.process_to_organize(event)
|
self.process_to_organize(event)
|
||||||
|
|
||||||
|
def process_default(self, event):
|
||||||
|
self.logger.info("===> Not handling: '%s'" % str(event))
|
||||||
|
|
||||||
def flush_events(self, path):
|
def flush_events(self, path):
|
||||||
"""
|
"""
|
||||||
organize the whole directory at path. (pretty much by doing what
|
organize the whole directory at path. (pretty much by doing what
|
||||||
|
@ -85,7 +87,6 @@ class OrganizeListener(BaseListener, pyinotify.ProcessEvent, Loggable):
|
||||||
flushed += 1
|
flushed += 1
|
||||||
self.logger.info("Flushed organized directory with %d files" % flushed)
|
self.logger.info("Flushed organized directory with %d files" % flushed)
|
||||||
|
|
||||||
@mediate_ignored
|
|
||||||
@IncludeOnly(mmp.supported_extensions)
|
@IncludeOnly(mmp.supported_extensions)
|
||||||
def process_to_organize(self, event):
|
def process_to_organize(self, event):
|
||||||
dispatcher.send(signal=self.signal, sender=self,
|
dispatcher.send(signal=self.signal, sender=self,
|
||||||
|
|
|
@ -67,6 +67,8 @@ class Manager(Loggable):
|
||||||
del(self.__wd_path[path])
|
del(self.__wd_path[path])
|
||||||
|
|
||||||
def __add_watch(self,path,listener):
|
def __add_watch(self,path,listener):
|
||||||
|
self.logger.info("Adding listener '%s' to '%s'" %
|
||||||
|
( listener.__class__.__name__, path) )
|
||||||
wd = self.wm.add_watch(path, pyinotify.ALL_EVENTS, rec=True,
|
wd = self.wm.add_watch(path, pyinotify.ALL_EVENTS, rec=True,
|
||||||
auto_add=True, proc_fun=listener)
|
auto_add=True, proc_fun=listener)
|
||||||
self.__wd_path[path] = wd.values()[0]
|
self.__wd_path[path] = wd.values()[0]
|
||||||
|
|
|
@ -15,16 +15,16 @@ class Organizer(ReportHandler,Loggable):
|
||||||
StoreWatchListener)
|
StoreWatchListener)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
_instance = None
|
#_instance = None
|
||||||
def __new__(cls, channel, target_path, recorded_path):
|
#def __new__(cls, channel, target_path, recorded_path):
|
||||||
if cls._instance:
|
#if cls._instance:
|
||||||
cls._instance.channel = channel
|
#cls._instance.channel = channel
|
||||||
cls._instance.target_path = target_path
|
#cls._instance.target_path = target_path
|
||||||
cls._instance.recorded_path = recorded_path
|
#cls._instance.recorded_path = recorded_path
|
||||||
else:
|
#else:
|
||||||
cls._instance = super(Organizer, cls).__new__( cls, channel,
|
#cls._instance = super(Organizer, cls).__new__( cls, channel,
|
||||||
target_path, recorded_path)
|
#target_path, recorded_path)
|
||||||
return cls._instance
|
#return cls._instance
|
||||||
|
|
||||||
def __init__(self, channel, target_path, recorded_path):
|
def __init__(self, channel, target_path, recorded_path):
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
|
|
|
@ -124,8 +124,9 @@ class WatchSyncer(ReportHandler,Loggable):
|
||||||
try:
|
try:
|
||||||
# If there is a strange bug anywhere in the code the next line
|
# If there is a strange bug anywhere in the code the next line
|
||||||
# should be a suspect
|
# should be a suspect
|
||||||
if self.contractor.register( event ):
|
#if self.contractor.register( event ):
|
||||||
self.push_queue( event )
|
#self.push_queue( event )
|
||||||
|
self.push_queue( event )
|
||||||
except BadSongFile as e:
|
except BadSongFile as e:
|
||||||
self.fatal_exception("Received bas song file '%s'" % e.path, e)
|
self.fatal_exception("Received bas song file '%s'" % e.path, e)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -18,7 +18,7 @@ class TestReceiver(unittest.TestCase):
|
||||||
|
|
||||||
def test_supported(self):
|
def test_supported(self):
|
||||||
# Every supported message should fire something
|
# Every supported message should fire something
|
||||||
for event_type in self.amr.dispatch_tables.keys():
|
for event_type in self.amr.dispatch_table.keys():
|
||||||
msg = { 'event_type' : event_type, 'extra_param' : 123 }
|
msg = { 'event_type' : event_type, 'extra_param' : 123 }
|
||||||
filtered = filter_ev(msg)
|
filtered = filter_ev(msg)
|
||||||
# There should be a better way to test the following without
|
# There should be a better way to test the following without
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
def notify(m)
|
def notify(m)
|
||||||
current_media_id := string_of(m['schedule_table_id'])
|
#current_media_id := string_of(m['schedule_table_id'])
|
||||||
command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --data='#{!pypo_data}' --media-id=#{m['schedule_table_id']} &"
|
command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --data='#{!pypo_data}' --media-id=#{m['schedule_table_id']} &"
|
||||||
log(command)
|
log(command)
|
||||||
system(command)
|
system(command)
|
||||||
|
@ -7,7 +7,7 @@ end
|
||||||
|
|
||||||
def notify_stream(m)
|
def notify_stream(m)
|
||||||
json_str = string.replace(pattern="\n",(fun (s) -> ""), json_of(m))
|
json_str = string.replace(pattern="\n",(fun (s) -> ""), json_of(m))
|
||||||
command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --webstream='#{json_str}' --media-id=#{!current_media_id} &"
|
command = "/usr/lib/airtime/pypo/bin/liquidsoap_scripts/notify.sh --webstream='#{json_str}' --media-id=#{!current_dyn_id} &"
|
||||||
log(command)
|
log(command)
|
||||||
system(command)
|
system(command)
|
||||||
end
|
end
|
||||||
|
@ -253,7 +253,7 @@ end
|
||||||
# Function to create a playlist source and output it.
|
# Function to create a playlist source and output it.
|
||||||
def create_dynamic_source(uri) =
|
def create_dynamic_source(uri) =
|
||||||
# The playlist source
|
# The playlist source
|
||||||
s = input.http(uri)
|
s = input.http(buffer=2., max=12., uri)
|
||||||
|
|
||||||
# The output
|
# The output
|
||||||
active_dyn_out = dyn_out(s)
|
active_dyn_out = dyn_out(s)
|
||||||
|
|
|
@ -8,9 +8,9 @@ set("server.telnet.port", 1234)
|
||||||
|
|
||||||
#Dynamic source list
|
#Dynamic source list
|
||||||
dyn_sources = ref []
|
dyn_sources = ref []
|
||||||
|
webstream_enabled = ref false
|
||||||
|
|
||||||
time = ref string_of(gettimeofday())
|
time = ref string_of(gettimeofday())
|
||||||
|
|
||||||
queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
|
queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
|
||||||
queue = cue_cut(queue)
|
queue = cue_cut(queue)
|
||||||
queue = amplify(1., override="replay_gain", queue)
|
queue = amplify(1., override="replay_gain", queue)
|
||||||
|
@ -27,7 +27,7 @@ stream_metadata_type = ref 0
|
||||||
default_dj_fade = ref 0.
|
default_dj_fade = ref 0.
|
||||||
station_name = ref ''
|
station_name = ref ''
|
||||||
show_name = ref ''
|
show_name = ref ''
|
||||||
current_media_id = ref ''
|
#current_media_id = ref ''
|
||||||
|
|
||||||
s1_connected = ref ''
|
s1_connected = ref ''
|
||||||
s2_connected = ref ''
|
s2_connected = ref ''
|
||||||
|
@ -39,7 +39,7 @@ just_switched = ref false
|
||||||
|
|
||||||
%include "ls_lib.liq"
|
%include "ls_lib.liq"
|
||||||
|
|
||||||
web_stream = input.harbor("test-harbor",port=8999,password="hackme")
|
web_stream = mksafe(input.harbor("test-harbor", port=8999, password="hackme"))
|
||||||
web_stream = on_metadata(notify_stream, web_stream)
|
web_stream = on_metadata(notify_stream, web_stream)
|
||||||
|
|
||||||
queue = on_metadata(notify, queue)
|
queue = on_metadata(notify, queue)
|
||||||
|
@ -47,16 +47,43 @@ queue = map_metadata(append_title, queue)
|
||||||
# the crossfade function controls fade in/out
|
# the crossfade function controls fade in/out
|
||||||
queue = crossfade(queue)
|
queue = crossfade(queue)
|
||||||
|
|
||||||
queue = fallback([web_stream, queue])
|
queue = switch(id="stream_queue_switch", track_sensitive=false, [({!webstream_enabled},web_stream), ({true}, queue)])
|
||||||
|
|
||||||
ignore(output.dummy(queue, fallible=true))
|
ignore(output.dummy(queue, fallible=true))
|
||||||
|
|
||||||
server.register(namespace="vars", "pypo_data", fun (s) -> begin pypo_data := s "Done" end)
|
server.register(namespace="vars",
|
||||||
server.register(namespace="vars", "stream_metadata_type", fun (s) -> begin stream_metadata_type := int_of_string(s) s end)
|
"pypo_data",
|
||||||
server.register(namespace="vars", "show_name", fun (s) -> begin show_name := s s end)
|
fun (s) -> begin pypo_data := s "Done" end)
|
||||||
server.register(namespace="vars", "station_name", fun (s) -> begin station_name := s s end)
|
server.register(namespace="vars",
|
||||||
server.register(namespace="vars", "bootup_time", fun (s) -> begin time := s s end)
|
"stream_metadata_type",
|
||||||
server.register(namespace="streams", "connection_status", fun (s) -> begin "1:#{!s1_connected},2:#{!s2_connected},3:#{!s3_connected}" end)
|
fun (s) -> begin stream_metadata_type := int_of_string(s) s end)
|
||||||
server.register(namespace="vars", "default_dj_fade", fun (s) -> begin default_dj_fade := float_of_string(s) s end)
|
server.register(namespace="vars",
|
||||||
|
"show_name",
|
||||||
|
fun (s) -> begin show_name := s s end)
|
||||||
|
server.register(namespace="vars",
|
||||||
|
"station_name",
|
||||||
|
fun (s) -> begin station_name := s s end)
|
||||||
|
server.register(namespace="vars",
|
||||||
|
"bootup_time",
|
||||||
|
fun (s) -> begin time := s s end)
|
||||||
|
server.register(namespace="streams",
|
||||||
|
"connection_status",
|
||||||
|
fun (s) -> begin "1:#{!s1_connected},2:#{!s2_connected},3:#{!s3_connected}" end)
|
||||||
|
server.register(namespace="vars",
|
||||||
|
"default_dj_fade",
|
||||||
|
fun (s) -> begin default_dj_fade := float_of_string(s) s end)
|
||||||
|
|
||||||
|
server.register(namespace="dynamic_source",
|
||||||
|
description="Enable webstream output",
|
||||||
|
usage='start',
|
||||||
|
"output_start",
|
||||||
|
fun (s) -> begin webstream_enabled := true "enabled" end)
|
||||||
|
server.register(namespace="dynamic_source",
|
||||||
|
description="Enable webstream output",
|
||||||
|
usage='stop',
|
||||||
|
"output_stop",
|
||||||
|
fun (s) -> begin webstream_enabled := false "disabled" end)
|
||||||
|
|
||||||
server.register(namespace="dynamic_source",
|
server.register(namespace="dynamic_source",
|
||||||
description="Set the cc_schedule row id",
|
description="Set the cc_schedule row id",
|
||||||
usage="id <id>",
|
usage="id <id>",
|
||||||
|
@ -65,12 +92,12 @@ server.register(namespace="dynamic_source",
|
||||||
server.register(namespace="dynamic_source",
|
server.register(namespace="dynamic_source",
|
||||||
description="Start a new dynamic source.",
|
description="Start a new dynamic source.",
|
||||||
usage="start <uri>",
|
usage="start <uri>",
|
||||||
"start",
|
"read_start",
|
||||||
create_dynamic_source)
|
create_dynamic_source)
|
||||||
server.register(namespace="dynamic_source",
|
server.register(namespace="dynamic_source",
|
||||||
description="Stop a dynamic source.",
|
description="Stop a dynamic source.",
|
||||||
usage="stop <uri>",
|
usage="stop <uri>",
|
||||||
"stop",
|
"read_stop",
|
||||||
destroy_dynamic_source)
|
destroy_dynamic_source)
|
||||||
|
|
||||||
default = amplify(id="silence_src", 0.00001, noise())
|
default = amplify(id="silence_src", 0.00001, noise())
|
||||||
|
|
|
@ -386,31 +386,46 @@ class PypoPush(Thread):
|
||||||
PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj")
|
PypoFetch.disconnect_source(self.logger, self.telnet_lock, "live_dj")
|
||||||
elif media_item['event_type'] == "switch_off":
|
elif media_item['event_type'] == "switch_off":
|
||||||
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
|
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
|
||||||
|
elif media_item['type'] == 'stream_buffer_start':
|
||||||
|
self.start_web_stream_buffer(media_item)
|
||||||
elif media_item['type'] == "stream":
|
elif media_item['type'] == "stream":
|
||||||
"""
|
|
||||||
Source is a stream that we need to being downloading to a file. Then we may simply
|
|
||||||
point Liquidsoap to play this file.
|
|
||||||
"""
|
|
||||||
self.start_web_stream(media_item)
|
self.start_web_stream(media_item)
|
||||||
elif media_item['type'] == "stream_end":
|
elif media_item['type'] == "stream_end":
|
||||||
self.stop_web_stream(media_item)
|
self.stop_web_stream(media_item)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error('Pypo Push Exception: %s', e)
|
self.logger.error('Pypo Push Exception: %s', e)
|
||||||
|
|
||||||
|
def start_web_stream_buffer(self, media_item):
|
||||||
|
try:
|
||||||
|
self.telnet_lock.acquire()
|
||||||
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||||
|
|
||||||
|
msg = 'dynamic_source.id %s\n' % media_item['row_id']
|
||||||
|
tn.write(msg)
|
||||||
|
|
||||||
|
#example: dynamic_source.read_start http://87.230.101.24:80/top100station.mp3
|
||||||
|
msg = 'dynamic_source.read_start %s\n' % media_item['uri'].encode('latin-1')
|
||||||
|
self.logger.debug(msg)
|
||||||
|
tn.write(msg)
|
||||||
|
|
||||||
|
tn.write("exit\n")
|
||||||
|
self.logger.debug(tn.read_all())
|
||||||
|
except Exception, e:
|
||||||
|
self.logger.error(str(e))
|
||||||
|
finally:
|
||||||
|
self.telnet_lock.release()
|
||||||
|
|
||||||
|
|
||||||
def start_web_stream(self, media_item):
|
def start_web_stream(self, media_item):
|
||||||
try:
|
try:
|
||||||
self.telnet_lock.acquire()
|
self.telnet_lock.acquire()
|
||||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||||
#dynamic_source.start http://87.230.101.24:80/top100station.mp3
|
|
||||||
|
|
||||||
msg = 'dynamic_source.id %s\n' % media_item['row_id']
|
|
||||||
tn.write(msg)
|
|
||||||
|
|
||||||
#TODO: DO we need this?
|
#TODO: DO we need this?
|
||||||
msg = 'streams.scheduled_play_start\n'
|
msg = 'streams.scheduled_play_start\n'
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
msg = 'dynamic_source.start %s\n' % media_item['uri'].encode('latin-1')
|
msg = 'dynamic_source.output_start\n'
|
||||||
self.logger.debug(msg)
|
self.logger.debug(msg)
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
|
@ -429,7 +444,11 @@ class PypoPush(Thread):
|
||||||
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
|
||||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||||
|
|
||||||
msg = 'dynamic_source.stop %s\n' % media_item['uri'].encode('latin-1')
|
msg = 'dynamic_source.read_stop %s\n' % media_item['uri'].encode('latin-1')
|
||||||
|
self.logger.debug(msg)
|
||||||
|
tn.write(msg)
|
||||||
|
|
||||||
|
msg = 'dynamic_source.output_stop\n'
|
||||||
self.logger.debug(msg)
|
self.logger.debug(msg)
|
||||||
tn.write(msg)
|
tn.write(msg)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue