From ca62086e16c910dffcd9137c8a3c8deadfd99156 Mon Sep 17 00:00:00 2001 From: drigato Date: Tue, 12 Aug 2014 12:59:50 -0400 Subject: [PATCH] Removed cloud file deletion from Airtime Analyzer. Deletion is done from PHP now. --- airtime_mvc/application/models/RabbitMq.php | 18 ++------------- .../rest/controllers/MediaController.php | 4 ++-- .../cloud_storage_uploader.py | 13 ----------- .../airtime_analyzer/message_listener.py | 23 +++++-------------- 4 files changed, 10 insertions(+), 48 deletions(-) diff --git a/airtime_mvc/application/models/RabbitMq.php b/airtime_mvc/application/models/RabbitMq.php index f81055460..15ad912e0 100644 --- a/airtime_mvc/application/models/RabbitMq.php +++ b/airtime_mvc/application/models/RabbitMq.php @@ -79,12 +79,11 @@ class Application_Model_RabbitMq self::sendMessage($exchange, 'direct', true, $data); } - public static function SendUploadMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, - $callbackUrl, $apiKey, $messageType) + public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, + $callbackUrl, $apiKey) { $exchange = 'airtime-uploads'; - $data['message_type'] = $messageType; $data['tmp_file_path'] = $tmpFilePath; $data['import_directory'] = $importedStorageDirectory; $data['original_filename'] = $originalFilename; @@ -94,17 +93,4 @@ class Application_Model_RabbitMq $jsonData = json_encode($data); self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); } - - public static function SendDeleteMessageToAnalyzer($callbackUrl, $objectName, $apiKey, $messageType) - { - $exchange = 'airtime-uploads'; - - $data['message_type'] = $messageType; - $data['api_key'] = $apiKey; - $data['object_name'] = $objectName; - $data['callback_url'] = $callbackUrl; - - $jsonData = json_encode($data); - self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads'); - } } diff --git a/airtime_mvc/application/modules/rest/controllers/MediaController.php b/airtime_mvc/application/modules/rest/controllers/MediaController.php index ca334f7bf..37a7ae6ff 100644 --- a/airtime_mvc/application/modules/rest/controllers/MediaController.php +++ b/airtime_mvc/application/modules/rest/controllers/MediaController.php @@ -463,9 +463,9 @@ class Rest_MediaController extends Zend_Rest_Controller //Dispatch a message to airtime_analyzer through RabbitMQ, //notifying it that there's a new upload to process! - Application_Model_RabbitMq::SendUploadMessageToAnalyzer($newTempFilePath, + Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath, $importedStorageDirectory, basename($originalFilename), - $callbackUrl, $apiKey, 'upload'); + $callbackUrl, $apiKey); } private function getOwnerId() diff --git a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py index d6b8bd9b6..d331b116a 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/cloud_storage_uploader.py @@ -48,16 +48,3 @@ class CloudStorageUploader: metadata["resource_id"] = object_name return metadata - def delete_obj(self, obj_name): - cls = get_driver(getattr(Provider, self._provider)) - driver = cls(self._api_key, self._api_key_secret) - - try: - cloud_obj = driver.get_object(container_name=self._bucket, - object_name=obj_name) - filesize = getattr(cloud_obj, 'size') - driver.delete_object(obj=cloud_obj) - return filesize - except ObjectDoesNotExistError: - raise Exception("Could not find object on %s" % self._provider) - diff --git a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py index cc633cca5..83fd32259 100644 --- a/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py +++ b/python_apps/airtime_analyzer/airtime_analyzer/message_listener.py @@ -152,7 +152,6 @@ class MessageListener: original_filename = "" callback_url = "" api_key = "" - message_type = "" ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue to pass objects between the processes so that if the analyzer process crashes, it does not @@ -163,24 +162,14 @@ class MessageListener: try: msg_dict = json.loads(body) api_key = msg_dict["api_key"] - message_type = msg_dict["message_type"] callback_url = msg_dict["callback_url"] - if message_type == "upload": - audio_file_path = msg_dict["tmp_file_path"] - import_directory = msg_dict["import_directory"] - original_filename = msg_dict["original_filename"] - - audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename) - StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) - elif message_type == "delete": - object_name = msg_dict["object_name"] - csu = CloudStorageUploader(self._provider, self._bucket, self._api_key, self._api_key_secret) - filesize = csu.delete_obj(object_name) - return_data = dict() - return_data["filesize"] = filesize - return_data["import_status"] = 1 - StatusReporter.report_success_to_callback_url(callback_url, api_key, return_data) + audio_file_path = msg_dict["tmp_file_path"] + import_directory = msg_dict["import_directory"] + original_filename = msg_dict["original_filename"] + + audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename) + StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) except KeyError as e: # A field in msg_dict that we needed was missing (eg. audio_file_path)