From c868136d26b916c5b8fdeec3a187800262cbef47 Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Fri, 20 Feb 2015 18:21:49 -0500 Subject: [PATCH] Make airtime_analyzer respect the storage_backend setting passed to it by Airtime --- airtime_mvc/application/models/RabbitMq.php | 4 ++-- .../airtime_analyzer/analyzer_pipeline.py | 6 +++--- .../airtime_analyzer/message_listener.py | 10 ++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index 6918abc06..2ff57d2ed 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -80,12 +80,12 @@ class Application_Model_RabbitMq } public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, - $callbackUrl, $apiKey, $currentStorageBackend) + $callbackUrl, $apiKey, $storageBackend) { $exchange = 'airtime-uploads'; $data['tmp_file_path'] = $tmpFilePath; - $data['current_storage_backend'] = $currentStorageBackend; + $data['storage_backend'] = $storageBackend; $data['import_directory'] = $importedStorageDirectory; $data['original_filename'] = $originalFilename; $data['callback_url'] = $callbackUrl; diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index f3cf04180..f25994921 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -21,7 +21,7 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename, cloud_storage_enabled): + def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -34,7 +34,7 @@ class AnalyzerPipeline: preserve. The file at audio_file_path typically has a temporary randomly generated name, which is why we want to know what the original name was. - cloud_storage_enabled: Whether to store the file in the cloud or on the local disk. + storage_backend: String indicating the storage backend (amazon_s3 or file) """ # It is super critical to initialize a separate log file here so that we # don't inherit logging/locks from the parent process. Supposedly @@ -62,7 +62,7 @@ class AnalyzerPipeline: metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) - if cloud_storage_enabled: + if storage_backend.lower() == "amazon_S3": csu = CloudStorageUploader() metadata = csu.upload_obj(audio_file_path, metadata) else: diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 8ed5fa782..a304929dc 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -165,8 +165,9 @@ class MessageListener: audio_file_path = msg_dict["tmp_file_path"] import_directory = msg_dict["import_directory"] original_filename = msg_dict["original_filename"] + storage_backend = msg_dict["storage_backend"] - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -205,15 +206,12 @@ class MessageListener: channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod - def spawn_analyzer_process(audio_file_path, import_directory, original_filename): + def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend): ''' Spawn a child process to analyze and import a new audio file. ''' - csu = CloudStorageUploader() - cloud_storage_enabled = csu.enabled() - q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename, cloud_storage_enabled)) + args=(q, audio_file_path, import_directory, original_filename, storage_backend)) p.start() p.join() if p.exitcode == 0: