CC-5709: Airtime Analyzer Queue

* Added StatusReporter class to make HTTP requests back to Airtime
* Handle even more errors now
* Added proper logging and log rotation
* Added --debug flag for increased logging.
This commit is contained in:
Albert Santoni 2014-03-05 22:43:47 -05:00
parent a6a64a2b9e
commit 59535850e2
7 changed files with 185 additions and 46 deletions

View file

@ -1,23 +1,49 @@
Ghetto temporary installation instructions
==========
set up a virtualenv
activate it
pip install mutagen python-magic pika
$ sudo python setup.py install
You will need to allow the "airtime" RabbitMQ user to access the airtime-uploads exchange and queue:
You will need to allow the "airtime" RabbitMQ user to access all exchanges and queues within the /airtime vhost:
sudo rabbitmqctl set_permissions -p /airtime airtime .* .* .*
Usage
==========
To print usage instructions, run:
$ airtime_analyzer --help
This application can be run as a daemon by running:
$ airtime_analyzer -d
sudo rabbitmqctl set_permissions -p /airtime airtime airtime-uploads airtime-uploads airtime-uploads
Developers
==========
For development, you want to install AAQ system-wide but with everything symlinked back to the source
directory (for convenience), so run:
For development, you want to install airtime_analyzer system-wide but with everything symlinked back to the source
directory for convenience. This is super easy to do, just run:
$ sudo python setup.py develop
To send an test message to airtime_analyzer, you can use the message_sender.php script in the tools directory.
For example, run:
$ php tools/message_sender.php '{ "tmp_file_path" : "foo.mp3" }'
Logging
=========
By default, logs are saved to:
/var/log/airtime/airtime_analyzer.log
This application takes care of rotating logs for you.
Unit Tests

View file

@ -1,4 +1,7 @@
import ConfigParser
import logging
import logging.handlers
import sys
from metadata_analyzer import MetadataAnalyzer
from replaygain_analyzer import ReplayGainAnalyzer
from message_listener import MessageListener
@ -6,16 +9,46 @@ from message_listener import MessageListener
class AirtimeAnalyzerServer:
# Constants
_CONFIG_PATH = '/etc/airtime/airtime.conf'
_LOG_PATH = "/var/log/airtime/airtime_analyzer.log"
# Variables
_log_level = logging.INFO
def __init__(self, debug=False):
# Configure logging
self.setup_logging(debug)
def __init__(self):
# Read our config file
rabbitmq_config = self.read_config_file()
# Start listening for RabbitMQ messages telling us about newly
# uploaded files.
self._msg_listener = MessageListener(rabbitmq_config)
def setup_logging(self, debug):
if debug:
self._log_level = logging.DEBUG
#self.log = logging.getLogger(__name__)
# Set up logging
logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s")
rootLogger = logging.getLogger()
rootLogger.setLevel(self._log_level)
fileHandler = logging.handlers.RotatingFileHandler(filename=self._LOG_PATH, maxBytes=1024*1024*30,
backupCount=8)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
def read_config_file(self):
config = ConfigParser.SafeConfigParser()

View file

@ -1,17 +1,31 @@
import logging
import multiprocessing
from metadata_analyzer import MetadataAnalyzer
class AnalyzerPipeline:
# Constructor
def __init__(self):
pass
#TODO: Take a JSON message and perform the necessary analysis.
#TODO: Comment the shit out of this
# Take message dictionary and perform the necessary analysis.
@staticmethod
def run_analysis(json_msg, queue):
# TODO: Pass the JSON along to each analyzer??
#print MetadataAnalyzer.analyze("foo.mp3")
#print ReplayGainAnalyzer.analyze("foo.mp3")
#raise Exception("Test Crash")
queue.put(MetadataAnalyzer.analyze("foo.mp3"))
def run_analysis(queue, audio_file_path, final_directory):
if not isinstance(queue, multiprocessing.queues.Queue):
raise TypeError("queue must be a multiprocessing.Queue()")
if not isinstance(audio_file_path, unicode):
raise TypeError("audio_file_path must be a string. Was of type " + type(audio_file_path).__name__ + " instead.")
if not isinstance(final_directory, unicode):
raise TypeError("final_directory must be a string. Was of type " + type(final_directory).__name__ + " instead.")
# Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata:
queue.put(MetadataAnalyzer.analyze(audio_file_path))
# Note that the queue we're putting the results into is our interprocess communication
# back to the main process.
#print ReplayGainAnalyzer.analyze("foo.mp3")

View file

@ -1,7 +1,11 @@
import sys
import pika
import json
import time
import logging
import multiprocessing
from analyzer_pipeline import AnalyzerPipeline
from status_reporter import StatusReporter
EXCHANGE = "airtime-uploads"
EXCHANGE_TYPE = "topic"
@ -13,22 +17,39 @@ QUEUE = "airtime-uploads"
- round robin messaging
- acking
- why we use the multiprocess architecture
- in general, how it works and why it works this way
'''
class MessageListener:
def __init__(self, config):
# Read the RabbitMQ connection settings from the config file
# The exceptions throw here by default give good error messages.
RMQ_CONFIG_SECTION = "rabbitmq"
if not config.has_section(RMQ_CONFIG_SECTION):
print "Error: rabbitmq section not found in config file at " + config_path
exit(-1)
self._host = config.get(RMQ_CONFIG_SECTION, 'host')
self._port = config.getint(RMQ_CONFIG_SECTION, 'port')
self._username = config.get(RMQ_CONFIG_SECTION, 'user')
self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
while True:
try:
self.connect_to_messaging_server()
self.wait_for_messages()
except KeyboardInterrupt:
self.disconnect_from_messaging_server()
break
except pika.exceptions.AMQPError as e:
logging.error("Connection to message queue failed. ")
logging.error(e)
logging.info("Retrying in 5 seconds...")
time.sleep(5)
self._connection.close()
def connect_to_messaging_server(self):
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host,
port=self._port, virtual_host=self._vhost,
credentials=pika.credentials.PlainCredentials(self._username, self._password)))
@ -38,21 +59,21 @@ class MessageListener:
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
print " Listening for messages..."
logging.info(" Listening for messages...")
self._channel.basic_consume(MessageListener.msg_received_callback,
queue=QUEUE, no_ack=False)
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._channel.stop_consuming()
def wait_for_messages(self):
self._channel.start_consuming()
def disconnect_from_messaging_server(self):
self._channel.stop_consuming()
self._connection.close()
# consume callback function
@staticmethod
def msg_received_callback(channel, method_frame, header_frame, body):
print " - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key)
logging.info(" - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key))
# Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
# to pass objects between the processes so that if the analyzer process crashes, it does not
@ -60,8 +81,22 @@ class MessageListener:
# propagated to other airtime_analyzer daemons (eg. running on other servers).
# We avoid cascading failure this way.
try:
MessageListener.spawn_analyzer_process(body)
except Exception:
msg_dict = json.loads(body)
audio_file_path = msg_dict["tmp_file_path"]
final_directory = msg_dict["final_directory"]
callback_url = msg_dict["callback_url"]
api_key = msg_dict["api_key"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, final_directory)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e:
logging.exception("A mandatory airtime_analyzer message field was missing from the message.")
# See the huge comment about NACK below.
channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False,
requeue=False) #Important that it doesn't requeue the message
except Exception as e:
#If ANY exception happens while processing a file, we're going to NACK to the
#messaging server and tell it to remove the message from the queue.
#(NACK is a negative acknowledgement. We could use ACK instead, but this might come
@ -72,31 +107,32 @@ class MessageListener:
channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False,
requeue=False) #Important that it doesn't requeue the message
#TODO: Report this as a failed upload to the File Upload REST API.
#
# TODO: Report this as a failed upload to the File Upload REST API.
#
# TODO: If the JSON was invalid, then don't report to the REST API
StatusReporter.report_failure_to_callback_url(callback_url, api_key, error_status=1,
reason=u'An error occurred while importing this file')
logging.error(e)
else:
# ACK at the very end, after the message has been successfully processed.
# If we don't ack, then RabbitMQ will redeliver a message in the future.
# If we don't ack, then RabbitMQ will redeliver the message in the future.
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
# Anything else could happen here:
# Send an email alert, send an xmnp message, trigger another process, etc
@staticmethod
def spawn_analyzer_process(json_msg):
def spawn_analyzer_process(audio_file_path, final_directory):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, args=(json_msg, q))
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, final_directory))
p.start()
p.join()
if p.exitcode == 0:
results = q.get()
print "Server received results: "
print results
logging.info("Main process received results from child: ")
logging.info(results)
else:
print "Analyzer process terminated unexpectedly."
raise AnalyzerException()
raise Exception("Analyzer process terminated unexpectedly.")

View file

@ -0,0 +1,29 @@
import requests
import json
import logging
class StatusReporter():
_HTTP_REQUEST_TIMEOUT = 30
# Report the extracted metadata and status of the successfully imported file
# to the callback URL (which should be the Airtime File Upload API)
@classmethod
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
# Encode the audio metadata as JSON and post it back to the callback_url
post_payload = json.dumps(audio_metadata)
r = requests.put(callback_url, data=post_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # Log the response body
r.raise_for_status() # Raise an exception if there was an HTTP error code returned
#TODO: Queue up failed requests and try them again later.
@classmethod
def report_failure_to_callback_url(self, callback_url, api_key, error_status, reason):
# TODO: Make error_status is an int?
pass

View file

@ -10,12 +10,13 @@ print "Airtime Analyzer " + VERSION
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true")
args = parser.parse_args()
if args.daemon:
with daemon.DaemonContext():
analyzer = aa.AirtimeAnalyzerServer()
analyzer = aa.AirtimeAnalyzerServer(debug=args.debug)
else:
# Run without daemonizing
analyzer = aa.AirtimeAnalyzerServer()
analyzer = aa.AirtimeAnalyzerServer(debug=args.debug)

View file

@ -1,5 +1,5 @@
from nose.tools import *
import airtime_analyzer_queue
import airtime_analyzer
def setup():
print "SETUP!"