CC-3346: Recorder: Merge recorder with pypo

- separated rabitMQ listener part out from pypoFetch and created
pypomessagehandler.py
This commit is contained in:
James 2012-02-27 16:18:10 -05:00
parent 38c16df138
commit f030cf4f67
5 changed files with 207 additions and 110 deletions

View File

@ -1,8 +1,8 @@
[loggers] [loggers]
keys=root,fetch,push,recorder keys=root,fetch,push,recorder,message_h
[handlers] [handlers]
keys=pypo,recorder keys=pypo,recorder,message_h
[formatters] [formatters]
keys=simpleFormatter keys=simpleFormatter
@ -29,6 +29,12 @@ handlers=recorder
qualname=recorder qualname=recorder
propagate=0 propagate=0
[logger_message_h]
level=DEBUG
handlers=message_h
qualname=message_h
propagate=0
[handler_pypo] [handler_pypo]
class=logging.handlers.RotatingFileHandler class=logging.handlers.RotatingFileHandler
level=DEBUG level=DEBUG
@ -41,6 +47,12 @@ level=DEBUG
formatter=simpleFormatter formatter=simpleFormatter
args=("/var/log/airtime/pypo/show-recorder.log", 'a', 1000000, 5,) args=("/var/log/airtime/pypo/show-recorder.log", 'a', 1000000, 5,)
[handler_message_h]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=("/var/log/airtime/pypo/message-handler.log", 'a', 1000000, 5,)
[formatter_simpleFormatter] [formatter_simpleFormatter]
format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s
datefmt= datefmt=

View File

@ -16,6 +16,7 @@ from Queue import Queue
from pypopush import PypoPush from pypopush import PypoPush
from pypofetch import PypoFetch from pypofetch import PypoFetch
from recorder import Recorder from recorder import Recorder
from pypomessagehandler import PypoMessageHandler
from configobj import ConfigObj from configobj import ConfigObj
@ -127,11 +128,19 @@ if __name__ == '__main__':
api_client = api_client.api_client_factory(config) api_client = api_client.api_client_factory(config)
api_client.register_component("pypo") api_client.register_component("pypo")
q = Queue() pypoFetch_q = Queue()
recorder_q = Queue() recorder_q = Queue()
pypoPush_q = Queue()
pp = PypoPush(q)
pmh = PypoMessageHandler(pypoFetch_q, recorder_q)
pmh.daemon = True
pmh.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q)
pf.daemon = True
pf.start()
pp = PypoPush(pypoPush_q)
pp.daemon = True pp.daemon = True
pp.start() pp.start()
@ -139,10 +148,6 @@ if __name__ == '__main__':
recorder.daemon = True recorder.daemon = True
recorder.start() recorder.start()
pf = PypoFetch(q, recorder_q)
pf.daemon = True
pf.start()
#pp.join() #pp.join()
pf.join() pf.join()
logger.info("pypo fetch exit") logger.info("pypo fetch exit")

View File

@ -16,13 +16,6 @@ from datetime import datetime
from datetime import timedelta from datetime import timedelta
from Queue import Empty from Queue import Empty
import filecmp import filecmp
import thread
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from kombu.exceptions import MessageStateError
from kombu.simple import SimpleQueue
from api_clients import api_client from api_clients import api_client
@ -44,30 +37,15 @@ except Exception, e:
sys.exit() sys.exit()
class PypoFetch(Thread): class PypoFetch(Thread):
def __init__(self, q, recorder_q): def __init__(self, pypoFetch_q, pypoPush_q):
Thread.__init__(self) Thread.__init__(self)
self.api_client = api_client.api_client_factory(config) self.api_client = api_client.api_client_factory(config)
self.set_export_source('scheduler') self.set_export_source('scheduler')
self.queue = q self.fetch_queue = pypoFetch_q
self.recorder_queue = recorder_q self.push_queue = pypoPush_q
self.schedule_data = [] self.schedule_data = []
logger = logging.getLogger('fetch') logger = logging.getLogger('fetch')
logger.info("PypoFetch: init complete") logger.info("PypoFetch: init complete")
def init_rabbit_mq(self):
logger = logging.getLogger('fetch')
logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"])
channel = connection.channel()
self.simple_queue = SimpleQueue(channel, schedule_queue)
except Exception, e:
logger.error(e)
return False
return True
""" """
Handle a message from RabbitMQ, put it into our yucky global var. Handle a message from RabbitMQ, put it into our yucky global var.
@ -84,7 +62,7 @@ class PypoFetch(Thread):
if command == 'update_schedule': if command == 'update_schedule':
self.schedule_data = m['schedule'] self.schedule_data = m['schedule']
thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", False)) self.process_schedule(self.schedule_data, "scheduler", False)
elif command == 'update_stream_setting': elif command == 'update_stream_setting':
logger.info("Updating stream setting...") logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting']) self.regenerateLiquidsoapConf(m['setting'])
@ -97,13 +75,11 @@ class PypoFetch(Thread):
elif command == 'cancel_current_show': elif command == 'cancel_current_show':
logger.info("Cancel current show command received...") logger.info("Cancel current show command received...")
self.stop_current_show() self.stop_current_show()
elif command == 'update_recorder_schedule':
temp = m
if temp is not None:
self.process_recorder_schedule(temp)
elif command == 'cancel_recording':
self.recorder_queue.put('cancel_recording')
except Exception, e: except Exception, e:
import traceback
top = traceback.format_exc()
logger.error('Exception: %s', e)
logger.error("traceback: %s", top)
logger.error("Exception in handling RabbitMQ message: %s", e) logger.error("Exception in handling RabbitMQ message: %s", e)
def stop_current_show(self): def stop_current_show(self):
@ -316,36 +292,11 @@ class PypoFetch(Thread):
scheduled_data = dict() scheduled_data = dict()
scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists
scheduled_data['schedule'] = playlists scheduled_data['schedule'] = playlists
self.queue.put(scheduled_data) self.push_queue.put(scheduled_data)
# cleanup # cleanup
try: self.cleanup(self.export_source) try: self.cleanup(self.export_source)
except Exception, e: logger.error("%s", e) except Exception, e: logger.error("%s", e)
def getDateTimeObj(self,time):
timeinfo = time.split(" ")
date = timeinfo[0].split("-")
time = timeinfo[1].split(":")
date = map(int, date)
time = map(int, time)
return datetime(date[0], date[1], date[2], time[0], time[1], time[2], 0, None)
def process_recorder_schedule(self, m):
logger = logging.getLogger('fetch')
logger.info("Parsing recording show schedules...")
shows_to_record = {}
shows = m['shows']
for show in shows:
show_starts = self.getDateTimeObj(show[u'starts'])
show_end = self.getDateTimeObj(show[u'ends'])
time_delta = show_end - show_starts
shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']]
self.recorder_queue.put(shows_to_record)
logger.info(shows_to_record)
""" """
In this function every audio file is cut as necessary (cue_in/cue_out != 0) In this function every audio file is cut as necessary (cue_in/cue_out != 0)
@ -519,24 +470,7 @@ class PypoFetch(Thread):
status, self.schedule_data = self.api_client.get_schedule() status, self.schedule_data = self.api_client.get_schedule()
if status == 1: if status == 1:
logger.info("Bootstrap schedule received: %s", self.schedule_data) logger.info("Bootstrap schedule received: %s", self.schedule_data)
thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", True)) self.process_schedule(self.schedule_data, "scheduler", True)
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception, e:
logger.error(e)
logger.info("Bootstrap complete: got initial copy of the schedule")
while not self.init_rabbit_mq():
logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
loops = 1 loops = 1
while True: while True:
@ -556,10 +490,8 @@ class PypoFetch(Thread):
Currently we are checking every 3600 seconds (1 hour) Currently we are checking every 3600 seconds (1 hour)
""" """
message = self.simple_queue.get(block=True, timeout=3600) message = self.fetch_queue.get(block=True, timeout=3600)
self.handle_message(message.payload) self.handle_message(message)
# ACK the message to take it off the queue
message.ack()
except Empty, e: except Empty, e:
""" """
Queue timeout. Fetching data manually Queue timeout. Fetching data manually
@ -584,17 +516,7 @@ class PypoFetch(Thread):
""" """
status, self.schedule_data = self.api_client.get_schedule() status, self.schedule_data = self.api_client.get_schedule()
if status == 1: if status == 1:
thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", False)) self.process_schedule(self.schedule_data, "scheduler", False)
"""
Fetch recorder schedule
"""
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
logger.info("updated recorder schedule received: %s", temp)
except Exception, e:
logger.error(e)
loops += 1 loops += 1

View File

@ -0,0 +1,120 @@
import logging
import logging.config
import sys
from configobj import ConfigObj
from threading import Thread
import time
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from kombu.exceptions import MessageStateError
from kombu.simple import SimpleQueue
import json
# configure logging
logging.config.fileConfig("logging.cfg")
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
POLL_INTERVAL = int(config['poll_interval'])
except Exception, e:
logger = logging.getLogger('message_h')
logger.error('Error loading config file: %s', e)
sys.exit()
class PypoMessageHandler(Thread):
def __init__(self, pq, rq):
Thread.__init__(self)
self.logger = logging.getLogger('message_h')
self.pypo_queue = pq
self.recorder_queue = rq
def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"])
channel = connection.channel()
self.simple_queue = SimpleQueue(channel, schedule_queue)
except Exception, e:
self.logger.error(e)
return False
return True
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, message):
try:
self.logger.info("Received event from RabbitMQ: %s" % message)
m = json.loads(message)
command = m['event_type']
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
self.logger.info("Updating schdule...")
self.pypo_queue.put(message)
elif command == 'update_stream_setting':
self.logger.info("Updating stream setting...")
self.pypo_queue.put(message)
elif command == 'update_stream_format':
self.logger.info("Updating stream format...")
self.pypo_queue.put(message)
elif command == 'update_station_name':
self.logger.info("Updating station name...")
self.pypo_queue.put(message)
elif command == 'cancel_current_show':
self.logger.info("Cancel current show command received...")
self.pypo_queue.put(message)
elif command == 'update_recorder_schedule':
self.recorder_queue.put(message)
elif command == 'cancel_recording':
self.recorder_queue.put(message)
except Exception, e:
self.logger.error("Exception in handling RabbitMQ message: %s", e)
def main(self):
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
loops = 1
while True:
self.logger.info("Loop #%s", loops)
try:
message = self.simple_queue.get(block=True)
self.handle_message(message.payload)
# ACK the message to take it off the queue
message.ack()
except Exception, e:
"""
sleep 5 seconds so that we don't spin inside this
while loop and eat all the CPU
"""
time.sleep(5)
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
self.logger.error("Exception, %s", e)
loops += 1
"""
Main loop of the thread:
Wait for schedule updates from RabbitMQ, but in case there arent any,
poll the server to get the upcoming schedule.
"""
def run(self):
while True:
self.main()

View File

@ -176,19 +176,35 @@ class Recorder(Thread):
self.server_timezone = '' self.server_timezone = ''
self.queue = q self.queue = q
self.logger.info("RecorderFetch: init complete") self.logger.info("RecorderFetch: init complete")
self.loops = 0
def handle_message(self): def handle_message(self):
if not self.queue.empty(): if not self.queue.empty():
msg = self.queue.get() message = self.queue.get()
self.logger.info("Receivied msg from Pypo Fetch: %s", msg) msg = json.loads(message)
if msg == 'cancel_recording': command = msg["event_type"]
self.logger.info("Received msg from Pypo Fetch: %s", msg)
if command == 'cancel_recording':
if self.sr is not None and self.sr.is_recording(): if self.sr is not None and self.sr.is_recording():
self.sr.cancel_recording() self.sr.cancel_recording()
else: else:
self.shows_to_record = msg self.process_recorder_schedule(msg)
self.loops = 0
if self.shows_to_record: if self.shows_to_record:
self.start_record() self.start_record()
def process_recorder_schedule(self, m):
self.logger.info("Parsing recording show schedules...")
temp_shows_to_record = {}
shows = m['shows']
for show in shows:
show_starts = getDateTimeObj(show[u'starts'])
show_end = getDateTimeObj(show[u'ends'])
time_delta = show_end - show_starts
temp_shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']]
self.shows_to_record = temp_shows_to_record
def get_time_till_next_show(self): def get_time_till_next_show(self):
if len(self.shows_to_record) != 0: if len(self.shows_to_record) != 0:
@ -247,21 +263,43 @@ class Recorder(Thread):
def run(self): def run(self):
try: try:
self.logger.info("Started...") self.logger.info("Started...")
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
self.logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception, e:
self.logger.error(e)
self.logger.info("Bootstrap complete: got initial copy of the schedule")
recording = False recording = False
loops = 0 self.loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL) heartbeat_period = math.floor(30/PUSH_INTERVAL)
while True: while True:
if loops % heartbeat_period == 0: if self.loops % heartbeat_period == 0:
self.logger.info("heartbeat") self.logger.info("heartbeat")
loops = 0 if self.loops * PUSH_INTERVAL > 3600:
self.loops = 0
"""
Fetch recorder schedule
"""
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
self.logger.info("updated recorder schedule received: %s", temp)
except Exception, e:
self.logger.error(e)
try: self.handle_message() try: self.handle_message()
except Exception, e: except Exception, e:
self.logger.error('Pypo Recorder Exception: %s', e) self.logger.error('Pypo Recorder Exception: %s', e)
time.sleep(PUSH_INTERVAL) time.sleep(PUSH_INTERVAL)
loops += 1 self.loops += 1
except Exception,e : except Exception,e :
import traceback import traceback
top = traceback.format_exc() top = traceback.format_exc()