Merge branch 'cc-5709-airtime-analyzer-cloud-storage' into cc-5709-airtime-analyzer-cloud-storage-saas

This commit is contained in:
drigato 2014-12-17 14:14:03 -05:00
commit ec1c8669c8
6 changed files with 68 additions and 14 deletions

View File

@ -20,6 +20,9 @@ class ProxyStorageBackend extends StorageBackend
{ {
$CC_CONFIG = Config::getConfig(); $CC_CONFIG = Config::getConfig();
//The storage backend in the airtime.conf directly corresponds to
//the name of the class that implements it (eg. Amazon_S3), so we
//can easily create the right backend object dynamically:
$this->storageBackend = new $storageBackend($CC_CONFIG[$storageBackend]); $this->storageBackend = new $storageBackend($CC_CONFIG[$storageBackend]);
} }

View File

@ -34,6 +34,10 @@ class Config {
$CC_CONFIG[$backend] = $cloudStorageValues[$backend]; $CC_CONFIG[$backend] = $cloudStorageValues[$backend];
} }
// Tells us where file uploads will be uploaded to.
// It will either be set to a cloud storage backend or local file storage.
$CC_CONFIG["current_backend"] = $cloudStorageValues["current_backend"]["storage_backend"];
$values = parse_ini_file($filename, true); $values = parse_ini_file($filename, true);
// Name of the web server user // Name of the web server user

View File

@ -80,7 +80,7 @@ class Application_Model_RabbitMq
} }
public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename, public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
$callbackUrl, $apiKey) $callbackUrl, $apiKey, $currentStorageBackend)
{ {
//Hack for Airtime Pro. The RabbitMQ settings for communicating with airtime_analyzer are global //Hack for Airtime Pro. The RabbitMQ settings for communicating with airtime_analyzer are global
//and shared between all instances on Airtime Pro. //and shared between all instances on Airtime Pro.
@ -101,6 +101,7 @@ class Application_Model_RabbitMq
$queue = 'airtime-uploads'; $queue = 'airtime-uploads';
$autoDeleteExchange = false; $autoDeleteExchange = false;
$data['tmp_file_path'] = $tmpFilePath; $data['tmp_file_path'] = $tmpFilePath;
$data['current_storage_backend'] = $currentStorageBackend;
$data['import_directory'] = $importedStorageDirectory; $data['import_directory'] = $importedStorageDirectory;
$data['original_filename'] = $originalFilename; $data['original_filename'] = $originalFilename;
$data['callback_url'] = $callbackUrl; $data['callback_url'] = $callbackUrl;

View File

@ -228,6 +228,42 @@ class Rest_MediaController extends Zend_Rest_Controller
$file->setDbMtime($now); $file->setDbMtime($now);
$file->save(); $file->save();
$this->getResponse()
->setHttpResponseCode(200)
->appendBody(json_encode(CcFiles::sanitizeResponse($file)));
} else if ($file) {
//local file storage
$file->setDbDirectory(self::MUSIC_DIRS_STOR_PK);
$file->fromArray($whiteList, BasePeer::TYPE_FIELDNAME);
//Our RESTful API takes "full_path" as a field, which we then split and translate to match
//our internal schema. Internally, file path is stored relative to a directory, with the directory
//as a foreign key to cc_music_dirs.
if (isset($requestData["full_path"])) {
$fileSizeBytes = filesize($requestData["full_path"]);
if (!isset($fileSizeBytes) || $fileSizeBytes === false)
{
$file->setDbImportStatus(self::IMPORT_STATUS_FAILED)->save();
$this->fileNotFoundResponse();
return;
}
Application_Model_Preference::updateDiskUsage($fileSizeBytes);
$fullPath = $requestData["full_path"];
$storDir = Application_Model_MusicDir::getStorDir()->getDirectory();
$pos = strpos($fullPath, $storDir);
if ($pos !== FALSE)
{
assert($pos == 0); //Path must start with the stor directory path
$filePathRelativeToStor = substr($fullPath, strlen($storDir));
$file->setDbFilepath($filePathRelativeToStor);
}
}
$now = new DateTime("now", new DateTimeZone("UTC"));
$file->setDbMtime($now);
$file->save();
$this->getResponse() $this->getResponse()
->setHttpResponseCode(200) ->setHttpResponseCode(200)
->appendBody(json_encode(CcFiles::sanitizeResponse($file))); ->appendBody(json_encode(CcFiles::sanitizeResponse($file)));
@ -427,8 +463,11 @@ class Rest_MediaController extends Zend_Rest_Controller
//TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon... //TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon...
$storDir = Application_Model_MusicDir::getStorDir(); $importedStorageDirectory = "";
$importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId; if ($CC_CONFIG["current_backend"] == "file") {
$storDir = Application_Model_MusicDir::getStorDir();
$importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId;
}
try { try {
//Copy the temporary file over to the "organize" folder so that it's off our webserver //Copy the temporary file over to the "organize" folder so that it's off our webserver
@ -444,7 +483,7 @@ class Rest_MediaController extends Zend_Rest_Controller
//notifying it that there's a new upload to process! //notifying it that there's a new upload to process!
Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath, Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath,
$importedStorageDirectory, basename($originalFilename), $importedStorageDirectory, basename($originalFilename),
$callbackUrl, $apiKey); $callbackUrl, $apiKey, $CC_CONFIG["current_backend"]);
} }
private function getOwnerId() private function getOwnerId()

View File

@ -21,7 +21,7 @@ class AnalyzerPipeline:
""" """
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain): def run_analysis(queue, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend):
"""Analyze and import an audio file, and put all extracted metadata into queue. """Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments: Keyword arguments:
@ -55,15 +55,19 @@ class AnalyzerPipeline:
# Analyze the audio file we were told to analyze: # Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
metadata = dict() metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata["station_domain"] = station_domain metadata["station_domain"] = station_domain
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata) metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata) metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata) metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
csu = CloudStorageUploader() if current_storage_backend == "file":
metadata = csu.upload_obj(audio_file_path, metadata) metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
else:
csu = CloudStorageUploader()
metadata = csu.upload_obj(audio_file_path, metadata)
metadata["import_status"] = 0 # Successfully imported metadata["import_status"] = 0 # Successfully imported
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication

View File

@ -150,6 +150,8 @@ class MessageListener:
original_filename = "" original_filename = ""
callback_url = "" callback_url = ""
api_key = "" api_key = ""
station_domain = ""
current_storage_backend = ""
''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue ''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
to pass objects between the processes so that if the analyzer process crashes, it does not to pass objects between the processes so that if the analyzer process crashes, it does not
@ -166,8 +168,9 @@ class MessageListener:
audio_file_path = msg_dict["tmp_file_path"] audio_file_path = msg_dict["tmp_file_path"]
import_directory = msg_dict["import_directory"] import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"] original_filename = msg_dict["original_filename"]
current_storage_backend = msg_dict["current_storage_backend"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -206,11 +209,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain): def spawn_analyzer_process(audio_file_path, import_directory, original_filename, station_domain, current_storage_backend):
''' Spawn a child process to analyze and import a new audio file. ''' ''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, station_domain)) args=(q, audio_file_path, import_directory, original_filename, station_domain, current_storage_backend))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: