diff --git a/python_apps/airtime_analyzer/README.rst b/python_apps/airtime_analyzer/README.rst index a7704a2a5..00067d643 100644 --- a/python_apps/airtime_analyzer/README.rst +++ b/python_apps/airtime_analyzer/README.rst @@ -1,23 +1,49 @@ Ghetto temporary installation instructions +========== -set up a virtualenv -activate it -pip install mutagen python-magic pika + $ sudo python setup.py install -You will need to allow the "airtime" RabbitMQ user to access the airtime-uploads exchange and queue: +You will need to allow the "airtime" RabbitMQ user to access all exchanges and queues within the /airtime vhost: + + sudo rabbitmqctl set_permissions -p /airtime airtime .* .* .* + + +Usage +========== + +To print usage instructions, run: + + $ airtime_analyzer --help + +This application can be run as a daemon by running: + + $ airtime_analyzer -d - sudo rabbitmqctl set_permissions -p /airtime airtime airtime-uploads airtime-uploads airtime-uploads Developers ========== -For development, you want to install AAQ system-wide but with everything symlinked back to the source -directory (for convenience), so run: +For development, you want to install airtime_analyzer system-wide but with everything symlinked back to the source +directory for convenience. This is super easy to do, just run: $ sudo python setup.py develop +To send an test message to airtime_analyzer, you can use the message_sender.php script in the tools directory. +For example, run: + + $ php tools/message_sender.php '{ "tmp_file_path" : "foo.mp3" }' + + +Logging +========= + +By default, logs are saved to: + + /var/log/airtime/airtime_analyzer.log + +This application takes care of rotating logs for you. Unit Tests diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index eea4622b7..1bc2e0534 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -1,4 +1,7 @@ import ConfigParser +import logging +import logging.handlers +import sys from metadata_analyzer import MetadataAnalyzer from replaygain_analyzer import ReplayGainAnalyzer from message_listener import MessageListener @@ -6,16 +9,46 @@ from message_listener import MessageListener class AirtimeAnalyzerServer: + # Constants _CONFIG_PATH = '/etc/airtime/airtime.conf' + _LOG_PATH = "/var/log/airtime/airtime_analyzer.log" + + # Variables + _log_level = logging.INFO + + def __init__(self, debug=False): + + # Configure logging + self.setup_logging(debug) - def __init__(self): - # Read our config file rabbitmq_config = self.read_config_file() # Start listening for RabbitMQ messages telling us about newly # uploaded files. self._msg_listener = MessageListener(rabbitmq_config) + + + def setup_logging(self, debug): + + if debug: + self._log_level = logging.DEBUG + #self.log = logging.getLogger(__name__) + + # Set up logging + logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s") + rootLogger = logging.getLogger() + rootLogger.setLevel(self._log_level) + + fileHandler = logging.handlers.RotatingFileHandler(filename=self._LOG_PATH, maxBytes=1024*1024*30, + backupCount=8) + fileHandler.setFormatter(logFormatter) + rootLogger.addHandler(fileHandler) + + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(logFormatter) + rootLogger.addHandler(consoleHandler) + def read_config_file(self): config = ConfigParser.SafeConfigParser() diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index da3fbbc71..94700913b 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -1,17 +1,31 @@ +import logging +import multiprocessing from metadata_analyzer import MetadataAnalyzer class AnalyzerPipeline: + # Constructor def __init__(self): pass - #TODO: Take a JSON message and perform the necessary analysis. - #TODO: Comment the shit out of this + # Take message dictionary and perform the necessary analysis. @staticmethod - def run_analysis(json_msg, queue): - # TODO: Pass the JSON along to each analyzer?? - #print MetadataAnalyzer.analyze("foo.mp3") - #print ReplayGainAnalyzer.analyze("foo.mp3") - #raise Exception("Test Crash") - queue.put(MetadataAnalyzer.analyze("foo.mp3")) + def run_analysis(queue, audio_file_path, final_directory): + + if not isinstance(queue, multiprocessing.queues.Queue): + raise TypeError("queue must be a multiprocessing.Queue()") + if not isinstance(audio_file_path, unicode): + raise TypeError("audio_file_path must be a string. Was of type " + type(audio_file_path).__name__ + " instead.") + if not isinstance(final_directory, unicode): + raise TypeError("final_directory must be a string. Was of type " + type(final_directory).__name__ + " instead.") + + + # Analyze the audio file we were told to analyze: + # First, we extract the ID3 tags and other metadata: + queue.put(MetadataAnalyzer.analyze(audio_file_path)) + + # Note that the queue we're putting the results into is our interprocess communication + # back to the main process. + + #print ReplayGainAnalyzer.analyze("foo.mp3") diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index e256fd7cc..13209dc7d 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -1,7 +1,11 @@ import sys import pika +import json +import time +import logging import multiprocessing from analyzer_pipeline import AnalyzerPipeline +from status_reporter import StatusReporter EXCHANGE = "airtime-uploads" EXCHANGE_TYPE = "topic" @@ -13,22 +17,39 @@ QUEUE = "airtime-uploads" - round robin messaging - acking - why we use the multiprocess architecture + - in general, how it works and why it works this way ''' class MessageListener: def __init__(self, config): + # Read the RabbitMQ connection settings from the config file + # The exceptions throw here by default give good error messages. RMQ_CONFIG_SECTION = "rabbitmq" - if not config.has_section(RMQ_CONFIG_SECTION): - print "Error: rabbitmq section not found in config file at " + config_path - exit(-1) - self._host = config.get(RMQ_CONFIG_SECTION, 'host') self._port = config.getint(RMQ_CONFIG_SECTION, 'port') self._username = config.get(RMQ_CONFIG_SECTION, 'user') self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') + while True: + try: + self.connect_to_messaging_server() + self.wait_for_messages() + except KeyboardInterrupt: + self.disconnect_from_messaging_server() + break + except pika.exceptions.AMQPError as e: + logging.error("Connection to message queue failed. ") + logging.error(e) + logging.info("Retrying in 5 seconds...") + time.sleep(5) + + self._connection.close() + + + def connect_to_messaging_server(self): + self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host, port=self._port, virtual_host=self._vhost, credentials=pika.credentials.PlainCredentials(self._username, self._password))) @@ -38,21 +59,21 @@ class MessageListener: self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) - print " Listening for messages..." + logging.info(" Listening for messages...") self._channel.basic_consume(MessageListener.msg_received_callback, queue=QUEUE, no_ack=False) - try: - self._channel.start_consuming() - except KeyboardInterrupt: - self._channel.stop_consuming() + def wait_for_messages(self): + self._channel.start_consuming() + + def disconnect_from_messaging_server(self): + self._channel.stop_consuming() - self._connection.close() # consume callback function @staticmethod def msg_received_callback(channel, method_frame, header_frame, body): - print " - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key) + logging.info(" - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key)) # Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue # to pass objects between the processes so that if the analyzer process crashes, it does not @@ -60,8 +81,22 @@ class MessageListener: # propagated to other airtime_analyzer daemons (eg. running on other servers). # We avoid cascading failure this way. try: - MessageListener.spawn_analyzer_process(body) - except Exception: + msg_dict = json.loads(body) + audio_file_path = msg_dict["tmp_file_path"] + final_directory = msg_dict["final_directory"] + callback_url = msg_dict["callback_url"] + api_key = msg_dict["api_key"] + + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, final_directory) + StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) + + except KeyError as e: + logging.exception("A mandatory airtime_analyzer message field was missing from the message.") + # See the huge comment about NACK below. + channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False, + requeue=False) #Important that it doesn't requeue the message + + except Exception as e: #If ANY exception happens while processing a file, we're going to NACK to the #messaging server and tell it to remove the message from the queue. #(NACK is a negative acknowledgement. We could use ACK instead, but this might come @@ -72,31 +107,32 @@ class MessageListener: channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False) #Important that it doesn't requeue the message - #TODO: Report this as a failed upload to the File Upload REST API. - # + # TODO: Report this as a failed upload to the File Upload REST API. # + # TODO: If the JSON was invalid, then don't report to the REST API + + StatusReporter.report_failure_to_callback_url(callback_url, api_key, error_status=1, + reason=u'An error occurred while importing this file') + + logging.error(e) else: # ACK at the very end, after the message has been successfully processed. - # If we don't ack, then RabbitMQ will redeliver a message in the future. + # If we don't ack, then RabbitMQ will redeliver the message in the future. channel.basic_ack(delivery_tag=method_frame.delivery_tag) - - # Anything else could happen here: - # Send an email alert, send an xmnp message, trigger another process, etc @staticmethod - def spawn_analyzer_process(json_msg): + def spawn_analyzer_process(audio_file_path, final_directory): q = multiprocessing.Queue() - p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, args=(json_msg, q)) + p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, + args=(q, audio_file_path, final_directory)) p.start() p.join() if p.exitcode == 0: results = q.get() - print "Server received results: " - print results + logging.info("Main process received results from child: ") + logging.info(results) else: - print "Analyzer process terminated unexpectedly." - raise AnalyzerException() - + raise Exception("Analyzer process terminated unexpectedly.") diff --git a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py new file mode 100644 index 000000000..c8664df08 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py @@ -0,0 +1,29 @@ +import requests +import json +import logging + +class StatusReporter(): + + _HTTP_REQUEST_TIMEOUT = 30 + + # Report the extracted metadata and status of the successfully imported file + # to the callback URL (which should be the Airtime File Upload API) + @classmethod + def report_success_to_callback_url(self, callback_url, api_key, audio_metadata): + + # Encode the audio metadata as JSON and post it back to the callback_url + post_payload = json.dumps(audio_metadata) + r = requests.put(callback_url, data=post_payload, + auth=requests.auth.HTTPBasicAuth(api_key, ''), + timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) + logging.debug("HTTP request returned status: " + str(r.status_code)) + logging.debug(r.text) # Log the response body + r.raise_for_status() # Raise an exception if there was an HTTP error code returned + + #TODO: Queue up failed requests and try them again later. + + @classmethod + def report_failure_to_callback_url(self, callback_url, api_key, error_status, reason): + # TODO: Make error_status is an int? + pass + diff --git a/python_apps/airtime_analyzer/bin/airtime_analyzer b/python_apps/airtime_analyzer/bin/airtime_analyzer index 3b163ec49..eb1902c2c 100755 --- a/python_apps/airtime_analyzer/bin/airtime_analyzer +++ b/python_apps/airtime_analyzer/bin/airtime_analyzer @@ -10,12 +10,13 @@ print "Airtime Analyzer " + VERSION parser = argparse.ArgumentParser() parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") +parser.add_argument("--debug", help="log full debugging output", action="store_true") args = parser.parse_args() if args.daemon: with daemon.DaemonContext(): - analyzer = aa.AirtimeAnalyzerServer() + analyzer = aa.AirtimeAnalyzerServer(debug=args.debug) else: # Run without daemonizing - analyzer = aa.AirtimeAnalyzerServer() + analyzer = aa.AirtimeAnalyzerServer(debug=args.debug) diff --git a/python_apps/airtime_analyzer/tests/airtime_analyzer_queue_tests.py b/python_apps/airtime_analyzer/tests/airtime_analyzer_tests.py similarity index 82% rename from python_apps/airtime_analyzer/tests/airtime_analyzer_queue_tests.py rename to python_apps/airtime_analyzer/tests/airtime_analyzer_tests.py index 949033a5c..f89336589 100644 --- a/python_apps/airtime_analyzer/tests/airtime_analyzer_queue_tests.py +++ b/python_apps/airtime_analyzer/tests/airtime_analyzer_tests.py @@ -1,5 +1,5 @@ from nose.tools import * -import airtime_analyzer_queue +import airtime_analyzer def setup(): print "SETUP!"