CC-3346: Recorder: Merge recorder with pypo

- Pypo fech works as msg listner for recroder now.
- recorder is part of pypo and all it does is waiting for msg from pypo
fetch and spwan a show recorder thread.
- added new parameter logger to api client. This way apiclient will log
into specific log file instead of grabbing current log file.
- show recoder is removed from all check system/status page
This commit is contained in:
James 2012-02-24 13:12:50 -05:00
parent 54f81ca5de
commit 695535ae64
30 changed files with 419 additions and 856 deletions

View file

@ -60,6 +60,8 @@ try:
create_dir(config['cache_dir'])
create_dir(config['file_dir'])
create_dir(config['tmp_dir'])
create_dir(config["base_recorded_files"])
#copy files to bin dir
copy_dir("%s/.."%current_script_dir, config["bin_dir"]+"/bin/")
@ -72,6 +74,7 @@ try:
os.system("chmod 755 "+os.path.join(config["bin_dir"], "bin/liquidsoap_scripts/notify.sh"))
os.system("chown -R pypo:pypo "+config["bin_dir"])
os.system("chown -R pypo:pypo "+config["cache_base_dir"])
os.system("chown -R pypo:pypo "+config["base_recorded_files"])
#copy init.d script
shutil.copy(config["bin_dir"]+"/bin/airtime-playout-init-d", "/etc/init.d/airtime-playout")

View file

@ -1,34 +1,46 @@
[loggers]
keys=root,fetch,push
keys=root,fetch,push,recorder
[handlers]
keys=fileOutHandler
keys=pypo,recorder
[formatters]
keys=simpleFormatter
[logger_root]
level=DEBUG
handlers=fileOutHandler
handlers=pypo
[logger_fetch]
level=DEBUG
handlers=fileOutHandler
handlers=pypo
qualname=fetch
propagate=0
[logger_push]
level=DEBUG
handlers=fileOutHandler
handlers=pypo
qualname=push
propagate=0
[handler_fileOutHandler]
[logger_recorder]
level=DEBUG
handlers=recorder
qualname=recorder
propagate=0
[handler_pypo]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=("/var/log/airtime/pypo/pypo.log", 'a', 1000000, 5,)
[handler_recorder]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=simpleFormatter
args=("/var/log/airtime/pypo/show-recorder.log", 'a', 1000000, 5,)
[formatter_simpleFormatter]
format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s
datefmt=

View file

@ -15,6 +15,7 @@ from Queue import Queue
from pypopush import PypoPush
from pypofetch import PypoFetch
from recorder import Recorder
from configobj import ConfigObj
@ -127,12 +128,18 @@ if __name__ == '__main__':
api_client.register_component("pypo")
q = Queue()
recorder_q = Queue()
pp = PypoPush(q)
pp.daemon = True
pp.start()
pf = PypoFetch(q)
recorder = Recorder(recorder_q)
recorder.daemon = True
recorder.start()
pf = PypoFetch(q, recorder_q)
pf.daemon = True
pf.start()

View file

@ -71,3 +71,17 @@ push_interval = 1 # in seconds
# while 'otf' (on the fly) cues while loading into ls
# (needs the post_processor patch)
cue_style = 'pre'
############################################
# Recorded Audio settings #
############################################
record_bitrate = 256
record_samplerate = 44100
record_channels = 2
record_sample_size = 16
#can be either ogg|mp3, mp3 recording requires installation of the package "lame"
record_file_type = 'ogg'
# base path to store recordered shows at
base_recorded_files = '/var/tmp/airtime/show-recorder/'

View file

@ -43,11 +43,12 @@ except Exception, e:
sys.exit()
class PypoFetch(Thread):
def __init__(self, q):
def __init__(self, q, recorder_q):
Thread.__init__(self)
self.api_client = api_client.api_client_factory(config)
self.set_export_source('scheduler')
self.queue = q
self.recorder_queue = recorder_q
self.schedule_data = []
logger = logging.getLogger('fetch')
logger.info("PypoFetch: init complete")
@ -95,6 +96,12 @@ class PypoFetch(Thread):
elif command == 'cancel_current_show':
logger.info("Cancel current show command received...")
self.stop_current_show()
elif command == 'update_recorder_schedule':
temp = m
if temp is not None:
self.parse_shows(temp)
elif command == 'cancel_recording':
self.recorder_queue.put('cancel_recording')
except Exception, e:
logger.error("Exception in handling RabbitMQ message: %s", e)
@ -313,6 +320,30 @@ class PypoFetch(Thread):
# cleanup
try: self.cleanup(self.export_source)
except Exception, e: logger.error("%s", e)
def getDateTimeObj(self,time):
timeinfo = time.split(" ")
date = timeinfo[0].split("-")
time = timeinfo[1].split(":")
date = map(int, date)
time = map(int, time)
return datetime(date[0], date[1], date[2], time[0], time[1], time[2], 0, None)
def parse_shows(self, m):
logger = logging.getLogger('fetch')
logger.info("Parsing recording show schedules...")
shows_to_record = {}
shows = m['shows']
for show in shows:
show_starts = self.getDateTimeObj(show[u'starts'])
show_end = self.getDateTimeObj(show[u'ends'])
time_delta = show_end - show_starts
shows_to_record[show[u'starts']] = [time_delta, show[u'instance_id'], show[u'name'], m['server_timezone']]
self.recorder_queue.put(shows_to_record)
logger.info(shows_to_record)
"""
@ -488,6 +519,17 @@ class PypoFetch(Thread):
if status == 1:
logger.info("Bootstrap schedule received: %s", self.schedule_data)
self.process_schedule(self.schedule_data, "scheduler", True)
# Bootstrap: since we are just starting up, we need to grab the
# most recent schedule. After that we can just wait for updates.
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.parse_shows(temp)
logger.info("Bootstrap recorder schedule received: %s", temp)
except Exception, e:
logger.error(e)
logger.info("Bootstrap complete: got initial copy of the schedule")
@ -542,8 +584,18 @@ class PypoFetch(Thread):
status, self.schedule_data = self.api_client.get_schedule()
if status == 1:
self.process_schedule(self.schedule_data, "scheduler", False)
"""
Fetch recorder schedule
"""
try:
temp = self.api_client.get_shows_to_record()
if temp is not None:
self.parse_shows(temp)
logger.info("updated recorder schedule received: %s", temp)
except Exception, e:
logger.error(e)
loops += 1
loops += 1
"""
Main loop of the thread:

View file

@ -0,0 +1,270 @@
import urllib
import logging
import logging.config
import json
import time
import datetime
import os
import sys
import shutil
import socket
import pytz
import signal
import math
from configobj import ConfigObj
from poster.encode import multipart_encode
from poster.streaminghttp import register_openers
import urllib2
from subprocess import Popen
from threading import Thread
import mutagen
from api_clients import api_client
# For RabbitMQ
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
# loading config file
try:
config = ConfigObj('/etc/airtime/pypo.cfg')
except Exception, e:
self.logger.error('Error loading config file: %s', e)
sys.exit()
def getDateTimeObj(time):
timeinfo = time.split(" ")
date = timeinfo[0].split("-")
time = timeinfo[1].split(":")
date = map(int, date)
time = map(int, time)
return datetime.datetime(date[0], date[1], date[2], time[0], time[1], time[2], 0, None)
PUSH_INTERVAL = 2
class ShowRecorder(Thread):
def __init__ (self, show_instance, show_name, filelength, start_time):
Thread.__init__(self)
self.logger = logging.getLogger('recorder')
self.api_client = api_client.api_client_factory(config, self.logger)
self.filelength = filelength
self.start_time = start_time
self.show_instance = show_instance
self.show_name = show_name
self.p = None
def record_show(self):
length = str(self.filelength)+".0"
filename = self.start_time
filename = filename.replace(" ", "-")
if config["record_file_type"] in ["mp3", "ogg"]:
filetype = config["record_file_type"]
else:
filetype = "ogg";
filepath = "%s%s.%s" % (config["base_recorded_files"], filename, filetype)
br = config["record_bitrate"]
sr = config["record_samplerate"]
c = config["record_channels"]
ss = config["record_sample_size"]
#-f:16,2,44100
#-b:256
command = "ecasound -f:%s,%s,%s -i alsa -o %s,%s000 -t:%s" % (ss, c, sr, filepath, br, length)
args = command.split(" ")
self.logger.info("starting record")
self.logger.info("command " + command)
self.p = Popen(args)
#blocks at the following line until the child process
#quits
code = self.p.wait()
self.logger.info("finishing record, return code %s", self.p.returncode)
code = self.p.returncode
self.p = None
return code, filepath
def cancel_recording(self):
#add 3 second delay before actually cancelling the show. The reason
#for this is because it appears that ecasound starts 1 second later than
#it should, and therefore this method is sometimes incorrectly called 1
#second before the show ends.
#time.sleep(3)
#send signal interrupt (2)
self.logger.info("Show manually cancelled!")
if (self.p is not None):
self.p.send_signal(signal.SIGINT)
#if self.p is defined, then the child process ecasound is recording
def is_recording(self):
return (self.p is not None)
def upload_file(self, filepath):
filename = os.path.split(filepath)[1]
# Register the streaming http handlers with urllib2
register_openers()
# headers contains the necessary Content-Type and Content-Length
# datagen is a generator object that yields the encoded parameters
datagen, headers = multipart_encode({"file": open(filepath, "rb"), 'name': filename, 'show_instance': self.show_instance})
self.api_client.upload_recorded_show(datagen, headers)
def set_metadata_and_save(self, filepath):
try:
date = self.start_time
md = date.split(" ")
time = md[1].replace(":", "-")
self.logger.info("time: %s" % time)
name = time+"-"+self.show_name
artist = api_client.encode_to("Airtime Show Recorder",'utf-8')
#set some metadata for our file daemon
recorded_file = mutagen.File(filepath, easy=True)
recorded_file['title'] = name
recorded_file['artist'] = artist
recorded_file['date'] = md[0]
recorded_file['tracknumber'] = self.show_instance
recorded_file.save()
except Exception, e:
self.logger.error("Exception: %s", e)
def run(self):
code, filepath = self.record_show()
if code == 0:
try:
self.logger.info("Preparing to upload %s" % filepath)
self.set_metadata_and_save(filepath)
self.upload_file(filepath)
os.remove(filepath)
except Exception, e:
self.logger.error(e)
else:
self.logger.info("problem recording show")
os.remove(filepath)
class Recorder(Thread):
def __init__(self, q):
Thread.__init__(self)
self.logger = logging.getLogger('recorder')
self.api_client = api_client.api_client_factory(config)
self.api_client.register_component("show-recorder")
self.sr = None
self.shows_to_record = {}
self.server_timezone = ''
self.queue = q
self.logger.info("RecorderFetch: init complete")
def handle_message(self):
if not self.queue.empty():
msg = self.queue.get()
self.logger.info("Receivied msg from Pypo Fetch: %s", msg)
if msg == 'cancel_recording':
if self.sr is not None and self.sr.is_recording():
self.sr.cancel_recording()
else:
self.shows_to_record = msg
if self.shows_to_record:
self.start_record()
def get_time_till_next_show(self):
if len(self.shows_to_record) != 0:
tnow = datetime.datetime.utcnow()
sorted_show_keys = sorted(self.shows_to_record.keys())
start_time = sorted_show_keys[0]
next_show = getDateTimeObj(start_time)
delta = next_show - tnow
out = delta.seconds
if out < 5:
self.logger.debug("Shows %s", self.shows_to_record)
self.logger.debug("Next show %s", next_show)
self.logger.debug("Now %s", tnow)
return out
def start_record(self):
if len(self.shows_to_record) != 0:
try:
delta = self.get_time_till_next_show()
if delta < 5:
self.logger.debug("sleeping %s seconds until show", delta)
time.sleep(delta)
sorted_show_keys = sorted(self.shows_to_record.keys())
start_time = sorted_show_keys[0]
show_length = self.shows_to_record[start_time][0]
show_instance = self.shows_to_record[start_time][1]
show_name = self.shows_to_record[start_time][2]
server_timezone = self.shows_to_record[start_time][3]
T = pytz.timezone(server_timezone)
start_time_on_UTC = getDateTimeObj(start_time)
start_time_on_server = start_time_on_UTC.replace(tzinfo=pytz.utc).astimezone(T)
start_time_formatted = '%(year)d-%(month)02d-%(day)02d %(hour)02d:%(min)02d:%(sec)02d' % \
{'year': start_time_on_server.year, 'month': start_time_on_server.month, 'day': start_time_on_server.day,\
'hour': start_time_on_server.hour, 'min': start_time_on_server.minute, 'sec': start_time_on_server.second}
self.sr = ShowRecorder(show_instance, show_name, show_length.seconds, start_time_formatted)
self.sr.start()
#remove show from shows to record.
del self.shows_to_record[start_time]
#self.time_till_next_show = self.get_time_till_next_show()
except Exception,e :
import traceback
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", top)
"""
Main loop of the thread:
Wait for schedule updates from RabbitMQ, but in case there arent any,
poll the server to get the upcoming schedule.
"""
def run(self):
try:
self.logger.info("Started...")
recording = False
loops = 0
heartbeat_period = math.floor(30/PUSH_INTERVAL)
while True:
if loops % heartbeat_period == 0:
self.logger.info("heartbeat")
loops = 0
try: self.handle_message()
except Exception, e:
self.logger.error('Pypo Recorder Exception: %s', e)
time.sleep(PUSH_INTERVAL)
loops += 1
except Exception,e :
import traceback
top = traceback.format_exc()
self.logger.error('Exception: %s', e)
self.logger.error("traceback: %s", top)