From 7b3f9af04c319901ec8a3114693f2deb8016585c Mon Sep 17 00:00:00 2001 From: drigato Date: Tue, 3 Feb 2015 15:55:47 -0500 Subject: [PATCH] SAAS-560: Deploy separate cloud storage config files for each development environment Changed analyzer upstart to take the cloud storage config file as a command line option Dropped the dev env portion from the rabbitmq-analyzer.ini filename --- airtime_mvc/application/configs/conf.php | 2 +- airtime_mvc/application/models/RabbitMq.php | 2 +- .../airtime_analyzer/airtime_analyzer.py | 9 ++++++--- .../airtime_analyzer/analyzer_pipeline.py | 12 ++++------- .../cloud_storage_uploader.py | 20 ++----------------- .../airtime_analyzer/message_listener.py | 11 ++++++---- .../airtime_analyzer/bin/airtime_analyzer | 9 ++++++++- 7 files changed, 29 insertions(+), 36 deletions(-) diff --git a/airtime_mvc/application/configs/conf.php b/airtime_mvc/application/configs/conf.php index dacdcc698..75a69d751 100644 --- a/airtime_mvc/application/configs/conf.php +++ b/airtime_mvc/application/configs/conf.php @@ -42,7 +42,7 @@ class Config { } // Parse separate conf file for cloud storage values - $cloudStorageConfig = "/etc/airtime-saas/".$CC_CONFIG['dev_env']."/cloud_storage_".$CC_CONFIG['dev_env'].".conf"; + $cloudStorageConfig = "/etc/airtime-saas/".$CC_CONFIG['dev_env']."/cloud_storage.conf"; $cloudStorageValues = parse_ini_file($cloudStorageConfig, true); $CC_CONFIG["supportedStorageBackends"] = array('amazon_S3'); diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index 30481b216..9ab7e6c22 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -89,7 +89,7 @@ class Application_Model_RabbitMq if (array_key_exists("dev_env", $CC_CONFIG)) { $devEnv = $CC_CONFIG["dev_env"]; } - $config = parse_ini_file("/etc/airtime-saas/".$devEnv."/rabbitmq-analyzer-" . $devEnv . ".ini", true); + $config = parse_ini_file("/etc/airtime-saas/".$devEnv."/rabbitmq-analyzer.ini", true); $conn = new AMQPConnection($config["rabbitmq"]["host"], $config["rabbitmq"]["port"], $config["rabbitmq"]["user"], diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index c643f21c9..2dc78d5bf 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -23,7 +23,7 @@ class AirtimeAnalyzerServer: # Variables _log_level = logging.INFO - def __init__(self, config_path, http_retry_queue_path, debug=False): + def __init__(self, config_path, cloud_storage_config_path, http_retry_queue_path, debug=False): # Dump a stacktrace with 'kill -SIGUSR2 ' signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace()) @@ -31,15 +31,18 @@ class AirtimeAnalyzerServer: # Configure logging self.setup_logging(debug) - # Read our config file + # Read our rmq config file config = config_file.read_config_file(config_path) + + # Read the cloud storage config file + cloud_storage_config = config_file.read_config_file(cloud_storage_config_path) # Start up the StatusReporter process StatusReporter.start_thread(http_retry_queue_path) # Start listening for RabbitMQ messages telling us about newly # uploaded files. This blocks until we recieve a shutdown signal. - self._msg_listener = MessageListener(config) + self._msg_listener = MessageListener(config, cloud_storage_config) StatusReporter.stop_thread() diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index 0779d5146..7d56f0de3 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -21,7 +21,7 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix): + def run_analysis(queue, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -34,7 +34,8 @@ class AnalyzerPipeline: preserve. The file at audio_file_path typically has a temporary randomly generated name, which is why we want to know what the original name was. - station_domain: The Airtime Pro account's domain name. i.e. bananas + file_prefix: + cloud_storage_config: ConfigParser object containing the cloud storage configuration settings """ # It is super critical to initialize a separate log file here so that we # don't inherit logging/locks from the parent process. Supposedly @@ -42,7 +43,6 @@ class AnalyzerPipeline: AnalyzerPipeline.python_logger_deadlock_workaround() try: - logging.info("111") if not isinstance(queue, multiprocessing.queues.Queue): raise TypeError("queue must be a multiprocessing.Queue()") if not isinstance(audio_file_path, unicode): @@ -62,18 +62,15 @@ class AnalyzerPipeline: metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) - logging.info("222") - csu = CloudStorageUploader() + csu = CloudStorageUploader(cloud_storage_config) if csu.enabled(): - logging.info("333") metadata = csu.upload_obj(audio_file_path, metadata) else: metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) metadata["import_status"] = 0 # Successfully imported - logging.info("444") # Note that the queue we're putting the results into is our interprocess communication # back to the main process. @@ -81,7 +78,6 @@ class AnalyzerPipeline: # Pass all the file metadata back to the main analyzer process, which then passes # it back to the Airtime web application. queue.put(metadata) - logging.info("555") except UnplayableFileError as e: logging.exception(e) metadata["import_status"] = 2 diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py index e47a6b004..eda3aabee 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -1,11 +1,9 @@ import os import logging import uuid -import config_file from boto.s3.connection import S3Connection from boto.s3.key import Key -AIRTIME_CONFIG_PATH = '/etc/airtime/airtime.conf' STORAGE_BACKEND_FILE = "file" class CloudStorageUploader: @@ -22,17 +20,7 @@ class CloudStorageUploader: _api_key_secret: Secret access key to objects on Amazon S3. """ - def __init__(self): - - airtime_config = config_file.read_config_file(AIRTIME_CONFIG_PATH) - dev_env = "production" # Default - if airtime_config.has_option("general", "dev_env"): - dev_env = airtime_config.get("general", "dev_env") - - - CLOUD_CONFIG_PATH = "/etc/airtime-saas/%s/cloud_storage_%s.conf" % (dev_env, dev_env) - logging.info(CLOUD_CONFIG_PATH) - config = config_file.read_config_file(CLOUD_CONFIG_PATH) + def __init__(self, config): CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend") self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION @@ -73,7 +61,7 @@ class CloudStorageUploader: resource_id: The unique object name used to identify the objects on Amazon S3 """ - logging.info("aaa") + file_base_name = os.path.basename(audio_file_path) file_name, extension = os.path.splitext(file_base_name) @@ -83,7 +71,6 @@ class CloudStorageUploader: file_name = file_name.replace(" ", "-") unique_id = str(uuid.uuid4()) - logging.info("bbb") # We add another prefix to the resource name with the last two characters # of the unique id so files are not all placed under the root folder. We @@ -91,10 +78,8 @@ class CloudStorageUploader: # is done via the S3 Browser client. The client will hang if there are too # many files under the same folder. unique_id_prefix = unique_id[-2:] - logging.info("ccc") resource_id = "%s/%s/%s_%s%s" % (metadata['file_prefix'], unique_id_prefix, file_name, unique_id, extension) - logging.info("ddd") conn = S3Connection(self._api_key, self._api_key_secret, host=self._host) bucket = conn.get_bucket(self._bucket) @@ -105,7 +90,6 @@ class CloudStorageUploader: key.set_contents_from_filename(audio_file_path) metadata["filesize"] = os.path.getsize(audio_file_path) - logging.info("eee") # Remove file from organize directory try: diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 4f1e31084..76713d47f 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -55,12 +55,13 @@ QUEUE = "airtime-uploads" """ class MessageListener: - def __init__(self, config): + def __init__(self, config, cloud_storage_config): ''' Start listening for file upload notification messages from RabbitMQ Keyword arguments: config: A ConfigParser object containing the [rabbitmq] configuration. + cloud_storage_config: A ConfigParser object containing the cloud storage configuration. ''' self._shutdown = False @@ -73,6 +74,8 @@ class MessageListener: self._username = config.get(RMQ_CONFIG_SECTION, 'user') self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') + + self.cloud_storage_config = cloud_storage_config # Set up a signal handler so we can shutdown gracefully # For some reason, this signal handler must be set up here. I'd rather @@ -167,7 +170,7 @@ class MessageListener: original_filename = msg_dict["original_filename"] file_prefix = msg_dict["file_prefix"] - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix) + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, self.cloud_storage_config) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) @@ -207,11 +210,11 @@ class MessageListener: channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod - def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix): + def spawn_analyzer_process(audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config): ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename, file_prefix)) + args=(q, audio_file_path, import_directory, original_filename, file_prefix, cloud_storage_config)) p.start() p.join() if p.exitcode == 0: diff --git a/python_apps/airtime_analyzer/bin/airtime_analyzer b/python_apps/airtime_analyzer/bin/airtime_analyzer index 35debbd41..154274a93 100755 --- a/python_apps/airtime_analyzer/bin/airtime_analyzer +++ b/python_apps/airtime_analyzer/bin/airtime_analyzer @@ -9,6 +9,7 @@ import airtime_analyzer.airtime_analyzer as aa VERSION = "1.0" DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf' +DEFAULT_CLOUD_STORAGE_CONFIG_PATH = '/etc/airtime-saas/production/cloud_storage.conf' DEFAULT_HTTP_RETRY_PATH = '/tmp/airtime_analyzer_http_retries' def run(): @@ -18,6 +19,7 @@ def run(): parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true") parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH) + parser.add_argument("--cloud-storage-config-file", help="specify a configuration file with cloud storage settings (default is %s)" % DEFAULT_CLOUD_STORAGE_CONFIG_PATH) parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH) args = parser.parse_args() @@ -25,20 +27,25 @@ def run(): #Default config file path config_path = DEFAULT_CONFIG_PATH + cloud_storage_config_path = DEFAULT_CLOUD_STORAGE_CONFIG_PATH http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH if args.rmq_config_file: config_path = args.rmq_config_file + if args.cloud_storage_config_file: + cloud_storage_config_path = args.cloud_storage_config_file if args.http_retry_queue_file: http_retry_queue_path = args.http_retry_queue_file if args.daemon: with daemon.DaemonContext(): - aa.AirtimeAnalyzerServer(config_path=config_path, + aa.AirtimeAnalyzerServer(config_path=config_path, + cloud_storage_config_path = cloud_storage_config_path, http_retry_queue_path=http_retry_queue_path, debug=args.debug) else: # Run without daemonizing aa.AirtimeAnalyzerServer(config_path=config_path, + cloud_storage_config_path = cloud_storage_config_path, http_retry_queue_path=http_retry_queue_path, debug=args.debug)