Made AirtimeAnalyzerServer.read_config_file static so CloudStorageUploader can use it.

Reverted MessageListener.msg_received_callback and MessageListener.spawn_analyzer_process back to static methods.
Moved cloud storage settings out of MessageListener and into CloudStorageUploader
This commit is contained in:
drigato 2014-11-05 16:24:03 -05:00
parent 2f16385b53
commit 6adb33e89b
6 changed files with 30 additions and 35 deletions

View file

@ -25,13 +25,16 @@ class Config {
$filename = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf"; $filename = isset($_SERVER['AIRTIME_CONF']) ? $_SERVER['AIRTIME_CONF'] : "/etc/airtime/airtime.conf";
} }
// Parse separate conf file for Amazon S3 values
$amazonFilename = "/etc/airtime-saas/amazon.conf";
$amazonValues = parse_ini_file($amazonFilename, true);
$CC_CONFIG['cloud_storage'] = $amazonValues['cloud_storage'];
$values = parse_ini_file($filename, true); $values = parse_ini_file($filename, true);
// Name of the web server user // Name of the web server user
$CC_CONFIG['webServerUser'] = $values['general']['web_server_user']; $CC_CONFIG['webServerUser'] = $values['general']['web_server_user'];
$CC_CONFIG['rabbitmq'] = $values['rabbitmq']; $CC_CONFIG['rabbitmq'] = $values['rabbitmq'];
$CC_CONFIG['cloud_storage'] = $values['cloud_storage'];
$CC_CONFIG['baseDir'] = $values['general']['base_dir']; $CC_CONFIG['baseDir'] = $values['general']['base_dir'];
$CC_CONFIG['baseUrl'] = $values['general']['base_url']; $CC_CONFIG['baseUrl'] = $values['general']['base_url'];

View file

@ -29,10 +29,4 @@ monit_password = airtime
[soundcloud] [soundcloud]
connection_retries = 3 connection_retries = 3
time_between_retries = 60 time_between_retries = 60
[cloud_storage]
provider = S3
bucket =
api_key =
api_key_secret =

View file

@ -32,7 +32,7 @@ class AirtimeAnalyzerServer:
self.setup_logging(debug) self.setup_logging(debug)
# Read our config file # Read our config file
config = self.read_config_file(config_path) config = AirtimeAnalyzerServer.read_config_file(config_path)
# Start up the StatusReporter process # Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path) StatusReporter.start_thread(http_retry_queue_path)
@ -73,7 +73,8 @@ class AirtimeAnalyzerServer:
rootLogger.addHandler(consoleHandler) rootLogger.addHandler(consoleHandler)
def read_config_file(self, config_path): @staticmethod
def read_config_file(config_path):
"""Parse the application's config file located at config_path.""" """Parse the application's config file located at config_path."""
config = ConfigParser.SafeConfigParser() config = ConfigParser.SafeConfigParser()
try: try:

View file

@ -18,8 +18,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, def run_analysis(queue, audio_file_path, import_directory, original_filename):
cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -54,7 +53,7 @@ class AnalyzerPipeline:
metadata = dict() metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
#metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) #metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
csu = CloudStorageUploader(cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret) csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata) metadata = csu.upload_obj(audio_file_path, metadata)
metadata["import_status"] = 0 # imported metadata["import_status"] = 0 # imported

View file

@ -1,10 +1,13 @@
import os import os
import logging import logging
import uuid import uuid
import airtime_analyzer as aa
from libcloud.storage.providers import get_driver from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError
CONFIG_PATH = '/etc/airtime-saas/amazon.conf'
class CloudStorageUploader: class CloudStorageUploader:
""" A class that uses Apache Libcloud's Storage API to upload objects into """ A class that uses Apache Libcloud's Storage API to upload objects into
a cloud storage backend. For this implementation all files will be uploaded a cloud storage backend. For this implementation all files will be uploaded
@ -21,11 +24,14 @@ class CloudStorageUploader:
_api_key_secret: Secret access key to objects on the provider's storage backend. _api_key_secret: Secret access key to objects on the provider's storage backend.
""" """
def __init__(self, provider, bucket, api_key, api_key_secret): def __init__(self):
self._provider = provider config = aa.AirtimeAnalyzerServer.read_config_file(CONFIG_PATH)
self._bucket = bucket
self._api_key = api_key CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
self._api_key_secret = api_key_secret self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
def upload_obj(self, audio_file_path, metadata): def upload_obj(self, audio_file_path, metadata):
"""Uploads a file into Amazon S3 object storage. """Uploads a file into Amazon S3 object storage.

View file

@ -74,13 +74,6 @@ class MessageListener:
self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
# Read the S3 API setting from the config file
CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
# Set up a signal handler so we can shutdown gracefully # Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather # For some reason, this signal handler must be set up here. I'd rather
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do # put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
@ -119,7 +112,7 @@ class MessageListener:
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
logging.info(" Listening for messages...") logging.info(" Listening for messages...")
self._channel.basic_consume(self.msg_received_callback, self._channel.basic_consume(MessageListener.msg_received_callback,
queue=QUEUE, no_ack=False) queue=QUEUE, no_ack=False)
def wait_for_messages(self): def wait_for_messages(self):
@ -141,8 +134,8 @@ class MessageListener:
self._shutdown = True self._shutdown = True
self.disconnect_from_messaging_server() self.disconnect_from_messaging_server()
#@staticmethod @staticmethod
def msg_received_callback(self, channel, method_frame, header_frame, body): def msg_received_callback(channel, method_frame, header_frame, body):
''' A callback method that runs when a RabbitMQ message is received. ''' A callback method that runs when a RabbitMQ message is received.
Here we parse the message, spin up an analyzer process, and report the Here we parse the message, spin up an analyzer process, and report the
@ -173,7 +166,7 @@ class MessageListener:
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -211,13 +204,12 @@ class MessageListener:
# If we don't ack, then RabbitMQ will redeliver the message in the future. # If we don't ack, then RabbitMQ will redeliver the message in the future.
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
#@staticmethod @staticmethod
def spawn_analyzer_process(self, audio_file_path, import_directory, original_filename): def spawn_analyzer_process(audio_file_path, import_directory, original_filename):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, args=(q, audio_file_path, import_directory, original_filename))
self._provider, self._bucket, self._api_key, self._api_key_secret))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: