Modified config file variables to describe not only rabbitmq settings

Removed filemover_analyzer functionality and replaced with an uploader function into Amazon s3
This commit is contained in:
drigato 2014-07-04 16:38:58 -04:00
parent 2b704178ea
commit 1393bad014
5 changed files with 70 additions and 20 deletions

View file

@ -23,7 +23,7 @@ class AirtimeAnalyzerServer:
# Variables # Variables
_log_level = logging.INFO _log_level = logging.INFO
def __init__(self, rmq_config_path, http_retry_queue_path, debug=False): def __init__(self, config_path, http_retry_queue_path, debug=False):
# Dump a stacktrace with 'kill -SIGUSR2 <PID>' # Dump a stacktrace with 'kill -SIGUSR2 <PID>'
signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace()) signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace())
@ -32,14 +32,14 @@ class AirtimeAnalyzerServer:
self.setup_logging(debug) self.setup_logging(debug)
# Read our config file # Read our config file
rabbitmq_config = self.read_config_file(rmq_config_path) config = self.read_config_file(config_path)
# Start up the StatusReporter process # Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path) StatusReporter.start_thread(http_retry_queue_path)
# Start listening for RabbitMQ messages telling us about newly # Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we recieve a shutdown signal. # uploaded files. This blocks until we recieve a shutdown signal.
self._msg_listener = MessageListener(rabbitmq_config) self._msg_listener = MessageListener(config)
StatusReporter.stop_thread() StatusReporter.stop_thread()

View file

@ -17,7 +17,8 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename): def run_analysis(queue, audio_file_path, import_directory, original_filename,
s3_bucket, s3_api_key, s3_api_key_secret):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -51,7 +52,8 @@ class AnalyzerPipeline:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata,
s3_bucket, s3_api_key, s3_api_key_secret)
metadata["import_status"] = 0 # imported metadata["import_status"] = 0 # imported
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication

View file

@ -18,7 +18,8 @@ class FileMoverAnalyzer(Analyzer):
raise Exception("Use FileMoverAnalyzer.move() instead.") raise Exception("Use FileMoverAnalyzer.move() instead.")
@staticmethod @staticmethod
def move(audio_file_path, import_directory, original_filename, metadata): def move(audio_file_path, import_directory, original_filename, metadata,
s3_bucket, s3_api_key, s3_api_key_secret):
"""Move the file at audio_file_path over into the import_directory/import, """Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename. renaming it to original_filename.
@ -42,9 +43,12 @@ class FileMoverAnalyzer(Analyzer):
# TODO: Also, handle the case where the move fails and write some code # TODO: Also, handle the case where the move fails and write some code
# to possibly move the file to problem_files. # to possibly move the file to problem_files.
#cloud storage doesn't need this
'''
max_dir_len = 32 max_dir_len = 32
max_file_len = 32 max_file_len = 32
final_file_path = import_directory final_file_path = import_directory
if metadata.has_key("artist_name"): if metadata.has_key("artist_name"):
final_file_path += "/" + metadata["artist_name"][0:max_dir_len] # truncating with array slicing final_file_path += "/" + metadata["artist_name"][0:max_dir_len] # truncating with array slicing
if metadata.has_key("album_title"): if metadata.has_key("album_title"):
@ -60,6 +64,7 @@ class FileMoverAnalyzer(Analyzer):
#the wrong information for the file we just overwrote (eg. the song length would be wrong!) #the wrong information for the file we just overwrote (eg. the song length would be wrong!)
#If the final file path is the same as the file we've been told to import (which #If the final file path is the same as the file we've been told to import (which
#you often do when you're debugging), then don't move the file at all. #you often do when you're debugging), then don't move the file at all.
if os.path.exists(final_file_path): if os.path.exists(final_file_path):
if os.path.samefile(audio_file_path, final_file_path): if os.path.samefile(audio_file_path, final_file_path):
metadata["full_path"] = final_file_path metadata["full_path"] = final_file_path
@ -74,12 +79,48 @@ class FileMoverAnalyzer(Analyzer):
#Ensure the full path to the file exists #Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path)) mkdir_p(os.path.dirname(final_file_path))
'''
file_base_name = os.path.basename(audio_file_path)
file_name, extension = os.path.splitext(file_base_name)
object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension)
from libcloud.storage.types import Provider, ContainerDoesNotExistError
from libcloud.storage.providers import get_driver
cls = get_driver(Provider.S3)
driver = cls(s3_api_key, s3_api_key_secret)
try:
container = driver.get_container(s3_bucket)
except ContainerDoesNotExistError:
container = driver.create_container(s3_bucket)
extra = {'meta_data': {'filename': file_base_name}}
#libcloud complains when float objects are in metadata
#extra = {'meta_data': metadata}
with open(audio_file_path, 'rb') as iterator:
obj = driver.upload_object_via_stream(iterator=iterator,
container=container,
object_name=object_name,
extra=extra)
#remove file from organize directory
try:
os.remove(audio_file_path)
except OSError:
pass
#cloud storage doesn't need this
'''
#Move the file into its final destination directory #Move the file into its final destination directory
logging.debug("Moving %s to %s" % (audio_file_path, final_file_path)) logging.debug("Moving %s to %s" % (audio_file_path, final_file_path))
shutil.move(audio_file_path, final_file_path) shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path metadata["full_path"] = final_file_path
'''
metadata["s3_object_name"] = object_name
return metadata return metadata
def mkdir_p(path): def mkdir_p(path):

View file

@ -73,6 +73,12 @@ class MessageListener:
self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._password = config.get(RMQ_CONFIG_SECTION, 'password')
self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost')
# Read the S3 API setting from the config file
S3_CONFIG_SECTION = "s3"
self._s3_bucket = config.get(S3_CONFIG_SECTION, 'bucket')
self._s3_api_key = config.get(S3_CONFIG_SECTION, 'api_key')
self._s3_api_key_secret = config.get(S3_CONFIG_SECTION, 'api_key_secret')
# Set up a signal handler so we can shutdown gracefully # Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather # For some reason, this signal handler must be set up here. I'd rather
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do # put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
@ -111,7 +117,7 @@ class MessageListener:
self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY)
logging.info(" Listening for messages...") logging.info(" Listening for messages...")
self._channel.basic_consume(MessageListener.msg_received_callback, self._channel.basic_consume(self.msg_received_callback,
queue=QUEUE, no_ack=False) queue=QUEUE, no_ack=False)
def wait_for_messages(self): def wait_for_messages(self):
@ -128,8 +134,8 @@ class MessageListener:
self._shutdown = True self._shutdown = True
self.disconnect_from_messaging_server() self.disconnect_from_messaging_server()
@staticmethod #@staticmethod
def msg_received_callback(channel, method_frame, header_frame, body): def msg_received_callback(self, channel, method_frame, header_frame, body):
''' A callback method that runs when a RabbitMQ message is received. ''' A callback method that runs when a RabbitMQ message is received.
Here we parse the message, spin up an analyzer process, and report the Here we parse the message, spin up an analyzer process, and report the
@ -160,7 +166,7 @@ class MessageListener:
callback_url = msg_dict["callback_url"] callback_url = msg_dict["callback_url"]
api_key = msg_dict["api_key"] api_key = msg_dict["api_key"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -198,12 +204,13 @@ class MessageListener:
# If we don't ack, then RabbitMQ will redeliver the message in the future. # If we don't ack, then RabbitMQ will redeliver the message in the future.
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod #@staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename): def spawn_analyzer_process(self, audio_file_path, import_directory, original_filename):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename)) args=(q, audio_file_path, import_directory, original_filename,
self._s3_bucket, self._s3_api_key, self._s3_api_key_secret))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0:

View file

@ -17,28 +17,28 @@ def run():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true")
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH) parser.add_argument("--config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH)
parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH) parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH)
args = parser.parse_args() args = parser.parse_args()
check_if_media_monitor_is_running() check_if_media_monitor_is_running()
#Default config file path #Default config file path
rmq_config_path = DEFAULT_CONFIG_PATH config_path = DEFAULT_CONFIG_PATH
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
if args.rmq_config_file: if args.config_file:
rmq_config_path = args.rmq_config_file config_path = args.config_file
if args.http_retry_queue_file: if args.http_retry_queue_file:
http_retry_queue_path = args.http_retry_queue_file http_retry_queue_path = args.http_retry_queue_file
if args.daemon: if args.daemon:
with daemon.DaemonContext(): with daemon.DaemonContext():
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path, aa.AirtimeAnalyzerServer(config_path=config_path,
http_retry_queue_path=http_retry_queue_path, http_retry_queue_path=http_retry_queue_path,
debug=args.debug) debug=args.debug)
else: else:
# Run without daemonizing # Run without daemonizing
aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path, aa.AirtimeAnalyzerServer(config_path=config_path,
http_retry_queue_path=http_retry_queue_path, http_retry_queue_path=http_retry_queue_path,
debug=args.debug) debug=args.debug)