Removed cloud file deletion from Airtime Analyzer. Deletion is done from PHP now.
This commit is contained in:
parent
879e776c8d
commit
ca62086e16
|
@ -79,12 +79,11 @@ class Application_Model_RabbitMq
|
||||||
self::sendMessage($exchange, 'direct', true, $data);
|
self::sendMessage($exchange, 'direct', true, $data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function SendUploadMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
|
public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
|
||||||
$callbackUrl, $apiKey, $messageType)
|
$callbackUrl, $apiKey)
|
||||||
{
|
{
|
||||||
$exchange = 'airtime-uploads';
|
$exchange = 'airtime-uploads';
|
||||||
|
|
||||||
$data['message_type'] = $messageType;
|
|
||||||
$data['tmp_file_path'] = $tmpFilePath;
|
$data['tmp_file_path'] = $tmpFilePath;
|
||||||
$data['import_directory'] = $importedStorageDirectory;
|
$data['import_directory'] = $importedStorageDirectory;
|
||||||
$data['original_filename'] = $originalFilename;
|
$data['original_filename'] = $originalFilename;
|
||||||
|
@ -94,17 +93,4 @@ class Application_Model_RabbitMq
|
||||||
$jsonData = json_encode($data);
|
$jsonData = json_encode($data);
|
||||||
self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads');
|
self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads');
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function SendDeleteMessageToAnalyzer($callbackUrl, $objectName, $apiKey, $messageType)
|
|
||||||
{
|
|
||||||
$exchange = 'airtime-uploads';
|
|
||||||
|
|
||||||
$data['message_type'] = $messageType;
|
|
||||||
$data['api_key'] = $apiKey;
|
|
||||||
$data['object_name'] = $objectName;
|
|
||||||
$data['callback_url'] = $callbackUrl;
|
|
||||||
|
|
||||||
$jsonData = json_encode($data);
|
|
||||||
self::sendMessage($exchange, 'topic', false, $jsonData, 'airtime-uploads');
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -463,9 +463,9 @@ class Rest_MediaController extends Zend_Rest_Controller
|
||||||
|
|
||||||
//Dispatch a message to airtime_analyzer through RabbitMQ,
|
//Dispatch a message to airtime_analyzer through RabbitMQ,
|
||||||
//notifying it that there's a new upload to process!
|
//notifying it that there's a new upload to process!
|
||||||
Application_Model_RabbitMq::SendUploadMessageToAnalyzer($newTempFilePath,
|
Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath,
|
||||||
$importedStorageDirectory, basename($originalFilename),
|
$importedStorageDirectory, basename($originalFilename),
|
||||||
$callbackUrl, $apiKey, 'upload');
|
$callbackUrl, $apiKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function getOwnerId()
|
private function getOwnerId()
|
||||||
|
|
|
@ -48,16 +48,3 @@ class CloudStorageUploader:
|
||||||
metadata["resource_id"] = object_name
|
metadata["resource_id"] = object_name
|
||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
def delete_obj(self, obj_name):
|
|
||||||
cls = get_driver(getattr(Provider, self._provider))
|
|
||||||
driver = cls(self._api_key, self._api_key_secret)
|
|
||||||
|
|
||||||
try:
|
|
||||||
cloud_obj = driver.get_object(container_name=self._bucket,
|
|
||||||
object_name=obj_name)
|
|
||||||
filesize = getattr(cloud_obj, 'size')
|
|
||||||
driver.delete_object(obj=cloud_obj)
|
|
||||||
return filesize
|
|
||||||
except ObjectDoesNotExistError:
|
|
||||||
raise Exception("Could not find object on %s" % self._provider)
|
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,6 @@ class MessageListener:
|
||||||
original_filename = ""
|
original_filename = ""
|
||||||
callback_url = ""
|
callback_url = ""
|
||||||
api_key = ""
|
api_key = ""
|
||||||
message_type = ""
|
|
||||||
|
|
||||||
''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
|
''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
|
||||||
to pass objects between the processes so that if the analyzer process crashes, it does not
|
to pass objects between the processes so that if the analyzer process crashes, it does not
|
||||||
|
@ -163,24 +162,14 @@ class MessageListener:
|
||||||
try:
|
try:
|
||||||
msg_dict = json.loads(body)
|
msg_dict = json.loads(body)
|
||||||
api_key = msg_dict["api_key"]
|
api_key = msg_dict["api_key"]
|
||||||
message_type = msg_dict["message_type"]
|
|
||||||
callback_url = msg_dict["callback_url"]
|
callback_url = msg_dict["callback_url"]
|
||||||
|
|
||||||
if message_type == "upload":
|
audio_file_path = msg_dict["tmp_file_path"]
|
||||||
audio_file_path = msg_dict["tmp_file_path"]
|
import_directory = msg_dict["import_directory"]
|
||||||
import_directory = msg_dict["import_directory"]
|
original_filename = msg_dict["original_filename"]
|
||||||
original_filename = msg_dict["original_filename"]
|
|
||||||
|
audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
|
||||||
audio_metadata = self.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
|
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
|
||||||
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
|
|
||||||
elif message_type == "delete":
|
|
||||||
object_name = msg_dict["object_name"]
|
|
||||||
csu = CloudStorageUploader(self._provider, self._bucket, self._api_key, self._api_key_secret)
|
|
||||||
filesize = csu.delete_obj(object_name)
|
|
||||||
return_data = dict()
|
|
||||||
return_data["filesize"] = filesize
|
|
||||||
return_data["import_status"] = 1
|
|
||||||
StatusReporter.report_success_to_callback_url(callback_url, api_key, return_data)
|
|
||||||
|
|
||||||
except KeyError as e:
|
except KeyError as e:
|
||||||
# A field in msg_dict that we needed was missing (eg. audio_file_path)
|
# A field in msg_dict that we needed was missing (eg. audio_file_path)
|
||||||
|
|
Loading…
Reference in New Issue