CC-3346: Recorder: Merge recorder with pypo

- separated rabitMQ listener part out from pypoFetch and created
pypomessagehandler.py

Conflicts:

	python_apps/pypo/pypofetch.py
This commit is contained in:
James 2012-02-28 14:44:39 -05:00
parent 91ef7f7669
commit 5aabe89069
5 changed files with 238 additions and 112 deletions

View File

@ -1,8 +1,8 @@
[loggers]
keys=root,fetch,push,recorder
keys=root,fetch,push,recorder,message_h
[handlers]
keys=pypo,recorder
keys=pypo,recorder,message_h
[formatters]
keys=simpleFormatter
@ -29,6 +29,12 @@ handlers=recorder
qualname=recorder
propagate=0
[logger_message_h]
level=DEBUG
handlers=message_h
qualname=message_h
propagate=0
[handler_pypo]
class=logging.handlers.RotatingFileHandler
level=DEBUG
@ -41,6 +47,12 @@ level=DEBUG
formatter=simpleFormatter
args=("/var/log/airtime/pypo/show-recorder.log", 'a', 1000000, 5,)
[handler_message_h]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=("/var/log/airtime/pypo/message-handler.log", 'a', 1000000, 5,)
[formatter_simpleFormatter]
format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s
datefmt=

View File

@ -16,6 +16,7 @@ from Queue import Queue
from pypopush import PypoPush
from pypofetch import PypoFetch
from recorder import Recorder
from pypomessagehandler import PypoMessageHandler
from configobj import ConfigObj
@ -127,11 +128,19 @@ if __name__ == '__main__':
api_client = api_client.api_client_factory(config)
api_client.register_component("pypo")
q = Queue()
pypoFetch_q = Queue()
recorder_q = Queue()
pp = PypoPush(q)
pypoPush_q = Queue()
pmh = PypoMessageHandler(pypoFetch_q, recorder_q)
pmh.daemon = True
pmh.start()
pf = PypoFetch(pypoFetch_q, pypoPush_q)
pf.daemon = True
pf.start()
pp = PypoPush(pypoPush_q)
pp.daemon = True
pp.start()
@ -139,10 +148,6 @@ if __name__ == '__main__':
recorder.daemon = True
recorder.start()
pf = PypoFetch(q, recorder_q)
pf.daemon = True
pf.start()
#pp.join()
pf.join()
logger.info("pypo fetch exit")

View File

@ -15,13 +15,6 @@ from subprocess import Popen, PIPE
from datetime import datetime
from datetime import timedelta
import filecmp
import thread
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from kombu.exceptions import MessageStateError
from kombu.simple import SimpleQueue
from api_clients import api_client
@ -43,30 +36,15 @@ except Exception, e:
sys.exit()
class PypoFetch(Thread):
def __init__(self, q, recorder_q):
def __init__(self, pypoFetch_q, pypoPush_q):
Thread.__init__(self)
self.api_client = api_client.api_client_factory(config)
self.set_export_source('scheduler')
self.queue = q
self.recorder_queue = recorder_q
self.fetch_queue = pypoFetch_q
self.push_queue = pypoPush_q
self.schedule_data = []
logger = logging.getLogger('fetch')
logger.info("PypoFetch: init complete")
def init_rabbit_mq(self):
logger = logging.getLogger('fetch')
logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"])
channel = connection.channel()
self.simple_queue = SimpleQueue(channel, schedule_queue)
except Exception, e:
logger.error(e)
return False
return True
"""
Handle a message from RabbitMQ, put it into our yucky global var.
@ -83,7 +61,7 @@ class PypoFetch(Thread):
if command == 'update_schedule':
self.schedule_data = m['schedule']
thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", False))
self.process_schedule(self.schedule_data, "scheduler", False)
elif command == 'update_stream_setting':
logger.info("Updating stream setting...")
self.regenerateLiquidsoapConf(m['setting'])
@ -96,13 +74,11 @@ class PypoFetch(Thread):
elif command == 'cancel_current_show':
logger.info("Cancel current show command received...")
self.stop_current_show()
elif command == 'update_recorder_schedule':
temp = m
if temp is not None:
self.process_recorder_schedule(temp)
elif command == 'cancel_recording':
self.recorder_queue.put('cancel_recording')
except Exception, e:
import traceback
top = traceback.format_exc()
logger.error('Exception: %s', e)
logger.error("traceback: %s", top)
logger.error("Exception in handling RabbitMQ message: %s", e)
def stop_current_show(self):
@ -315,36 +291,11 @@ class PypoFetch(Thread):
scheduled_data = dict()
scheduled_data['liquidsoap_playlists'] = liquidsoap_playlists
scheduled_data['schedule'] = playlists
self.queue.put(scheduled_data)
self.push_queue.put(scheduled_data)
# cleanup
try: self.cleanup(self.export_source)
except Exception, e: logger.error("%s", e)
def getDateTimeObj(self,time):
timeinfo = time.split(" ")
date = timeinfo[0].split("-")
time = timeinfo[1].split(":")
date = map(int, date)
time = map(int, time)
return datetime(date[0], date[1], date[2], time[0], time[1], time[2], 0, None)
def process_recorder_schedule(self, m):
logger = logging.getLogger('fetch')
logger.info("Parsing recording show schedules...")
shows_to_record = {}
shows = m['shows']
for show in shows:
show_starts = self.getDateTimeObj(show[u'starts'])
show_end = self.getDateTimeObj(show[u'ends'])
time_delta = show_end - show_starts
shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']]
self.recorder_queue.put(shows_to_record)
logger.info(shows_to_record)
"""
In this function every audio file is cut as necessary (cue_in/cue_out != 0)
@ -518,36 +469,46 @@ class PypoFetch(Thread):
status, self.schedule_data = self.api_client.get_schedule()
if status == 1:
logger.info("Bootstrap schedule received: %s", self.schedule_data)
thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", True))
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception, e:
logger.error(e)
logger.info("Bootstrap complete: got initial copy of the schedule")
while not self.init_rabbit_mq():
logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
self.process_schedule(self.schedule_data, "scheduler", True)
loops = 1
while True:
logger.info("Loop #%s", loops)
try:
try:
message = self.simple_queue.get(block=True)
self.handle_message(message.payload)
# ACK the message to take it off the queue
message.ack()
except MessageStateError, m:
logger.error("Message ACK error: %s", m)
"""
our simple_queue.get() requires a timeout, in which case we
fetch the Airtime schedule manually. It is important to fetch
the schedule periodically because if we didn't, we would only
get schedule updates via RabbitMq if the user was constantly
using the Airtime interface.
If the user is not using the interface, RabbitMq messages are not
sent, and we will have very stale (or non-existent!) data about the
schedule.
Currently we are checking every 3600 seconds (1 hour)
"""
message = self.fetch_queue.get(block=True, timeout=3600)
self.handle_message(message)
except Empty, e:
"""
Queue timeout. Fetching data manually
"""
raise
except Exception, e:
"""
sleep 5 seconds so that we don't spin inside this
while loop and eat all the CPU
"""
time.sleep(5)
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
logger.error("Exception, %s", e)
raise
except Exception, e:
"""
There is a problem with the RabbitMq messenger service. Let's
@ -557,17 +518,7 @@ class PypoFetch(Thread):
status, self.schedule_data = self.api_client.get_schedule()
if status == 1:
thread.start_new_thread(self.process_schedule, (self.schedule_data, "scheduler", False))
"""
Fetch recorder schedule
"""
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
logger.info("updated recorder schedule received: %s", temp)
except Exception, e:
logger.error(e)
self.process_schedule(self.schedule_data, "scheduler", False)
loops += 1

View File

@ -0,0 +1,120 @@
import logging
import logging.config
import sys
from configobj import ConfigObj
from threading import Thread
import time
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
from kombu.exceptions import MessageStateError
from kombu.simple import SimpleQueue
import json
# configure logging
logging.config.fileConfig("logging.cfg")
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
LS_HOST = config['ls_host']
LS_PORT = config['ls_port']
POLL_INTERVAL = int(config['poll_interval'])
except Exception, e:
logger = logging.getLogger('message_h')
logger.error('Error loading config file: %s', e)
sys.exit()
class PypoMessageHandler(Thread):
def __init__(self, pq, rq):
Thread.__init__(self)
self.logger = logging.getLogger('message_h')
self.pypo_queue = pq
self.recorder_queue = rq
def init_rabbit_mq(self):
self.logger.info("Initializing RabbitMQ stuff")
try:
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
connection = BrokerConnection(config["rabbitmq_host"], config["rabbitmq_user"], config["rabbitmq_password"], config["rabbitmq_vhost"])
channel = connection.channel()
self.simple_queue = SimpleQueue(channel, schedule_queue)
except Exception, e:
self.logger.error(e)
return False
return True
"""
Handle a message from RabbitMQ, put it into our yucky global var.
Hopefully there is a better way to do this.
"""
def handle_message(self, message):
try:
self.logger.info("Received event from RabbitMQ: %s" % message)
m = json.loads(message)
command = m['event_type']
self.logger.info("Handling command: " + command)
if command == 'update_schedule':
self.logger.info("Updating schdule...")
self.pypo_queue.put(message)
elif command == 'update_stream_setting':
self.logger.info("Updating stream setting...")
self.pypo_queue.put(message)
elif command == 'update_stream_format':
self.logger.info("Updating stream format...")
self.pypo_queue.put(message)
elif command == 'update_station_name':
self.logger.info("Updating station name...")
self.pypo_queue.put(message)
elif command == 'cancel_current_show':
self.logger.info("Cancel current show command received...")
self.pypo_queue.put(message)
elif command == 'update_recorder_schedule':
self.recorder_queue.put(message)
elif command == 'cancel_recording':
self.recorder_queue.put(message)
except Exception, e:
self.logger.error("Exception in handling RabbitMQ message: %s", e)
def main(self):
while not self.init_rabbit_mq():
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
time.sleep(5)
loops = 1
while True:
self.logger.info("Loop #%s", loops)
try:
message = self.simple_queue.get(block=True)
self.handle_message(message.payload)
# ACK the message to take it off the queue
message.ack()
except Exception, e:
"""
sleep 5 seconds so that we don't spin inside this
while loop and eat all the CPU
"""
time.sleep(5)
"""
There is a problem with the RabbitMq messenger service. Let's
log the error and get the schedule via HTTP polling
"""
self.logger.error("Exception, %s", e)
loops += 1
"""
Main loop of the thread:
Wait for schedule updates from RabbitMQ, but in case there arent any,
poll the server to get the upcoming schedule.
"""
def run(self):
while True:
self.main()

View File

@ -176,19 +176,35 @@ class Recorder(Thread):
self.server_timezone = ''
self.queue = q
self.logger.info("RecorderFetch: init complete")
self.loops = 0
def handle_message(self):
if not self.queue.empty():
msg = self.queue.get()
self.logger.info("Receivied msg from Pypo Fetch: %s", msg)
if msg == 'cancel_recording':
message = self.queue.get()
msg = json.loads(message)
command = msg["event_type"]
self.logger.info("Received msg from Pypo Fetch: %s", msg)
if command == 'cancel_recording':
if self.sr is not None and self.sr.is_recording():
self.sr.cancel_recording()
else:
self.shows_to_record = msg
self.process_recorder_schedule(msg)
self.loops = 0
if self.shows_to_record:
self.start_record()
def process_recorder_schedule(self, m):
self.logger.info("Parsing recording show schedules...")
temp_shows_to_record = {}
shows = m['shows']
for show in shows:
show_starts = getDateTimeObj(show[u'starts'])
show_end = getDateTimeObj(show[u'ends'])
time_delta = show_end - show_starts
temp_shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']]
self.shows_to_record = temp_shows_to_record
def get_time_till_next_show(self):
if len(self.shows_to_record) != 0:
@ -247,21 +263,43 @@ class Recorder(Thread):
def run(self):
try:
self.logger.info("Started...")
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
self.logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception, e:
self.logger.error(e)
self.logger.info("Bootstrap complete: got initial copy of the schedule")
recording = False
loops = 0
self.loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
while True:
if loops % heartbeat_period == 0:
if self.loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
if self.loops * PUSH_INTERVAL > 3600:
self.loops = 0
"""
Fetch recorder schedule
"""
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.process_recorder_schedule(temp)
self.logger.info("updated recorder schedule received: %s", temp)
except Exception, e:
self.logger.error(e)
try: self.handle_message()
except Exception, e:
self.logger.error('Pypo Recorder Exception: %s', e)
time.sleep(PUSH_INTERVAL)
loops += 1
self.loops += 1
except Exception,e :
import traceback
top = traceback.format_exc()