From e24b4f43fdb70e1a6abf6b0ce10d96fb4d5114f1 Mon Sep 17 00:00:00 2001 From: drigato Date: Wed, 5 Nov 2014 16:24:03 -0500 Subject: [PATCH 1/2] Made AirtimeAnalyzerServer.read_config_file static so CloudStorageUploader can use it. Reverted MessageListener.msg_received_callback and MessageListener.spawn_analyzer_process back to static methods. Moved cloud storage settings out of MessageListener and into CloudStorageUploader --- airtime_mvc/application/configs/conf.php | 7 ++++-- airtime_mvc/build/airtime.conf | 8 +------ .../airtime_analyzer/airtime_analyzer.py | 5 +++-- .../airtime_analyzer/analyzer_pipeline.py | 5 ++--- .../cloud_storage_uploader.py | 18 ++++++++++----- .../airtime_analyzer/message_listener.py | 22 ++++++------------- 6 files changed, 30 insertions(+), 35 deletions(-) diff --git a/airtime_mvc/application/configs/conf.php b/airtime_mvc/application/configs/conf.php index 26a35b404..30918e4de 100644 --- a/airtime_mvc/application/configs/conf.php +++ b/airtime_mvc/application/configs/conf.php @@ -25,13 +25,16 @@ class Config { $filename = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf"; } + // Parse separate conf file for Amazon S3 values + $amazonFilename = "/etc/airtime-saas/amazon.conf"; + $amazonValues = parse_ini_file($amazonFilename, true); + $CC_CONFIG['cloud_storage'] = $amazonValues['cloud_storage']; + $values = parse_ini_file($filename, true); // Name of the web server user $CC_CONFIG['webServerUser'] = $values['general']['web_server_user']; $CC_CONFIG['rabbitmq'] = $values['rabbitmq']; - - $CC_CONFIG['cloud_storage'] = $values['cloud_storage']; $CC_CONFIG['baseDir'] = $values['general']['base_dir']; $CC_CONFIG['baseUrl'] = $values['general']['base_url']; diff --git a/airtime_mvc/build/airtime.conf b/airtime_mvc/build/airtime.conf index 10963f552..60633ed2b 100644 --- a/airtime_mvc/build/airtime.conf +++ b/airtime_mvc/build/airtime.conf @@ -29,10 +29,4 @@ monit_password = airtime [soundcloud] connection_retries = 3 -time_between_retries = 60 - -[cloud_storage] -provider = S3 -bucket = -api_key = -api_key_secret = \ No newline at end of file +time_between_retries = 60 \ No newline at end of file diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index 567c31c98..dae271960 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -32,7 +32,7 @@ class AirtimeAnalyzerServer: self.setup_logging(debug) # Read our config file - config = self.read_config_file(config_path) + config = AirtimeAnalyzerServer.read_config_file(config_path) # Start up the StatusReporter process StatusReporter.start_thread(http_retry_queue_path) @@ -73,7 +73,8 @@ class AirtimeAnalyzerServer: rootLogger.addHandler(consoleHandler) - def read_config_file(self, config_path): + @staticmethod + def read_config_file(config_path): """Parse the application's config file located at config_path.""" config = ConfigParser.SafeConfigParser() try: diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index 0202c2687..f17b1711d 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -18,8 +18,7 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename, - cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret): + def run_analysis(queue, audio_file_path, import_directory, original_filename): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -54,7 +53,7 @@ class AnalyzerPipeline: metadata = dict() metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) #metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) - csu = CloudStorageUploader(cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret) + csu = CloudStorageUploader() metadata = csu.upload_obj(audio_file_path, metadata) metadata["import_status"] = 0 # imported diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py index f4c03de48..b52ac0113 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -1,10 +1,13 @@ import os import logging import uuid - +import airtime_analyzer as aa from libcloud.storage.providers import get_driver from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError + +CONFIG_PATH = '/etc/airtime-saas/amazon.conf' + class CloudStorageUploader: """ A class that uses Apache Libcloud's Storage API to upload objects into a cloud storage backend. For this implementation all files will be uploaded @@ -21,11 +24,14 @@ class CloudStorageUploader: _api_key_secret: Secret access key to objects on the provider's storage backend. """ - def __init__(self, provider, bucket, api_key, api_key_secret): - self._provider = provider - self._bucket = bucket - self._api_key = api_key - self._api_key_secret = api_key_secret + def __init__(self): + config = aa.AirtimeAnalyzerServer.read_config_file(CONFIG_PATH) + + CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage" + self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') + self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') + self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') + self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') def upload_obj(self, audio_file_path, metadata): """Uploads a file into Amazon S3 object storage. diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 71211192a..acf21f471 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -74,13 +74,6 @@ class MessageListener: self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') - # Read the S3 API setting from the config file - CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage" - self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') - self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') - self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') - self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') - # Set up a signal handler so we can shutdown gracefully # For some reason, this signal handler must be set up here. I'd rather # put it in AirtimeAnalyzerServer, but it doesn't work there (something to do @@ -119,7 +112,7 @@ class MessageListener: self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) logging.info(" Listening for messages...") - self._channel.basic_consume(self.msg_received_callback, + self._channel.basic_consume(MessageListener.msg_received_callback, queue=QUEUE, no_ack=False) def wait_for_messages(self): @@ -141,8 +134,8 @@ class MessageListener: self._shutdown = True self.disconnect_from_messaging_server() - #@staticmethod - def msg_received_callback(self, channel, method_frame, header_frame, body): + @staticmethod + def msg_received_callback(channel, method_frame, header_frame, body): ''' A callback method that runs when a RabbitMQ message is received. Here we parse the message, spin up an analyzer process, and report the @@ -173,7 +166,7 @@ class MessageListener: import_directory = msg_dict["import_directory"] original_filename = msg_dict["original_filename"] - audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename) + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -211,13 +204,12 @@ class MessageListener: # If we don't ack, then RabbitMQ will redeliver the message in the future. channel.basic_ack(delivery_tag=method_frame.delivery_tag) - #@staticmethod - def spawn_analyzer_process(self, audio_file_path, import_directory, original_filename): + @staticmethod + def spawn_analyzer_process(audio_file_path, import_directory, original_filename): ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename, - self._provider, self._bucket, self._api_key, self._api_key_secret)) + args=(q, audio_file_path, import_directory, original_filename)) p.start() p.join() if p.exitcode == 0: From 1dc72d5ebec05a341775d0242aa71983e54745ad Mon Sep 17 00:00:00 2001 From: drigato Date: Fri, 7 Nov 2014 11:51:11 -0500 Subject: [PATCH 2/2] Changed Amazon S3 download URL format so it works with S3_EU_WEST region. Updated amazon config file path in the downloader class --- airtime_mvc/application/models/airtime/CloudFile.php | 2 +- python_apps/pypo/cloud_storage_downloader.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airtime_mvc/application/models/airtime/CloudFile.php b/airtime_mvc/application/models/airtime/CloudFile.php index a31660403..a30eedc6c 100644 --- a/airtime_mvc/application/models/airtime/CloudFile.php +++ b/airtime_mvc/application/models/airtime/CloudFile.php @@ -41,7 +41,7 @@ class CloudFile extends BaseCloudFile $scheme = $endpoint->getScheme(); $host = $endpoint->getHost(); $s3_bucket = $amazon_s3->getBucket(); - return "$scheme://$host/$s3_bucket/".utf8_encode($resource_id); + return "$scheme://$s3_bucket.$host/".utf8_encode($resource_id); } /** diff --git a/python_apps/pypo/cloud_storage_downloader.py b/python_apps/pypo/cloud_storage_downloader.py index 0a129f1d0..4565985e9 100644 --- a/python_apps/pypo/cloud_storage_downloader.py +++ b/python_apps/pypo/cloud_storage_downloader.py @@ -7,7 +7,7 @@ import hashlib from libcloud.storage.types import Provider, ObjectDoesNotExistError from libcloud.storage.providers import get_driver -CONFIG_PATH = '/etc/airtime/airtime.conf' +CONFIG_PATH = '/etc/airtime-saas/amazon.conf' class CloudStorageDownloader: """ A class that uses Apache Libcloud's Storage API to download objects from