From 3f5b4faf1c593ba41ca6d938c14419371e1a0c60 Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Fri, 19 Dec 2014 12:58:55 -0500 Subject: [PATCH 1/3] Cloud storage cleanup and fixed the unit tests --- .../airtime_analyzer/airtime_analyzer.py | 20 ++--------- .../airtime_analyzer/analyzer_pipeline.py | 13 ++++--- .../cloud_storage_uploader.py | 36 +++++++++++++------ .../airtime_analyzer/message_listener.py | 16 ++++----- 4 files changed, 39 insertions(+), 46 deletions(-) diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index dae271960..c643f21c9 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -1,11 +1,11 @@ """Contains the main application class for airtime_analyzer. """ -import ConfigParser import logging import logging.handlers import sys import signal import traceback +import config_file from functools import partial from metadata_analyzer import MetadataAnalyzer from replaygain_analyzer import ReplayGainAnalyzer @@ -32,7 +32,7 @@ class AirtimeAnalyzerServer: self.setup_logging(debug) # Read our config file - config = AirtimeAnalyzerServer.read_config_file(config_path) + config = config_file.read_config_file(config_path) # Start up the StatusReporter process StatusReporter.start_thread(http_retry_queue_path) @@ -71,22 +71,6 @@ class AirtimeAnalyzerServer: consoleHandler = logging.StreamHandler() consoleHandler.setFormatter(logFormatter) rootLogger.addHandler(consoleHandler) - - - @staticmethod - def read_config_file(config_path): - """Parse the application's config file located at config_path.""" - config = ConfigParser.SafeConfigParser() - try: - config.readfp(open(config_path)) - except IOError as e: - print "Failed to open config file at " + config_path + ": " + e.strerror - exit(-1) - except Exception: - print e.strerror - exit(-1) - - return config @classmethod def dump_stacktrace(stack): diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index a11248066..514c1db38 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -21,7 +21,7 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend): + def run_analysis(queue, audio_file_path, import_directory, original_filename): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -55,18 +55,17 @@ class AnalyzerPipeline: # Analyze the audio file we were told to analyze: # First, we extract the ID3 tags and other metadata: metadata = dict() - metadata["station_domain"] = station_domain - metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) - if current_storage_backend == "file": - metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) - else: - csu = CloudStorageUploader() + + csu = CloudStorageUploader() + if csu.enabled(): metadata = csu.upload_obj(audio_file_path, metadata) + else: + metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) metadata["import_status"] = 0 # Successfully imported diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py index b9178709c..045c719f1 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -1,12 +1,13 @@ import os import logging import uuid -import airtime_analyzer as aa +import config_file from libcloud.storage.providers import get_driver from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError -CONFIG_PATH = '/etc/airtime-saas/cloud_storage.conf' +CLOUD_CONFIG_PATH = '/etc/airtime-saas/cloud_storage.conf' +STORAGE_BACKEND_FILE = "file" class CloudStorageUploader: """ A class that uses Apache Libcloud's Storage API to upload objects into @@ -25,14 +26,28 @@ class CloudStorageUploader: """ def __init__(self): - config = aa.AirtimeAnalyzerServer.read_config_file(CONFIG_PATH) - + + config = config_file.read_config_file(CLOUD_CONFIG_PATH) + CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend") self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION - self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') - self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') - self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') - self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') + if self._storage_backend == STORAGE_BACKEND_FILE: + self._provider = "" + self._bucket = "" + self._api_key = "" + self._api_key_secret = "" + else: + self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') + self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') + self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') + self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') + + def enabled(self): + if self._storage_backend == "file": + return False + else: + return True + def upload_obj(self, audio_file_path, metadata): """Uploads a file into Amazon S3 object storage. @@ -61,7 +76,7 @@ class CloudStorageUploader: # in the object name. URL encoding the object name doesn't solve the # problem. As a solution we will replace spaces with dashes. file_name = file_name.replace(" ", "-") - object_name = "%s/%s_%s%s" % (metadata["station_domain"], file_name, str(uuid.uuid4()), extension) + object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension) provider_driver_class = get_driver(getattr(Provider, self._provider)) driver = provider_driver_class(self._api_key, self._api_key_secret) @@ -71,8 +86,7 @@ class CloudStorageUploader: except ContainerDoesNotExistError: container = driver.create_container(self._bucket) - extra = {'meta_data': {'filename': file_base_name, - 'station_domain': metadata["station_domain"]}} + extra = {'meta_data': {'filename': file_base_name}} obj = driver.upload_object(file_path=audio_file_path, container=container, diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 245a74118..eb5909247 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -112,7 +112,7 @@ class MessageListener: self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) logging.info(" Listening for messages...") - self._channel.basic_consume(MessageListener.msg_received_callback, + self._channel.basic_consume(self.msg_received_callback, queue=QUEUE, no_ack=False) def wait_for_messages(self): @@ -134,7 +134,6 @@ class MessageListener: self._shutdown = True self.disconnect_from_messaging_server() - @staticmethod def msg_received_callback(channel, method_frame, header_frame, body): ''' A callback method that runs when a RabbitMQ message is received. @@ -151,7 +150,6 @@ class MessageListener: callback_url = "" api_key = "" station_domain = "" - current_storage_backend = "" ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue to pass objects between the processes so that if the analyzer process crashes, it does not @@ -163,14 +161,12 @@ class MessageListener: msg_dict = json.loads(body) api_key = msg_dict["api_key"] callback_url = msg_dict["callback_url"] - station_domain = msg_dict["station_domain"] - + audio_file_path = msg_dict["tmp_file_path"] import_directory = msg_dict["import_directory"] original_filename = msg_dict["original_filename"] - current_storage_backend = msg_dict["current_storage_backend"] - - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend) + + audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -209,11 +205,11 @@ class MessageListener: channel.basic_ack(delivery_tag=method_frame.delivery_tag) @staticmethod - def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend): + def spawn_analyzer_process(audio_file_path, import_directory, original_filename): ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend)) + args=(q, audio_file_path, import_directory, original_filename)) p.start() p.join() if p.exitcode == 0: From 1718868835cf5c21bcd33bf8be3c04216c93a5dc Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Fri, 19 Dec 2014 12:59:20 -0500 Subject: [PATCH 2/3] Stub CloudStorageAnalyzer tests --- .../tests/cloud_storage_uploader_tests.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py diff --git a/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py b/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py new file mode 100644 index 000000000..cd5b29f8d --- /dev/null +++ b/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py @@ -0,0 +1,12 @@ +from nose.tools import * +from airtime_analyzer.cloud_storage_uploader import CloudStorageUploader +from airtime_analyzer.airtime_analyzer import AirtimeAnalyzerServer + +def setup(): + pass + +def teardown(): + pass + +def test_analyze(): + cl = CloudStorageUploader() From 9a8b34feae76b7a02916d80da565dea69c288cb8 Mon Sep 17 00:00:00 2001 From: Albert Santoni Date: Fri, 19 Dec 2014 14:03:28 -0500 Subject: [PATCH 3/3] Unit test tweak for CloudStorage --- .../airtime_analyzer/tests/cloud_storage_uploader_tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py b/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py index cd5b29f8d..d54e4573a 100644 --- a/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py +++ b/python_apps/airtime_analyzer/tests/cloud_storage_uploader_tests.py @@ -10,3 +10,4 @@ def teardown(): def test_analyze(): cl = CloudStorageUploader() + cl._storage_backend = "file" \ No newline at end of file