CC-5885: Factor out cloud storage code into separate class

This commit is contained in:
drigato 2014-07-11 16:16:30 -04:00
parent e7dfc08128
commit 039a51121b
7 changed files with 67 additions and 60 deletions

View file

@ -31,7 +31,7 @@ class Config {
$CC_CONFIG['webServerUser'] = $values['general']['web_server_user']; $CC_CONFIG['webServerUser'] = $values['general']['web_server_user'];
$CC_CONFIG['rabbitmq'] = $values['rabbitmq']; $CC_CONFIG['rabbitmq'] = $values['rabbitmq'];
$CC_CONFIG['s3'] = $values['s3']; $CC_CONFIG['cloud_storage'] = $values['cloud_storage'];
$CC_CONFIG['baseDir'] = $values['general']['base_dir']; $CC_CONFIG['baseDir'] = $values['general']['base_dir'];
$CC_CONFIG['baseUrl'] = $values['general']['base_url']; $CC_CONFIG['baseUrl'] = $values['general']['base_url'];

View file

@ -573,7 +573,7 @@ SQL;
public function getCloudUrl() public function getCloudUrl()
{ {
$CC_CONFIG = Config::getConfig(); $CC_CONFIG = Config::getConfig();
return $CC_CONFIG["s3"]["host"]."/".$CC_CONFIG["s3"]["bucket"]."/" . urlencode($this->getResourceId()); return $CC_CONFIG["cloud_storage"]["host"]."/".$CC_CONFIG["cloud_storage"]["bucket"]."/" . urlencode($this->getResourceId());
} }
public function getResourceId() public function getResourceId()

View file

@ -5,6 +5,7 @@ import threading
import multiprocessing import multiprocessing
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
from filemover_analyzer import FileMoverAnalyzer from filemover_analyzer import FileMoverAnalyzer
from cloud_storage_uploader import CloudStorageUploader
class AnalyzerPipeline: class AnalyzerPipeline:
""" Analyzes and imports an audio file into the Airtime library. """ Analyzes and imports an audio file into the Airtime library.
@ -18,7 +19,7 @@ class AnalyzerPipeline:
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, def run_analysis(queue, audio_file_path, import_directory, original_filename,
s3_bucket, s3_api_key, s3_api_key_secret): cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -52,8 +53,9 @@ class AnalyzerPipeline:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata, #metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
s3_bucket, s3_api_key, s3_api_key_secret) csu = CloudStorageUploader(cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret)
metadata = csu.upload_obj(audio_file_path, metadata)
metadata["import_status"] = 0 # imported metadata["import_status"] = 0 # imported
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication

View file

@ -0,0 +1,43 @@
import os
import logging
import uuid
from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider, ContainerDoesNotExistError
class CloudStorageUploader:
def __init__(self, provider, bucket, api_key, api_key_secret):
self._provider = provider
self._bucket = bucket
self._api_key = api_key
self._api_key_secret = api_key_secret
def upload_obj(self, audio_file_path, metadata):
file_base_name = os.path.basename(audio_file_path)
file_name, extension = os.path.splitext(file_base_name)
object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension)
cls = get_driver(getattr(Provider, self._provider))
driver = cls(self._api_key, self._api_key_secret)
try:
container = driver.get_container(self._bucket)
except ContainerDoesNotExistError:
container = driver.create_container(self._bucket)
extra = {'meta_data': {'filename': file_base_name}}
with open(audio_file_path, 'rb') as iterator:
obj = driver.upload_object_via_stream(iterator=iterator,
container=container,
object_name=object_name,
extra=extra)
'''remove file from organize directory'''
try:
os.remove(audio_file_path)
except OSError:
logging.info("Could not remove %s" % audio_file_path)
metadata["s3_object_name"] = object_name
return metadata

View file

@ -18,8 +18,7 @@ class FileMoverAnalyzer(Analyzer):
raise Exception("Use FileMoverAnalyzer.move() instead.") raise Exception("Use FileMoverAnalyzer.move() instead.")
@staticmethod @staticmethod
def move(audio_file_path, import_directory, original_filename, metadata, def move(audio_file_path, import_directory, original_filename, metadata):
s3_bucket, s3_api_key, s3_api_key_secret):
"""Move the file at audio_file_path over into the import_directory/import, """Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename. renaming it to original_filename.
@ -43,8 +42,6 @@ class FileMoverAnalyzer(Analyzer):
# TODO: Also, handle the case where the move fails and write some code # TODO: Also, handle the case where the move fails and write some code
# to possibly move the file to problem_files. # to possibly move the file to problem_files.
#cloud storage doesn't need this
'''
max_dir_len = 32 max_dir_len = 32
max_file_len = 32 max_file_len = 32
final_file_path = import_directory final_file_path = import_directory
@ -79,48 +76,12 @@ class FileMoverAnalyzer(Analyzer):
#Ensure the full path to the file exists #Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path)) mkdir_p(os.path.dirname(final_file_path))
'''
file_base_name = os.path.basename(audio_file_path)
file_name, extension = os.path.splitext(file_base_name)
object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension)
from libcloud.storage.types import Provider, ContainerDoesNotExistError
from libcloud.storage.providers import get_driver
cls = get_driver(Provider.S3)
driver = cls(s3_api_key, s3_api_key_secret)
try:
container = driver.get_container(s3_bucket)
except ContainerDoesNotExistError:
container = driver.create_container(s3_bucket)
extra = {'meta_data': {'filename': file_base_name}}
#libcloud complains when float objects are in metadata
#extra = {'meta_data': metadata}
with open(audio_file_path, 'rb') as iterator:
obj = driver.upload_object_via_stream(iterator=iterator,
container=container,
object_name=object_name,
extra=extra)
#remove file from organize directory
try:
os.remove(audio_file_path)
except OSError:
pass
#cloud storage doesn't need this
'''
#Move the file into its final destination directory #Move the file into its final destination directory
logging.debug("Moving %s to %s" % (audio_file_path, final_file_path)) logging.debug("Moving %s to %s" % (audio_file_path, final_file_path))
shutil.move(audio_file_path, final_file_path) shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path metadata["full_path"] = final_file_path
'''
metadata["s3_object_name"] = object_name
return metadata return metadata
def mkdir_p(path): def mkdir_p(path):

View file

@ -74,10 +74,11 @@ class MessageListener:
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
# Read the S3 API setting from the config file # Read the S3 API setting from the config file
S3_CONFIG_SECTION = "s3" CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
self._s3_bucket = config.get(S3_CONFIG_SECTION, 'bucket') self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._s3_api_key = config.get(S3_CONFIG_SECTION, 'api_key') self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._s3_api_key_secret = config.get(S3_CONFIG_SECTION, 'api_key_secret') self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
# Set up a signal handler so we can shutdown gracefully # Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather # For some reason, this signal handler must be set up here. I'd rather
@ -210,7 +211,7 @@ class MessageListener:
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, args=(q, audio_file_path, import_directory, original_filename,
self._s3_bucket, self._s3_api_key, self._s3_api_key_secret)) self._provider, self._bucket, self._api_key, self._api_key_secret))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0:

View file

@ -1,9 +1,8 @@
import os import os
import logging import logging
import ConfigParser import ConfigParser
import urllib2
from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError from libcloud.storage.types import Provider, ObjectDoesNotExistError
from libcloud.storage.providers import get_driver from libcloud.storage.providers import get_driver
CONFIG_PATH = '/etc/airtime/airtime.conf' CONFIG_PATH = '/etc/airtime/airtime.conf'
@ -12,17 +11,18 @@ class CloudStorageDownloader:
def __init__(self): def __init__(self):
config = self.read_config_file(CONFIG_PATH) config = self.read_config_file(CONFIG_PATH)
S3_CONFIG_SECTION = "s3" CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
self._s3_bucket = config.get(S3_CONFIG_SECTION, 'bucket') self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._s3_api_key = config.get(S3_CONFIG_SECTION, 'api_key') self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._s3_api_key_secret = config.get(S3_CONFIG_SECTION, 'api_key_secret') self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
def download_obj(self, dst, obj_name): def download_obj(self, dst, obj_name):
cls = get_driver(Provider.S3) cls = get_driver(getattr(Provider, self._provider))
driver = cls(self._s3_api_key, self._s3_api_key_secret) driver = cls(self._api_key, self._api_key_secret)
#object_name = os.path.basename(urllib2.unquote(obj_url).decode('utf8'))
try: try:
cloud_obj = driver.get_object(container_name=self._s3_bucket, cloud_obj = driver.get_object(container_name=self._bucket,
object_name=obj_name) object_name=obj_name)
except ObjectDoesNotExistError: except ObjectDoesNotExistError:
logging.info("Could not find object: %s" % obj_name) logging.info("Could not find object: %s" % obj_name)