diff --git a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py index bb76ba465..567c31c98 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/airtime_analyzer.py @@ -23,7 +23,7 @@ class AirtimeAnalyzerServer: # Variables _log_level = logging.INFO - def __init__(self, rmq_config_path, http_retry_queue_path, debug=False): + def __init__(self, config_path, http_retry_queue_path, debug=False): # Dump a stacktrace with 'kill -SIGUSR2 ' signal.signal(signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace()) @@ -32,14 +32,14 @@ class AirtimeAnalyzerServer: self.setup_logging(debug) # Read our config file - rabbitmq_config = self.read_config_file(rmq_config_path) + config = self.read_config_file(config_path) # Start up the StatusReporter process StatusReporter.start_thread(http_retry_queue_path) # Start listening for RabbitMQ messages telling us about newly # uploaded files. This blocks until we recieve a shutdown signal. - self._msg_listener = MessageListener(rabbitmq_config) + self._msg_listener = MessageListener(config) StatusReporter.stop_thread() diff --git a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py index 39c558bac..89513fd4c 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/analyzer_pipeline.py @@ -17,7 +17,8 @@ class AnalyzerPipeline: """ @staticmethod - def run_analysis(queue, audio_file_path, import_directory, original_filename): + def run_analysis(queue, audio_file_path, import_directory, original_filename, + s3_bucket, s3_api_key, s3_api_key_secret): """Analyze and import an audio file, and put all extracted metadata into queue. Keyword arguments: @@ -51,7 +52,8 @@ class AnalyzerPipeline: # First, we extract the ID3 tags and other metadata: metadata = dict() metadata = MetadataAnalyzer.analyze(audio_file_path, metadata) - metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata) + metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata, + s3_bucket, s3_api_key, s3_api_key_secret) metadata["import_status"] = 0 # imported # Note that the queue we're putting the results into is our interprocess communication diff --git a/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py b/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py index de296e092..8d899152f 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/filemover_analyzer.py @@ -18,7 +18,8 @@ class FileMoverAnalyzer(Analyzer): raise Exception("Use FileMoverAnalyzer.move() instead.") @staticmethod - def move(audio_file_path, import_directory, original_filename, metadata): + def move(audio_file_path, import_directory, original_filename, metadata, + s3_bucket, s3_api_key, s3_api_key_secret): """Move the file at audio_file_path over into the import_directory/import, renaming it to original_filename. @@ -41,10 +42,13 @@ class FileMoverAnalyzer(Analyzer): #Import the file over to it's final location. # TODO: Also, handle the case where the move fails and write some code # to possibly move the file to problem_files. - + + #cloud storage doesn't need this + ''' max_dir_len = 32 max_file_len = 32 final_file_path = import_directory + if metadata.has_key("artist_name"): final_file_path += "/" + metadata["artist_name"][0:max_dir_len] # truncating with array slicing if metadata.has_key("album_title"): @@ -60,6 +64,7 @@ class FileMoverAnalyzer(Analyzer): #the wrong information for the file we just overwrote (eg. the song length would be wrong!) #If the final file path is the same as the file we've been told to import (which #you often do when you're debugging), then don't move the file at all. + if os.path.exists(final_file_path): if os.path.samefile(audio_file_path, final_file_path): metadata["full_path"] = final_file_path @@ -74,12 +79,48 @@ class FileMoverAnalyzer(Analyzer): #Ensure the full path to the file exists mkdir_p(os.path.dirname(final_file_path)) + ''' + file_base_name = os.path.basename(audio_file_path) + file_name, extension = os.path.splitext(file_base_name) + object_name = "%s_%s%s" % (file_name, str(uuid.uuid4()), extension) + + from libcloud.storage.types import Provider, ContainerDoesNotExistError + from libcloud.storage.providers import get_driver + + cls = get_driver(Provider.S3) + driver = cls(s3_api_key, s3_api_key_secret) + + try: + container = driver.get_container(s3_bucket) + except ContainerDoesNotExistError: + container = driver.create_container(s3_bucket) + + extra = {'meta_data': {'filename': file_base_name}} + #libcloud complains when float objects are in metadata + #extra = {'meta_data': metadata} + + with open(audio_file_path, 'rb') as iterator: + obj = driver.upload_object_via_stream(iterator=iterator, + container=container, + object_name=object_name, + extra=extra) + + #remove file from organize directory + try: + os.remove(audio_file_path) + except OSError: + pass + + #cloud storage doesn't need this + ''' #Move the file into its final destination directory logging.debug("Moving %s to %s" % (audio_file_path, final_file_path)) shutil.move(audio_file_path, final_file_path) metadata["full_path"] = final_file_path + ''' + metadata["s3_object_name"] = object_name return metadata def mkdir_p(path): diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index 97613e81f..9dd25a436 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -73,6 +73,12 @@ class MessageListener: self._password = config.get(RMQ_CONFIG_SECTION, 'password') self._vhost = config.get(RMQ_CONFIG_SECTION, 'vhost') + # Read the S3 API setting from the config file + S3_CONFIG_SECTION = "s3" + self._s3_bucket = config.get(S3_CONFIG_SECTION, 'bucket') + self._s3_api_key = config.get(S3_CONFIG_SECTION, 'api_key') + self._s3_api_key_secret = config.get(S3_CONFIG_SECTION, 'api_key_secret') + # Set up a signal handler so we can shutdown gracefully # For some reason, this signal handler must be set up here. I'd rather # put it in AirtimeAnalyzerServer, but it doesn't work there (something to do @@ -111,7 +117,7 @@ class MessageListener: self._channel.queue_bind(exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY) logging.info(" Listening for messages...") - self._channel.basic_consume(MessageListener.msg_received_callback, + self._channel.basic_consume(self.msg_received_callback, queue=QUEUE, no_ack=False) def wait_for_messages(self): @@ -128,8 +134,8 @@ class MessageListener: self._shutdown = True self.disconnect_from_messaging_server() - @staticmethod - def msg_received_callback(channel, method_frame, header_frame, body): + #@staticmethod + def msg_received_callback(self, channel, method_frame, header_frame, body): ''' A callback method that runs when a RabbitMQ message is received. Here we parse the message, spin up an analyzer process, and report the @@ -160,7 +166,7 @@ class MessageListener: callback_url = msg_dict["callback_url"] api_key = msg_dict["api_key"] - audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename) + audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: @@ -198,12 +204,13 @@ class MessageListener: # If we don't ack, then RabbitMQ will redeliver the message in the future. channel.basic_ack(delivery_tag=method_frame.delivery_tag) - @staticmethod - def spawn_analyzer_process(audio_file_path, import_directory, original_filename): + #@staticmethod + def spawn_analyzer_process(self, audio_file_path, import_directory, original_filename): ''' Spawn a child process to analyze and import a new audio file. ''' q = multiprocessing.Queue() p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, - args=(q, audio_file_path, import_directory, original_filename)) + args=(q, audio_file_path, import_directory, original_filename, + self._s3_bucket, self._s3_api_key, self._s3_api_key_secret)) p.start() p.join() if p.exitcode == 0: diff --git a/python_apps/airtime_analyzer/bin/airtime_analyzer b/python_apps/airtime_analyzer/bin/airtime_analyzer index 6e8eab367..ff17fc2f2 100755 --- a/python_apps/airtime_analyzer/bin/airtime_analyzer +++ b/python_apps/airtime_analyzer/bin/airtime_analyzer @@ -17,28 +17,28 @@ def run(): parser = argparse.ArgumentParser() parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true") - parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH) + parser.add_argument("--config-file", help="specify a configuration file with RabbitMQ settings (default is %s)" % DEFAULT_CONFIG_PATH) parser.add_argument("--http-retry-queue-file", help="specify where incompleted HTTP requests will be serialized (default is %s)" % DEFAULT_HTTP_RETRY_PATH) args = parser.parse_args() check_if_media_monitor_is_running() #Default config file path - rmq_config_path = DEFAULT_CONFIG_PATH + config_path = DEFAULT_CONFIG_PATH http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH - if args.rmq_config_file: - rmq_config_path = args.rmq_config_file + if args.config_file: + config_path = args.config_file if args.http_retry_queue_file: http_retry_queue_path = args.http_retry_queue_file if args.daemon: with daemon.DaemonContext(): - aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path, + aa.AirtimeAnalyzerServer(config_path=config_path, http_retry_queue_path=http_retry_queue_path, debug=args.debug) else: # Run without daemonizing - aa.AirtimeAnalyzerServer(rmq_config_path=rmq_config_path, + aa.AirtimeAnalyzerServer(config_path=config_path, http_retry_queue_path=http_retry_queue_path, debug=args.debug)