Cloud storage cleanup and fixed the unit tests

This commit is contained in:
Albert Santoni 2014-12-19 12:58:55 -05:00
parent cdabbc6648
commit 3f5b4faf1c
4 changed files with 39 additions and 46 deletions

View File

@ -1,11 +1,11 @@
"""Contains the main application class for airtime_analyzer. """Contains the main application class for airtime_analyzer.
""" """
import ConfigParser
import logging import logging
import logging.handlers import logging.handlers
import sys import sys
import signal import signal
import traceback import traceback
import config_file
from functools import partial from functools import partial
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
from replaygain_analyzer import ReplayGainAnalyzer from replaygain_analyzer import ReplayGainAnalyzer
@ -32,7 +32,7 @@ class AirtimeAnalyzerServer:
self.setup_logging(debug) self.setup_logging(debug)
# Read our config file # Read our config file
config = AirtimeAnalyzerServer.read_config_file(config_path) config = config_file.read_config_file(config_path)
# Start up the StatusReporter process # Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path) StatusReporter.start_thread(http_retry_queue_path)
@ -71,22 +71,6 @@ class AirtimeAnalyzerServer:
consoleHandler = logging.StreamHandler() consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter) consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler) rootLogger.addHandler(consoleHandler)
@staticmethod
def read_config_file(config_path):
"""Parse the application's config file located at config_path."""
config = ConfigParser.SafeConfigParser()
try:
config.readfp(open(config_path))
except IOError as e:
print "Failed to open config file at " + config_path + ": " + e.strerror
exit(-1)
except Exception:
print e.strerror
exit(-1)
return config
@classmethod @classmethod
def dump_stacktrace(stack): def dump_stacktrace(stack):

View File

@ -21,7 +21,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend): def run_analysis(queue, audio_file_path, import_directory, original_filename):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -55,18 +55,17 @@ class AnalyzerPipeline:
# Analyze the audio file we were told to analyze: # Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = dict()
metadata["station_domain"] = station_domain
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
if current_storage_backend == "file":
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) csu = CloudStorageUploader()
else: if csu.enabled():
csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata) metadata = csu.upload_obj(audio_file_path, metadata)
else:
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
metadata["import_status"] = 0 # Successfully imported metadata["import_status"] = 0 # Successfully imported

View File

@ -1,12 +1,13 @@
import os import os
import logging import logging
import uuid import uuid
import airtime_analyzer as aa import config_file
from libcloud.storage.providers import get_driver from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError
CONFIG_PATH = '/etc/airtime-saas/cloud_storage.conf' CLOUD_CONFIG_PATH = '/etc/airtime-saas/cloud_storage.conf'
STORAGE_BACKEND_FILE = "file"
class CloudStorageUploader: class CloudStorageUploader:
""" A class that uses Apache Libcloud's Storage API to upload objects into """ A class that uses Apache Libcloud's Storage API to upload objects into
@ -25,14 +26,28 @@ class CloudStorageUploader:
""" """
def __init__(self): def __init__(self):
config = aa.AirtimeAnalyzerServer.read_config_file(CONFIG_PATH)
config = config_file.read_config_file(CLOUD_CONFIG_PATH)
CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend") CLOUD_STORAGE_CONFIG_SECTION = config.get("current_backend", "storage_backend")
self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION self._storage_backend = CLOUD_STORAGE_CONFIG_SECTION
self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider') if self._storage_backend == STORAGE_BACKEND_FILE:
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket') self._provider = ""
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key') self._bucket = ""
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret') self._api_key = ""
self._api_key_secret = ""
else:
self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
def enabled(self):
if self._storage_backend == "file":
return False
else:
return True
def upload_obj(self, audio_file_path, metadata): def upload_obj(self, audio_file_path, metadata):
"""Uploads a file into Amazon S3 object storage. """Uploads a file into Amazon S3 object storage.
@ -61,7 +76,7 @@ class CloudStorageUploader:
# in the object name. URL encoding the object name doesn't solve the # in the object name. URL encoding the object name doesn't solve the
# problem. As a solution we will replace spaces with dashes. # problem. As a solution we will replace spaces with dashes.
file_name = file_name.replace(" ", "-") file_name = file_name.replace(" ", "-")
object_name = "%s/%s_%s%s" % (metadata["station_domain"], file_name, str(uuid.uuid4()), extension) object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension)
provider_driver_class = get_driver(getattr(Provider, self._provider)) provider_driver_class = get_driver(getattr(Provider, self._provider))
driver = provider_driver_class(self._api_key, self._api_key_secret) driver = provider_driver_class(self._api_key, self._api_key_secret)
@ -71,8 +86,7 @@ class CloudStorageUploader:
except ContainerDoesNotExistError: except ContainerDoesNotExistError:
container = driver.create_container(self._bucket) container = driver.create_container(self._bucket)
extra = {'meta_data': {'filename': file_base_name, extra = {'meta_data': {'filename': file_base_name}}
'station_domain': metadata["station_domain"]}}
obj = driver.upload_object(file_path=audio_file_path, obj = driver.upload_object(file_path=audio_file_path,
container=container, container=container,

View File

@ -112,7 +112,7 @@ class MessageListener:
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
logging.info(" Listening for messages...") logging.info(" Listening for messages...")
self._channel.basic_consume(MessageListener.msg_received_callback, self._channel.basic_consume(self.msg_received_callback,
queue=QUEUE, no_ack=False) queue=QUEUE, no_ack=False)
def wait_for_messages(self): def wait_for_messages(self):
@ -134,7 +134,6 @@ class MessageListener:
self._shutdown = True self._shutdown = True
self.disconnect_from_messaging_server() self.disconnect_from_messaging_server()
@staticmethod
def msg_received_callback(channel, method_frame, header_frame, body): def msg_received_callback(channel, method_frame, header_frame, body):
''' A callback method that runs when a RabbitMQ message is received. ''' A callback method that runs when a RabbitMQ message is received.
@ -151,7 +150,6 @@ class MessageListener:
callback_url = "" callback_url = ""
api_key = "" api_key = ""
station_domain = "" station_domain = ""
current_storage_backend = ""
''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
to pass objects between the processes so that if the analyzer process crashes, it does not to pass objects between the processes so that if the analyzer process crashes, it does not
@ -163,14 +161,12 @@ class MessageListener:
msg_dict = json.loads(body) msg_dict = json.loads(body)
api_key = msg_dict["api_key"] api_key = msg_dict["api_key"]
callback_url = msg_dict["callback_url"] callback_url = msg_dict["callback_url"]
station_domain = msg_dict["station_domain"]
audio_file_path = msg_dict["tmp_file_path"] audio_file_path = msg_dict["tmp_file_path"]
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
current_storage_backend = msg_dict["current_storage_backend"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -209,11 +205,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend): def spawn_analyzer_process(audio_file_path, import_directory, original_filename):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend)) args=(q, audio_file_path, import_directory, original_filename))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: