diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index f8bfa6f0c..4acc74584 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -107,13 +107,16 @@ class Application_Model_RabbitMq $data['original_filename'] = $originalFilename; $data['callback_url'] = $callbackUrl; $data['api_key'] = $apiKey; - // Pass station name to the analyzer so we can set it with the file's metadata - // and prefix the object name with it before uploading it to the cloud. This - // isn't a requirement for cloud storage, but put there as a safeguard, since - // all Airtime Pro stations will share the same bucket. + // Pass station name to the analyzer so we can set it with the file's + // metadata before uploading it to the cloud. This isn't a requirement + // for cloud storage, but put there as a safeguard, since all Airtime + // Pro stations will share the same bucket. $data['station_domain'] = $stationDomain = Application_Model_Preference::GetStationName(); - Logging::info(BillingController::getClientCurrentAirtimeProduct()); - $data['file_prefix'] = BillingController::getClientCurrentAirtimeProduct(); + + // Each file uploaded to cloud storage is prefixed with the station's + // hosting id. + $clientCurrentAirtimeProduct = BillingController::getClientCurrentAirtimeProduct(); + $data['file_prefix'] = $clientCurrentAirtimeProduct["id"]; $jsonData = json_encode($data); //self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index a11248066..b418c927d 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -21,7 +21,7 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend): + def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend, file_prefix): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -56,6 +56,7 @@ class AnalyzerPipeline: # First, we extract the ID3 tags and other metadata: metadata = dict() metadata["station_domain"] = station_domain + metadata["file_prefix"] = file_prefix metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py index b9178709c..780f6ecd6 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -61,7 +61,7 @@ class CloudStorageUploader: # in the object name. URL encoding the object name doesn't solve the # problem. As a solution we will replace spaces with dashes. file_name = file_name.replace(" ", "-") - object_name = "%s/%s_%s%s" % (metadata["station_domain"], file_name, str(uuid.uuid4()), extension) + object_name = "%s/%s_%s%s" % (metadata["file_prefix"], file_name, str(uuid.uuid4()), extension) provider_driver_class = get_driver(getattr(Provider, self._provider)) driver = provider_driver_class(self._api_key, self._api_key_secret) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 245a74118..ecb943dba 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -152,6 +152,7 @@ class MessageListener: api_key = "" station_domain = "" current_storage_backend = "" + file_prefix = "" ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue to pass objects between the processes so that if the analyzer process crashes, it does not @@ -169,8 +170,9 @@ class MessageListener: import_directory = msg_dict["import_directory"] original_filename = msg_dict["original_filename"] current_storage_backend = msg_dict["current_storage_backend"] - - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend) + file_prefix = msg_dict["file_prefix"] + + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend, file_prefix) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -209,11 +211,11 @@ class MessageListener: channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod - def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend): + def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend, file_prefix): ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend)) + args=(q, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend, file_prefix)) p.start() p.join() if p.exitcode == 0: