From 039a51121b74be5f985d7465fa3d858ca5923621 Mon Sep 17 00:00:00 2001 From: drigato Date: Fri, 11 Jul 2014 16:16:30 -0400 Subject: [PATCH] CC-5885: Factor out cloud storage code into separate class --- airtime_mvc/application/configs/conf.php | 2 +- airtime_mvc/application/models/StoredFile.php | 2 +- .../airtime_analyzer/analyzer_pipeline.py | 8 ++-- .../cloud_storage_uploader.py | 43 +++++++++++++++++++ .../airtime_analyzer/filemover_analyzer.py | 41 +----------------- .../airtime_analyzer/message_listener.py | 11 ++--- python_apps/pypo/cloud_storage_downloader.py | 20 ++++----- 7 files changed, 67 insertions(+), 60 deletions(-) create mode 100644 python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py diff --git a/airtime_mvc/application/configs/conf.php b/airtime_mvc/application/configs/conf.php index 2c11755cf..26a35b404 100644 --- a/airtime_mvc/application/configs/conf.php +++ b/airtime_mvc/application/configs/conf.php @@ -31,7 +31,7 @@ class Config { $CC_CONFIG['webServerUser'] = $values['general']['web_server_user']; $CC_CONFIG['rabbitmq'] = $values['rabbitmq']; - $CC_CONFIG['s3'] = $values['s3']; + $CC_CONFIG['cloud_storage'] = $values['cloud_storage']; $CC_CONFIG['baseDir'] = $values['general']['base_dir']; $CC_CONFIG['baseUrl'] = $values['general']['base_url']; diff --git a/airtime_mvc/application/models/StoredFile.php b/airtime_mvc/application/models/StoredFile.php index 01ccdd0a6..bb2d040a3 100644 --- a/airtime_mvc/application/models/StoredFile.php +++ b/airtime_mvc/application/models/StoredFile.php @@ -573,7 +573,7 @@ SQL; public function getCloudUrl() { $CC_CONFIG = Config::getConfig(); - return $CC_CONFIG["s3"]["host"]."/".$CC_CONFIG["s3"]["bucket"]."/" . urlencode($this->getResourceId()); + return $CC_CONFIG["cloud_storage"]["host"]."/".$CC_CONFIG["cloud_storage"]["bucket"]."/" . urlencode($this->getResourceId()); } public function getResourceId() diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index 89513fd4c..0202c2687 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -5,6 +5,7 @@ import threading import multiprocessing from metadata_analyzer import MetadataAnalyzer from filemover_analyzer import FileMoverAnalyzer +from cloud_storage_uploader import CloudStorageUploader class AnalyzerPipeline: """ Analyzes and imports an audio file into the Airtime library. @@ -18,7 +19,7 @@ class AnalyzerPipeline: @staticmethod def run_analysis(queue, audio_file_path, import_directory, original_filename, - s3_bucket, s3_api_key, s3_api_key_secret): + cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -52,8 +53,9 @@ class AnalyzerPipeline: # First, we extract the ID3 tags and other metadata: metadata = dict() metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) - metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata, - s3_bucket, s3_api_key, s3_api_key_secret) + #metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) + csu = CloudStorageUploader(cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret) + metadata = csu.upload_obj(audio_file_path, metadata) metadata["import_status"] = 0 # imported # Note that the queue we're putting the results into is our interprocess communication diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py new file mode 100644 index 000000000..4fa2da0b7 --- /dev/null +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -0,0 +1,43 @@ +import os +import logging +import uuid + +from libcloud.storage.providers import get_driver +from libcloud.storage.types import Provider, ContainerDoesNotExistError + +class CloudStorageUploader: + def __init__(self, provider, bucket, api_key, api_key_secret): + self._provider = provider + self._bucket = bucket + self._api_key = api_key + self._api_key_secret = api_key_secret + + def upload_obj(self, audio_file_path, metadata): + file_base_name = os.path.basename(audio_file_path) + file_name, extension = os.path.splitext(file_base_name) + object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension) + + cls = get_driver(getattr(Provider, self._provider)) + driver = cls(self._api_key, self._api_key_secret) + + try: + container = driver.get_container(self._bucket) + except ContainerDoesNotExistError: + container = driver.create_container(self._bucket) + + extra = {'meta_data': {'filename': file_base_name}} + + with open(audio_file_path, 'rb') as iterator: + obj = driver.upload_object_via_stream(iterator=iterator, + container=container, + object_name=object_name, + extra=extra) + + '''remove file from organize directory''' + try: + os.remove(audio_file_path) + except OSError: + logging.info("Could not remove %s" % audio_file_path) + + metadata["s3_object_name"] = object_name + return metadata diff --git a/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py index 8d899152f..b759ecab7 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py @@ -18,8 +18,7 @@ class FileMoverAnalyzer(Analyzer): raise Exception("Use FileMoverAnalyzer.move() instead.") @staticmethod - def move(audio_file_path, import_directory, original_filename, metadata, - s3_bucket, s3_api_key, s3_api_key_secret): + def move(audio_file_path, import_directory, original_filename, metadata): """Move the file at audio_file_path over into the import_directory/import, renaming it to original_filename. @@ -43,8 +42,6 @@ class FileMoverAnalyzer(Analyzer): # TODO: Also, handle the case where the move fails and write some code # to possibly move the file to problem_files. - #cloud storage doesn't need this - ''' max_dir_len = 32 max_file_len = 32 final_file_path = import_directory @@ -79,48 +76,12 @@ class FileMoverAnalyzer(Analyzer): #Ensure the full path to the file exists mkdir_p(os.path.dirname(final_file_path)) - ''' - file_base_name = os.path.basename(audio_file_path) - file_name, extension = os.path.splitext(file_base_name) - object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension) - - from libcloud.storage.types import Provider, ContainerDoesNotExistError - from libcloud.storage.providers import get_driver - - cls = get_driver(Provider.S3) - driver = cls(s3_api_key, s3_api_key_secret) - - try: - container = driver.get_container(s3_bucket) - except ContainerDoesNotExistError: - container = driver.create_container(s3_bucket) - - extra = {'meta_data': {'filename': file_base_name}} - #libcloud complains when float objects are in metadata - #extra = {'meta_data': metadata} - - with open(audio_file_path, 'rb') as iterator: - obj = driver.upload_object_via_stream(iterator=iterator, - container=container, - object_name=object_name, - extra=extra) - - #remove file from organize directory - try: - os.remove(audio_file_path) - except OSError: - pass - - #cloud storage doesn't need this - ''' #Move the file into its final destination directory logging.debug("Moving %s to %s" % (audio_file_path, final_file_path)) shutil.move(audio_file_path, final_file_path) metadata["full_path"] = final_file_path - ''' - metadata["s3_object_name"] = object_name return metadata def mkdir_p(path): diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 9dd25a436..e723542d7 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -74,10 +74,11 @@ class MessageListener: self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') # Read the S3 API setting from the config file - S3_CONFIG_SECTION = "s3" - self._s3_bucket = config.get(S3_CONFIG_SECTION, 'bucket') - self._s3_api_key = config.get(S3_CONFIG_SECTION, 'api_key') - self._s3_api_key_secret = config.get(S3_CONFIG_SECTION, 'api_key_secret') + CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage" + self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') + self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') + self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') + self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') # Set up a signal handler so we can shutdown gracefully # For some reason, this signal handler must be set up here. I'd rather @@ -210,7 +211,7 @@ class MessageListener: q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, args=(q, audio_file_path, import_directory, original_filename, - self._s3_bucket, self._s3_api_key, self._s3_api_key_secret)) + self._provider, self._bucket, self._api_key, self._api_key_secret)) p.start() p.join() if p.exitcode == 0: diff --git a/python_apps/pypo/cloud_storage_downloader.py b/python_apps/pypo/cloud_storage_downloader.py index 059b31275..2ae6931a9 100644 --- a/python_apps/pypo/cloud_storage_downloader.py +++ b/python_apps/pypo/cloud_storage_downloader.py @@ -1,9 +1,8 @@ import os import logging import ConfigParser -import urllib2 -from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError +from libcloud.storage.types import Provider, ObjectDoesNotExistError from libcloud.storage.providers import get_driver CONFIG_PATH = '/etc/airtime/airtime.conf' @@ -12,17 +11,18 @@ class CloudStorageDownloader: def __init__(self): config = self.read_config_file(CONFIG_PATH) - S3_CONFIG_SECTION = "s3" - self._s3_bucket = config.get(S3_CONFIG_SECTION, 'bucket') - self._s3_api_key = config.get(S3_CONFIG_SECTION, 'api_key') - self._s3_api_key_secret = config.get(S3_CONFIG_SECTION, 'api_key_secret') + CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage" + self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') + self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') + self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') + self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') def download_obj(self, dst, obj_name): - cls = get_driver(Provider.S3) - driver = cls(self._s3_api_key, self._s3_api_key_secret) - #object_name = os.path.basename(urllib2.unquote(obj_url).decode('utf8')) + cls = get_driver(getattr(Provider, self._provider)) + driver = cls(self._api_key, self._api_key_secret) + try: - cloud_obj = driver.get_object(container_name=self._s3_bucket, + cloud_obj = driver.get_object(container_name=self._bucket, object_name=obj_name) except ObjectDoesNotExistError: logging.info("Could not find object: %s" % obj_name)