CC-2607: Abilitiy to adjust stream bitrate, type, etc from the UI

interface

- dummy page "Stream Setting" page for the test
- StreamSetting model is added
- set owner and group as 'pypo' for liquidsoap.cfg
- pypofech handle 'update_stream_setting' command
This commit is contained in:
James 2011-08-15 16:10:46 -04:00
parent e18c0903cb
commit 4f2b2dba6d
12 changed files with 174 additions and 30 deletions

View file

@ -13,6 +13,7 @@ from threading import Thread
from subprocess import Popen, PIPE
from datetime import datetime
from datetime import timedelta
import filecmp
# For RabbitMQ
from kombu.connection import BrokerConnection
@ -37,17 +38,14 @@ except Exception, e:
logger.error('Error loading config file: %s', e)
sys.exit()
# Yuk - using a global, i know!
SCHEDULE_PUSH_MSG = []
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(body, message):
"""def handle_message(body, message):
logger = logging.getLogger('fetch')
global SCHEDULE_PUSH_MSG
logger.info("Received schedule from RabbitMQ: " + message.body)
logger.info("Received event from RabbitMQ: " + message.body)
m = json.loads(message.body)
command = m['event_type']
@ -59,10 +57,10 @@ def handle_message(body, message):
logger.info("Setting timezone to %s", m['timezone'])
os.environ['TZ'] = m['timezone']
time.tzset()
elif (command == 'update_stream_setting'):
logger.info("Updating stream setting: %s", m['setting'])
# ACK the message to take it off the queue
message.ack()
message.ack()"""
class PypoFetch(Thread):
def __init__(self, q):
@ -71,26 +69,103 @@ class PypoFetch(Thread):
self.api_client = api_client.api_client_factory(config)
self.set_export_source('scheduler')
self.queue = q
self.schedule_data = []
logger.info("PypoFetch: init complete")
def init_rabbit_mq(self):
logger = logging.getLogger('fetch')
logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-schedule", "direct", durable=True, auto_delete=True)
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
self.connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], "/")
channel = self.connection.channel()
consumer = Consumer(channel, schedule_queue)
consumer.register_callback(handle_message)
consumer.register_callback(self.handle_message)
consumer.consume()
except Exception, e:
logger.error(e)
return False
return True
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, body, message):
logger = logging.getLogger('fetch')
logger.info("Received event from RabbitMQ: " + message.body)
m = json.loads(message.body)
command = m['event_type']
logger.info("Handling command: " + command)
if(command == 'update_schedule'):
self.schedule_data = m['schedule']
self.process_schedule(self.schedule_data, "scheduler", False)
elif (command == 'update_timezone'):
logger.info("Setting timezone to %s", m['timezone'])
os.environ['TZ'] = m['timezone']
time.tzset()
elif (command == 'update_stream_setting'):
logger.info("Updating stream setting: %s", m['setting'])
self.regenerateLiquidsoapConf(m['setting'])
# ACK the message to take it off the queue
message.ack()
def regenerateLiquidsoapConf(self, setting):
logger = logging.getLogger('fetch')
existing = {}
# create a temp file
fh = open('/etc/airtime/liquidsoap.cfg', 'r')
# read existing conf file and build dict
while 1:
line = fh.readline()
if not line:
break;
key, value = line.split('=')
key = key.strip()
value = value.strip()
value = value.replace('"', '')
if value == "dummy_string" or value == "0":
value = ''
existing[key] = value
fh.close()
# flag for any change in cofig
change = False
# look for changes
for s in setting:
if not s[u'value'] == existing[s[u'keyname']]:
logger.info("Keyname: %s, Curent value: %s, New Value: %s", s[u'keyname'], s[u'value'], existing[s[u'keyname']])
change = True
# rewrite
if change:
fh = open('/etc/airtime/liquidsoap.cfg', 'w')
logger.info("Rewriting liquidsoap.cfg...")
for d in setting:
buffer = d[u'keyname'] + " = "
if(d[u'type'] == 'string'):
temp = d[u'value']
if(temp == ""):
temp = "dummy_string"
buffer += "\"" + temp + "\""
else:
temp = d[u'value']
if(temp == ""):
temp = "0"
buffer += temp
buffer += "\n"
fh.write(buffer)
fh.close()
# restart playout
logger.info("Restarting airtime-playout...")
p = Popen("/etc/init.d/airtime-playout restart >/dev/null 2>&1", shell=True)
sts = os.waitpid(p.pid, 0)[1]
else:
logger.info("No change detected in setting...")
def set_export_source(self, export_source):
logger = logging.getLogger('fetch')
self.export_source = export_source
@ -379,9 +454,9 @@ class PypoFetch(Thread):
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
status, schedule_data = self.api_client.get_schedule()
status, self.schedule_data = self.api_client.get_schedule()
if status == 1:
self.process_schedule(schedule_data, "scheduler", True)
self.process_schedule(self.schedule_data , "scheduler", True)
logger.info("Bootstrap complete: got initial copy of the schedule")
loops = 1
@ -391,15 +466,11 @@ class PypoFetch(Thread):
# Wait for messages from RabbitMQ. Timeout if we
# dont get any after POLL_INTERVAL.
self.connection.drain_events(timeout=POLL_INTERVAL)
# Hooray for globals!
schedule_data = SCHEDULE_PUSH_MSG
status = 1
except:
except Exception, e:
# We didnt get a message for a while, so poll the server
# to get an updated schedule.
status, schedule_data = self.api_client.get_schedule()
if status == 1:
self.process_schedule(schedule_data, "scheduler", False)
logger.info("Exception %s", e)
status, self.schedule_data = self.api_client.get_schedule()
self.process_schedule(self.schedule_data, "scheduler", False)
loops += 1