Merge branch 'cc-5709-airtime-analyzer-refactor' into saas-media-refactor

Conflicts:
	airtime_mvc/application/models/RabbitMq.php
	python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py
	python_apps/airtime_analyzer/airtime_analyzer/message_listener.py
This commit is contained in:
Albert Santoni 2015-02-20 18:25:38 -05:00
commit 51aa846a4c
3 changed files with 10 additions and 9 deletions

View File

@ -80,7 +80,7 @@ class Application_Model_RabbitMq
} }
public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
$callbackUrl, $apiKey, $currentStorageBackend, $filePrefix) $callbackUrl, $apiKey, $storageBackend, $filePrefix)
{ {
//Hack for Airtime Pro. The RabbitMQ settings for communicating with airtime_analyzer are global //Hack for Airtime Pro. The RabbitMQ settings for communicating with airtime_analyzer are global
//and shared between all instances on Airtime Pro. //and shared between all instances on Airtime Pro.
@ -107,7 +107,7 @@ class Application_Model_RabbitMq
$queue = 'airtime-uploads'; $queue = 'airtime-uploads';
$autoDeleteExchange = false; $autoDeleteExchange = false;
$data['tmp_file_path'] = $tmpFilePath; $data['tmp_file_path'] = $tmpFilePath;
$data['current_storage_backend'] = $currentStorageBackend; $data['storage_backend'] = $storageBackend;
$data['import_directory'] = $importedStorageDirectory; $data['import_directory'] = $importedStorageDirectory;
$data['original_filename'] = $originalFilename; $data['original_filename'] = $originalFilename;
$data['callback_url'] = $callbackUrl; $data['callback_url'] = $callbackUrl;

View File

@ -22,7 +22,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config): def run_analysis(queue, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -35,6 +35,7 @@ class AnalyzerPipeline:
preserve. The file at audio_file_path typically has a preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want temporary randomly generated name, which is why we want
to know what the original name was. to know what the original name was.
storage_backend: String indicating the storage backend (amazon_s3 or file)
file_prefix: file_prefix:
cloud_storage_config: ConfigParser object containing the cloud storage configuration settings cloud_storage_config: ConfigParser object containing the cloud storage configuration settings
""" """
@ -68,8 +69,8 @@ class AnalyzerPipeline:
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
csu = CloudStorageUploader(cloud_storage_config) if storage_backend.lower() == "amazon_S3":
if csu.enabled(): csu = CloudStorageUploader(cloud_storage_config)
metadata = csu.upload_obj(audio_file_path, metadata) metadata = csu.upload_obj(audio_file_path, metadata)
else: else:
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)

View File

@ -169,9 +169,9 @@ class MessageListener:
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
file_prefix = msg_dict["file_prefix"] file_prefix = msg_dict["file_prefix"]
storage_backend = msg_dict["storage_backend"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, self.cloud_storage_config) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix, self.cloud_storage_config)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -210,11 +210,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config)) args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix, cloud_storage_config))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: