From 95b369c54df86b92bd656f0b62ebb8e255581e8a Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Thu, 3 Apr 2014 16:13:26 -0400 Subject: [PATCH] CC-5709: Airtime Analyzer * Remove awful StoredFile::uploadFile() function * Massive airtime_analyzer commenting and cleanup * Cleaned up the upload code * Temporarily disabled the liquidsoap playability test. --- .../application/controllers/ApiController.php | 4 +- airtime_mvc/application/models/StoredFile.php | 104 +----------------- .../rest/controllers/MediaController.php | 17 ++- python_apps/airtime_analyzer/README.rst | 17 ++- .../airtime_analyzer/airtime_analyzer.py | 24 ++-- .../airtime_analyzer/analyzer.py | 5 +- .../airtime_analyzer/analyzer_pipeline.py | 33 ++++-- .../airtime_analyzer/filemover_analyzer.py | 16 ++- .../airtime_analyzer/message_listener.py | 89 +++++++++++---- .../airtime_analyzer/metadata_analyzer.py | 7 +- .../airtime_analyzer/replaygain_analyzer.py | 4 +- .../airtime_analyzer/status_reporter.py | 22 ++-- .../airtime_analyzer/bin/airtime_analyzer | 53 +++++---- 13 files changed, 204 insertions(+), 191 deletions(-) diff --git a/airtime_mvc/application/controllers/ApiController.php b/airtime_mvc/application/controllers/ApiController.php index 4ce7a5583..98ca2ce60 100644 --- a/airtime_mvc/application/controllers/ApiController.php +++ b/airtime_mvc/application/controllers/ApiController.php @@ -453,6 +453,8 @@ class ApiController extends Zend_Controller_Action public function uploadFileAction() { + Logging::error("FIXME: Change the show recorder to use the File Upload API and remove this function."); // Albert - April 3, 2014 + /** $upload_dir = ini_get("upload_tmp_dir"); $tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir); $tempFileName = basename($tempFilePath); @@ -464,7 +466,7 @@ class ApiController extends Zend_Controller_Action $this->_helper->json->sendJson( array("jsonrpc" => "2.0", "error" => array("code" => $result['code'], "message" => $result['message'])) ); - } + }*/ } public function uploadRecordedAction() diff --git a/airtime_mvc/application/models/StoredFile.php b/airtime_mvc/application/models/StoredFile.php index 23247522c..3199d1b1c 100644 --- a/airtime_mvc/application/models/StoredFile.php +++ b/airtime_mvc/application/models/StoredFile.php @@ -899,107 +899,6 @@ SQL; return $results; } - public static function uploadFile($p_targetDir) - { - // HTTP headers for no cache etc - header('Content-type: text/plain; charset=UTF-8'); - header("Expires: Mon, 26 Jul 1997 05:00:00 GMT"); - header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT"); - header("Cache-Control: no-store, no-cache, must-revalidate"); - header("Cache-Control: post-check=0, pre-check=0", false); - header("Pragma: no-cache"); - - // Settings - $cleanupTargetDir = false; // Remove old files - $maxFileAge = 60 * 60; // Temp file age in seconds - - // 5 minutes execution time - @set_time_limit(5 * 60); - // usleep(5000); - - // Get parameters - $chunk = isset($_REQUEST["chunk"]) ? $_REQUEST["chunk"] : 0; - $chunks = isset($_REQUEST["chunks"]) ? $_REQUEST["chunks"] : 0; - $fileName = isset($_REQUEST["name"]) ? $_REQUEST["name"] : ''; - # TODO : should not log __FILE__ itself. there is general logging for - # this - Logging::info(__FILE__.":uploadFile(): filename=$fileName to $p_targetDir"); - // Clean the fileName for security reasons - //this needs fixing for songs not in ascii. - //$fileName = preg_replace('/[^\w\._]+/', '', $fileName); - - // Create target dir - if (!file_exists($p_targetDir)) - @mkdir($p_targetDir); - - // Remove old temp files - if (is_dir($p_targetDir) && ($dir = opendir($p_targetDir))) { - while (($file = readdir($dir)) !== false) { - $filePath = $p_targetDir . DIRECTORY_SEPARATOR . $file; - - // Remove temp files if they are older than the max age - if (preg_match('/\.tmp$/', $file) && (filemtime($filePath) < time() - $maxFileAge)) - @unlink($filePath); - } - - closedir($dir); - } else - die('{"jsonrpc" : "2.0", "error" : {"code": 100, "message": _("Failed to open temp directory.")}, "id" : "id"}'); - - // Look for the content type header - if (isset($_SERVER["HTTP_CONTENT_TYPE"])) - $contentType = $_SERVER["HTTP_CONTENT_TYPE"]; - - if (isset($_SERVER["CONTENT_TYPE"])) - $contentType = $_SERVER["CONTENT_TYPE"]; - - // create temp file name (CC-3086) - // we are not using mktemp command anymore. - // plupload support unique_name feature. - $tempFilePath= $p_targetDir . DIRECTORY_SEPARATOR . $fileName; - - // Old IBM code... - if (strpos($contentType, "multipart") !== false) { - if (isset($_FILES['file']['tmp_name']) && is_uploaded_file($_FILES['file']['tmp_name'])) { - // Open temp file - $out = fopen($tempFilePath, $chunk == 0 ? "wb" : "ab"); - if ($out) { - // Read binary input stream and append it to temp file - $in = fopen($_FILES['file']['tmp_name'], "rb"); - - if ($in) { - while (($buff = fread($in, 4096))) - fwrite($out, $buff); - } else - die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": _("Failed to open input stream.")}, "id" : "id"}'); - - fclose($out); - unlink($_FILES['file']['tmp_name']); - } else - die('{"jsonrpc" : "2.0", "error" : {"code": 102, "message": _("Failed to open output stream.")}, "id" : "id"}'); - } else - die('{"jsonrpc" : "2.0", "error" : {"code": 103, "message": _("Failed to move uploaded file.")}, "id" : "id"}'); - } else { - // Open temp file - $out = fopen($tempFilePath, $chunk == 0 ? "wb" : "ab"); - if ($out) { - // Read binary input stream and append it to temp file - $in = fopen("php://input", "rb"); - - if ($in) { - while (($buff = fread($in, 4096))) - fwrite($out, $buff); - } else - die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": _("Failed to open input stream.")}, "id" : "id"}'); - - fclose($out); - } else - die('{"jsonrpc" : "2.0", "error" : {"code": 102, "message": _("Failed to open output stream.")}, "id" : "id"}'); - } - - return $tempFilePath; - } - /** * Check, using disk_free_space, the space available in the $destination_folder folder to see if it has * enough space to move the $audio_file into and report back to the user if not. @@ -1065,12 +964,13 @@ SQL; // Check if liquidsoap can play this file // TODO: Move this to airtime_analyzer + /* if (!self::liquidsoapFilePlayabilityTest($audio_file)) { return array( "code" => 110, "message" => _("This file appears to be corrupted and will not " ."be added to media library.")); - } + }*/ // Did all the checks for real, now trying to copy diff --git a/airtime_mvc/application/modules/rest/controllers/MediaController.php b/airtime_mvc/application/modules/rest/controllers/MediaController.php index 230144ee6..812d00bcc 100644 --- a/airtime_mvc/application/modules/rest/controllers/MediaController.php +++ b/airtime_mvc/application/modules/rest/controllers/MediaController.php @@ -333,22 +333,19 @@ class Rest_MediaController extends Zend_Rest_Controller $CC_CONFIG = Config::getConfig(); $apiKey = $CC_CONFIG["apiKey"][0]; - $upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; - $tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir); + //$upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; + //$tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir); + + $tempFilePath = $_FILES['file']['tmp_name']; $tempFileName = basename($tempFilePath); //TODO: Remove copyFileToStor from StoredFile... //TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon? - - $upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload"; - $tempFilePath = $upload_dir . "/" . $tempFileName; - + $storDir = Application_Model_MusicDir::getStorDir(); - //$finalFullFilePath = $storDir->getDirectory() . "/imported/" . $ownerId . "/" . $originalFilename; $importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId; - try { //Copy the temporary file over to the "organize" folder so that it's off our webserver //and accessible by airtime_analyzer which could be running on a different machine. @@ -357,8 +354,8 @@ class Rest_MediaController extends Zend_Rest_Controller Logging::error($e->getMessage()); } - //Logging::info("New temporary file path: " . $newTempFilePath); - //Logging::info("Final file path: " . $finalFullFilePath); + Logging::info($newTempFilePath); + //Logging::info("Old temp file path: " . $tempFilePath); //Dispatch a message to airtime_analyzer through RabbitMQ, //notifying it that there's a new upload to process! diff --git a/python_apps/airtime_analyzer/README.rst b/python_apps/airtime_analyzer/README.rst index e6a5a08b9..41881992e 100644 --- a/python_apps/airtime_analyzer/README.rst +++ b/python_apps/airtime_analyzer/README.rst @@ -1,5 +1,14 @@ +airtime_analyzer +========== -Ghetto temporary installation instructions +airtime_analyzer is a daemon that processes Airtime file uploads as background jobs. +It performs metadata extraction using Mutagen and moves uploads into Airtime's +music library directory (stor/imported). + +airtime_analyzer uses process isolation to make it resilient to crashes and runs in +a multi-tenant environment with no modifications. + +Installation ========== $ sudo python setup.py install @@ -71,3 +80,9 @@ a test, run: To run the unit tests and generate a code coverage report, run: $ nosetests --with-coverage --cover-package=airtime_analyzer + + + History and Design Motivation + =========== + + \ No newline at end of file diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index 66933f55a..a87fc2b3d 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -1,3 +1,5 @@ +"""Contains the main application class for airtime_analyzer. +""" import ConfigParser import logging import logging.handlers @@ -8,6 +10,8 @@ from message_listener import MessageListener class AirtimeAnalyzerServer: + """A server for importing uploads to Airtime as background jobs. + """ # Constants _LOG_PATH = "/var/log/airtime/airtime_analyzer.log" @@ -29,7 +33,12 @@ class AirtimeAnalyzerServer: def setup_logging(self, debug): - + """Set up nicely formatted logging and log rotation. + + Keyword arguments: + debug -- a boolean indicating whether to enable super verbose logging + to the screen and disk. + """ if debug: self._log_level = logging.DEBUG else: @@ -37,8 +46,6 @@ class AirtimeAnalyzerServer: pika_logger = logging.getLogger('pika') pika_logger.setLevel(logging.CRITICAL) - #self.log = logging.getLogger(__name__) - # Set up logging logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s") rootLogger = logging.getLogger() @@ -55,6 +62,7 @@ class AirtimeAnalyzerServer: def read_config_file(self, config_path): + """Parse the application's config file located at config_path.""" config = ConfigParser.SafeConfigParser() try: config.readfp(open(config_path)) @@ -66,12 +74,4 @@ class AirtimeAnalyzerServer: exit(-1) return config - - -''' When being run from the command line, analyze a file passed - as an argument. ''' -if __name__ == "__main__": - import sys - analyzers = AnalyzerPipeline() - - + \ No newline at end of file diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py index b47ebe0ab..4a58aa75d 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer.py @@ -1,12 +1,13 @@ class Analyzer: - + """ Abstract base class fpr all "analyzers". + """ @staticmethod def analyze(filename, metadata): raise NotImplementedError ''' -class AnalyzerError(Exception): +class AnalyzerError(Error): def __init__(self): super.__init__(self) ''' diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index 2266fb9a2..0ba5fb22e 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -1,14 +1,35 @@ +""" Analyzes and imports an audio file into the Airtime library. +""" import logging import multiprocessing from metadata_analyzer import MetadataAnalyzer from filemover_analyzer import FileMoverAnalyzer class AnalyzerPipeline: - - # Take message dictionary and perform the necessary analysis. + """ Analyzes and imports an audio file into the Airtime library. + + This currently performs metadata extraction (eg. gets the ID3 tags from an MP3), + then moves the file to the Airtime music library (stor/imported), and returns + the results back to the parent process. This class is used in an isolated process + so that if it crashes, it does not kill the entire airtime_analyzer daemon and + the failure to import can be reported back to the web application. + """ + @staticmethod def run_analysis(queue, audio_file_path, import_directory, original_filename): - + """Analyze and import an audio file, and put all extracted metadata into queue. + + Keyword arguments: + queue: A multiprocessing.queues.Queue which will be used to pass the + extracted metadata back to the parent process. + audio_file_path: Path on disk to the audio file to analyze. + import_directory: Path to the final Airtime "import" directory where + we will move the file. + original_filename: The original filename of the file, which we'll try to + preserve. The file at audio_file_path typically has a + temporary randomly generated name, which is why we want + to know what the original name was. + """ if not isinstance(queue, multiprocessing.queues.Queue): raise TypeError("queue must be a multiprocessing.Queue()") if not isinstance(audio_file_path, unicode): @@ -18,8 +39,6 @@ class AnalyzerPipeline: if not isinstance(original_filename, unicode): raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.") - #print ReplayGainAnalyzer.analyze("foo.mp3") - # Analyze the audio file we were told to analyze: # First, we extract the ID3 tags and other metadata: metadata = dict() @@ -30,8 +49,8 @@ class AnalyzerPipeline: # Note that the queue we're putting the results into is our interprocess communication # back to the main process. - #Pass all the file metadata back to the main analyzer process, which then passes - #it back to the Airtime web application. + # Pass all the file metadata back to the main analyzer process, which then passes + # it back to the Airtime web application. queue.put(metadata) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py index ab12aad79..b0d94ee79 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py @@ -9,13 +9,26 @@ import uuid from analyzer import Analyzer class FileMoverAnalyzer(Analyzer): - + """This analyzer copies a file over from a temporary directory (stor/organize) + into the Airtime library (stor/imported). + """ @staticmethod def analyze(audio_file_path, metadata): + """Dummy method because we need more info than analyze gets passed to it""" raise Exception("Use FileMoverAnalyzer.move() instead.") @staticmethod def move(audio_file_path, import_directory, original_filename, metadata): + """Move the file at audio_file_path over into the import_directory/import, + renaming it to original_filename. + + Keyword arguments: + audio_file_path: Path to the file to be imported. + import_directory: Path to the "import" directory inside the Airtime stor directory. + (eg. /srv/airtime/stor/import) + original_filename: The filename of the file when it was uploaded to Airtime. + metadata: A dictionary where the "full_path" of where the file is moved to will be added. + """ if not isinstance(audio_file_path, unicode): raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__) if not isinstance(import_directory, unicode): @@ -68,6 +81,7 @@ class FileMoverAnalyzer(Analyzer): return metadata def mkdir_p(path): + """ Make all directories in a tree (like mkdir -p)""" if path == "": return try: diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 41d6f7bd2..0e33dbd3a 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -9,19 +9,56 @@ from status_reporter import StatusReporter EXCHANGE = "airtime-uploads" EXCHANGE_TYPE = "topic" -ROUTING_KEY = "" #"airtime.analyzer.tasks" +ROUTING_KEY = "" QUEUE = "airtime-uploads" -''' TODO: Document me - - round robin messaging - - acking - - why we use the multiprocess architecture - - in general, how it works and why it works this way -''' +""" A message listener class that waits for messages from Airtime through RabbitMQ + notifying us about new uploads. + + This is probably the most important class in this application. It connects + to RabbitMQ (or an AMQP server) and listens for messages that notify us + when a user uploads a new file to Airtime, either through the web interface + or via FTP (on Airtime Pro). When we get a notification, we spawn a child + process that extracts the uploaded audio file's metadata and moves it into + Airtime's music library directory. Lastly, the extracted metadata is + reported back to the Airtime web application. + + There's a couple of Very Important technical details and contraints that you + need to know if you're going to work on this code: + + 1) airtime_analyzer is designed so it doesn't have to run on the same + computer as the web server. It just needs access to your Airtime library + folder (stor). + 2) airtime_analyzer is multi-tenant - One process can be used for many + Airtime instances. It's designed NOT to know about whether it's running + in a single tenant or multi-tenant environment. All the information it + needs to import a file into an Airtime instance is passed in via those + RabbitMQ messages. + 3) We're using a "topic exchange" for the new upload notification RabbitMQ + messages. This means if we run several airtime_analyzer processes on + different computers, RabbitMQ will do round-robin dispatching of the + file notification. This is cheap, easy load balancing and + redundancy for us. You can even run multiple airtime_analyzer processes + on one machine if you want. + 4) We run the actual work (metadata analysis and file moving) in a separate + child process so that if it crashes, we can stop RabbitMQ from resending + the file notification message to another airtime_analyzer process (NACK), + which would otherwise cause cascading failure. We also do this so that we + can report the problem file to the Airtime web interface ("import failed"). + + So that is a quick overview of the design constraints for this application, and + why airtime_analyzer is written this way. +""" class MessageListener: def __init__(self, config): + ''' Start listening for file upload notification messages + from RabbitMQ + + Keyword arguments: + config: A ConfigParser object containing the [rabbitmq] configuration. + ''' # Read the RabbitMQ connection settings from the config file # The exceptions throw here by default give good error messages. @@ -49,7 +86,7 @@ class MessageListener: def connect_to_messaging_server(self): - + '''Connect to the RabbitMQ server and start listening for messages.''' self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host, port=self._port, virtual_host=self._vhost, credentials=pika.credentials.PlainCredentials(self._username, self._password))) @@ -64,15 +101,21 @@ class MessageListener: queue=QUEUE, no_ack=False) def wait_for_messages(self): + '''Wait until we've received a RabbitMQ message.''' self._channel.start_consuming() def disconnect_from_messaging_server(self): + '''Stop consuming RabbitMQ messages and disconnect''' self._channel.stop_consuming() - # consume callback function @staticmethod def msg_received_callback(channel, method_frame, header_frame, body): + ''' A callback method that runs when a RabbitMQ message is received. + + Here we parse the message, spin up an analyzer process, and report the + metadata back to the Airtime web application (or report an error). + ''' logging.info(" - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key)) #Declare all variables here so they exist in the exception handlers below, no matter what. @@ -83,11 +126,12 @@ class MessageListener: callback_url = "" api_key = "" - # Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue - # to pass objects between the processes so that if the analyzer process crashes, it does not - # take down the rest of the daemon and we NACK that message so that it doesn't get - # propagated to other airtime_analyzer daemons (eg. running on other servers). - # We avoid cascading failure this way. + ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue + to pass objects between the processes so that if the analyzer process crashes, it does not + take down the rest of the daemon and we NACK that message so that it doesn't get + propagated to other airtime_analyzer daemons (eg. running on other servers). + We avoid cascading failure this way. + ''' try: msg_dict = json.loads(body) audio_file_path = msg_dict["tmp_file_path"] @@ -109,13 +153,14 @@ class MessageListener: except Exception as e: logging.exception(e) - #If ANY exception happens while processing a file, we're going to NACK to the - #messaging server and tell it to remove the message from the queue. - #(NACK is a negative acknowledgement. We could use ACK instead, but this might come - # in handy in the future.) - #Exceptions in this context are unexpected, unhandled errors. We try to recover - #from as many errors as possble in AnalyzerPipeline, but we're safeguarding ourselves - #here from any catastrophic or genuinely unexpected errors: + ''' If ANY exception happens while processing a file, we're going to NACK to the + messaging server and tell it to remove the message from the queue. + (NACK is a negative acknowledgement. We could use ACK instead, but this might come + in handy in the future.) + Exceptions in this context are unexpected, unhandled errors. We try to recover + from as many errors as possble in AnalyzerPipeline, but we're safeguarding ourselves + here from any catastrophic or genuinely unexpected errors: + ''' channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False) #Important that it doesn't requeue the message @@ -136,7 +181,7 @@ class MessageListener: @staticmethod def spawn_analyzer_process(audio_file_path, import_directory, original_filename): - + ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, args=(q, audio_file_path, import_directory, original_filename)) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py index 1fa269051..471d6b592 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/metadata_analyzer.py @@ -8,6 +8,12 @@ class MetadataAnalyzer(Analyzer): @staticmethod def analyze(filename, metadata): + ''' Extract audio metadata from tags embedded in the file (eg. ID3 tags) + + Keyword arguments: + filename: The path to the audio file to extract metadata from. + metadata: A dictionary that the extracted metadata will be added to. + ''' if not isinstance(filename, unicode): raise TypeError("filename must be unicode. Was of type " + type(filename).__name__) if not isinstance(metadata, dict): @@ -25,7 +31,6 @@ class MetadataAnalyzer(Analyzer): track_length = datetime.timedelta(seconds=info.length) metadata["length"] = str(track_length) #time.strftime("%H:%M:%S.%f", track_length) metadata["bit_rate"] = info.bitrate - #metadata["channels"] = info.channels #Use the python-magic module to get the MIME type. mime_magic = magic.Magic(mime=True) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py index cf10c0a44..2d02518f2 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/replaygain_analyzer.py @@ -1,6 +1,8 @@ from analyzer import Analyzer -''' TODO: everything ''' +''' TODO: ReplayGain is currently calculated by pypo but it should + be done here in the analyzer. +''' class ReplayGainAnalyzer(Analyzer): def __init__(self): diff --git a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py index 2a0963dd9..4e1dccf2c 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/status_reporter.py @@ -3,15 +3,18 @@ import json import logging class StatusReporter(): - + ''' Reports the extracted audio file metadata and job status back to the + Airtime web application. + ''' _HTTP_REQUEST_TIMEOUT = 30 - # Report the extracted metadata and status of the successfully imported file - # to the callback URL (which should be the Airtime File Upload API) @classmethod def report_success_to_callback_url(self, callback_url, api_key, audio_metadata): - - # encode the audio metadata as json and post it back to the callback_url + ''' Report the extracted metadata and status of the successfully imported file + to the callback URL (which should be the Airtime File Upload API) + ''' + + # Encode the audio metadata as json and post it back to the callback_url put_payload = json.dumps(audio_metadata) logging.debug("sending http put with payload: " + put_payload) r = requests.put(callback_url, data=put_payload, @@ -20,13 +23,14 @@ class StatusReporter(): logging.debug("HTTP request returned status: " + str(r.status_code)) logging.debug(r.text) # log the response body - #todo: queue up failed requests and try them again later. - r.raise_for_status() # raise an exception if there was an http error code returned + #TODO: queue up failed requests and try them again later. + r.raise_for_status() # Raise an exception if there was an http error code returned @classmethod def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason): - # TODO: Make import_status is an int? - + if not isinstance(import_status, (int, long) ): + raise TypeError("import_status must be an integer. Was of type " + type(import_status).__name__) + logging.debug("Reporting import failure to Airtime REST API...") audio_metadata = dict() audio_metadata["import_status"] = import_status diff --git a/python_apps/airtime_analyzer/bin/airtime_analyzer b/python_apps/airtime_analyzer/bin/airtime_analyzer index 85266e9b0..8b45e775c 100755 --- a/python_apps/airtime_analyzer/bin/airtime_analyzer +++ b/python_apps/airtime_analyzer/bin/airtime_analyzer @@ -1,3 +1,5 @@ +"""Runs the airtime_analyzer application. +""" #!/usr/bin/env python import daemon @@ -8,19 +10,37 @@ import airtime_analyzer.airtime_analyzer as aa VERSION = "1.0" DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf' -print "Airtime Analyzer " + VERSION +def run(): + '''Entry-point for this application''' + print "Airtime Analyzer " + VERSION + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") + parser.add_argument("--debug", help="log full debugging output", action="store_true") + parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is /etc/airtime/airtime.conf)") + args = parser.parse_args() + + check_if_media_monitor_is_running() + + #Default config file path + config_path = DEFAULT_CONFIG_PATH + if args.rmq_config_file: + config_path = args.rmq_config_file + + if args.daemon: + with daemon.DaemonContext(): + aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) + else: + # Run without daemonizing + aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) -parser = argparse.ArgumentParser() -parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") -parser.add_argument("--debug", help="log full debugging output", action="store_true") -parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is /etc/airtime/airtime.conf)") -args = parser.parse_args() -'''Ensure media_monitor isn't running before we start, because it'll move newly uploaded - files into the library on us and screw up the operation of airtime_analyzer. - media_monitor is deprecated. -''' def check_if_media_monitor_is_running(): + """Ensure media_monitor isn't running before we start. + + We do this because media_monitor will move newly uploaded + files into the library on us and screw up the operation of airtime_analyzer. + media_monitor is deprecated. + """ pids = [pid for pid in os.listdir('/proc') if pid.isdigit()] for pid in pids: @@ -33,17 +53,6 @@ def check_if_media_monitor_is_running(): except IOError: # proc has already terminated continue -check_if_media_monitor_is_running() +run() -#Default config file path -config_path = DEFAULT_CONFIG_PATH -if args.rmq_config_file: - config_path = args.rmq_config_file - -if args.daemon: - with daemon.DaemonContext(): - analyzer = aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug) -else: - # Run without daemonizing - analyzer = aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug)