diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index 15ad912e0..a9f699841 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -89,6 +89,11 @@ class Application_Model_RabbitMq $data['original_filename'] = $originalFilename; $data['callback_url'] = $callbackUrl; $data['api_key'] = $apiKey; + // Pass station name to the analyzer so we can set it with the file's metadata + // before uploading it to the cloud. This isn't a requirement for cloud storage, + // but put there as a safeguard, since all Airtime Pro stations will share the + // same bucket. + $data['station_domain'] = $stationDomain = Application_Model_Preference::GetStationName(); $jsonData = json_encode($data); self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index f17b1711d..39ab70400 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -18,7 +18,7 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename): + def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -31,6 +31,7 @@ class AnalyzerPipeline: preserve. The file at audio_file_path typically has a temporary randomly generated name, which is why we want to know what the original name was. + station_domain: The Airtime Pro account's domain name. i.e. bananas """ # It is super critical to initialize a separate log file here so that we # don't inherit logging/locks from the parent process. Supposedly @@ -52,6 +53,7 @@ class AnalyzerPipeline: # First, we extract the ID3 tags and other metadata: metadata = dict() metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) + metadata["station_domain"] = station_domain #metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) csu = CloudStorageUploader() metadata = csu.upload_obj(audio_file_path, metadata) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py index 6ecb2a06d..d060f3bcc 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -71,7 +71,8 @@ class CloudStorageUploader: except ContainerDoesNotExistError: container = driver.create_container(self._bucket) - extra = {'meta_data': {'filename': file_base_name}} + extra = {'meta_data': {'filename': file_base_name, + 'station_domain': metadata["station_domain"]}} obj = driver.upload_object(file_path=audio_file_path, container=container, diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index f106258e1..b61c2133e 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -161,12 +161,13 @@ class MessageListener: msg_dict = json.loads(body) api_key = msg_dict["api_key"] callback_url = msg_dict["callback_url"] + station_domain = msg_dict["station_domain"] audio_file_path = msg_dict["tmp_file_path"] import_directory = msg_dict["import_directory"] original_filename = msg_dict["original_filename"] - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -205,11 +206,11 @@ class MessageListener: channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod - def spawn_analyzer_process(audio_file_path, import_directory, original_filename): + def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain): ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename)) + args=(q, audio_file_path, import_directory, original_filename, station_domain)) p.start() p.join() if p.exitcode == 0: