cleanup of unecessary code

-we shouldn't be loading config file multiple times.
This commit is contained in:
Martin Konecny 2013-04-25 22:11:26 -04:00
parent 8f6b583fcb
commit fb4a02faec
5 changed files with 72 additions and 89 deletions

View File

@ -33,8 +33,6 @@ from configobj import ConfigObj
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
PYPO_VERSION = '1.1'
# Set up command-line options # Set up command-line options
parser = OptionParser() parser = OptionParser()
@ -43,11 +41,31 @@ usage = "%prog [options]" + " - python playout system"
parser = OptionParser(usage=usage) parser = OptionParser(usage=usage)
# Options # Options
parser.add_option("-v", "--compat", help="Check compatibility with server API version", default=False, action="store_true", dest="check_compat") parser.add_option("-v", "--compat",
help="Check compatibility with server API version",
default=False,
action="store_true",
dest="check_compat")
parser.add_option("-t", "--test", help="Do a test to make sure everything is working properly.", default=False, action="store_true", dest="test") parser.add_option("-t", "--test",
parser.add_option("-b", "--cleanup", help="Cleanup", default=False, action="store_true", dest="cleanup") help="Do a test to make sure everything is working properly.",
parser.add_option("-c", "--check", help="Check the cached schedule and exit", default=False, action="store_true", dest="check") default=False,
action="store_true",
dest="test")
parser.add_option("-b",
"--cleanup",
help="Cleanup",
default=False,
action="store_true",
dest="cleanup")
parser.add_option("-c",
"--check",
help="Check the cached schedule and exit",
default=False,
action="store_true",
dest="check")
# parse options # parse options
(options, args) = parser.parse_args() (options, args) = parser.parse_args()
@ -66,6 +84,13 @@ except Exception, e:
sys.exit(1) sys.exit(1)
def configure_locale(): def configure_locale():
"""
Silly hacks to force Python 2.x to run in UTF-8 mode. Not portable at all,
however serves our purpose at the moment.
More information available here:
http://stackoverflow.com/questions/3828723/why-we-need-sys-setdefaultencodingutf-8-in-a-py-script
"""
logger.debug("Before %s", locale.nl_langinfo(locale.CODESET)) logger.debug("Before %s", locale.nl_langinfo(locale.CODESET))
current_locale = locale.getlocale() current_locale = locale.getlocale()
@ -74,20 +99,21 @@ def configure_locale():
default_locale = locale.getdefaultlocale() default_locale = locale.getdefaultlocale()
if default_locale[1] is None: if default_locale[1] is None:
logger.debug("No default locale exists. Let's try loading from /etc/default/locale") logger.debug("No default locale exists. Let's try loading from \
/etc/default/locale")
if os.path.exists("/etc/default/locale"): if os.path.exists("/etc/default/locale"):
locale_config = ConfigObj('/etc/default/locale') locale_config = ConfigObj('/etc/default/locale')
lang = locale_config.get('LANG') lang = locale_config.get('LANG')
new_locale = lang new_locale = lang
else: else:
logger.error("/etc/default/locale could not be found! Please run 'sudo update-locale' from command-line.") logger.error("/etc/default/locale could not be found! Please \
run 'sudo update-locale' from command-line.")
sys.exit(1) sys.exit(1)
else: else:
new_locale = default_locale new_locale = default_locale
logger.info("New locale set to: %s", locale.setlocale(locale.LC_ALL, new_locale)) logger.info("New locale set to: %s", \
locale.setlocale(locale.LC_ALL, new_locale))
reload(sys) reload(sys)
sys.setdefaultencoding("UTF-8") sys.setdefaultencoding("UTF-8")
@ -96,7 +122,8 @@ def configure_locale():
logger.debug("After %s", locale.nl_langinfo(locale.CODESET)) logger.debug("After %s", locale.nl_langinfo(locale.CODESET))
if current_locale_encoding not in ['utf-8', 'utf8']: if current_locale_encoding not in ['utf-8', 'utf8']:
logger.error("Need a UTF-8 locale. Currently '%s'. Exiting..." % current_locale_encoding) logger.error("Need a UTF-8 locale. Currently '%s'. Exiting..." % \
current_locale_encoding)
sys.exit(1) sys.exit(1)
@ -107,7 +134,7 @@ try:
config = ConfigObj('/etc/airtime/pypo.cfg') config = ConfigObj('/etc/airtime/pypo.cfg')
except Exception, e: except Exception, e:
logger.error('Error loading config file: %s', e) logger.error('Error loading config file: %s', e)
sys.exit() sys.exit(1)
class Global: class Global:
def __init__(self): def __init__(self):
@ -174,7 +201,7 @@ if __name__ == '__main__':
if options.test: if options.test:
g.test_api() g.test_api()
sys.exit() sys.exit(0)
api_client = api_client.AirtimeApiClient() api_client = api_client.AirtimeApiClient()
@ -194,25 +221,23 @@ if __name__ == '__main__':
recorder_q = Queue() recorder_q = Queue()
pypoPush_q = Queue() pypoPush_q = Queue()
""" """
This queue is shared between pypo-fetch and pypo-file, where pypo-file This queue is shared between pypo-fetch and pypo-file, where pypo-file
is the receiver. Pypo-fetch will send every schedule it gets to pypo-file is the consumer. Pypo-fetch will send every schedule it gets to pypo-file
and pypo will parse this schedule to determine which file has the highest and pypo will parse this schedule to determine which file has the highest
priority, and will retrieve it. priority, and retrieve it.
""" """
media_q = Queue() media_q = Queue()
pmh = PypoMessageHandler(pypoFetch_q, recorder_q) pmh = PypoMessageHandler(pypoFetch_q, recorder_q, config)
pmh.daemon = True pmh.daemon = True
pmh.start() pmh.start()
pfile = PypoFile(media_q) pfile = PypoFile(media_q, config)
pfile.daemon = True pfile.daemon = True
pfile.start() pfile.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock) pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock, config)
pf.daemon = True pf.daemon = True
pf.start() pf.start()
@ -228,12 +253,6 @@ if __name__ == '__main__':
stat.daemon = True stat.daemon = True
stat.start() stat.start()
# all join() are commented out because we want to exit entire pypo
# if pypofetch terminates
#pmh.join()
#recorder.join()
#pp.join()
pf.join() pf.join()
logger.info("pypo fetch exit") logger.info("System exit")
sys.exit()

View File

@ -15,7 +15,6 @@ import traceback
from Queue import Empty from Queue import Empty
from threading import Thread from threading import Thread
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from configobj import ConfigObj
from api_clients import api_client from api_clients import api_client
from std_err_override import LogWriter from std_err_override import LogWriter
@ -35,25 +34,17 @@ signal.signal(signal.SIGINT, keyboardInterruptHandler)
#need to wait for Python 2.7 for this.. #need to wait for Python 2.7 for this..
#logging.captureWarnings(True) #logging.captureWarnings(True)
# loading config file POLL_INTERVAL = 1800
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
#POLL_INTERVAL = int(config['poll_interval'])
POLL_INTERVAL = 1800
except Exception, e:
logger.error('Error loading config file: %s', e)
sys.exit()
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock): def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, config):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient() self.api_client = api_client.AirtimeApiClient()
self.fetch_queue = pypoFetch_q self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q self.push_queue = pypoPush_q
self.media_prepare_queue = media_q self.media_prepare_queue = media_q
self.last_update_schedule_timestamp = time.time() self.last_update_schedule_timestamp = time.time()
self.config = config
self.listener_timeout = POLL_INTERVAL self.listener_timeout = POLL_INTERVAL
self.telnet_lock = telnet_lock self.telnet_lock = telnet_lock
@ -141,7 +132,7 @@ class PypoFetch(Thread):
try: try:
lock.acquire() lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
logger.info(command) logger.info(command)
tn.write(command) tn.write(command)
tn.write('exit\n') tn.write('exit\n')
@ -156,7 +147,7 @@ class PypoFetch(Thread):
try: try:
lock.acquire() lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
for i in commands: for i in commands:
logger.info(i) logger.info(i)
tn.write(i) tn.write(i)
@ -243,7 +234,7 @@ class PypoFetch(Thread):
self.logger.info("Waiting for Liquidsoap to start") self.logger.info("Waiting for Liquidsoap to start")
while True: while True:
try: try:
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
tn.write("exit\n") tn.write("exit\n")
tn.read_all() tn.read_all()
self.logger.info("Liquidsoap is up and running") self.logger.info("Liquidsoap is up and running")
@ -368,7 +359,7 @@ class PypoFetch(Thread):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting, # update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
# we are manually adjusting the bootup time variable so the status msg will get # we are manually adjusting the bootup time variable so the status msg will get
# updated. # updated.
@ -411,7 +402,7 @@ class PypoFetch(Thread):
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
self.logger.info(command) self.logger.info(command)
tn.write(command) tn.write(command)
@ -427,7 +418,7 @@ class PypoFetch(Thread):
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8') command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8')
self.logger.info(command) self.logger.info(command)
tn.write(command) tn.write(command)
@ -442,12 +433,9 @@ class PypoFetch(Thread):
# Push stream metadata to liquidsoap # Push stream metadata to liquidsoap
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
try: try:
self.logger.info(LS_HOST)
self.logger.info(LS_PORT)
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
command = ('vars.station_name %s\n' % station_name).encode('utf-8') command = ('vars.station_name %s\n' % station_name).encode('utf-8')
self.logger.info(command) self.logger.info(command)
tn.write(command) tn.write(command)

View File

@ -2,7 +2,6 @@
from threading import Thread from threading import Thread
from Queue import Empty from Queue import Empty
from configobj import ConfigObj
import logging import logging
import shutil import shutil
@ -20,21 +19,10 @@ LogWriter.override_std_err(logger)
#need to wait for Python 2.7 for this.. #need to wait for Python 2.7 for this..
#logging.captureWarnings(True) #logging.captureWarnings(True)
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
POLL_INTERVAL = int(config['poll_interval'])
except Exception, e:
logger.error('Error loading config file: %s', e)
sys.exit(1)
class PypoFile(Thread): class PypoFile(Thread):
def __init__(self, schedule_queue): def __init__(self, schedule_queue, config):
Thread.__init__(self) Thread.__init__(self)
self.logger = logging.getLogger() self.logger = logging.getLogger()
self.media_queue = schedule_queue self.media_queue = schedule_queue

View File

@ -3,7 +3,6 @@
import logging import logging
import traceback import traceback
import sys import sys
from configobj import ConfigObj
from threading import Thread from threading import Thread
import time import time
# For RabbitMQ # For RabbitMQ
@ -23,30 +22,25 @@ LogWriter.override_std_err(logger)
#need to wait for Python 2.7 for this.. #need to wait for Python 2.7 for this..
#logging.captureWarnings(True) #logging.captureWarnings(True)
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
POLL_INTERVAL = int(config['poll_interval'])
except Exception, e:
logger.error('Error loading config file: %s', e)
sys.exit()
class PypoMessageHandler(Thread): class PypoMessageHandler(Thread):
def __init__(self, pq, rq): def __init__(self, pq, rq, config):
Thread.__init__(self) Thread.__init__(self)
self.logger = logging.getLogger('message_h') self.logger = logging.getLogger('message_h')
self.pypo_queue = pq self.pypo_queue = pq
self.recorder_queue = rq self.recorder_queue = rq
self.config = config
def init_rabbit_mq(self): def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff") self.logger.info("Initializing RabbitMQ stuff")
try: try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True) schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo") schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"]) connection = BrokerConnection(self.config["rabbitmq_host"], \
self.config["rabbitmq_user"], \
self.config["rabbitmq_password"], \
self.config["rabbitmq_vhost"])
channel = connection.channel() channel = connection.channel()
self.simple_queue = SimpleQueue(channel, schedule_queue) self.simple_queue = SimpleQueue(channel, schedule_queue)
except Exception, e: except Exception, e:

View File

@ -34,15 +34,8 @@ LogWriter.override_std_err(logger)
#need to wait for Python 2.7 for this.. #need to wait for Python 2.7 for this..
#logging.captureWarnings(True) #logging.captureWarnings(True)
# loading config file PUSH_INTERVAL = 2
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
PUSH_INTERVAL = 2
except Exception, e:
logger.error('Error loading config file %s', e)
sys.exit()
def is_stream(media_item): def is_stream(media_item):
return media_item['type'] == 'stream_output_start' return media_item['type'] == 'stream_output_start'
@ -51,12 +44,13 @@ def is_file(media_item):
return media_item['type'] == 'file' return media_item['type'] == 'file'
class PypoPush(Thread): class PypoPush(Thread):
def __init__(self, q, telnet_lock): def __init__(self, q, telnet_lock, config):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.AirtimeApiClient() self.api_client = api_client.AirtimeApiClient()
self.queue = q self.queue = q
self.telnet_lock = telnet_lock self.telnet_lock = telnet_lock
self.config = config
self.pushed_objects = {} self.pushed_objects = {}
self.logger = logging.getLogger('push') self.logger = logging.getLogger('push')
@ -65,7 +59,7 @@ class PypoPush(Thread):
self.future_scheduled_queue = Queue() self.future_scheduled_queue = Queue()
self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\ self.pypo_liquidsoap = PypoLiquidsoap(self.logger, telnet_lock,\
LS_HOST, LS_PORT) config['ls_host'], config['ls_port'])
self.plq = PypoLiqQueue(self.future_scheduled_queue, \ self.plq = PypoLiqQueue(self.future_scheduled_queue, \
self.pypo_liquidsoap, \ self.pypo_liquidsoap, \
@ -126,7 +120,7 @@ class PypoPush(Thread):
response = "-1" response = "-1"
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
msg = 'dynamic_source.get_id\n' msg = 'dynamic_source.get_id\n'
tn.write(msg) tn.write(msg)
@ -173,7 +167,7 @@ class PypoPush(Thread):
def stop_web_stream_all(self): def stop_web_stream_all(self):
try: try:
self.telnet_lock.acquire() self.telnet_lock.acquire()
tn = telnetlib.Telnet(LS_HOST, LS_PORT) tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT'])
#msg = 'dynamic_source.read_stop_all xxx\n' #msg = 'dynamic_source.read_stop_all xxx\n'
msg = 'http.stop\n' msg = 'http.stop\n'