more lintian fixes
-mostly lines with length > 79
This commit is contained in:
parent
d0a0487840
commit
e51e046dd0
11 changed files with 162 additions and 113 deletions
|
@ -82,8 +82,8 @@ def calculate_replay_gain(file_path):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
"""
|
"""
|
||||||
Making a duplicate is required because the ReplayGain extraction utilities we use
|
Making a duplicate is required because the ReplayGain extraction
|
||||||
make unwanted modifications to the file.
|
utilities we use make unwanted modifications to the file.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
search = None
|
search = None
|
||||||
|
@ -95,16 +95,18 @@ def calculate_replay_gain(file_path):
|
||||||
if file_type:
|
if file_type:
|
||||||
if file_type == 'mp3':
|
if file_type == 'mp3':
|
||||||
if run_process(['which', 'mp3gain']) == 0:
|
if run_process(['which', 'mp3gain']) == 0:
|
||||||
command = ['nice', '-n', nice_level, 'mp3gain', '-q', temp_file_path]
|
command = ['nice', '-n', nice_level, 'mp3gain', '-q',
|
||||||
|
temp_file_path]
|
||||||
out = get_process_output(command)
|
out = get_process_output(command)
|
||||||
search = re.search(r'Recommended "Track" dB change: (.*)', \
|
search = re.search(r'Recommended "Track" dB change: (.*)',
|
||||||
out)
|
out)
|
||||||
else:
|
else:
|
||||||
logger.warn("mp3gain not found")
|
logger.warn("mp3gain not found")
|
||||||
elif file_type == 'vorbis':
|
elif file_type == 'vorbis':
|
||||||
if run_process(['which', 'ogginfo']) == 0 and \
|
if run_process(['which', 'ogginfo']) == 0 and \
|
||||||
run_process(['which', 'vorbisgain']) == 0:
|
run_process(['which', 'vorbisgain']) == 0:
|
||||||
command = ['nice', '-n', nice_level, 'vorbisgain', '-q', '-f', temp_file_path]
|
command = ['nice', '-n', nice_level, 'vorbisgain', '-q',
|
||||||
|
'-f', temp_file_path]
|
||||||
run_process(command)
|
run_process(command)
|
||||||
|
|
||||||
out = get_process_output(['ogginfo', temp_file_path])
|
out = get_process_output(['ogginfo', temp_file_path])
|
||||||
|
|
|
@ -50,16 +50,19 @@ class ReplayGainUpdater(Thread):
|
||||||
# return a list of pairs where the first value is the
|
# return a list of pairs where the first value is the
|
||||||
# file's database row id and the second value is the
|
# file's database row id and the second value is the
|
||||||
# filepath
|
# filepath
|
||||||
files = self.api_client.get_files_without_replay_gain_value(dir_id)
|
files = self.api_client.\
|
||||||
|
get_files_without_replay_gain_value(dir_id)
|
||||||
processed_data = []
|
processed_data = []
|
||||||
for f in files:
|
for f in files:
|
||||||
full_path = os.path.join(dir_path, f['fp'])
|
full_path = os.path.join(dir_path, f['fp'])
|
||||||
processed_data.append((f['id'], replaygain.calculate_replay_gain(full_path)))
|
processed_data.append((f['id'],
|
||||||
|
replaygain.calculate_replay_gain(full_path)))
|
||||||
total += 1
|
total += 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if len(processed_data):
|
if len(processed_data):
|
||||||
self.api_client.update_replay_gain_values(processed_data)
|
self.api_client.\
|
||||||
|
update_replay_gain_values(processed_data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(e)
|
self.logger.error(e)
|
||||||
self.logger.debug(traceback.format_exc())
|
self.logger.debug(traceback.format_exc())
|
||||||
|
@ -77,7 +80,8 @@ class ReplayGainUpdater(Thread):
|
||||||
self.main()
|
self.main()
|
||||||
# Sleep for 5 minutes in case new files have been added
|
# Sleep for 5 minutes in case new files have been added
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error('ReplayGainUpdater Exception: %s', traceback.format_exc())
|
self.logger.error('ReplayGainUpdater Exception: %s',
|
||||||
|
traceback.format_exc())
|
||||||
self.logger.error(e)
|
self.logger.error(e)
|
||||||
time.sleep(60 * 5)
|
time.sleep(60 * 5)
|
||||||
|
|
||||||
|
|
|
@ -42,18 +42,23 @@ class SilanAnalyzer(Thread):
|
||||||
# silence detect(set default queue in and out)
|
# silence detect(set default queue in and out)
|
||||||
try:
|
try:
|
||||||
data = {}
|
data = {}
|
||||||
command = ['nice', '-n', '19', 'silan', '-b', '-f', 'JSON', full_path]
|
command = ['nice', '-n', '19', 'silan', '-b', '-f', 'JSON',
|
||||||
|
full_path]
|
||||||
try:
|
try:
|
||||||
proc = subprocess.Popen(command, stdout=subprocess.PIPE)
|
proc = subprocess.Popen(command,
|
||||||
|
stdout=subprocess.PIPE)
|
||||||
comm = proc.communicate()
|
comm = proc.communicate()
|
||||||
if len(comm):
|
if len(comm):
|
||||||
out = comm[0].strip('\r\n')
|
out = comm[0].strip('\r\n')
|
||||||
info = json.loads(out)
|
info = json.loads(out)
|
||||||
try: data['length'] = str('{0:f}'.format(info['file duration']))
|
try: data['length'] = \
|
||||||
|
str('{0:f}'.format(info['file duration']))
|
||||||
except: pass
|
except: pass
|
||||||
try: data['cuein'] = str('{0:f}'.format(info['sound'][0][0]))
|
try: data['cuein'] = \
|
||||||
|
str('{0:f}'.format(info['sound'][0][0]))
|
||||||
except: pass
|
except: pass
|
||||||
try: data['cueout'] = str('{0:f}'.format(info['sound'][-1][1]))
|
try: data['cueout'] = \
|
||||||
|
str('{0:f}'.format(info['sound'][-1][1]))
|
||||||
except: pass
|
except: pass
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error(str(command))
|
self.logger.error(str(command))
|
||||||
|
@ -61,7 +66,9 @@ class SilanAnalyzer(Thread):
|
||||||
processed_data.append((f['id'], data))
|
processed_data.append((f['id'], data))
|
||||||
total += 1
|
total += 1
|
||||||
if total % 5 == 0:
|
if total % 5 == 0:
|
||||||
self.logger.info("Total %s / %s files has been processed.." % (total, total_files))
|
self.logger.info("Total %s / %s files has been" +
|
||||||
|
"processed..",
|
||||||
|
total, total_files)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error(e)
|
self.logger.error(e)
|
||||||
self.logger.error(traceback.format_exc())
|
self.logger.error(traceback.format_exc())
|
||||||
|
@ -80,7 +87,8 @@ class SilanAnalyzer(Thread):
|
||||||
self.logger.info("Running Silan analyzer")
|
self.logger.info("Running Silan analyzer")
|
||||||
self.main()
|
self.main()
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error('Silan Analyzer Exception: %s', traceback.format_exc())
|
self.logger.error('Silan Analyzer Exception: %s',
|
||||||
|
traceback.format_exc())
|
||||||
self.logger.error(e)
|
self.logger.error(e)
|
||||||
self.logger.info("Sleeping for 5...")
|
self.logger.info("Sleeping for 5...")
|
||||||
time.sleep(60 * 5)
|
time.sleep(60 * 5)
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
import sys
|
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import time
|
import time
|
||||||
# For RabbitMQ
|
# For RabbitMQ
|
||||||
|
@ -34,8 +33,11 @@ class PypoMessageHandler(Thread):
|
||||||
def init_rabbit_mq(self):
|
def init_rabbit_mq(self):
|
||||||
self.logger.info("Initializing RabbitMQ stuff")
|
self.logger.info("Initializing RabbitMQ stuff")
|
||||||
try:
|
try:
|
||||||
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
|
schedule_exchange = \
|
||||||
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
|
Exchange("airtime-pypo", "direct",
|
||||||
|
durable=True, auto_delete=True)
|
||||||
|
schedule_queue = \
|
||||||
|
Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
|
||||||
connection = BrokerConnection(self.config["rabbitmq_host"], \
|
connection = BrokerConnection(self.config["rabbitmq_host"], \
|
||||||
self.config["rabbitmq_user"], \
|
self.config["rabbitmq_user"], \
|
||||||
self.config["rabbitmq_password"], \
|
self.config["rabbitmq_password"], \
|
||||||
|
@ -96,7 +98,8 @@ class PypoMessageHandler(Thread):
|
||||||
|
|
||||||
def main(self):
|
def main(self):
|
||||||
while not self.init_rabbit_mq():
|
while not self.init_rabbit_mq():
|
||||||
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
|
self.logger.error("Error connecting to RabbitMQ Server. " +
|
||||||
|
"Trying again in few seconds")
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
loops = 1
|
loops = 1
|
||||||
|
@ -111,7 +114,8 @@ class PypoMessageHandler(Thread):
|
||||||
self.logger.error('Exception: %s', e)
|
self.logger.error('Exception: %s', e)
|
||||||
self.logger.error("traceback: %s", traceback.format_exc())
|
self.logger.error("traceback: %s", traceback.format_exc())
|
||||||
while not self.init_rabbit_mq():
|
while not self.init_rabbit_mq():
|
||||||
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
|
self.logger.error("Error connecting to RabbitMQ Server. " +
|
||||||
|
"Trying again in few seconds")
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -12,16 +12,19 @@
|
||||||
import re
|
import re
|
||||||
|
|
||||||
def version_cmp(version1, version2):
|
def version_cmp(version1, version2):
|
||||||
|
"""Compare version strings such as 1.1.1, and 1.1.2. Returns the same as
|
||||||
|
Python built-in cmp. That is return value is negative if x < y, zero if
|
||||||
|
x == y and strictly positive if x > y."""
|
||||||
def normalize(v):
|
def normalize(v):
|
||||||
return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")]
|
return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")]
|
||||||
return cmp(normalize(version1), normalize(version2))
|
return cmp(normalize(version1), normalize(version2))
|
||||||
|
|
||||||
|
|
||||||
def date_interval_to_seconds(interval):
|
def date_interval_to_seconds(interval):
|
||||||
"""
|
"""Convert timedelta object into int representing the number of seconds. If
|
||||||
Convert timedelta object into int representing the number of seconds. If
|
number of seconds is less than 0, then return 0."""
|
||||||
number of seconds is less than 0, then return 0.
|
seconds = ((interval.microseconds +
|
||||||
"""
|
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6)
|
||||||
seconds = (interval.microseconds + \
|
/ float(10 ** 6))
|
||||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
|
||||||
|
|
||||||
return seconds
|
return seconds
|
||||||
|
|
|
@ -52,7 +52,8 @@ signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||||
POLL_INTERVAL = 1800
|
POLL_INTERVAL = 1800
|
||||||
|
|
||||||
class PypoFetch(Thread):
|
class PypoFetch(Thread):
|
||||||
def __init__(self, pypoFetch_q, pypoPush_q, media_q, pypo_liquidsoap, config):
|
def __init__(self, pypoFetch_q, pypoPush_q, media_q, pypo_liquidsoap,
|
||||||
|
config):
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
|
|
||||||
self.api_client = api_client.AirtimeApiClient()
|
self.api_client = api_client.AirtimeApiClient()
|
||||||
|
@ -90,7 +91,8 @@ class PypoFetch(Thread):
|
||||||
"""
|
"""
|
||||||
def handle_message(self, message):
|
def handle_message(self, message):
|
||||||
try:
|
try:
|
||||||
self.logger.info("Received event from Pypo Message Handler: %s" % message)
|
self.logger.info("Received event from Pypo Message Handler: %s",
|
||||||
|
message)
|
||||||
|
|
||||||
m = json.loads(message)
|
m = json.loads(message)
|
||||||
command = m['event_type']
|
command = m['event_type']
|
||||||
|
@ -135,7 +137,10 @@ class PypoFetch(Thread):
|
||||||
if command == 'update_schedule':
|
if command == 'update_schedule':
|
||||||
self.listener_timeout = POLL_INTERVAL
|
self.listener_timeout = POLL_INTERVAL
|
||||||
else:
|
else:
|
||||||
self.listener_timeout = self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL
|
self.listener_timeout = \
|
||||||
|
(self.last_update_schedule_timestamp -
|
||||||
|
time.time() +
|
||||||
|
POLL_INTERVAL)
|
||||||
if self.listener_timeout < 0:
|
if self.listener_timeout < 0:
|
||||||
self.listener_timeout = 0
|
self.listener_timeout = 0
|
||||||
self.logger.info("New timeout: %s" % self.listener_timeout)
|
self.logger.info("New timeout: %s" % self.listener_timeout)
|
||||||
|
@ -143,11 +148,12 @@ class PypoFetch(Thread):
|
||||||
top = traceback.format_exc()
|
top = traceback.format_exc()
|
||||||
self.logger.error('Exception: %s', e)
|
self.logger.error('Exception: %s', e)
|
||||||
self.logger.error("traceback: %s", top)
|
self.logger.error("traceback: %s", top)
|
||||||
self.logger.error("Exception in handling Message Handler message: %s", e)
|
|
||||||
|
|
||||||
|
|
||||||
def switch_source_temp(self, sourcename, status):
|
def switch_source_temp(self, sourcename, status):
|
||||||
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
|
self.logger.debug('Setting source %s to "%s" status', sourcename,
|
||||||
|
status)
|
||||||
|
|
||||||
command = "streams."
|
command = "streams."
|
||||||
if sourcename == "master_dj":
|
if sourcename == "master_dj":
|
||||||
command += "master_dj_"
|
command += "master_dj_"
|
||||||
|
@ -167,7 +173,7 @@ class PypoFetch(Thread):
|
||||||
Initialize Liquidsoap environment
|
Initialize Liquidsoap environment
|
||||||
"""
|
"""
|
||||||
def set_bootstrap_variables(self):
|
def set_bootstrap_variables(self):
|
||||||
self.logger.debug('Getting information needed on bootstrap from Airtime')
|
self.logger.debug('Getting information needed on bootstrap from DB')
|
||||||
try:
|
try:
|
||||||
info = self.api_client.get_bootstrap_info()
|
info = self.api_client.get_bootstrap_info()
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
|
@ -183,8 +189,12 @@ class PypoFetch(Thread):
|
||||||
station_name = info['station_name']
|
station_name = info['station_name']
|
||||||
fade = info['transition_fade']
|
fade = info['transition_fade']
|
||||||
|
|
||||||
commands.append(('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8'))
|
commands.append(('vars.stream_metadata_type %s\n' % stream_format).\
|
||||||
commands.append(('vars.station_name %s\n' % station_name).encode('utf-8'))
|
encode('utf-8'))
|
||||||
|
|
||||||
|
commands.append(('vars.station_name %s\n' % station_name).\
|
||||||
|
encode('utf-8'))
|
||||||
|
|
||||||
commands.append(('vars.default_dj_fade %s\n' % fade).encode('utf-8'))
|
commands.append(('vars.default_dj_fade %s\n' % fade).encode('utf-8'))
|
||||||
self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands)
|
self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands)
|
||||||
|
|
||||||
|
@ -288,7 +298,8 @@ class PypoFetch(Thread):
|
||||||
change[stream] = True
|
change[stream] = True
|
||||||
|
|
||||||
# set flag change for sound_device alway True
|
# set flag change for sound_device alway True
|
||||||
self.logger.info("Change:%s, State_Change:%s...", change, state_change_restart)
|
self.logger.info("Change:%s, State_Change:%s...", change,
|
||||||
|
state_change_restart)
|
||||||
|
|
||||||
for k, v in state_change_restart.items():
|
for k, v in state_change_restart.items():
|
||||||
if k == "sound_device" and v:
|
if k == "sound_device" and v:
|
||||||
|
@ -327,7 +338,8 @@ class PypoFetch(Thread):
|
||||||
stream_id = info[0]
|
stream_id = info[0]
|
||||||
status = info[1]
|
status = info[1]
|
||||||
if(status == "true"):
|
if(status == "true"):
|
||||||
self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time))
|
self.api_client.notify_liquidsoap_status("OK", stream_id,
|
||||||
|
str(fake_time))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -338,10 +350,11 @@ class PypoFetch(Thread):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Process the schedule
|
Process the schedule
|
||||||
- Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
|
- Reads the scheduled entries of a given range (actual time +/-
|
||||||
- Saves a serialized file of the schedule
|
"prepare_ahead" / "cache_for")
|
||||||
- playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied
|
- playlists are prepared. (brought to liquidsoap format) and, if not
|
||||||
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
|
mounted via nsf, files are copied to the cache dir (Folder-structure:
|
||||||
|
cache/YYYY-MM-DD-hh-mm-ss)
|
||||||
- runs the cleanup routine, to get rid of unused cached files
|
- runs the cleanup routine, to get rid of unused cached files
|
||||||
"""
|
"""
|
||||||
def process_schedule(self, schedule_data):
|
def process_schedule(self, schedule_data):
|
||||||
|
@ -350,9 +363,7 @@ class PypoFetch(Thread):
|
||||||
media = schedule_data["media"]
|
media = schedule_data["media"]
|
||||||
media_filtered = {}
|
media_filtered = {}
|
||||||
|
|
||||||
# Download all the media and put playlists in liquidsoap "annotate" format
|
|
||||||
try:
|
try:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Make sure cache_dir exists
|
Make sure cache_dir exists
|
||||||
"""
|
"""
|
||||||
|
@ -368,16 +379,18 @@ class PypoFetch(Thread):
|
||||||
if (media_item['type'] == 'file'):
|
if (media_item['type'] == 'file'):
|
||||||
self.sanity_check_media_item(media_item)
|
self.sanity_check_media_item(media_item)
|
||||||
fileExt = os.path.splitext(media_item['uri'])[1]
|
fileExt = os.path.splitext(media_item['uri'])[1]
|
||||||
dst = os.path.join(download_dir, unicode(media_item['id']) + fileExt)
|
dst = os.path.join(download_dir,
|
||||||
|
unicode(media_item['id']) + fileExt)
|
||||||
media_item['dst'] = dst
|
media_item['dst'] = dst
|
||||||
media_item['file_ready'] = False
|
media_item['file_ready'] = False
|
||||||
media_filtered[key] = media_item
|
media_filtered[key] = media_item
|
||||||
|
|
||||||
media_item['start'] = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S")
|
media_item['start'] = datetime.strptime(media_item['start'],
|
||||||
media_item['end'] = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S")
|
"%Y-%m-%d-%H-%M-%S")
|
||||||
|
media_item['end'] = datetime.strptime(media_item['end'],
|
||||||
|
"%Y-%m-%d-%H-%M-%S")
|
||||||
media_copy[media_item['start']] = media_item
|
media_copy[media_item['start']] = media_item
|
||||||
|
|
||||||
|
|
||||||
self.media_prepare_queue.put(copy.copy(media_filtered))
|
self.media_prepare_queue.put(copy.copy(media_filtered))
|
||||||
except Exception, e: self.logger.error("%s", e)
|
except Exception, e: self.logger.error("%s", e)
|
||||||
|
|
||||||
|
@ -411,11 +424,10 @@ class PypoFetch(Thread):
|
||||||
return bool(out)
|
return bool(out)
|
||||||
|
|
||||||
def cache_cleanup(self, media):
|
def cache_cleanup(self, media):
|
||||||
"""
|
"""Get list of all files in the cache dir and remove them if they aren't
|
||||||
Get list of all files in the cache dir and remove them if they aren't being used anymore.
|
being used anymore. Input dict() media, lists all files that are
|
||||||
Input dict() media, lists all files that are scheduled or currently playing. Not being in this
|
scheduled or currently playing. Not being in this dict() means the
|
||||||
dict() means the file is safe to remove.
|
file is safe to remove."""
|
||||||
"""
|
|
||||||
cached_file_set = set(os.listdir(self.cache_dir))
|
cached_file_set = set(os.listdir(self.cache_dir))
|
||||||
scheduled_file_set = set()
|
scheduled_file_set = set()
|
||||||
|
|
||||||
|
@ -440,7 +452,8 @@ class PypoFetch(Thread):
|
||||||
os.remove(path)
|
os.remove(path)
|
||||||
self.logger.info("File '%s' removed" % path)
|
self.logger.info("File '%s' removed" % path)
|
||||||
else:
|
else:
|
||||||
self.logger.info("File '%s' not removed. Still busy!" % path)
|
self.logger.info("File '%s' not removed. Still busy!",
|
||||||
|
path)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error("Problem removing file '%s'" % f)
|
self.logger.error("Problem removing file '%s'" % f)
|
||||||
self.logger.error(traceback.format_exc())
|
self.logger.error(traceback.format_exc())
|
||||||
|
@ -462,11 +475,11 @@ class PypoFetch(Thread):
|
||||||
|
|
||||||
|
|
||||||
def main(self):
|
def main(self):
|
||||||
#Make sure all Liquidsoap queues are empty. This is important in the
|
"""Make sure all Liquidsoap queues are empty. This is important in the
|
||||||
#case where we've just restarted the pypo scheduler, but Liquidsoap still
|
case where we've just restarted the pypo scheduler, but Liquidsoap
|
||||||
#is playing tracks. In this case let's just restart everything from scratch
|
still is playing tracks. In this case let's just restart everything
|
||||||
#so that we can repopulate our dictionary that keeps track of what
|
from scratch so that we can repopulate our dictionary that keeps track
|
||||||
#Liquidsoap is playing much more easily.
|
of what Liquidsoap is playing much more easily."""
|
||||||
self.pypo_liquidsoap.clear_all_queues()
|
self.pypo_liquidsoap.clear_all_queues()
|
||||||
|
|
||||||
self.set_bootstrap_variables()
|
self.set_bootstrap_variables()
|
||||||
|
@ -476,7 +489,8 @@ class PypoFetch(Thread):
|
||||||
success = self.persistent_manual_schedule_fetch(max_attempts=5)
|
success = self.persistent_manual_schedule_fetch(max_attempts=5)
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
|
self.logger.info("Bootstrap schedule received: %s",
|
||||||
|
self.schedule_data)
|
||||||
|
|
||||||
loops = 1
|
loops = 1
|
||||||
while True:
|
while True:
|
||||||
|
@ -489,15 +503,16 @@ class PypoFetch(Thread):
|
||||||
get schedule updates via RabbitMq if the user was constantly
|
get schedule updates via RabbitMq if the user was constantly
|
||||||
using the Airtime interface.
|
using the Airtime interface.
|
||||||
|
|
||||||
If the user is not using the interface, RabbitMq messages are not
|
If the user is not using the interface, RabbitMq messages are
|
||||||
sent, and we will have very stale (or non-existent!) data about the
|
not sent, and we will have very stale (or non-existent!) data
|
||||||
schedule.
|
about the schedule.
|
||||||
|
|
||||||
Currently we are checking every POLL_INTERVAL seconds
|
Currently we are checking every POLL_INTERVAL seconds.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
message = self.fetch_queue.get(block=True, timeout=self.listener_timeout)
|
message = self.fetch_queue.get(block=True,
|
||||||
|
timeout=self.listener_timeout)
|
||||||
self.handle_message(message)
|
self.handle_message(message)
|
||||||
except Empty, e:
|
except Empty, e:
|
||||||
self.logger.info("Queue timeout. Fetching schedule manually")
|
self.logger.info("Queue timeout. Fetching schedule manually")
|
||||||
|
@ -510,7 +525,4 @@ class PypoFetch(Thread):
|
||||||
loops += 1
|
loops += 1
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""
|
|
||||||
Entry point of the thread
|
|
||||||
"""
|
|
||||||
self.main()
|
self.main()
|
||||||
|
|
|
@ -18,6 +18,7 @@ import shutil
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import stat
|
import stat
|
||||||
|
import traceback
|
||||||
|
|
||||||
from std_err_override import LogWriter
|
from std_err_override import LogWriter
|
||||||
|
|
||||||
|
@ -64,7 +65,8 @@ class PypoFile(Thread):
|
||||||
if src_size != dst_size:
|
if src_size != dst_size:
|
||||||
do_copy = True
|
do_copy = True
|
||||||
else:
|
else:
|
||||||
self.logger.debug("file %s already exists in local cache as %s, skipping copying..." % (src, dst))
|
self.logger.debug("file %s already exists in local cache as " +
|
||||||
|
"%s, skipping copying...", src, dst)
|
||||||
else:
|
else:
|
||||||
do_copy = True
|
do_copy = True
|
||||||
|
|
||||||
|
@ -75,10 +77,10 @@ class PypoFile(Thread):
|
||||||
try:
|
try:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
List file as "ready" before it starts copying because by the time
|
List file as "ready" before it starts copying because by the
|
||||||
Liquidsoap is ready to play this file, it should have at least started
|
time Liquidsoap is ready to play this file, it should have at
|
||||||
copying (and can continue copying while Liquidsoap reads from the beginning
|
least started copying (and can continue copying while
|
||||||
of the file)
|
Liquidsoap reads from the beginning of the file)
|
||||||
"""
|
"""
|
||||||
media_item['file_ready'] = True
|
media_item['file_ready'] = True
|
||||||
|
|
||||||
|
@ -117,7 +119,8 @@ class PypoFile(Thread):
|
||||||
anymore. If on the next iteration we have received a new schedule,
|
anymore. If on the next iteration we have received a new schedule,
|
||||||
it is very possible we will have to deal with the same media_items
|
it is very possible we will have to deal with the same media_items
|
||||||
again. In this situation, the worst possible case is that we try to
|
again. In this situation, the worst possible case is that we try to
|
||||||
copy the file again and realize we already have it (thus aborting the copy).
|
copy the file again and realize we already have it (thus aborting the
|
||||||
|
copy).
|
||||||
"""
|
"""
|
||||||
del schedule[highest_priority]
|
del schedule[highest_priority]
|
||||||
|
|
||||||
|
@ -137,8 +140,9 @@ class PypoFile(Thread):
|
||||||
"""
|
"""
|
||||||
We have a schedule we need to process, but we also want
|
We have a schedule we need to process, but we also want
|
||||||
to check if a newer schedule is available. In this case
|
to check if a newer schedule is available. In this case
|
||||||
do a non-blocking queue.get and in either case (we get something
|
do a non-blocking queue.get and in either case (we get
|
||||||
or we don't), get back to work on preparing getting files.
|
something or we don't), get back to work on preparing
|
||||||
|
getting files.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self.media = self.media_queue.get_nowait()
|
self.media = self.media_queue.get_nowait()
|
||||||
|
@ -150,14 +154,10 @@ class PypoFile(Thread):
|
||||||
if media_item is not None:
|
if media_item is not None:
|
||||||
self.copy_file(media_item)
|
self.copy_file(media_item)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
import traceback
|
|
||||||
top = traceback.format_exc()
|
top = traceback.format_exc()
|
||||||
self.logger.error(str(e))
|
|
||||||
self.logger.error(top)
|
self.logger.error(top)
|
||||||
|
self.logger.error(str(e))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""
|
|
||||||
Entry point of the thread
|
|
||||||
"""
|
|
||||||
self.main()
|
self.main()
|
||||||
|
|
|
@ -47,7 +47,7 @@ class PypoLiqQueue(Thread):
|
||||||
self.logger.info("waiting indefinitely for schedule")
|
self.logger.info("waiting indefinitely for schedule")
|
||||||
media_schedule = self.queue.get(block=True)
|
media_schedule = self.queue.get(block=True)
|
||||||
else:
|
else:
|
||||||
self.logger.info("waiting %ss until next scheduled item" % \
|
self.logger.info("waiting %ss until next scheduled item" %
|
||||||
time_until_next_play)
|
time_until_next_play)
|
||||||
media_schedule = self.queue.get(block=True, \
|
media_schedule = self.queue.get(block=True, \
|
||||||
timeout=time_until_next_play)
|
timeout=time_until_next_play)
|
||||||
|
@ -58,7 +58,7 @@ class PypoLiqQueue(Thread):
|
||||||
if len(schedule_deque):
|
if len(schedule_deque):
|
||||||
time_until_next_play = \
|
time_until_next_play = \
|
||||||
pure.date_interval_to_seconds(
|
pure.date_interval_to_seconds(
|
||||||
schedule_deque[0]['start'] - datetime.utcnow())
|
schedule_deque[0]['start'] - datetime.utcnow())
|
||||||
if time_until_next_play < 0:
|
if time_until_next_play < 0:
|
||||||
time_until_next_play = 0
|
time_until_next_play = 0
|
||||||
else:
|
else:
|
||||||
|
@ -82,7 +82,8 @@ class PypoLiqQueue(Thread):
|
||||||
def run(self):
|
def run(self):
|
||||||
try: self.main()
|
try: self.main()
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc())
|
self.logger.error('PypoLiqQueue Exception: %s',
|
||||||
|
traceback.format_exc())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -51,9 +51,11 @@ class PypoLiquidsoap():
|
||||||
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
|
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
|
||||||
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
||||||
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
|
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
|
||||||
if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id:
|
if media_item['row_id'] != \
|
||||||
#this is called if the stream wasn't scheduled sufficiently ahead of time
|
self.telnet_liquidsoap.current_prebuffering_stream_id:
|
||||||
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
|
#this is called if the stream wasn't scheduled sufficiently
|
||||||
|
#ahead of time so that the prebuffering stage could take
|
||||||
|
#effect. Let's do the prebuffering now.
|
||||||
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
||||||
self.telnet_liquidsoap.start_web_stream(media_item)
|
self.telnet_liquidsoap.start_web_stream(media_item)
|
||||||
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
|
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
|
||||||
|
@ -82,7 +84,8 @@ class PypoLiquidsoap():
|
||||||
self.logger.error(e)
|
self.logger.error(e)
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
|
self.logger.warn("File %s did not become ready in less than 5 " +
|
||||||
|
"seconds. Skipping...", media_item['dst'])
|
||||||
|
|
||||||
def handle_event_type(self, media_item):
|
def handle_event_type(self, media_item):
|
||||||
if media_item['event_type'] == "kick_out":
|
if media_item['event_type'] == "kick_out":
|
||||||
|
@ -119,8 +122,8 @@ class PypoLiquidsoap():
|
||||||
#currently_playing then stop it.
|
#currently_playing then stop it.
|
||||||
|
|
||||||
#Check for Liquidsoap media we should source.skip
|
#Check for Liquidsoap media we should source.skip
|
||||||
#get liquidsoap items for each queue. Since each queue can only have one
|
#get liquidsoap items for each queue. Since each queue can only have
|
||||||
#item, we should have a max of 8 items.
|
#one item, we should have a max of 8 items.
|
||||||
|
|
||||||
#2013-03-21-22-56-00_0: {
|
#2013-03-21-22-56-00_0: {
|
||||||
#id: 1,
|
#id: 1,
|
||||||
|
@ -138,7 +141,7 @@ class PypoLiquidsoap():
|
||||||
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
|
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
|
||||||
|
|
||||||
scheduled_now_webstream = \
|
scheduled_now_webstream = \
|
||||||
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \
|
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START,
|
||||||
scheduled_now)
|
scheduled_now)
|
||||||
|
|
||||||
schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files))
|
schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files))
|
||||||
|
@ -155,8 +158,8 @@ class PypoLiquidsoap():
|
||||||
to_be_added = set()
|
to_be_added = set()
|
||||||
|
|
||||||
#Iterate over the new files, and compare them to currently scheduled
|
#Iterate over the new files, and compare them to currently scheduled
|
||||||
#tracks. If already in liquidsoap queue still need to make sure they don't
|
#tracks. If already in liquidsoap queue still need to make sure they
|
||||||
#have different attributes such replay_gain etc.
|
#don't have different attributes such replay_gain etc.
|
||||||
for i in scheduled_now_files:
|
for i in scheduled_now_files:
|
||||||
if i["row_id"] in row_id_map:
|
if i["row_id"] in row_id_map:
|
||||||
mi = row_id_map[i["row_id"]]
|
mi = row_id_map[i["row_id"]]
|
||||||
|
@ -176,7 +179,7 @@ class PypoLiquidsoap():
|
||||||
to_be_added.update(schedule_ids - liq_queue_ids)
|
to_be_added.update(schedule_ids - liq_queue_ids)
|
||||||
|
|
||||||
if to_be_removed:
|
if to_be_removed:
|
||||||
self.logger.info("Need to remove items from Liquidsoap: %s" % \
|
self.logger.info("Need to remove items from Liquidsoap: %s" %
|
||||||
to_be_removed)
|
to_be_removed)
|
||||||
|
|
||||||
#remove files from Liquidsoap's queue
|
#remove files from Liquidsoap's queue
|
||||||
|
@ -186,7 +189,7 @@ class PypoLiquidsoap():
|
||||||
self.stop(i)
|
self.stop(i)
|
||||||
|
|
||||||
if to_be_added:
|
if to_be_added:
|
||||||
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
|
self.logger.info("Need to add items to Liquidsoap *now*: %s" %
|
||||||
to_be_added)
|
to_be_added)
|
||||||
|
|
||||||
for i in scheduled_now:
|
for i in scheduled_now:
|
||||||
|
@ -227,9 +230,11 @@ class PypoLiquidsoap():
|
||||||
diff_sec = pure.date_interval_to_seconds(diff_td)
|
diff_sec = pure.date_interval_to_seconds(diff_td)
|
||||||
|
|
||||||
if diff_sec > 0:
|
if diff_sec > 0:
|
||||||
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
|
self.logger.debug("media item was supposed to start %s ago. " +
|
||||||
|
"Preparing to start..", diff_sec)
|
||||||
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
|
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
|
||||||
link['cue_in'] = pure.date_interval_to_seconds(original_cue_in_td) + diff_sec
|
link['cue_in'] = \
|
||||||
|
pure.date_interval_to_seconds(original_cue_in_td) + diff_sec
|
||||||
|
|
||||||
def clear_all_queues(self):
|
def clear_all_queues(self):
|
||||||
self.telnet_liquidsoap.queue_clear_all()
|
self.telnet_liquidsoap.queue_clear_all()
|
||||||
|
@ -243,23 +248,30 @@ class PypoLiquidsoap():
|
||||||
|
|
||||||
def liquidsoap_startup_test(self):
|
def liquidsoap_startup_test(self):
|
||||||
liquidsoap_version_string = \
|
liquidsoap_version_string = \
|
||||||
self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info())
|
self.get_liquidsoap_version(
|
||||||
|
self.telnet_liquidsoap.liquidsoap_get_info())
|
||||||
while not liquidsoap_version_string:
|
while not liquidsoap_version_string:
|
||||||
self.logger.warning("Liquidsoap doesn't appear to be running!, " +
|
self.logger.warning("Liquidsoap doesn't appear to be running!, " +
|
||||||
"Sleeping and trying again")
|
"Sleeping and trying again")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
liquidsoap_version_string = \
|
liquidsoap_version_string = \
|
||||||
self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info())
|
self.get_liquidsoap_version(
|
||||||
|
self.telnet_liquidsoap.liquidsoap_get_info())
|
||||||
|
|
||||||
while pure.version_cmp(liquidsoap_version_string, constants.LIQUIDSOAP_MIN_VERSION) < 0:
|
while pure.version_cmp(
|
||||||
self.logger.warning("Liquidsoap is running but in incorrect version! " +
|
liquidsoap_version_string,
|
||||||
"Make sure you have at least Liquidsoap %s installed" %
|
constants.LIQUIDSOAP_MIN_VERSION) < 0:
|
||||||
|
self.logger.warning("Liquidsoap is running but in incorrect " +
|
||||||
|
"version! Make sure you have at least Liquidsoap %s " +
|
||||||
|
"installed",
|
||||||
constants.LIQUIDSOAP_MIN_VERSION)
|
constants.LIQUIDSOAP_MIN_VERSION)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
liquidsoap_version_string = \
|
liquidsoap_version_string = \
|
||||||
self.get_liquidsoap_version(self.telnet_liquidsoap.liquidsoap_get_info())
|
self.get_liquidsoap_version(
|
||||||
|
self.telnet_liquidsoap.liquidsoap_get_info())
|
||||||
|
|
||||||
self.logger.info("Liquidsoap version string found %s" % liquidsoap_version_string)
|
self.logger.info("Liquidsoap version string found %s",
|
||||||
|
liquidsoap_version_string)
|
||||||
|
|
||||||
|
|
||||||
class UnknownMediaItemType(Exception):
|
class UnknownMediaItemType(Exception):
|
||||||
|
|
|
@ -68,7 +68,8 @@ class PypoPush(Thread):
|
||||||
currently_playing, scheduled_for_future = \
|
currently_playing, scheduled_for_future = \
|
||||||
self.separate_present_future(media_schedule)
|
self.separate_present_future(media_schedule)
|
||||||
|
|
||||||
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
|
self.pypo_liquidsoap.verify_correct_present_media(
|
||||||
|
currently_playing)
|
||||||
self.future_scheduled_queue.put(scheduled_for_future)
|
self.future_scheduled_queue.put(scheduled_for_future)
|
||||||
|
|
||||||
def separate_present_future(self, media_schedule):
|
def separate_present_future(self, media_schedule):
|
||||||
|
|
|
@ -11,7 +11,8 @@ import telnetlib
|
||||||
from threading import Lock
|
from threading import Lock
|
||||||
|
|
||||||
def create_liquidsoap_annotation(media):
|
def create_liquidsoap_annotation(media):
|
||||||
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
|
# We need liq_start_next value in the annotate. That is the value that
|
||||||
|
# controls overlap duration of crossfade.
|
||||||
return ('annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' + \
|
return ('annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' + \
|
||||||
'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' + \
|
'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' + \
|
||||||
'schedule_table_id="%s",replay_gain="%s dB":%s') % \
|
'schedule_table_id="%s",replay_gain="%s dB":%s') % \
|
||||||
|
@ -241,7 +242,8 @@ class TelnetLiquidsoap:
|
||||||
|
|
||||||
|
|
||||||
def switch_source(self, sourcename, status):
|
def switch_source(self, sourcename, status):
|
||||||
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
|
self.logger.debug('Switching source: %s to "%s" status', sourcename,
|
||||||
|
status)
|
||||||
command = "streams."
|
command = "streams."
|
||||||
if sourcename == "master_dj":
|
if sourcename == "master_dj":
|
||||||
command += "master_dj_"
|
command += "master_dj_"
|
||||||
|
@ -280,7 +282,8 @@ class TelnetLiquidsoap:
|
||||||
try:
|
try:
|
||||||
self.telnet_lock.acquire()
|
self.telnet_lock.acquire()
|
||||||
tn = telnetlib.Telnet(self.host, self.port)
|
tn = telnetlib.Telnet(self.host, self.port)
|
||||||
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
|
command = ('vars.station_name %s\n' %
|
||||||
|
station_name).encode('utf-8')
|
||||||
self.logger.info(command)
|
self.logger.info(command)
|
||||||
tn.write(command)
|
tn.write(command)
|
||||||
tn.write('exit\n')
|
tn.write('exit\n')
|
||||||
|
@ -297,9 +300,9 @@ class TelnetLiquidsoap:
|
||||||
try:
|
try:
|
||||||
self.telnet_lock.acquire()
|
self.telnet_lock.acquire()
|
||||||
tn = telnetlib.Telnet(self.host, self.port)
|
tn = telnetlib.Telnet(self.host, self.port)
|
||||||
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
|
# update the boot up time of Liquidsoap. Since Liquidsoap is not
|
||||||
# we are manually adjusting the bootup time variable so the status msg will get
|
# restarting, we are manually adjusting the bootup time variable
|
||||||
# updated.
|
# so the status msg will get updated.
|
||||||
boot_up_time_command = "vars.bootup_time %s\n" % str(current_time)
|
boot_up_time_command = "vars.bootup_time %s\n" % str(current_time)
|
||||||
self.logger.info(boot_up_time_command)
|
self.logger.info(boot_up_time_command)
|
||||||
tn.write(boot_up_time_command)
|
tn.write(boot_up_time_command)
|
||||||
|
@ -322,7 +325,8 @@ class TelnetLiquidsoap:
|
||||||
try:
|
try:
|
||||||
self.telnet_lock.acquire()
|
self.telnet_lock.acquire()
|
||||||
tn = telnetlib.Telnet(self.host, self.port)
|
tn = telnetlib.Telnet(self.host, self.port)
|
||||||
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
|
command = ('vars.stream_metadata_type %s\n' %
|
||||||
|
stream_format).encode('utf-8')
|
||||||
self.logger.info(command)
|
self.logger.info(command)
|
||||||
tn.write(command)
|
tn.write(command)
|
||||||
tn.write('exit\n')
|
tn.write('exit\n')
|
||||||
|
@ -384,7 +388,5 @@ class DummyTelnetLiquidsoap:
|
||||||
finally:
|
finally:
|
||||||
self.telnet_lock.release()
|
self.telnet_lock.release()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class QueueNotEmptyException(Exception):
|
class QueueNotEmptyException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue