Merge pull request #91 from sourcefabric/saas-cloud-storage-config-per-dev-env

Saas cloud storage config per dev env
This commit is contained in:
Denise Rigato 2015-02-05 15:02:05 -05:00
commit ae3c9c1f5a
9 changed files with 80 additions and 63 deletions

View File

@ -25,19 +25,6 @@ class Config {
$filename = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
}
// Parse separate conf file for cloud storage values
$cloudStorageConfig = isset($_SERVER['CLOUD_STORAGE_CONF']) ? $_SERVER['CLOUD_STORAGE_CONF'] : "/etc/airtime-saas/cloud_storage.conf";
$cloudStorageValues = parse_ini_file($cloudStorageConfig, true);
$CC_CONFIG["supportedStorageBackends"] = array('amazon_S3');
foreach ($CC_CONFIG["supportedStorageBackends"] as $backend) {
$CC_CONFIG[$backend] = $cloudStorageValues[$backend];
}
// Tells us where file uploads will be uploaded to.
// It will either be set to a cloud storage backend or local file storage.
$CC_CONFIG["current_backend"] = $cloudStorageValues["current_backend"]["storage_backend"];
$values = parse_ini_file($filename, true);
// Name of the web server user
@ -54,6 +41,24 @@ class Config {
$CC_CONFIG['dev_env'] = 'production';
}
// Parse separate conf file for cloud storage values
$cloudStorageConfig = "/etc/airtime-saas/".$CC_CONFIG['dev_env']."/cloud_storage.conf";
if (!file_exists($cloudStorageConfig)) {
// If the dev env specific cloud_storage.conf doesn't exist default
// to the production cloud_storage.conf
$cloudStorageConfig = "/etc/airtime-saas/production/cloud_storage.conf";
}
$cloudStorageValues = parse_ini_file($cloudStorageConfig, true);
$CC_CONFIG["supportedStorageBackends"] = array('amazon_S3');
foreach ($CC_CONFIG["supportedStorageBackends"] as $backend) {
$CC_CONFIG[$backend] = $cloudStorageValues[$backend];
}
// Tells us where file uploads will be uploaded to.
// It will either be set to a cloud storage backend or local file storage.
$CC_CONFIG["current_backend"] = $cloudStorageValues["current_backend"]["storage_backend"];
$CC_CONFIG['cache_ahead_hours'] = $values['general']['cache_ahead_hours'];
$CC_CONFIG['monit_user'] = $values['monit']['monit_user'];

View File

@ -89,7 +89,13 @@ class Application_Model_RabbitMq
if (array_key_exists("dev_env", $CC_CONFIG)) {
$devEnv = $CC_CONFIG["dev_env"];
}
$config = parse_ini_file("/etc/airtime-saas/rabbitmq-analyzer-" . $devEnv . ".ini", true);
$rmq_config_path = "/etc/airtime-saas/".$devEnv."/rabbitmq-analyzer.ini";
if (!file_exists($rmq_config_path)) {
// If the dev env specific rabbitmq-analyzer.ini doesn't exist default
// to the production rabbitmq-analyzer.ini
$rmq_config_path = "/etc/airtime-saas/production/rabbitmq-analyzer.ini";
}
$config = parse_ini_file($rmq_config_path, true);
$conn = new AMQPConnection($config["rabbitmq"]["host"],
$config["rabbitmq"]["port"],
$config["rabbitmq"]["user"],

View File

@ -23,7 +23,7 @@ class AirtimeAnalyzerServer:
# Variables
_log_level = logging.INFO
def __init__(self, config_path, http_retry_queue_path, debug=False):
def __init__(self, rmq_config_path, cloud_storage_config_path, http_retry_queue_path, debug=False):
# Dump a stacktrace with 'kill -SIGUSR2 <PID>'
signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace())
@ -31,15 +31,18 @@ class AirtimeAnalyzerServer:
# Configure logging
self.setup_logging(debug)
# Read our config file
config = config_file.read_config_file(config_path)
# Read our rmq config file
rmq_config = config_file.read_config_file(rmq_config_path)
# Read the cloud storage config file
cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
# Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path)
# Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we recieve a shutdown signal.
self._msg_listener = MessageListener(config)
self._msg_listener = MessageListener(rmq_config, cloud_storage_config)
StatusReporter.stop_thread()

View File

@ -2,7 +2,8 @@
"""
import logging
import threading
import multiprocessing
import multiprocessing
import ConfigParser
from metadata_analyzer import MetadataAnalyzer
from filemover_analyzer import FileMoverAnalyzer
from cloud_storage_uploader import CloudStorageUploader
@ -21,7 +22,7 @@ class AnalyzerPipeline:
"""
@staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_enabled):
def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config):
"""Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments:
@ -33,9 +34,9 @@ class AnalyzerPipeline:
original_filename: The original filename of the file, which we'll try to
preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want
to know what the original name was.
file_prefix: A prefix for any files storage in the cloud (treated as a directory name on S3)
cloud_storage_enabled: Whether to store the file in the cloud or on the local disk.
to know what the original name was.
file_prefix:
cloud_storage_config: ConfigParser object containing the cloud storage configuration settings
"""
# It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly
@ -53,8 +54,8 @@ class AnalyzerPipeline:
raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.")
if not isinstance(file_prefix, unicode):
raise TypeError("file_prefix must be unicode. Was of type " + type(file_prefix).__name__ + " instead.")
if not isinstance(cloud_storage_enabled, bool):
raise TypeError("cloud_storage_enabled must be a boolean. Was of type " + type(cloud_storage_enabled).__name__ + " instead.")
if not isinstance(cloud_storage_config, ConfigParser.SafeConfigParser):
raise TypeError("cloud_storage_config must be a SafeConfigParser. Was of type " + type(cloud_storage_config).__name__ + " instead.")
# Analyze the audio file we were told to analyze:
@ -67,8 +68,8 @@ class AnalyzerPipeline:
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
if cloud_storage_enabled:
csu = CloudStorageUploader()
csu = CloudStorageUploader(cloud_storage_config)
if csu.enabled():
metadata = csu.upload_obj(audio_file_path, metadata)
else:
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)

View File

@ -1,12 +1,9 @@
import os
import logging
import uuid
import ConfigParser
from boto.s3.connection import S3Connection
from boto.s3.key import Key
CLOUD_CONFIG_PATH = '/etc/airtime-saas/cloud_storage.conf'
STORAGE_BACKEND_FILE = "file"
class CloudStorageUploader:
@ -23,17 +20,11 @@ class CloudStorageUploader:
_api_key_secret: Secret access key to objects on Amazon S3.
"""
def __init__(self):
def __init__(self, config):
try:
config = ConfigParser.SafeConfigParser()
config.readfp(open(CLOUD_CONFIG_PATH))
cloud_storage_config_section = config.get("current_backend", "storage_backend")
self._storage_backend = cloud_storage_config_section
except IOError as e:
print "Failed to open config file at " + CLOUD_CONFIG_PATH + ": " + e.strerror
print "Defaulting to file storage"
self._storage_backend = STORAGE_BACKEND_FILE
except Exception as e:
print e
print "Defaulting to file storage"
@ -77,7 +68,7 @@ class CloudStorageUploader:
resource_id: The unique object name used to identify the objects
on Amazon S3
"""
file_base_name = os.path.basename(audio_file_path)
file_name, extension = os.path.splitext(file_base_name)

View File

@ -55,24 +55,27 @@ QUEUE = "airtime-uploads"
"""
class MessageListener:
def __init__(self, config):
def __init__(self, rmq_config, cloud_storage_config):
''' Start listening for file upload notification messages
from RabbitMQ
Keyword arguments:
config: A ConfigParser object containing the [rabbitmq] configuration.
rmq_config: A ConfigParser object containing the [rabbitmq] configuration.
cloud_storage_config: A ConfigParser object containing the cloud storage configuration.
'''
self._shutdown = False
# Read the RabbitMQ connection settings from the config file
# Read the RabbitMQ connection settings from the rmq_config file
# The exceptions throw here by default give good error messages.
RMQ_CONFIG_SECTION = "rabbitmq"
self._host = config.get(RMQ_CONFIG_SECTION, 'host')
self._port = config.getint(RMQ_CONFIG_SECTION, 'port')
self._username = config.get(RMQ_CONFIG_SECTION, 'user')
self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
self._host = rmq_config.get(RMQ_CONFIG_SECTION, 'host')
self._port = rmq_config.getint(RMQ_CONFIG_SECTION, 'port')
self._username = rmq_config.get(RMQ_CONFIG_SECTION, 'user')
self._password = rmq_config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = rmq_config.get(RMQ_CONFIG_SECTION, 'vhost')
self.cloud_storage_config = cloud_storage_config
# Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather
@ -167,7 +170,7 @@ class MessageListener:
original_filename = msg_dict["original_filename"]
file_prefix = msg_dict["file_prefix"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix)
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, self.cloud_storage_config)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
@ -207,15 +210,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix):
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config):
''' Spawn a child process to analyze and import a new audio file. '''
csu = CloudStorageUploader()
cloud_storage_enabled = csu.enabled()
q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_enabled))
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config))
p.start()
p.join()
if p.exitcode == 0:

View File

@ -8,7 +8,8 @@ import os
import airtime_analyzer.airtime_analyzer as aa
VERSION = "1.0"
DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf'
DEFAULT_RMQ_CONFIG_PATH = '/etc/airtime/airtime.conf'
DEFAULT_CLOUD_STORAGE_CONFIG_PATH = '/etc/airtime-saas/production/cloud_storage.conf'
DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries'
def run():
@ -17,28 +18,34 @@ def run():
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true")
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH)
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_RMQ_CONFIG_PATH)
parser.add_argument("--cloud-storage-config-file", help="specify a configuration file with cloud storage settings (default is %s)" % DEFAULT_CLOUD_STORAGE_CONFIG_PATH)
parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH)
args = parser.parse_args()
check_if_media_monitor_is_running()
#Default config file path
config_path = DEFAULT_CONFIG_PATH
rmq_config_path = DEFAULT_RMQ_CONFIG_PATH
cloud_storage_config_path = DEFAULT_CLOUD_STORAGE_CONFIG_PATH
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
if args.rmq_config_file:
config_path = args.rmq_config_file
rmq_config_path = args.rmq_config_file
if args.cloud_storage_config_file:
cloud_storage_config_path = args.cloud_storage_config_file
if args.http_retry_queue_file:
http_retry_queue_path = args.http_retry_queue_file
if args.daemon:
with daemon.DaemonContext():
aa.AirtimeAnalyzerServer(config_path=config_path,
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
cloud_storage_config_path = cloud_storage_config_path,
http_retry_queue_path=http_retry_queue_path,
debug=args.debug)
else:
# Run without daemonizing
aa.AirtimeAnalyzerServer(config_path=config_path,
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path,
cloud_storage_config_path = cloud_storage_config_path,
http_retry_queue_path=http_retry_queue_path,
debug=args.debug)

View File

@ -5,6 +5,7 @@ import multiprocessing
import Queue
import datetime
from airtime_analyzer.analyzer_pipeline import AnalyzerPipeline
from airtime_analyzer import config_file
DEFAULT_AUDIO_FILE = u'tests/test_data/44100Hz-16bit-mono.mp3'
DEFAULT_IMPORT_DEST = u'Test Artist/Test Album/44100Hz-16bit-mono.mp3'
@ -20,10 +21,11 @@ def teardown():
def test_basic():
filename = os.path.basename(DEFAULT_AUDIO_FILE)
q = multiprocessing.Queue()
cloud_storage_enabled = False
cloud_storage_config_path = '/etc/airtime-saas/production/cloud_storage.conf'
cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
file_prefix = u''
#This actually imports the file into the "./Test Artist" directory.
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, file_prefix, cloud_storage_enabled)
AnalyzerPipeline.run_analysis(q, DEFAULT_AUDIO_FILE, u'.', filename, file_prefix, cloud_storage_config)
metadata = q.get()
assert metadata['track_title'] == u'Test Title'
assert metadata['artist_name'] == u'Test Artist'

View File

@ -1,6 +1,7 @@
from nose.tools import *
from airtime_analyzer.cloud_storage_uploader import CloudStorageUploader
from airtime_analyzer.airtime_analyzer import AirtimeAnalyzerServer
from airtime_analyzer import config_file
def setup():
pass
@ -9,5 +10,7 @@ def teardown():
pass
def test_analyze():
cl = CloudStorageUploader()
cloud_storage_config_path = '/etc/airtime-saas/production/cloud_storage.conf'
cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
cl = CloudStorageUploader(cloud_storage_config)
cl._storage_backend = "file"