Merge branch '2.3.x' into devel

This commit is contained in:
Martin Konecny 2013-03-05 16:21:22 -05:00
commit cc69418215
29 changed files with 218 additions and 169 deletions

View file

@ -92,7 +92,11 @@ class ApiRequest(object):
raise
# Ghetto hack for now because we don't the content type we are getting
# (Pointless to look at mime since it's not being set correctly always)
return json.loads(response)
try:
return json.loads(response)
except Exception:
self.logger.error(response)
raise
def req(self, *args, **kwargs):
self.__req = lambda : self(*args, **kwargs)

View file

@ -57,14 +57,18 @@ def get_file_type(file_path):
file_type = 'vorbis'
elif re.search(r'flac$', file_path, re.IGNORECASE):
file_type = 'flac'
elif re.search(r'(mp4|m4a)$', file_path, re.IGNORECASE):
file_type = 'mp4'
else:
mime_type = get_mime_type(file_path) == "audio/mpeg"
if 'mpeg' in mime_type:
file_type = 'mp3'
elif 'ogg' in mime_type:
elif 'ogg' in mime_type or "oga" in mime_type:
file_type = 'vorbis'
elif 'flac' in mime_type:
file_type = 'flac'
elif 'mp4' in mime_type or "m4a" in mime_type:
file_type = 'mp4'
return file_type
@ -109,6 +113,12 @@ def calculate_replay_gain(file_path):
search = re.search(r'REPLAYGAIN_TRACK_GAIN=(.*) dB', out)
else:
logger.warn("metaflac not found")
elif file_type == 'mp4':
if run_process("which aacgain > /dev/null") == 0:
out = get_process_output('aacgain -q "%s" 2> /dev/null' % temp_file_path)
search = re.search(r'Recommended "Track" dB change: (.*)', out)
else:
logger.warn("aacgain not found")
else:
pass

View file

@ -170,18 +170,17 @@ def normalize_mutagen(path):
md['mime'] = m.mime[0] if len(m.mime) > 0 else u''
md['path'] = normpath(path)
# silence detect(set default queue in and out)
try:
command = ['silan', '-f', 'JSON', md['path']]
proc = subprocess.Popen(command, stdout=subprocess.PIPE)
out = proc.communicate()[0].strip('\r\n')
# silence detect(set default cue in and out)
#try:
#command = ['silan', '-b', '-f', 'JSON', md['path']]
#proc = subprocess.Popen(command, stdout=subprocess.PIPE)
#out = proc.communicate()[0].strip('\r\n')
info = json.loads(out)
md['cuein'] = info['sound'][0][0]
md['cueout'] = info['sound'][-1][1]
except Exception:
logger = logging.getLogger()
logger.info('silan is missing')
#info = json.loads(out)
#md['cuein'] = info['sound'][0][0]
#md['cueout'] = info['sound'][0][1]
#except Exception:
#self.logger.debug('silan is missing')
if 'title' not in md: md['title'] = u''
return md

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from kombu.messaging import Exchange, Queue, Consumer
from kombu.connection import BrokerConnection
from kombu.simple import SimpleQueue
from os.path import normpath
import json
@ -24,35 +25,43 @@ class AirtimeNotifier(Loggable):
"""
def __init__(self, cfg, message_receiver):
self.cfg = cfg
self.handler = message_receiver
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
def init_rabbit_mq(self):
try:
self.handler = message_receiver
self.logger.info("Initializing RabbitMQ message consumer...")
schedule_exchange = Exchange("airtime-media-monitor", "direct",
durable=True, auto_delete=True)
schedule_queue = Queue("media-monitor", exchange=schedule_exchange,
key="filesystem")
self.connection = BrokerConnection(cfg["rabbitmq_host"],
cfg["rabbitmq_user"], cfg["rabbitmq_password"],
cfg["rabbitmq_vhost"])
self.connection = BrokerConnection(self.cfg["rabbitmq_host"],
self.cfg["rabbitmq_user"], self.cfg["rabbitmq_password"],
self.cfg["rabbitmq_vhost"])
channel = self.connection.channel()
consumer = Consumer(channel, schedule_queue)
consumer.register_callback(self.handle_message)
consumer.consume()
self.simple_queue = SimpleQueue(channel, schedule_queue)
self.logger.info("Initialized RabbitMQ consumer.")
except Exception as e:
self.logger.info("Failed to initialize RabbitMQ consumer")
self.logger.error(e)
return False
def handle_message(self, body, message):
return True
def handle_message(self, message):
"""
Messages received from RabbitMQ are handled here. These messages
instruct media-monitor of events such as a new directory being watched,
file metadata has been changed, or any other changes to the config of
media-monitor via the web UI.
"""
message.ack()
self.logger.info("Received md from RabbitMQ: %s" % str(body))
m = json.loads(message.body)
self.logger.info("Received md from RabbitMQ: %s" % str(message))
m = json.loads(message)
# TODO : normalize any other keys that could be used to pass
# directories
if 'directory' in m: m['directory'] = normpath(m['directory'])

View file

@ -1,19 +1,26 @@
import socket
import time
from media.monitor.log import Loggable
from media.monitor.toucher import RepeatTimer
from amqplib.client_0_8.exceptions import AMQPConnectionException
class EventDrainer(Loggable):
"""
Flushes events from RabbitMQ that are sent from airtime every
certain amount of time
"""
def __init__(self, connection, interval=1):
def __init__(self, airtime_notifier, interval=1):
def cb():
# TODO : make 0.3 parameter configurable
try : connection.drain_events(timeout=0.3)
except socket.timeout : pass
except Exception as e :
self.fatal_exception("Error flushing events", e)
try:
message = airtime_notifier.simple_queue.get(block=True)
airtime_notifier.handle_message(message.payload)
message.ack()
except (IOError, AttributeError, AMQPConnectionException), e:
self.logger.error('Exception: %s', e)
while not airtime_notifier.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. \
Trying again in few seconds")
time.sleep(5)
t = RepeatTimer(interval, cb)
t.daemon = True

View file

@ -166,7 +166,8 @@ def walk_supported(directory, clean_empties=False):
def file_locked(path):
proc = Popen(["lsof", path], stdout=PIPE)
#Capture stderr to avoid polluting py-interpreter.log
proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE)
out = proc.communicate()[0].strip('\r\n')
return bool(out)

View file

@ -75,7 +75,7 @@ class MM2(InstanceThread, Loggable):
airtime_receiver.new_watch({ 'directory':watch_dir }, restart=True)
else: self.logger.info("Failed to add watch on %s" % str(watch_dir))
EventDrainer(airtime_notifier.connection,
EventDrainer(airtime_notifier,
interval=float(config['rmq_event_wait']))
# Launch the toucher that updates the last time when the script was

View file

@ -6,7 +6,7 @@ virtualenv_bin="/usr/lib/airtime/airtime_virtualenv/bin/"
ls_user="pypo"
export HOME="/var/tmp/airtime/pypo/"
api_client_path="/usr/lib/airtime/"
ls_path="/usr/bin/airtime-liquidsoap --verbose -f"
ls_path="/usr/bin/airtime-liquidsoap --verbose -f -d"
ls_param="/usr/lib/airtime/pypo/bin/liquidsoap_scripts/ls_script.liq"
exec 2>&1

View file

@ -20,8 +20,11 @@ start () {
chown pypo:pypo /var/log/airtime/pypo
chown pypo:pypo /var/log/airtime/pypo-liquidsoap
start-stop-daemon --start --background --quiet --chuid $USERID:$GROUPID \
--nicelevel -15 --make-pidfile --pidfile $PIDFILE --startas $DAEMON
touch /var/run/airtime-liquidsoap.pid
chown pypo:pypo /var/run/airtime-liquidsoap.pid
start-stop-daemon --start --quiet --chuid $USERID:$GROUPID \
--pidfile /var/run/airtime-liquidsoap.pid --nicelevel -15 --startas $DAEMON
monit monitor airtime-liquidsoap >/dev/null 2>&1
}

View file

@ -6,6 +6,7 @@ import sys
import subprocess
import random
import string
import re
from configobj import ConfigObj
if os.geteuid() != 0:
@ -36,6 +37,30 @@ def get_rand_string(length=10):
def get_rand_string(length=10):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(length))
def get_monit_version():
version = 0
try:
p = subprocess.Popen(['monit', '-V'], stdout=subprocess.PIPE)
out = p.communicate()[0].strip()
search = re.search(r'This is Monit version (.*)\n', out, re.IGNORECASE)
if search:
matches = search.groups()
if len(matches) == 1:
version = matches[0]
except Exception:
print "Could not get monit version"
return version
#return 1 if version1 > version2
#return 0 if version1 == version2
#return -1 if version1 < version2
def version_compare(version1, version2):
def normalize(v):
return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")]
return cmp(normalize(version1), normalize(version2))
PATH_INI_FILE = '/etc/airtime/pypo.cfg'
try:
@ -63,9 +88,15 @@ try:
#copy monit files
shutil.copy('%s/../../monit/monit-airtime-generic.cfg'%current_script_dir, '/etc/monit/conf.d/')
subprocess.call('sed -i "s/\$admin_pass/%s/g" /etc/monit/conf.d/monit-airtime-generic.cfg' % get_rand_string(), shell=True)
shutil.copy('%s/../../monit/monit-airtime-rabbitmq-server.cfg'%current_script_dir, '/etc/monit/conf.d/')
shutil.copy('%s/../monit-airtime-liquidsoap.cfg'%current_script_dir, '/etc/monit/conf.d/')
monit_version = get_monit_version()
if version_compare(monit_version, "5.3.0") >= 0:
shutil.copy('%s/../monit-airtime-liquidsoap.cfg' % current_script_dir, \
'/etc/monit/conf.d/monit-airtime-liquidsoap.cfg')
else:
shutil.copy('%s/../monit-pre530-airtime-liquidsoap.cfg' % current_script_dir, \
'/etc/monit/conf.d/monit-airtime-liquidsoap.cfg')
shutil.copy('%s/../monit-airtime-playout.cfg'%current_script_dir, '/etc/monit/conf.d/')
#create pypo log dir

View file

@ -49,7 +49,6 @@ try:
remove_file("/etc/monit/conf.d/monit-airtime-playout.cfg")
remove_file("/etc/monit/conf.d/monit-airtime-liquidsoap.cfg")
remove_file("/etc/monit/conf.d/monit-airtime-generic.cfg")
remove_file("/etc/monit/conf.d/monit-airtime-rabbitmq-server.cfg")
except Exception, e:
print e

View file

@ -1,10 +1,12 @@
%include "library/pervasives.liq"
%include "/etc/airtime/liquidsoap.cfg"
set("log.file.path", log_file)
set("log.stdout", true)
set("server.telnet", true)
set("server.telnet.port", 1234)
set("init.daemon.pidfile.path", "/var/run/airtime-liquidsoap.pid")
%include "library/pervasives.liq"
#Dynamic source list
#dyn_sources = ref []
@ -31,8 +33,6 @@ s2_namespace = ref ''
s3_namespace = ref ''
just_switched = ref false
#stream_harbor_pass = list.hd(get_process_lines('pwgen -s -N 1 -n 20'))
%include "ls_lib.liq"
queue = audio_to_stereo(id="queue_src", request.equeue(id="queue", length=0.5))
@ -78,8 +78,8 @@ server.register(namespace="dynamic_source",
description="Enable webstream output",
usage='start',
"output_start",
fun (s) -> begin log("dynamic_source.output_start")
notify([("schedule_table_id", !current_dyn_id)])
fun (s) -> begin log("dynamic_source.output_start")
notify([("schedule_table_id", !current_dyn_id)])
webstream_enabled := true "enabled" end)
server.register(namespace="dynamic_source",
description="Enable webstream output",
@ -195,95 +195,58 @@ def check_dj_client(user,password) =
hd == "True"
end
def append_dj_inputs(master_harbor_input_port,
master_harbor_input_mount_point,
dj_harbor_input_port,
dj_harbor_input_mount_point,
s) =
if master_harbor_input_port != 0
and master_harbor_input_mount_point != ""
and dj_harbor_input_port != 0
and dj_harbor_input_mount_point != "" then
s = switch(id="schedule_noise_switch",
track_sensitive=false,
transitions=[transition_default, transition],
[({!scheduled_play_enabled}, stream_queue), ({true}, default)]
)
master_dj = mksafe(
audio_to_stereo(
input.harbor(id="master_harbor",
master_harbor_input_mount_point,
port=master_harbor_input_port,
auth=check_master_dj_client,
max=40.,
on_connect=master_dj_connect,
on_disconnect=master_dj_disconnect)))
s = if dj_live_stream_port != 0 and dj_live_stream_mp != "" then
dj_live = mksafe(
audio_to_stereo(
input.harbor(id="live_dj_harbor",
dj_live_stream_mp,
port=dj_live_stream_port,
auth=check_dj_client,
max=40.,
on_connect=live_dj_connect,
on_disconnect=live_dj_disconnect)))
dj_live = mksafe(
audio_to_stereo(
input.harbor(id="live_dj_harbor",
dj_harbor_input_mount_point,
port=dj_harbor_input_port,
auth=check_dj_client,
max=40.,
on_connect=live_dj_connect,
on_disconnect=live_dj_disconnect)))
ignore(output.dummy(dj_live, fallible=true))
ignore(output.dummy(master_dj, fallible=true))
ignore(output.dummy(dj_live, fallible=true))
switch(id="master_dj_switch",
track_sensitive=false,
transitions=[transition, transition, transition],
[({!master_dj_enabled},master_dj),
({!live_dj_enabled},dj_live),
({true}, s)])
elsif master_harbor_input_port != 0 and master_harbor_input_mount_point != "" then
master_dj = mksafe(
audio_to_stereo(
input.harbor(id="master_harbor",
master_harbor_input_mount_point,
port=master_harbor_input_port,
auth=check_master_dj_client,
max=40.,
on_connect=master_dj_connect,
on_disconnect=master_dj_disconnect)))
ignore(output.dummy(master_dj, fallible=true))
switch(id="master_dj_switch",
track_sensitive=false,
transitions=[transition, transition],
[({!master_dj_enabled},master_dj), ({true}, s)])
elsif dj_harbor_input_port != 0 and dj_harbor_input_mount_point != "" then
dj_live = mksafe(
audio_to_stereo(
input.harbor(id="live_dj_harbor",
dj_harbor_input_mount_point,
port=dj_harbor_input_port,
auth=check_dj_client,
max=40.,
on_connect=live_dj_connect,
on_disconnect=live_dj_disconnect)))
ignore(output.dummy(dj_live, fallible=true))
switch(id="live_dj_switch",
track_sensitive=false,
transitions=[transition, transition],
[({!live_dj_enabled},dj_live), ({true}, s)])
else
s
end
switch(id="show_schedule_noise_switch",
track_sensitive=false,
transitions=[transition, transition],
[({!live_dj_enabled}, dj_live), ({true}, s)]
)
else
s
end
s = switch(id="default_switch", track_sensitive=false,
transitions=[transition_default, transition],
[({!scheduled_play_enabled}, stream_queue),({true},default)])
s = if master_live_stream_port != 0 and master_live_stream_mp != "" then
master_dj = mksafe(
audio_to_stereo(
input.harbor(id="master_harbor",
master_live_stream_mp,
port=master_live_stream_port,
auth=check_master_dj_client,
max=40.,
on_connect=master_dj_connect,
on_disconnect=master_dj_disconnect)))
ignore(output.dummy(master_dj, fallible=true))
switch(id="master_show_schedule_noise_switch",
track_sensitive=false,
transitions=[transition, transition],
[({!master_dj_enabled}, master_dj), ({true}, s)]
)
else
s
end
s = append_dj_inputs(master_live_stream_port, master_live_stream_mp,
dj_live_stream_port, dj_live_stream_mp, s)
# Attach a skip command to the source s:
add_skip_command(s)
server.register(namespace="streams",

View file

@ -102,7 +102,6 @@ class ListenerStat(Thread):
stats.append(self.get_shoutcast_stats(v))
self.update_listener_stat_error(v["mount"], 'OK')
except Exception, e:
self.logger.error('Exception: %s', e)
try:
self.update_listener_stat_error(v["mount"], str(e))
except Exception, e:
@ -126,13 +125,9 @@ class ListenerStat(Thread):
while True:
try:
stream_parameters = self.get_stream_parameters()
stats = self.get_stream_stats(stream_parameters["stream_params"])
self.logger.debug(stats)
if not stats:
self.logger.error("Not able to get listener stats")
else:
if stats:
self.push_stream_stats(stats)
except Exception, e:
self.logger.error('Exception: %s', e)

View file

@ -55,6 +55,7 @@ class ReplayGainUpdater(Thread):
for f in files:
full_path = os.path.join(dir_path, f['fp'])
processed_data.append((f['id'], replaygain.calculate_replay_gain(full_path)))
total += 1
try:
self.api_client.update_replay_gain_values(processed_data)

View file

@ -1,4 +1,4 @@
set daemon 10 # Poll at 5 second intervals
set daemon 15 # Poll at 5 second intervals
set logfile /var/log/monit.log
set httpd port 2812
@ -7,3 +7,10 @@
with pidfile "/var/run/airtime-liquidsoap.pid"
start program = "/etc/init.d/airtime-liquidsoap start" with timeout 5 seconds
stop program = "/etc/init.d/airtime-liquidsoap stop"
if mem > 600 MB for 3 cycles then restart
if failed host localhost port 1234
send "version\r\nexit\r\n"
expect "Liquidsoap"
retry 3
then restart

View file

@ -0,0 +1,9 @@
set daemon 15 # Poll at 5 second intervals
set logfile /var/log/monit.log
set httpd port 2812
check process airtime-liquidsoap
with pidfile "/var/run/airtime-liquidsoap.pid"
start program = "/etc/init.d/airtime-liquidsoap start" with timeout 5 seconds
stop program = "/etc/init.d/airtime-liquidsoap stop"

View file

@ -14,6 +14,7 @@ from Queue import Empty
from api_clients import api_client
from std_err_override import LogWriter
from subprocess import Popen, PIPE
from configobj import ConfigObj
@ -501,6 +502,12 @@ class PypoFetch(Thread):
try: self.cache_cleanup(media)
except Exception, e: self.logger.error("%s", e)
def is_file_opened(self, path):
#Capture stderr to avoid polluting py-interpreter.log
proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE)
out = proc.communicate()[0].strip()
return bool(out)
def cache_cleanup(self, media):
"""
Get list of all files in the cache dir and remove them if they aren't being used anymore.
@ -521,8 +528,14 @@ class PypoFetch(Thread):
self.logger.debug("Files to remove " + str(expired_files))
for f in expired_files:
try:
self.logger.debug("Removing %s" % os.path.join(self.cache_dir, f))
os.remove(os.path.join(self.cache_dir, f))
path = os.path.join(self.cache_dir, f)
self.logger.debug("Removing %s" % path)
#check if this file is opened (sometimes Liquidsoap is still
#playing the file due to our knowledge of the track length
#being incorrect!)
if not self.is_file_opened():
os.remove(path)
except Exception, e:
self.logger.error(e)

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import logging
import traceback
import sys
from configobj import ConfigObj
from threading import Thread
@ -9,6 +10,7 @@ import time
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue
from kombu.simple import SimpleQueue
from amqplib.client_0_8.exceptions import AMQPConnectionException
import json
from std_err_override import LogWriter
@ -111,11 +113,9 @@ class PypoMessageHandler(Thread):
self.handle_message(message.payload)
# ACK the message to take it off the queue
message.ack()
except (IOError, AttributeError), e:
import traceback
top = traceback.format_exc()
except (IOError, AttributeError, AMQPConnectionException), e:
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", top)
self.logger.error("traceback: %s", traceback.format_exc())
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)

View file

@ -8,5 +8,5 @@ poster==0.8.1
pytz==2011k
wsgiref==0.1.2
configobj==4.7.2
mutagen==1.20
mutagen==1.21
docopt==0.4.2

View file

@ -50,18 +50,3 @@ fi
echo -e "\n*** Installing Python Libraries ***"
/usr/lib/airtime/airtime_virtualenv/bin/pip install ${SCRIPTPATH}/airtime_virtual_env.pybundle || exit 1
PYTHON_VERSION=$(python -c "import sys; print 'python%s.%s' % (sys.version_info[0], sys.version_info[1])")
echo -e "\n*** Patching Python Libraries ***"
echo " * Patching virtualenv libraries in /usr/lib/airtime/airtime_virtualenv/lib/$PYTHON_VERSION"
PATCHES=${SCRIPTPATH}/patches/*
for file in $(find $PATCHES -print); do
if [ -d $file ]; then
DIRNAME=$(basename $file)
echo -e "\n ---Applying Patches for $DIRNAME---"
else
patch -N -p7 -i $file -d /usr/lib/airtime/airtime_virtualenv/lib/$PYTHON_VERSION
fi
done
exit 0