CC-5709 / CC-5705 : Airtime Analyzer

* Finished the skeleton of the airtime_analyzer service.
* Basic round-robin, reliable AMQP messaging works.
* Using multiprocess arch so the daemon survives analyzer crashes and
  avoids failures propagating to other nodes.
* Basic metadata extractor using Mutagen is done.
* HTTP requests to the File API to are next to come...
This commit is contained in:
Albert Santoni 2014-03-05 12:15:25 -05:00
parent b6dd2e3152
commit a6a64a2b9e
15 changed files with 411 additions and 0 deletions

View file

@ -0,0 +1 @@
include README.rst

View file

@ -0,0 +1,30 @@
Ghetto temporary installation instructions
set up a virtualenv
activate it
pip install mutagen python-magic pika
You will need to allow the "airtime" RabbitMQ user to access the airtime-uploads exchange and queue:
sudo rabbitmqctl set_permissions -p /airtime airtime airtime-uploads airtime-uploads airtime-uploads
Developers
==========
For development, you want to install AAQ system-wide but with everything symlinked back to the source
directory (for convenience), so run:
$ sudo python setup.py develop
Unit Tests
==========
To run the unit tests, execute:
$ nosetests

View file

@ -0,0 +1,2 @@
from airtime_analyzer import AirtimeAnalyzerServer

View file

@ -0,0 +1,41 @@
import ConfigParser
from metadata_analyzer import MetadataAnalyzer
from replaygain_analyzer import ReplayGainAnalyzer
from message_listener import MessageListener
class AirtimeAnalyzerServer:
_CONFIG_PATH = '/etc/airtime/airtime.conf'
def __init__(self):
# Read our config file
rabbitmq_config = self.read_config_file()
# Start listening for RabbitMQ messages telling us about newly
# uploaded files.
self._msg_listener = MessageListener(rabbitmq_config)
def read_config_file(self):
config = ConfigParser.SafeConfigParser()
config_path = AirtimeAnalyzerServer._CONFIG_PATH
try:
config.readfp(open(config_path))
except IOError as e:
print "Failed to open config file at " + config_path + ": " + e.strerror
exit(-1)
except Exception:
print e.strerror
exit(-1)
return config
''' When being run from the command line, analyze a file passed
as an argument. '''
if __name__ == "__main__":
import sys
analyzers = AnalyzerPipeline()

View file

@ -0,0 +1,10 @@
class Analyzer:
@staticmethod
def analyze(filename):
raise NotImplementedError
class AnalyzerError(Exception):
def __init__(self):
super.__init__(self)

View file

@ -0,0 +1,17 @@
from metadata_analyzer import MetadataAnalyzer
class AnalyzerPipeline:
def __init__(self):
pass
#TODO: Take a JSON message and perform the necessary analysis.
#TODO: Comment the shit out of this
@staticmethod
def run_analysis(json_msg, queue):
# TODO: Pass the JSON along to each analyzer??
#print MetadataAnalyzer.analyze("foo.mp3")
#print ReplayGainAnalyzer.analyze("foo.mp3")
#raise Exception("Test Crash")
queue.put(MetadataAnalyzer.analyze("foo.mp3"))

View file

@ -0,0 +1,102 @@
import sys
import pika
import multiprocessing
from analyzer_pipeline import AnalyzerPipeline
EXCHANGE = "airtime-uploads"
EXCHANGE_TYPE = "topic"
ROUTING_KEY = "" #"airtime.analyzer.tasks"
QUEUE = "airtime-uploads"
''' TODO: Document me
- round robin messaging
- acking
- why we use the multiprocess architecture
'''
class MessageListener:
def __init__(self, config):
RMQ_CONFIG_SECTION = "rabbitmq"
if not config.has_section(RMQ_CONFIG_SECTION):
print "Error: rabbitmq section not found in config file at " + config_path
exit(-1)
self._host = config.get(RMQ_CONFIG_SECTION, 'host')
self._port = config.getint(RMQ_CONFIG_SECTION, 'port')
self._username = config.get(RMQ_CONFIG_SECTION, 'user')
self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host,
port=self._port, virtual_host=self._vhost,
credentials=pika.credentials.PlainCredentials(self._username, self._password)))
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE)
result = self._channel.queue_declare(queue=QUEUE, durable=True)
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
print " Listening for messages..."
self._channel.basic_consume(MessageListener.msg_received_callback,
queue=QUEUE, no_ack=False)
try:
self._channel.start_consuming()
except KeyboardInterrupt:
self._channel.stop_consuming()
self._connection.close()
# consume callback function
@staticmethod
def msg_received_callback(channel, method_frame, header_frame, body):
print " - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key)
# Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
# to pass objects between the processes so that if the analyzer process crashes, it does not
# take down the rest of the daemon and we NACK that message so that it doesn't get
# propagated to other airtime_analyzer daemons (eg. running on other servers).
# We avoid cascading failure this way.
try:
MessageListener.spawn_analyzer_process(body)
except Exception:
#If ANY exception happens while processing a file, we're going to NACK to the
#messaging server and tell it to remove the message from the queue.
#(NACK is a negative acknowledgement. We could use ACK instead, but this might come
# in handy in the future.)
#Exceptions in this context are unexpected, unhandled errors. We try to recover
#from as many errors as possble in AnalyzerPipeline, but we're safeguarding ourselves
#here from any catastrophic or genuinely unexpected errors:
channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False,
requeue=False) #Important that it doesn't requeue the message
#TODO: Report this as a failed upload to the File Upload REST API.
#
#
else:
# ACK at the very end, after the message has been successfully processed.
# If we don't ack, then RabbitMQ will redeliver a message in the future.
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
# Anything else could happen here:
# Send an email alert, send an xmnp message, trigger another process, etc
@staticmethod
def spawn_analyzer_process(json_msg):
q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, args=(json_msg, q))
p.start()
p.join()
if p.exitcode == 0:
results = q.get()
print "Server received results: "
print results
else:
print "Analyzer process terminated unexpectedly."
raise AnalyzerException()

View file

@ -0,0 +1,95 @@
import mutagen
import magic # For MIME type detection
from analyzer import Analyzer
class MetadataAnalyzer(Analyzer):
def __init__(self):
pass
@staticmethod
def analyze(filename):
metadata = dict()
#Extract metadata from an audio file using mutagen
audio_file = mutagen.File(filename, easy=True)
#Grab other file information that isn't encoded in a tag, but instead usually
#in the file header. Mutagen breaks that out into a separate "info" object:
info = audio_file.info
metadata["sample_rate"] = info.sample_rate
metadata["length_seconds"] = info.length
metadata["bitrate"] = info.bitrate
#Use the python-magic module to get the MIME type.
mime_magic = magic.Magic(mime=True)
metadata["mime_type"] = mime_magic.from_file(filename)
#We normalize the mutagen tags slightly here, so in case mutagen changes,
#we find the
mutagen_to_analyzer_mapping = {
'title': 'title',
'artist': 'artist',
'album': 'album',
'bpm': 'bpm',
'composer': 'composer',
'conductor': 'conductor',
'copyright': 'copyright',
'encoded_by': 'encoder',
'genre': 'genre',
'isrc': 'isrc',
'label': 'label',
'language': 'language',
'last_modified':'last_modified',
'mood': 'mood',
'replay_gain': 'replaygain',
'track_number': 'tracknumber',
'track_total': 'tracktotal',
'website': 'website',
'year': 'year',
}
for mutagen_tag, analyzer_tag in mutagen_to_analyzer_mapping.iteritems():
try:
metadata[analyzer_tag] = audio_file[mutagen_tag]
# Some tags are returned as lists because there could be multiple values.
# This is unusual so we're going to always just take the first item in the list.
if isinstance(metadata[analyzer_tag], list):
metadata[analyzer_tag] = metadata[analyzer_tag][0]
except KeyError:
pass
return metadata
'''
For reference, the Airtime metadata fields are:
title
artist ("Creator" in Airtime)
album
bit rate
BPM
composer
conductor
copyright
cue in
cue out
encoded by
genre
ISRC
label
language
last modified
length
mime
mood
owner
replay gain
sample rate
track number
website
year
'''

View file

@ -0,0 +1,12 @@
from analyzer import Analyzer
''' TODO: everything '''
class ReplayGainAnalyzer(Analyzer):
def __init__(self):
pass
@staticmethod
def analyze(filename):
pass

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python
import daemon
import argparse
import airtime_analyzer as aa
VERSION = "1.0"
print "Airtime Analyzer " + VERSION
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
args = parser.parse_args()
if args.daemon:
with daemon.DaemonContext():
analyzer = aa.AirtimeAnalyzerServer()
else:
# Run without daemonizing
analyzer = aa.AirtimeAnalyzerServer()

View file

@ -0,0 +1,20 @@
from setuptools import setup
setup(name='airtime_analyzer',
version='0.1',
description='Airtime Analyzer Worker and File Importer',
url='http://github.com/sourcefabric/Airtime',
author='Albert Santoni',
author_email='albert.santoni@sourcefabric.org',
license='MIT',
packages=['airtime_analyzer'],
scripts=['bin/airtime_analyzer'],
install_requires=[
'mutagen',
'python-magic',
'pika',
'nose',
'python-daemon',
'requests',
],
zip_safe=False)

View file

@ -0,0 +1,12 @@
from nose.tools import *
import airtime_analyzer_queue
def setup():
print "SETUP!"
def teardown():
print "TEAR DOWN!"
def test_basic():
print "I RAN!"

View file

@ -0,0 +1,47 @@
<?
require_once('php-amqplib/amqp.inc');
//use PhpAmqpLibConnectionAMQPConnection;
//use PhpAmqpLibMessageAMQPMessage;
define('HOST', '127.0.0.1');
define('PORT', '5672');
define('USER', 'airtime');
define('PASS', 'QEFKX5GMKT4YNMOAL9R8');
define('VHOST', '/airtime');//'/airtime');
$exchange = "airtime-uploads";
$exchangeType = "topic";
$queue = "airtime-uploads";
$routingKey = ""; //"airtime.analyzer.tasks";
if ($argc <= 1)
{
echo("Usage: " . $argv[0] . " message\n");
exit();
}
$message = $argv[1];
$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
if (!isset($connection))
{
echo "Failed to connect to the RabbitMQ server.";
return;
}
$channel = $connection->channel();
// declare/create the queue
$channel->queue_declare($queue, false, true, false, false);
// declare/create the exchange as a topic exchange.
$channel->exchange_declare($exchange, $exchangeType, false, false, false);
$msg = new AMQPMessage($message, array("content_type" => "text/plain"));
$channel->basic_publish($msg, $exchange, $routingKey);
print "Sent $message ($routingKey)\n";
$channel->close();
$connection->close();

View file

@ -0,0 +1 @@
../../../airtime_mvc/library/php-amqplib