SAAS-560: Deploy separate cloud storage config files for each development environment

Changed analyzer upstart to take the cloud storage config file as a command line option
Dropped the dev env portion from the rabbitmq-analyzer.ini filename
This commit is contained in:
drigato 2015-02-03 15:55:47 -05:00
parent 7808fd4708
commit 7b3f9af04c
7 changed files with 29 additions and 36 deletions

View File

@ -42,7 +42,7 @@ class Config {
} }
// Parse separate conf file for cloud storage values // Parse separate conf file for cloud storage values
$cloudStorageConfig = "/etc/airtime-saas/".$CC_CONFIG['dev_env']."/cloud_storage_".$CC_CONFIG['dev_env'].".conf"; $cloudStorageConfig = "/etc/airtime-saas/".$CC_CONFIG['dev_env']."/cloud_storage.conf";
$cloudStorageValues = parse_ini_file($cloudStorageConfig, true); $cloudStorageValues = parse_ini_file($cloudStorageConfig, true);
$CC_CONFIG["supportedStorageBackends"] = array('amazon_S3'); $CC_CONFIG["supportedStorageBackends"] = array('amazon_S3');

View File

@ -89,7 +89,7 @@ class Application_Model_RabbitMq
if (array_key_exists("dev_env", $CC_CONFIG)) { if (array_key_exists("dev_env", $CC_CONFIG)) {
$devEnv = $CC_CONFIG["dev_env"]; $devEnv = $CC_CONFIG["dev_env"];
} }
$config = parse_ini_file("/etc/airtime-saas/".$devEnv."/rabbitmq-analyzer-" . $devEnv . ".ini", true); $config = parse_ini_file("/etc/airtime-saas/".$devEnv."/rabbitmq-analyzer.ini", true);
$conn = new AMQPConnection($config["rabbitmq"]["host"], $conn = new AMQPConnection($config["rabbitmq"]["host"],
$config["rabbitmq"]["port"], $config["rabbitmq"]["port"],
$config["rabbitmq"]["user"], $config["rabbitmq"]["user"],

View File

@ -23,7 +23,7 @@ class AirtimeAnalyzerServer:
# Variables # Variables
_log_level = logging.INFO _log_level = logging.INFO
def __init__(self, config_path, http_retry_queue_path, debug=False): def __init__(self, config_path, cloud_storage_config_path, http_retry_queue_path, debug=False):
# Dump a stacktrace with 'kill -SIGUSR2 <PID>' # Dump a stacktrace with 'kill -SIGUSR2 <PID>'
signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace()) signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace())
@ -31,15 +31,18 @@ class AirtimeAnalyzerServer:
# Configure logging # Configure logging
self.setup_logging(debug) self.setup_logging(debug)
# Read our config file # Read our rmq config file
config = config_file.read_config_file(config_path) config = config_file.read_config_file(config_path)
# Read the cloud storage config file
cloud_storage_config = config_file.read_config_file(cloud_storage_config_path)
# Start up the StatusReporter process # Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path) StatusReporter.start_thread(http_retry_queue_path)
# Start listening for RabbitMQ messages telling us about newly # Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we recieve a shutdown signal. # uploaded files. This blocks until we recieve a shutdown signal.
self._msg_listener = MessageListener(config) self._msg_listener = MessageListener(config, cloud_storage_config)
StatusReporter.stop_thread() StatusReporter.stop_thread()

View File

@ -21,7 +21,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix): def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -34,7 +34,8 @@ class AnalyzerPipeline:
preserve. The file at audio_file_path typically has a preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want temporary randomly generated name, which is why we want
to know what the original name was. to know what the original name was.
station_domain: The Airtime Pro account's domain name. i.e. bananas file_prefix:
cloud_storage_config: ConfigParser object containing the cloud storage configuration settings
""" """
# It is super critical to initialize a separate log file here so that we # It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly # don't inherit logging/locks from the parent process. Supposedly
@ -42,7 +43,6 @@ class AnalyzerPipeline:
AnalyzerPipeline.python_logger_deadlock_workaround() AnalyzerPipeline.python_logger_deadlock_workaround()
try: try:
logging.info("111")
if not isinstance(queue, multiprocessing.queues.Queue): if not isinstance(queue, multiprocessing.queues.Queue):
raise TypeError("queue must be a multiprocessing.Queue()") raise TypeError("queue must be a multiprocessing.Queue()")
if not isinstance(audio_file_path, unicode): if not isinstance(audio_file_path, unicode):
@ -62,18 +62,15 @@ class AnalyzerPipeline:
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
logging.info("222")
csu = CloudStorageUploader() csu = CloudStorageUploader(cloud_storage_config)
if csu.enabled(): if csu.enabled():
logging.info("333")
metadata = csu.upload_obj(audio_file_path, metadata) metadata = csu.upload_obj(audio_file_path, metadata)
else: else:
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
metadata["import_status"] = 0 # Successfully imported metadata["import_status"] = 0 # Successfully imported
logging.info("444")
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication
# back to the main process. # back to the main process.
@ -81,7 +78,6 @@ class AnalyzerPipeline:
# Pass all the file metadata back to the main analyzer process, which then passes # Pass all the file metadata back to the main analyzer process, which then passes
# it back to the Airtime web application. # it back to the Airtime web application.
queue.put(metadata) queue.put(metadata)
logging.info("555")
except UnplayableFileError as e: except UnplayableFileError as e:
logging.exception(e) logging.exception(e)
metadata["import_status"] = 2 metadata["import_status"] = 2

View File

@ -1,11 +1,9 @@
import os import os
import logging import logging
import uuid import uuid
import config_file
from boto.s3.connection import S3Connection from boto.s3.connection import S3Connection
from boto.s3.key import Key from boto.s3.key import Key
AIRTIME_CONFIG_PATH = '/etc/airtime/airtime.conf'
STORAGE_BACKEND_FILE = "file" STORAGE_BACKEND_FILE = "file"
class CloudStorageUploader: class CloudStorageUploader:
@ -22,17 +20,7 @@ class CloudStorageUploader:
_api_key_secret: Secret access key to objects on Amazon S3. _api_key_secret: Secret access key to objects on Amazon S3.
""" """
def __init__(self): def __init__(self, config):
airtime_config = config_file.read_config_file(AIRTIME_CONFIG_PATH)
dev_env = "production" # Default
if airtime_config.has_option("general", "dev_env"):
dev_env = airtime_config.get("general", "dev_env")
CLOUD_CONFIG_PATH = "/etc/airtime-saas/%s/cloud_storage_%s.conf" % (dev_env, dev_env)
logging.info(CLOUD_CONFIG_PATH)
config = config_file.read_config_file(CLOUD_CONFIG_PATH)
CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend") CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend")
self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION
@ -73,7 +61,7 @@ class CloudStorageUploader:
resource_id: The unique object name used to identify the objects resource_id: The unique object name used to identify the objects
on Amazon S3 on Amazon S3
""" """
logging.info("aaa")
file_base_name = os.path.basename(audio_file_path) file_base_name = os.path.basename(audio_file_path)
file_name, extension = os.path.splitext(file_base_name) file_name, extension = os.path.splitext(file_base_name)
@ -83,7 +71,6 @@ class CloudStorageUploader:
file_name = file_name.replace(" ", "-") file_name = file_name.replace(" ", "-")
unique_id = str(uuid.uuid4()) unique_id = str(uuid.uuid4())
logging.info("bbb")
# We add another prefix to the resource name with the last two characters # We add another prefix to the resource name with the last two characters
# of the unique id so files are not all placed under the root folder. We # of the unique id so files are not all placed under the root folder. We
@ -91,10 +78,8 @@ class CloudStorageUploader:
# is done via the S3 Browser client. The client will hang if there are too # is done via the S3 Browser client. The client will hang if there are too
# many files under the same folder. # many files under the same folder.
unique_id_prefix = unique_id[-2:] unique_id_prefix = unique_id[-2:]
logging.info("ccc")
resource_id = "%s/%s/%s_%s%s" % (metadata['file_prefix'], unique_id_prefix, file_name, unique_id, extension) resource_id = "%s/%s/%s_%s%s" % (metadata['file_prefix'], unique_id_prefix, file_name, unique_id, extension)
logging.info("ddd")
conn = S3Connection(self._api_key, self._api_key_secret, host=self._host) conn = S3Connection(self._api_key, self._api_key_secret, host=self._host)
bucket = conn.get_bucket(self._bucket) bucket = conn.get_bucket(self._bucket)
@ -105,7 +90,6 @@ class CloudStorageUploader:
key.set_contents_from_filename(audio_file_path) key.set_contents_from_filename(audio_file_path)
metadata["filesize"] = os.path.getsize(audio_file_path) metadata["filesize"] = os.path.getsize(audio_file_path)
logging.info("eee")
# Remove file from organize directory # Remove file from organize directory
try: try:

View File

@ -55,12 +55,13 @@ QUEUE = "airtime-uploads"
""" """
class MessageListener: class MessageListener:
def __init__(self, config): def __init__(self, config, cloud_storage_config):
''' Start listening for file upload notification messages ''' Start listening for file upload notification messages
from RabbitMQ from RabbitMQ
Keyword arguments: Keyword arguments:
config: A ConfigParser object containing the [rabbitmq] configuration. config: A ConfigParser object containing the [rabbitmq] configuration.
cloud_storage_config: A ConfigParser object containing the cloud storage configuration.
''' '''
self._shutdown = False self._shutdown = False
@ -74,6 +75,8 @@ class MessageListener:
self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
self.cloud_storage_config = cloud_storage_config
# Set up a signal handler so we can shutdown gracefully # Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather # For some reason, this signal handler must be set up here. I'd rather
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do # put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
@ -167,7 +170,7 @@ class MessageListener:
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
file_prefix = msg_dict["file_prefix"] file_prefix = msg_dict["file_prefix"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, self.cloud_storage_config)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
@ -207,11 +210,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, file_prefix)) args=(q, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0:

View File

@ -9,6 +9,7 @@ import airtime_analyzer.airtime_analyzer as aa
VERSION = "1.0" VERSION = "1.0"
DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf' DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf'
DEFAULT_CLOUD_STORAGE_CONFIG_PATH = '/etc/airtime-saas/production/cloud_storage.conf'
DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries' DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries'
def run(): def run():
@ -18,6 +19,7 @@ def run():
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true")
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH) parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH)
parser.add_argument("--cloud-storage-config-file", help="specify a configuration file with cloud storage settings (default is %s)" % DEFAULT_CLOUD_STORAGE_CONFIG_PATH)
parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH) parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH)
args = parser.parse_args() args = parser.parse_args()
@ -25,20 +27,25 @@ def run():
#Default config file path #Default config file path
config_path = DEFAULT_CONFIG_PATH config_path = DEFAULT_CONFIG_PATH
cloud_storage_config_path = DEFAULT_CLOUD_STORAGE_CONFIG_PATH
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
if args.rmq_config_file: if args.rmq_config_file:
config_path = args.rmq_config_file config_path = args.rmq_config_file
if args.cloud_storage_config_file:
cloud_storage_config_path = args.cloud_storage_config_file
if args.http_retry_queue_file: if args.http_retry_queue_file:
http_retry_queue_path = args.http_retry_queue_file http_retry_queue_path = args.http_retry_queue_file
if args.daemon: if args.daemon:
with daemon.DaemonContext(): with daemon.DaemonContext():
aa.AirtimeAnalyzerServer(config_path=config_path, aa.AirtimeAnalyzerServer(config_path=config_path,
cloud_storage_config_path = cloud_storage_config_path,
http_retry_queue_path=http_retry_queue_path, http_retry_queue_path=http_retry_queue_path,
debug=args.debug) debug=args.debug)
else: else:
# Run without daemonizing # Run without daemonizing
aa.AirtimeAnalyzerServer(config_path=config_path, aa.AirtimeAnalyzerServer(config_path=config_path,
cloud_storage_config_path = cloud_storage_config_path,
http_retry_queue_path=http_retry_queue_path, http_retry_queue_path=http_retry_queue_path,
debug=args.debug) debug=args.debug)