Made AirtimeAnalyzerServer.read_config_file static so CloudStorageUploader can use it.

Reverted MessageListener.msg_received_callback and MessageListener.spawn_analyzer_process back to static methods.
Moved cloud storage settings out of MessageListener and into CloudStorageUploader
This commit is contained in:
drigato 2014-11-05 16:24:03 -05:00
parent 27b8f0ac13
commit e24b4f43fd
6 changed files with 30 additions and 35 deletions

View file

@ -32,7 +32,7 @@ class AirtimeAnalyzerServer:
self.setup_logging(debug)
# Read our config file
config = self.read_config_file(config_path)
config = AirtimeAnalyzerServer.read_config_file(config_path)
# Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path)
@ -73,7 +73,8 @@ class AirtimeAnalyzerServer:
rootLogger.addHandler(consoleHandler)
def read_config_file(self, config_path):
@staticmethod
def read_config_file(config_path):
"""Parse the application's config file located at config_path."""
config = ConfigParser.SafeConfigParser()
try:

View file

@ -18,8 +18,7 @@ class AnalyzerPipeline:
"""
@staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename,
cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret):
def run_analysis(queue, audio_file_path, import_directory, original_filename):
"""Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments:
@ -54,7 +53,7 @@ class AnalyzerPipeline:
metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
#metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
csu = CloudStorageUploader(cloud_provider, cloud_bucket, cloud_api_key, cloud_api_key_secret)
csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata)
metadata["import_status"] = 0 # imported

View file

@ -1,10 +1,13 @@
import os
import logging
import uuid
import airtime_analyzer as aa
from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider, ContainerDoesNotExistError, ObjectDoesNotExistError
CONFIG_PATH = '/etc/airtime-saas/amazon.conf'
class CloudStorageUploader:
""" A class that uses Apache Libcloud's Storage API to upload objects into
a cloud storage backend. For this implementation all files will be uploaded
@ -21,11 +24,14 @@ class CloudStorageUploader:
_api_key_secret: Secret access key to objects on the provider's storage backend.
"""
def __init__(self, provider, bucket, api_key, api_key_secret):
self._provider = provider
self._bucket = bucket
self._api_key = api_key
self._api_key_secret = api_key_secret
def __init__(self):
config = aa.AirtimeAnalyzerServer.read_config_file(CONFIG_PATH)
CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
def upload_obj(self, audio_file_path, metadata):
"""Uploads a file into Amazon S3 object storage.

View file

@ -74,13 +74,6 @@ class MessageListener:
self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
# Read the S3 API setting from the config file
CLOUD_STORAGE_CONFIG_SECTION = "cloud_storage"
self._provider = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'provider')
self._bucket = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'bucket')
self._api_key = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key')
self._api_key_secret = config.get(CLOUD_STORAGE_CONFIG_SECTION, 'api_key_secret')
# Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
@ -119,7 +112,7 @@ class MessageListener:
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
logging.info(" Listening for messages...")
self._channel.basic_consume(self.msg_received_callback,
self._channel.basic_consume(MessageListener.msg_received_callback,
queue=QUEUE, no_ack=False)
def wait_for_messages(self):
@ -141,8 +134,8 @@ class MessageListener:
self._shutdown = True
self.disconnect_from_messaging_server()
#@staticmethod
def msg_received_callback(self, channel, method_frame, header_frame, body):
@staticmethod
def msg_received_callback(channel, method_frame, header_frame, body):
''' A callback method that runs when a RabbitMQ message is received.
Here we parse the message, spin up an analyzer process, and report the
@ -173,7 +166,7 @@ class MessageListener:
import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"]
audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e:
@ -211,13 +204,12 @@ class MessageListener:
# If we don't ack, then RabbitMQ will redeliver the message in the future.
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
#@staticmethod
def spawn_analyzer_process(self, audio_file_path, import_directory, original_filename):
@staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename):
''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename,
self._provider, self._bucket, self._api_key, self._api_key_secret))
args=(q, audio_file_path, import_directory, original_filename))
p.start()
p.join()
if p.exitcode == 0: