diff --git a/python_apps/airtime_analyzer/MANIFEST.in b/python_apps/airtime_analyzer/MANIFEST.in new file mode 100644 index 000000000..9561fb106 --- /dev/null +++ b/python_apps/airtime_analyzer/MANIFEST.in @@ -0,0 +1 @@ +include README.rst diff --git a/python_apps/airtime_analyzer/README.rst b/python_apps/airtime_analyzer/README.rst new file mode 100644 index 000000000..a7704a2a5 --- /dev/null +++ b/python_apps/airtime_analyzer/README.rst @@ -0,0 +1,30 @@ + +Ghetto temporary installation instructions + +set up a virtualenv +activate it +pip install mutagen python-magic pika + +You will need to allow the "airtime" RabbitMQ user to access the airtime-uploads exchange and queue: + + sudo rabbitmqctl set_permissions -p /airtime airtime airtime-uploads airtime-uploads airtime-uploads + + +Developers +========== + +For development, you want to install AAQ system-wide but with everything symlinked back to the source +directory (for convenience), so run: + + $ sudo python setup.py develop + + + +Unit Tests +========== + +To run the unit tests, execute: + + $ nosetests + + diff --git a/python_apps/airtime_analyzer/airtime_analyzer/__init__.py b/python_apps/airtime_analyzer/airtime_analyzer/__init__.py new file mode 100644 index 000000000..11fe8aa91 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/__init__.py @@ -0,0 +1,2 @@ +from airtime_analyzer import AirtimeAnalyzerServer + diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py new file mode 100644 index 000000000..eea4622b7 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -0,0 +1,41 @@ +import ConfigParser +from metadata_analyzer import MetadataAnalyzer +from replaygain_analyzer import ReplayGainAnalyzer +from message_listener import MessageListener + + +class AirtimeAnalyzerServer: + + _CONFIG_PATH = '/etc/airtime/airtime.conf' + + def __init__(self): + + # Read our config file + rabbitmq_config = self.read_config_file() + + # Start listening for RabbitMQ messages telling us about newly + # uploaded files. + self._msg_listener = MessageListener(rabbitmq_config) + + def read_config_file(self): + config = ConfigParser.SafeConfigParser() + config_path = AirtimeAnalyzerServer._CONFIG_PATH + try: + config.readfp(open(config_path)) + except IOError as e: + print "Failed to open config file at " + config_path + ": " + e.strerror + exit(-1) + except Exception: + print e.strerror + exit(-1) + + return config + + +''' When being run from the command line, analyze a file passed + as an argument. ''' +if __name__ == "__main__": + import sys + analyzers = AnalyzerPipeline() + + diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py new file mode 100644 index 000000000..de23a8e68 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py @@ -0,0 +1,10 @@ + +class Analyzer: + + @staticmethod + def analyze(filename): + raise NotImplementedError + +class AnalyzerError(Exception): + def __init__(self): + super.__init__(self) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py new file mode 100644 index 000000000..da3fbbc71 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -0,0 +1,17 @@ +from metadata_analyzer import MetadataAnalyzer + +class AnalyzerPipeline: + + def __init__(self): + pass + + #TODO: Take a JSON message and perform the necessary analysis. + #TODO: Comment the shit out of this + @staticmethod + def run_analysis(json_msg, queue): + # TODO: Pass the JSON along to each analyzer?? + #print MetadataAnalyzer.analyze("foo.mp3") + #print ReplayGainAnalyzer.analyze("foo.mp3") + #raise Exception("Test Crash") + queue.put(MetadataAnalyzer.analyze("foo.mp3")) + diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py new file mode 100644 index 000000000..e256fd7cc --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -0,0 +1,102 @@ +import sys +import pika +import multiprocessing +from analyzer_pipeline import AnalyzerPipeline + +EXCHANGE = "airtime-uploads" +EXCHANGE_TYPE = "topic" +ROUTING_KEY = "" #"airtime.analyzer.tasks" +QUEUE = "airtime-uploads" + + +''' TODO: Document me + - round robin messaging + - acking + - why we use the multiprocess architecture +''' +class MessageListener: + + def __init__(self, config): + + RMQ_CONFIG_SECTION = "rabbitmq" + if not config.has_section(RMQ_CONFIG_SECTION): + print "Error: rabbitmq section not found in config file at " + config_path + exit(-1) + + self._host = config.get(RMQ_CONFIG_SECTION, 'host') + self._port = config.getint(RMQ_CONFIG_SECTION, 'port') + self._username = config.get(RMQ_CONFIG_SECTION, 'user') + self._password = config.get(RMQ_CONFIG_SECTION, 'password') + self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') + + self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host, + port=self._port, virtual_host=self._vhost, + credentials=pika.credentials.PlainCredentials(self._username, self._password))) + self._channel = self._connection.channel() + self._channel.exchange_declare(exchange=EXCHANGE, type=EXCHANGE_TYPE) + result = self._channel.queue_declare(queue=QUEUE, durable=True) + + self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) + + print " Listening for messages..." + self._channel.basic_consume(MessageListener.msg_received_callback, + queue=QUEUE, no_ack=False) + + try: + self._channel.start_consuming() + except KeyboardInterrupt: + self._channel.stop_consuming() + + self._connection.close() + + # consume callback function + @staticmethod + def msg_received_callback(channel, method_frame, header_frame, body): + print " - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key) + + # Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue + # to pass objects between the processes so that if the analyzer process crashes, it does not + # take down the rest of the daemon and we NACK that message so that it doesn't get + # propagated to other airtime_analyzer daemons (eg. running on other servers). + # We avoid cascading failure this way. + try: + MessageListener.spawn_analyzer_process(body) + except Exception: + #If ANY exception happens while processing a file, we're going to NACK to the + #messaging server and tell it to remove the message from the queue. + #(NACK is a negative acknowledgement. We could use ACK instead, but this might come + # in handy in the future.) + #Exceptions in this context are unexpected, unhandled errors. We try to recover + #from as many errors as possble in AnalyzerPipeline, but we're safeguarding ourselves + #here from any catastrophic or genuinely unexpected errors: + channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False, + requeue=False) #Important that it doesn't requeue the message + + #TODO: Report this as a failed upload to the File Upload REST API. + # + # + + else: + # ACK at the very end, after the message has been successfully processed. + # If we don't ack, then RabbitMQ will redeliver a message in the future. + channel.basic_ack(delivery_tag=method_frame.delivery_tag) + + # Anything else could happen here: + # Send an email alert, send an xmnp message, trigger another process, etc + + @staticmethod + def spawn_analyzer_process(json_msg): + + q = multiprocessing.Queue() + p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, args=(json_msg, q)) + p.start() + p.join() + if p.exitcode == 0: + results = q.get() + print "Server received results: " + print results + else: + print "Analyzer process terminated unexpectedly." + raise AnalyzerException() + + diff --git a/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py new file mode 100644 index 000000000..af7d81745 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py @@ -0,0 +1,95 @@ +import mutagen +import magic # For MIME type detection +from analyzer import Analyzer + +class MetadataAnalyzer(Analyzer): + + def __init__(self): + pass + + @staticmethod + def analyze(filename): + + metadata = dict() + #Extract metadata from an audio file using mutagen + audio_file = mutagen.File(filename, easy=True) + + #Grab other file information that isn't encoded in a tag, but instead usually + #in the file header. Mutagen breaks that out into a separate "info" object: + info = audio_file.info + metadata["sample_rate"] = info.sample_rate + metadata["length_seconds"] = info.length + metadata["bitrate"] = info.bitrate + + #Use the python-magic module to get the MIME type. + mime_magic = magic.Magic(mime=True) + metadata["mime_type"] = mime_magic.from_file(filename) + + #We normalize the mutagen tags slightly here, so in case mutagen changes, + #we find the + mutagen_to_analyzer_mapping = { + 'title': 'title', + 'artist': 'artist', + 'album': 'album', + 'bpm': 'bpm', + 'composer': 'composer', + 'conductor': 'conductor', + 'copyright': 'copyright', + 'encoded_by': 'encoder', + 'genre': 'genre', + 'isrc': 'isrc', + 'label': 'label', + 'language': 'language', + 'last_modified':'last_modified', + 'mood': 'mood', + 'replay_gain': 'replaygain', + 'track_number': 'tracknumber', + 'track_total': 'tracktotal', + 'website': 'website', + 'year': 'year', + } + + for mutagen_tag, analyzer_tag in mutagen_to_analyzer_mapping.iteritems(): + try: + metadata[analyzer_tag] = audio_file[mutagen_tag] + + # Some tags are returned as lists because there could be multiple values. + # This is unusual so we're going to always just take the first item in the list. + if isinstance(metadata[analyzer_tag], list): + metadata[analyzer_tag] = metadata[analyzer_tag][0] + + except KeyError: + pass + + return metadata + + + +''' +For reference, the Airtime metadata fields are: + title + artist ("Creator" in Airtime) + album + bit rate + BPM + composer + conductor + copyright + cue in + cue out + encoded by + genre + ISRC + label + language + last modified + length + mime + mood + owner + replay gain + sample rate + track number + website + year +''' diff --git a/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py new file mode 100644 index 000000000..cf10c0a44 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py @@ -0,0 +1,12 @@ +from analyzer import Analyzer + +''' TODO: everything ''' +class ReplayGainAnalyzer(Analyzer): + + def __init__(self): + pass + + @staticmethod + def analyze(filename): + pass + diff --git a/python_apps/airtime_analyzer/bin/airtime_analyzer b/python_apps/airtime_analyzer/bin/airtime_analyzer new file mode 100755 index 000000000..3b163ec49 --- /dev/null +++ b/python_apps/airtime_analyzer/bin/airtime_analyzer @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +import daemon +import argparse +import airtime_analyzer as aa + +VERSION = "1.0" + +print "Airtime Analyzer " + VERSION + +parser = argparse.ArgumentParser() +parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") +args = parser.parse_args() + +if args.daemon: + with daemon.DaemonContext(): + analyzer = aa.AirtimeAnalyzerServer() +else: + # Run without daemonizing + analyzer = aa.AirtimeAnalyzerServer() + diff --git a/python_apps/airtime_analyzer/setup.py b/python_apps/airtime_analyzer/setup.py new file mode 100644 index 000000000..259696b8e --- /dev/null +++ b/python_apps/airtime_analyzer/setup.py @@ -0,0 +1,20 @@ +from setuptools import setup + +setup(name='airtime_analyzer', + version='0.1', + description='Airtime Analyzer Worker and File Importer', + url='http://github.com/sourcefabric/Airtime', + author='Albert Santoni', + author_email='albert.santoni@sourcefabric.org', + license='MIT', + packages=['airtime_analyzer'], + scripts=['bin/airtime_analyzer'], + install_requires=[ + 'mutagen', + 'python-magic', + 'pika', + 'nose', + 'python-daemon', + 'requests', + ], + zip_safe=False) diff --git a/python_apps/airtime_analyzer/tests/__init__.py b/python_apps/airtime_analyzer/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python_apps/airtime_analyzer/tests/airtime_analyzer_queue_tests.py b/python_apps/airtime_analyzer/tests/airtime_analyzer_queue_tests.py new file mode 100644 index 000000000..949033a5c --- /dev/null +++ b/python_apps/airtime_analyzer/tests/airtime_analyzer_queue_tests.py @@ -0,0 +1,12 @@ +from nose.tools import * +import airtime_analyzer_queue + +def setup(): + print "SETUP!" + +def teardown(): + print "TEAR DOWN!" + +def test_basic(): + print "I RAN!" + diff --git a/python_apps/airtime_analyzer/tools/message_sender.php b/python_apps/airtime_analyzer/tools/message_sender.php new file mode 100644 index 000000000..d5c9171c1 --- /dev/null +++ b/python_apps/airtime_analyzer/tools/message_sender.php @@ -0,0 +1,47 @@ +channel(); + +// declare/create the queue +$channel->queue_declare($queue, false, true, false, false); + +// declare/create the exchange as a topic exchange. +$channel->exchange_declare($exchange, $exchangeType, false, false, false); + +$msg = new AMQPMessage($message, array("content_type" => "text/plain")); + +$channel->basic_publish($msg, $exchange, $routingKey); +print "Sent $message ($routingKey)\n"; +$channel->close(); +$connection->close(); + diff --git a/python_apps/airtime_analyzer/tools/php-amqplib b/python_apps/airtime_analyzer/tools/php-amqplib new file mode 120000 index 000000000..68ef0b223 --- /dev/null +++ b/python_apps/airtime_analyzer/tools/php-amqplib @@ -0,0 +1 @@ +../../../airtime_mvc/library/php-amqplib \ No newline at end of file