Make airtime_analyzer respect the storage_backend setting passed to it

by Airtime
This commit is contained in:
Albert Santoni 2015-02-20 18:21:49 -05:00
parent 909cbae5f4
commit c868136d26
3 changed files with 9 additions and 11 deletions

View File

@ -80,12 +80,12 @@ class Application_Model_RabbitMq
} }
public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
$callbackUrl, $apiKey, $currentStorageBackend) $callbackUrl, $apiKey, $storageBackend)
{ {
$exchange = 'airtime-uploads'; $exchange = 'airtime-uploads';
$data['tmp_file_path'] = $tmpFilePath; $data['tmp_file_path'] = $tmpFilePath;
$data['current_storage_backend'] = $currentStorageBackend; $data['storage_backend'] = $storageBackend;
$data['import_directory'] = $importedStorageDirectory; $data['import_directory'] = $importedStorageDirectory;
$data['original_filename'] = $originalFilename; $data['original_filename'] = $originalFilename;
$data['callback_url'] = $callbackUrl; $data['callback_url'] = $callbackUrl;

View File

@ -21,7 +21,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, cloud_storage_enabled): def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -34,7 +34,7 @@ class AnalyzerPipeline:
preserve. The file at audio_file_path typically has a preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want temporary randomly generated name, which is why we want
to know what the original name was. to know what the original name was.
cloud_storage_enabled: Whether to store the file in the cloud or on the local disk. storage_backend: String indicating the storage backend (amazon_s3 or file)
""" """
# It is super critical to initialize a separate log file here so that we # It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly # don't inherit logging/locks from the parent process. Supposedly
@ -62,7 +62,7 @@ class AnalyzerPipeline:
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
if cloud_storage_enabled: if storage_backend.lower() == "amazon_S3":
csu = CloudStorageUploader() csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata) metadata = csu.upload_obj(audio_file_path, metadata)
else: else:

View File

@ -165,8 +165,9 @@ class MessageListener:
audio_file_path = msg_dict["tmp_file_path"] audio_file_path = msg_dict["tmp_file_path"]
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
storage_backend = msg_dict["storage_backend"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -205,15 +206,12 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
csu = CloudStorageUploader()
cloud_storage_enabled = csu.enabled()
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, cloud_storage_enabled)) args=(q, audio_file_path, import_directory, original_filename, storage_backend))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: