SAAS-502: Analyzer -> Set the station id and domain in the cloud object's metadata

Set the domain name in the cloud object's metadata
This commit is contained in:
drigato 2014-11-27 16:54:22 -05:00
parent 92feacd46f
commit 432245b18e
4 changed files with 14 additions and 5 deletions

View file

@ -89,6 +89,11 @@ class Application_Model_RabbitMq
$data['original_filename'] = $originalFilename; $data['original_filename'] = $originalFilename;
$data['callback_url'] = $callbackUrl; $data['callback_url'] = $callbackUrl;
$data['api_key'] = $apiKey; $data['api_key'] = $apiKey;
// Pass station name to the analyzer so we can set it with the file's metadata
// before uploading it to the cloud. This isn't a requirement for cloud storage,
// but put there as a safeguard, since all Airtime Pro stations will share the
// same bucket.
$data['station_domain'] = $stationDomain = Application_Model_Preference::GetStationName();
$jsonData = json_encode($data); $jsonData = json_encode($data);
self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads');

View file

@ -18,7 +18,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename): def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -31,6 +31,7 @@ class AnalyzerPipeline:
preserve. The file at audio_file_path typically has a preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want temporary randomly generated name, which is why we want
to know what the original name was. to know what the original name was.
station_domain: The Airtime Pro account's domain name. i.e. bananas
""" """
# It is super critical to initialize a separate log file here so that we # It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly # don't inherit logging/locks from the parent process. Supposedly
@ -52,6 +53,7 @@ class AnalyzerPipeline:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata["station_domain"] = station_domain
#metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) #metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
csu = CloudStorageUploader() csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata) metadata = csu.upload_obj(audio_file_path, metadata)

View file

@ -71,7 +71,8 @@ class CloudStorageUploader:
except ContainerDoesNotExistError: except ContainerDoesNotExistError:
container = driver.create_container(self._bucket) container = driver.create_container(self._bucket)
extra = {'meta_data': {'filename': file_base_name}} extra = {'meta_data': {'filename': file_base_name,
'station_domain': metadata["station_domain"]}}
obj = driver.upload_object(file_path=audio_file_path, obj = driver.upload_object(file_path=audio_file_path,
container=container, container=container,

View file

@ -161,12 +161,13 @@ class MessageListener:
msg_dict = json.loads(body) msg_dict = json.loads(body)
api_key = msg_dict["api_key"] api_key = msg_dict["api_key"]
callback_url = msg_dict["callback_url"] callback_url = msg_dict["callback_url"]
station_domain = msg_dict["station_domain"]
audio_file_path = msg_dict["tmp_file_path"] audio_file_path = msg_dict["tmp_file_path"]
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -205,11 +206,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename)) args=(q, audio_file_path, import_directory, original_filename, station_domain))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: