SAAS-527: Allow files to be uploaded to either the cloud or on local file storage

Not quite done.
This commit is contained in:
drigato 2014-12-16 18:47:42 -05:00
parent ff0a685243
commit 1de326283e
5 changed files with 29 additions and 12 deletions

View File

@ -34,6 +34,10 @@ class Config {
$CC_CONFIG[$backend] = $cloudStorageValues[$backend]; $CC_CONFIG[$backend] = $cloudStorageValues[$backend];
} }
// Tells us where file uploads will be uploaded to.
// It will either be set to a cloud storage backend or local file storage.
$CC_CONFIG["current_backend"] = $cloudStorageValues["current_backend"]["storage_backend"];
$values = parse_ini_file($filename, true); $values = parse_ini_file($filename, true);
// Name of the web server user // Name of the web server user

View File

@ -80,11 +80,12 @@ class Application_Model_RabbitMq
} }
public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
$callbackUrl, $apiKey) $callbackUrl, $apiKey, $currentStorageBackend)
{ {
$exchange = 'airtime-uploads'; $exchange = 'airtime-uploads';
$data['tmp_file_path'] = $tmpFilePath; $data['tmp_file_path'] = $tmpFilePath;
$data['current_storage_backend'] = $currentStorageBackend;
$data['import_directory'] = $importedStorageDirectory; $data['import_directory'] = $importedStorageDirectory;
$data['original_filename'] = $originalFilename; $data['original_filename'] = $originalFilename;
$data['callback_url'] = $callbackUrl; $data['callback_url'] = $callbackUrl;

View File

@ -413,9 +413,12 @@ class Rest_MediaController extends Zend_Rest_Controller
} }
//TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon... //TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon...
$storDir = Application_Model_MusicDir::getStorDir(); $importedStorageDirectory = "";
$importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId; if ($CC_CONFIG["current_backend"] == "file") {
$storDir = Application_Model_MusicDir::getStorDir();
$importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId;
}
try { try {
//Copy the temporary file over to the "organize" folder so that it's off our webserver //Copy the temporary file over to the "organize" folder so that it's off our webserver
@ -426,12 +429,14 @@ class Rest_MediaController extends Zend_Rest_Controller
Logging::error($e->getMessage()); Logging::error($e->getMessage());
return; return;
} }
Logging::info($importedStorageDirectory);
//Dispatch a message to airtime_analyzer through RabbitMQ, //Dispatch a message to airtime_analyzer through RabbitMQ,
//notifying it that there's a new upload to process! //notifying it that there's a new upload to process!
Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath, Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath,
$importedStorageDirectory, basename($originalFilename), $importedStorageDirectory, basename($originalFilename),
$callbackUrl, $apiKey); $callbackUrl, $apiKey, $CC_CONFIG["current_backend"]);
} }
private function getOwnerId() private function getOwnerId()

View File

@ -21,7 +21,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain): def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -55,15 +55,19 @@ class AnalyzerPipeline:
# Analyze the audio file we were told to analyze: # Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata["station_domain"] = station_domain metadata["station_domain"] = station_domain
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
csu = CloudStorageUploader() if current_storage_backend == "file":
metadata = csu.upload_obj(audio_file_path, metadata) metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
else:
csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata)
metadata["import_status"] = 0 # Successfully imported metadata["import_status"] = 0 # Successfully imported
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication

View File

@ -150,6 +150,8 @@ class MessageListener:
original_filename = "" original_filename = ""
callback_url = "" callback_url = ""
api_key = "" api_key = ""
station_domain = ""
current_storage_backend = ""
''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
to pass objects between the processes so that if the analyzer process crashes, it does not to pass objects between the processes so that if the analyzer process crashes, it does not
@ -166,8 +168,9 @@ class MessageListener:
audio_file_path = msg_dict["tmp_file_path"] audio_file_path = msg_dict["tmp_file_path"]
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
current_storage_backend = msg_dict["current_storage_backend"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -206,11 +209,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, station_domain)) args=(q, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: