clean up indentation + imports for python apps

This commit is contained in:
Martin Konecny 2012-06-26 22:41:11 -04:00
parent b6483cd952
commit 0a3c9c7351
7 changed files with 183 additions and 212 deletions

View File

@ -109,20 +109,6 @@ try:
print e
sys.exit(1)
"""
logging.basicConfig(format='%(message)s')
#generate liquidsoap config file
#access the DB and generate liquidsoap.cfg under /etc/airtime/
ac = api_client.api_client_factory(config, logging.getLogger())
ss = ac.get_stream_setting()
if ss is not None:
generate_liquidsoap_config(ss)
else:
print "Unable to connect to the Airtime server."
"""
#initialize init.d scripts
subprocess.call("update-rc.d airtime-playout defaults >/dev/null 2>&1", shell=True)

View File

@ -3,12 +3,10 @@ Python part of radio playout (pypo)
"""
import time
from optparse import *
from optparse import OptionParser
import sys
import signal
import logging
import logging.config
import logging.handlers
import locale
import os
from Queue import Queue
@ -53,11 +51,11 @@ parser.add_option("-c", "--check", help="Check the cached schedule and exit", de
def configure_locale():
logger.debug("Before %s", locale.nl_langinfo(locale.CODESET))
current_locale = locale.getlocale()
if current_locale[1] is None:
logger.debug("No locale currently set. Attempting to get default locale.")
default_locale = locale.getdefaultlocale()
if default_locale[1] is None:
logger.debug("No default locale exists. Let's try loading from /etc/default/locale")
if os.path.exists("/etc/default/locale"):
@ -69,17 +67,17 @@ def configure_locale():
sys.exit(1)
else:
new_locale = default_locale
logger.info("New locale set to: %s", locale.setlocale(locale.LC_ALL, new_locale))
reload(sys)
sys.setdefaultencoding("UTF-8")
current_locale_encoding = locale.getlocale()[1].lower()
logger.debug("sys default encoding %s", sys.getdefaultencoding())
logger.debug("After %s", locale.nl_langinfo(locale.CODESET))
if current_locale_encoding not in ['utf-8', 'utf8']:
logger.error("Need a UTF-8 locale. Currently '%s'. Exiting..." % current_locale_encoding)
sys.exit(1)
@ -92,7 +90,7 @@ try:
except Exception, e:
print "Couldn't configure logging"
sys.exit()
configure_locale()
# loading config file
@ -105,11 +103,11 @@ except Exception, e:
class Global:
def __init__(self):
self.api_client = api_client.api_client_factory(config)
def selfcheck(self):
self.api_client = api_client.api_client_factory(config)
return self.api_client.is_server_compatible()
def test_api(self):
self.api_client.test()
@ -160,7 +158,7 @@ if __name__ == '__main__':
g = Global()
while not g.selfcheck(): time.sleep(5)
logger = logging.getLogger()
if options.test:
@ -173,9 +171,9 @@ if __name__ == '__main__':
pypoFetch_q = Queue()
recorder_q = Queue()
pypoPush_q = Queue()
telnet_lock = Lock()
"""
This queue is shared between pypo-fetch and pypo-file, where pypo-file
is the receiver. Pypo-fetch will send every schedule it gets to pypo-file
@ -183,19 +181,19 @@ if __name__ == '__main__':
priority, and will retrieve it.
"""
media_q = Queue()
pmh = PypoMessageHandler(pypoFetch_q, recorder_q)
pmh.daemon = True
pmh.start()
pfile = PypoFile(media_q)
pfile.daemon = True
pfile.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock)
pf.daemon = True
pf.start()
pp = PypoPush(pypoPush_q, telnet_lock)
pp.daemon = True
pp.start()
@ -210,7 +208,7 @@ if __name__ == '__main__':
#recorder.join()
#pp.join()
pf.join()
logger.info("pypo fetch exit")
sys.exit()
"""

View File

@ -3,9 +3,7 @@
import os
import sys
import time
import logging
import logging.config
import shutil
import json
import telnetlib
import copy
@ -44,11 +42,11 @@ class PypoFetch(Thread):
self.media_prepare_queue = media_q
self.last_update_schedule_timestamp = time.time()
self.listener_timeout = 3600
self.telnet_lock = telnet_lock
self.logger = logging.getLogger();
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
self.logger.debug("Cache dir %s", self.cache_dir)
@ -63,24 +61,24 @@ class PypoFetch(Thread):
os.makedirs(dir)
except Exception, e:
pass
self.schedule_data = []
self.logger.info("PypoFetch: init complete")
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, message):
try:
try:
self.logger.info("Received event from Pypo Message Handler: %s" % message)
m = json.loads(message)
m = json.loads(message)
command = m['event_type']
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
self.schedule_data = m['schedule']
self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data)
elif command == 'update_stream_setting':
self.logger.info("Updating stream setting...")
@ -100,7 +98,7 @@ class PypoFetch(Thread):
elif command == 'disconnect_source':
self.logger.info("disconnect_on_source show command received...")
self.disconnect_source(self.logger, self.telnet_lock, m['sourcename'])
# update timeout value
if command == 'update_schedule':
self.listener_timeout = 3600
@ -115,7 +113,7 @@ class PypoFetch(Thread):
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", top)
self.logger.error("Exception in handling Message Handler message: %s", e)
@staticmethod
def disconnect_source(logger, lock, sourcename):
logger.debug('Disconnecting source: %s', sourcename)
@ -124,7 +122,7 @@ class PypoFetch(Thread):
command += "master_harbor.kick\n"
elif(sourcename == "live_dj"):
command += "live_dj_harbor.kick\n"
lock.acquire()
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
@ -135,7 +133,7 @@ class PypoFetch(Thread):
logger.error(str(e))
finally:
lock.release()
@staticmethod
def switch_source(logger, lock, sourcename, status):
logger.debug('Switching source: %s to "%s" status', sourcename, status)
@ -146,12 +144,12 @@ class PypoFetch(Thread):
command += "live_dj_"
elif(sourcename == "scheduled_play"):
command += "scheduled_play_"
if(status == "on"):
command += "start\n"
else:
command += "stop\n"
lock.acquire()
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
@ -162,7 +160,7 @@ class PypoFetch(Thread):
logger.error(str(e))
finally:
lock.release()
"""
grabs some information that are needed to be set on bootstrap time
and configures them
@ -174,13 +172,13 @@ class PypoFetch(Thread):
self.logger.error('Unable to get bootstrap info.. Existing pypo...')
sys.exit(0)
else:
self.logger.debug('info:%s',info)
self.logger.debug('info:%s', info)
for k, v in info['switch_status'].iteritems():
self.switch_source(self.logger, self.telnet_lock, k, v)
self.update_liquidsoap_stream_format(info['stream_label'])
self.update_liquidsoap_station_name(info['station_name'])
self.update_liquidsoap_transition_fade(info['transition_fade'])
def write_liquidsoap_config(self, setting):
fh = open('/etc/airtime/liquidsoap.cfg', 'w')
self.logger.info("Rewriting liquidsoap.cfg...")
@ -197,7 +195,7 @@ class PypoFetch(Thread):
if temp == "":
temp = "0"
buffer_str += temp
buffer_str += "\n"
fh.write(api_client.encode_to(buffer_str))
fh.write("log_file = \"/var/log/airtime/pypo-liquidsoap/<script>.log\"\n");
@ -206,18 +204,18 @@ class PypoFetch(Thread):
# we could just restart liquidsoap but it take more time somehow.
self.logger.info("Restarting pypo...")
sys.exit(0)
def regenerateLiquidsoapConf(self, setting):
existing = {}
# create a temp file
setting = sorted(setting.items())
try:
fh = open('/etc/airtime/liquidsoap.cfg', 'r')
except IOError, e:
#file does not exist
self.write_liquidsoap_config(setting)
self.logger.info("Reading existing config...")
# read existing conf file and build dict
while True:
@ -226,9 +224,9 @@ class PypoFetch(Thread):
# empty line means EOF
if not line:
break
line = line.strip()
if line[0] == "#":
continue
@ -243,7 +241,7 @@ class PypoFetch(Thread):
value = ''
existing[key] = value
fh.close()
# dict flag for any change in cofig
change = {}
# this flag is to detect disable -> disable change
@ -251,7 +249,7 @@ class PypoFetch(Thread):
state_change_restart = {}
#restart flag
restart = False
self.logger.info("Looking for changes...")
# look for changes
for k, s in setting:
@ -267,13 +265,13 @@ class PypoFetch(Thread):
self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
restart = True;
else:
stream, dump = s[u'keyname'].split('_',1)
stream, dump = s[u'keyname'].split('_', 1)
if "_output" in s[u'keyname']:
if (existing[s[u'keyname']] != s[u'value']):
self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname'])
restart = True;
state_change_restart[stream] = True
elif ( s[u'value'] != 'disabled'):
elif (s[u'value'] != 'disabled'):
state_change_restart[stream] = True
else:
state_change_restart[stream] = False
@ -284,10 +282,10 @@ class PypoFetch(Thread):
if not (s[u'value'] == existing[s[u'keyname']]):
self.logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value'])
change[stream] = True
# set flag change for sound_device alway True
self.logger.info("Change:%s, State_Change:%s...", change, state_change_restart)
for k, v in state_change_restart.items():
if k == "sound_device" and v:
restart = True
@ -306,7 +304,7 @@ class PypoFetch(Thread):
updates the status of liquidsoap connection to the streaming server
This fucntion updates the bootup time variable in liquidsoap script
"""
self.telnet_lock.acquire()
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
@ -314,25 +312,25 @@ class PypoFetch(Thread):
# we are manually adjusting the bootup time variable so the status msg will get
# updated.
current_time = time.time()
boot_up_time_command = "vars.bootup_time "+str(current_time)+"\n"
boot_up_time_command = "vars.bootup_time " + str(current_time) + "\n"
tn.write(boot_up_time_command)
tn.write("streams.connection_status\n")
tn.write('exit\n')
output = tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
output_list = output.split("\r\n")
stream_info = output_list[2]
# streamin info is in the form of:
# eg. s1:true,2:true,3:false
streams = stream_info.split(",")
self.logger.info(streams)
fake_time = current_time + 1
for s in streams:
info = s.split(':')
@ -340,7 +338,7 @@ class PypoFetch(Thread):
status = info[1]
if(status == "true"):
self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time))
def update_liquidsoap_stream_format(self, stream_format):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
@ -356,7 +354,7 @@ class PypoFetch(Thread):
self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
def update_liquidsoap_transition_fade(self, fade):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
@ -372,14 +370,14 @@ class PypoFetch(Thread):
self.logger.error("Exception %s", e)
finally:
self.telnet_lock.release()
def update_liquidsoap_station_name(self, station_name):
# Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try:
self.logger.info(LS_HOST)
self.logger.info(LS_PORT)
self.telnet_lock.acquire()
try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
@ -387,7 +385,7 @@ class PypoFetch(Thread):
self.logger.info(command)
tn.write(command)
tn.write('exit\n')
tn.read_all()
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
@ -403,7 +401,7 @@ class PypoFetch(Thread):
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
- runs the cleanup routine, to get rid of unused cached files
"""
def process_schedule(self, schedule_data):
def process_schedule(self, schedule_data):
self.last_update_schedule_timestamp = time.time()
self.logger.debug(schedule_data)
media = schedule_data["media"]
@ -411,7 +409,7 @@ class PypoFetch(Thread):
# Download all the media and put playlists in liquidsoap "annotate" format
try:
"""
Make sure cache_dir exists
"""
@ -420,15 +418,15 @@ class PypoFetch(Thread):
os.makedirs(download_dir)
except Exception, e:
pass
for key in media:
media_item = media[key]
if(media_item['type'] == 'file'):
fileExt = os.path.splitext(media_item['uri'])[1]
dst = os.path.join(download_dir, media_item['id']+fileExt)
dst = os.path.join(download_dir, media_item['id'] + fileExt)
media_item['dst'] = dst
media_filtered[key] = media_item
self.media_prepare_queue.put(copy.copy(media_filtered))
except Exception, e: self.logger.error("%s", e)
@ -440,7 +438,7 @@ class PypoFetch(Thread):
# cleanup
try: self.cache_cleanup(media)
except Exception, e: self.logger.error("%s", e)
def cache_cleanup(self, media):
"""
Get list of all files in the cache dir and remove them if they aren't being used anymore.
@ -449,18 +447,18 @@ class PypoFetch(Thread):
"""
cached_file_set = set(os.listdir(self.cache_dir))
scheduled_file_set = set()
for mkey in media:
media_item = media[mkey]
fileExt = os.path.splitext(media_item['uri'])[1]
scheduled_file_set.add(media_item["id"] + fileExt)
unneeded_files = cached_file_set - scheduled_file_set
self.logger.debug("Files to remove " + str(unneeded_files))
for file in unneeded_files:
self.logger.debug("Removing %s" % os.path.join(self.cache_dir, file))
os.remove(os.path.join(self.cache_dir, file))
for f in unneeded_files:
self.logger.debug("Removing %s" % os.path.join(self.cache_dir, f))
os.remove(os.path.join(self.cache_dir, f))
def main(self):
# Bootstrap: since we are just starting up, we need to grab the
@ -471,10 +469,10 @@ class PypoFetch(Thread):
self.process_schedule(self.schedule_data)
self.set_bootstrap_variables()
loops = 1
loops = 1
while True:
self.logger.info("Loop #%s", loops)
try:
try:
"""
our simple_queue.get() requires a timeout, in which case we
fetch the Airtime schedule manually. It is important to fetch
@ -488,8 +486,8 @@ class PypoFetch(Thread):
Currently we are checking every 3600 seconds (1 hour)
"""
message = self.fetch_queue.get(block=True, timeout=self.listener_timeout)
self.handle_message(message)
except Exception, e:
@ -497,7 +495,7 @@ class PypoFetch(Thread):
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", top)
success, self.schedule_data = self.api_client.get_schedule()
if success:
self.process_schedule(self.schedule_data)

View File

@ -5,7 +5,6 @@ from Queue import Empty
from configobj import ConfigObj
import logging
import logging.config
import shutil
import os
import sys

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import logging
import logging.config
import sys
from configobj import ConfigObj
from threading import Thread
@ -39,7 +38,7 @@ class PypoMessageHandler(Thread):
self.logger = logging.getLogger('message_h')
self.pypo_queue = pq
self.recorder_queue = rq
def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff")
try:
@ -51,21 +50,21 @@ class PypoMessageHandler(Thread):
except Exception, e:
self.logger.error(e)
return False
return True
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, message):
try:
try:
self.logger.info("Received event from RabbitMQ: %s" % message)
m = json.loads(message)
m = json.loads(message)
command = m['event_type']
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
self.logger.info("Updating schdule...")
self.pypo_queue.put(message)
@ -121,15 +120,13 @@ class PypoMessageHandler(Thread):
while loop and eat all the CPU
"""
time.sleep(5)
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
import traceback
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", top)
self.logger.error("traceback: %s", traceback.format_exc())
loops += 1

View File

@ -5,11 +5,9 @@ from datetime import timedelta
import sys
import time
import logging
import logging.config
import telnetlib
import calendar
import json
import math
from pypofetch import PypoFetch
@ -51,34 +49,34 @@ class PypoPush(Thread):
self.pushed_objects = {}
self.logger = logging.getLogger('push')
def main(self):
loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
next_media_item_chain = None
media_schedule = None
time_until_next_play = None
chains = None
while True:
try:
if time_until_next_play is None:
media_schedule = self.queue.get(block=True)
else:
media_schedule = self.queue.get(block=True, timeout=time_until_next_play)
chains = self.get_all_chains(media_schedule)
chains = self.get_all_chains(media_schedule)
#We get to the following lines only if a schedule was received.
liquidsoap_queue_approx = self.get_queue_items_from_liquidsoap()
current_event_chain = self.get_current_chain(chains)
current_event_chain = self.get_current_chain(chains)
if len(current_event_chain) > 0 and len(liquidsoap_queue_approx) == 0:
#Something is scheduled but Liquidsoap is not playing anything!
#Need to schedule it immediately..this might happen if Liquidsoap crashed.
chains.remove(current_event_chain)
self.modify_cue_point(current_event_chain[0])
next_media_item_chain = current_event_chain
time_until_next_play = 0
@ -88,10 +86,10 @@ class PypoPush(Thread):
else:
media_chain = filter(lambda item: (item["type"] == "file"), current_event_chain)
self.handle_new_media_schedule(media_schedule, liquidsoap_queue_approx, media_chain)
next_media_item_chain = self.get_next_schedule_chain(chains)
self.logger.debug("Next schedule chain: %s", next_media_item_chain)
self.logger.debug("Next schedule chain: %s", next_media_item_chain)
if next_media_item_chain is not None:
chains.remove(next_media_item_chain)
tnow = datetime.utcnow()
@ -104,7 +102,7 @@ class PypoPush(Thread):
except Empty, e:
#We only get here when a new chain of tracks are ready to be played.
self.push_to_liquidsoap(next_media_item_chain)
next_media_item_chain = self.get_next_schedule_chain(chains)
if next_media_item_chain is not None:
tnow = datetime.utcnow()
@ -114,7 +112,7 @@ class PypoPush(Thread):
else:
self.logger.debug("Blocking indefinitely since no show scheduled next")
time_until_next_play = None
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
@ -127,7 +125,7 @@ class PypoPush(Thread):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = 'queue.queue\n'
tn.write(msg)
response = tn.read_until("\r\n").strip(" \r\n")
@ -138,14 +136,14 @@ class PypoPush(Thread):
response = []
finally:
self.telnet_lock.release()
liquidsoap_queue_approx = []
if len(response) > 0:
items_in_queue = response.split(" ")
self.logger.debug("items_in_queue: %s", items_in_queue)
for item in items_in_queue:
if item in self.pushed_objects:
liquidsoap_queue_approx.append(self.pushed_objects[item])
@ -158,9 +156,9 @@ class PypoPush(Thread):
self.clear_liquidsoap_queue()
liquidsoap_queue_approx = []
break
return liquidsoap_queue_approx
def handle_new_media_schedule(self, media_schedule, liquidsoap_queue_approx, media_chain):
"""
This function's purpose is to gracefully handle situations where
@ -169,27 +167,27 @@ class PypoPush(Thread):
call other functions that will connect to Liquidsoap and alter its
queue.
"""
problem_at_iteration, problem_start_time = self.find_removed_items(media_schedule, liquidsoap_queue_approx)
problem_at_iteration = self.find_removed_items(media_schedule, liquidsoap_queue_approx)
if problem_at_iteration is not None:
#Items that are in Liquidsoap's queue aren't scheduled anymore. We need to connect
#and remove these items.
self.logger.debug("Change in link %s of current chain", problem_at_iteration)
self.remove_from_liquidsoap_queue(problem_at_iteration, liquidsoap_queue_approx[problem_at_iteration:])
if problem_at_iteration is None and len(media_chain) > len(liquidsoap_queue_approx):
self.logger.debug("New schedule has longer current chain.")
problem_at_iteration = len(liquidsoap_queue_approx)
if problem_at_iteration is not None:
self.logger.debug("Change in chain at link %s", problem_at_iteration)
chain_to_push = media_chain[problem_at_iteration:]
if len(chain_to_push) > 0:
self.modify_cue_point(chain_to_push[0])
self.push_to_liquidsoap(chain_to_push)
"""
Compare whats in the liquidsoap_queue to the new schedule we just
received in media_schedule. This function only iterates over liquidsoap_queue_approx
@ -201,7 +199,6 @@ class PypoPush(Thread):
#see if they are the same as the newly received schedule
iteration = 0
problem_at_iteration = None
problem_start_time = None
for queue_item in liquidsoap_queue_approx:
if queue_item['start'] in media_schedule.keys():
media_item = media_schedule[queue_item['start']]
@ -211,33 +208,30 @@ class PypoPush(Thread):
pass
else:
problem_at_iteration = iteration
problem_start_time = queue_item['start']
break
break
else:
#A different item has been scheduled at the same time! Need to remove
#all tracks from the Liquidsoap queue starting at this point, and re-add
#them.
problem_at_iteration = iteration
problem_start_time = queue_item['start']
break
else:
#There are no more items scheduled for this time! The user has shortened
#the playlist, so we simply need to remove tracks from the queue.
problem_at_iteration = iteration
problem_start_time = queue_item['start']
break
iteration+=1
return (problem_at_iteration, problem_start_time)
iteration += 1
return problem_at_iteration
def get_all_chains(self, media_schedule):
chains = []
current_chain = []
sorted_keys = sorted(media_schedule.keys())
for mkey in sorted_keys:
media_item = media_schedule[mkey]
if media_item['type'] == "event":
@ -251,26 +245,26 @@ class PypoPush(Thread):
#Start a new one instead
chains.append(current_chain)
current_chain = [media_item]
if len(current_chain) > 0:
chains.append(current_chain)
return chains
def modify_cue_point(self, link):
tnow = datetime.utcnow()
link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S")
diff_td = tnow - link_start
diff_sec = self.date_interval_to_seconds(diff_td)
if diff_sec > 0:
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
def get_current_chain(self, chains):
tnow = datetime.utcnow()
current_chain = []
@ -280,22 +274,22 @@ class PypoPush(Thread):
for link in chain:
link_start = datetime.strptime(link['start'], "%Y-%m-%d-%H-%M-%S")
link_end = datetime.strptime(link['end'], "%Y-%m-%d-%H-%M-%S")
self.logger.debug("tnow %s, chain_start %s", tnow, link_start)
if link_start <= tnow and tnow < link_end:
current_chain = chain[iteration:]
break
iteration += 1
return current_chain
"""
The purpose of this function is to take a look at the last received schedule from
pypo-fetch and return the next chain of media_items. A chain is defined as a sequence
of media_items where the end time of media_item 'n' is the start time of media_item
'n+1'
"""
def get_next_schedule_chain(self, chains):
def get_next_schedule_chain(self, chains):
#all media_items are now divided into chains. Let's find the one that
#starts closest in the future.
tnow = datetime.utcnow()
@ -307,15 +301,15 @@ class PypoPush(Thread):
if (closest_start == None or chain_start < closest_start) and chain_start > tnow:
closest_start = chain_start
closest_chain = chain
return closest_chain
def date_interval_to_seconds(self, interval):
return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10**6) / float(10**6)
return (interval.microseconds + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
def push_to_liquidsoap(self, event_chain):
try:
for media_item in event_chain:
if media_item['type'] == "file":
@ -327,27 +321,27 @@ class PypoPush(Thread):
PypoFetch.switch_source(self.logger, self.telnet_lock, "live_dj", "off")
except Exception, e:
self.logger.error('Pypo Push Exception: %s', e)
def clear_liquidsoap_queue(self):
self.logger.debug("Clearing Liquidsoap queue")
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
msg = "source.skip\n"
tn.write(msg)
tn.write(msg)
tn.write("exit\n")
tn.read_all()
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
self.telnet_lock.release()
def remove_from_liquidsoap_queue(self, problem_at_iteration, liquidsoap_queue_approx):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
if problem_at_iteration == 0:
msg = "source.skip\n"
self.logger.debug(msg)
@ -355,13 +349,13 @@ class PypoPush(Thread):
else:
# Remove things in reverse order.
queue_copy = liquidsoap_queue_approx[::-1]
for queue_item in queue_copy:
msg = "queue.remove %s\n" % queue_item['queue_id']
self.logger.debug(msg)
tn.write(msg)
response = tn.read_until("\r\n").strip("\r\n")
if "No such request in my queue" in response:
"""
Cannot remove because Liquidsoap started playing the item. Need
@ -370,33 +364,33 @@ class PypoPush(Thread):
msg = "source.skip\n"
self.logger.debug(msg)
tn.write(msg)
msg = "queue.queue\n"
self.logger.debug(msg)
tn.write(msg)
tn.write("exit\n")
self.logger.debug(tn.read_all())
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def sleep_until_start(self, media_item):
"""
The purpose of this function is to look at the difference between
"now" and when the media_item starts, and sleep for that period of time.
After waking from sleep, this function returns.
"""
mi_start = media_item['start'][0:19]
#strptime returns struct_time in local time
epoch_start = calendar.timegm(time.strptime(mi_start, '%Y-%m-%d-%H-%M-%S'))
#Return the time as a floating point number expressed in seconds since the epoch, in UTC.
epoch_now = time.time()
self.logger.debug("Epoch start: %s" % epoch_start)
self.logger.debug("Epoch now: %s" % epoch_now)
@ -417,43 +411,43 @@ class PypoPush(Thread):
try:
self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT)
#tn.write(("vars.pypo_data %s\n"%liquidsoap_data["schedule_id"]).encode('utf-8'))
annotation = self.create_liquidsoap_annotation(media_item)
msg = 'queue.push %s\n' % annotation.encode('utf-8')
self.logger.debug(msg)
tn.write(msg)
queue_id = tn.read_until("\r\n").strip("\r\n")
#remember the media_item's queue id which we may use
#later if we need to remove it from the queue.
media_item['queue_id'] = queue_id
#add media_item to the end of our queue
self.pushed_objects[queue_id] = media_item
show_name = media_item['show_name']
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
tn.write(msg)
self.logger.debug(msg)
tn.write("exit\n")
self.logger.debug(tn.read_all())
except Exception, e:
self.logger.error(str(e))
finally:
self.telnet_lock.release()
def create_liquidsoap_annotation(self, media):
def create_liquidsoap_annotation(self, media):
# we need lia_start_next value in the annotate. That is the value that controlls overlap duration of crossfade.
return 'annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",schedule_table_id="%s":%s' \
% (media['id'], float(media['fade_in'])/1000, float(media['fade_out'])/1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['dst'])
% (media['id'], float(media['fade_in']) / 1000, float(media['fade_out']) / 1000, float(media['cue_in']), float(media['cue_out']), media['row_id'], media['dst'])
def run(self):
try: self.main()
except Exception, e:
import traceback
top = traceback.format_exc()
self.logger.error('Pypo Push Exception: %s', top)

View File

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import logging
import logging.config
import json
import time
import datetime
@ -55,7 +54,7 @@ class ShowRecorder(Thread):
self.p = None
def record_show(self):
length = str(self.filelength)+".0"
length = str(self.filelength) + ".0"
filename = self.start_time
filename = filename.replace(" ", "-")
@ -128,7 +127,7 @@ class ShowRecorder(Thread):
time = md[1].replace(":", "-")
self.logger.info("time: %s" % time)
name = time+"-"+self.show_name
name = time + "-" + self.show_name
artist = "Airtime Show Recorder"
#set some metadata for our file daemon
@ -181,7 +180,7 @@ class Recorder(Thread):
def handle_message(self):
if not self.queue.empty():
message = self.queue.get()
msg = json.loads(message)
msg = json.loads(message)
command = msg["event_type"]
self.logger.info("Received msg from Pypo Message Handler: %s", msg)
if command == 'cancel_recording':
@ -190,10 +189,10 @@ class Recorder(Thread):
else:
self.process_recorder_schedule(msg)
self.loops = 0
if self.shows_to_record:
self.start_record()
def process_recorder_schedule(self, m):
self.logger.info("Parsing recording show schedules...")
temp_shows_to_record = {}
@ -217,7 +216,7 @@ class Recorder(Thread):
delta = next_show - tnow
s = '%s.%s' % (delta.seconds, delta.microseconds)
out = float(s)
if out < 5:
self.logger.debug("Shows %s", self.shows_to_record)
self.logger.debug("Next show %s", next_show)
@ -231,26 +230,26 @@ class Recorder(Thread):
if delta < 5:
self.logger.debug("sleeping %s seconds until show", delta)
time.sleep(delta)
sorted_show_keys = sorted(self.shows_to_record.keys())
start_time = sorted_show_keys[0]
show_length = self.shows_to_record[start_time][0]
show_instance = self.shows_to_record[start_time][1]
show_name = self.shows_to_record[start_time][2]
server_timezone = self.shows_to_record[start_time][3]
T = pytz.timezone(server_timezone)
start_time_on_UTC = getDateTimeObj(start_time)
start_time_on_server = start_time_on_UTC.replace(tzinfo=pytz.utc).astimezone(T)
start_time_formatted = '%(year)d-%(month)02d-%(day)02d %(hour)02d:%(min)02d:%(sec)02d' % \
{'year': start_time_on_server.year, 'month': start_time_on_server.month, 'day': start_time_on_server.day,\
{'year': start_time_on_server.year, 'month': start_time_on_server.month, 'day': start_time_on_server.day, \
'hour': start_time_on_server.hour, 'min': start_time_on_server.minute, 'sec': start_time_on_server.second}
self.sr = ShowRecorder(show_instance, show_name, show_length.seconds, start_time_formatted)
self.sr.start()
#remove show from shows to record.
del self.shows_to_record[start_time]
#self.time_till_next_show = self.get_time_till_next_show()
except Exception,e :
except Exception, e :
import traceback
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
@ -273,12 +272,12 @@ class Recorder(Thread):
self.logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception, e:
self.logger.error(e)
self.logger.info("Bootstrap complete: got initial copy of the schedule")
self.loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
while True:
if self.loops % heartbeat_period == 0:
self.logger.info("heartbeat")
@ -299,7 +298,7 @@ class Recorder(Thread):
self.logger.error('Pypo Recorder Exception: %s', e)
time.sleep(PUSH_INTERVAL)
self.loops += 1
except Exception,e :
except Exception, e :
import traceback
top = traceback.format_exc()
self.logger.error('Exception: %s', e)