CC-5709: Airtime Analyzer

* File import via the Add Media page now works!
* Reworked StoredFile::copyFileToStor() because it's still needed.
* Reworked the way file paths are reported in airtime_analyzer
  and in the RESTful media API
This commit is contained in:
Albert Santoni 2014-03-17 10:19:39 -04:00
parent 6a68967f81
commit 50a42f12bb
5 changed files with 144 additions and 54 deletions

View File

@ -78,12 +78,14 @@ class Application_Model_RabbitMq
self::sendMessage($exchange, 'direct', true, $data); self::sendMessage($exchange, 'direct', true, $data);
} }
public static function SendMessageToAnalyzer($tmpFilePath, $finalDirectory, $callbackUrl, $apiKey) public static function SendMessageToAnalyzer($tmpFilePath, $importedStorageDirectory, $originalFilename,
$callbackUrl, $apiKey)
{ {
$exchange = 'airtime-uploads'; $exchange = 'airtime-uploads';
$data['tmp_file_path'] = $tmpFilePath; $data['tmp_file_path'] = $tmpFilePath;
$data['final_directory'] = $finalDirectory; $data['import_directory'] = $importedStorageDirectory;
$data['original_filename'] = $originalFilename;
$data['callback_url'] = $callbackUrl; $data['callback_url'] = $callbackUrl;
$data['api_key'] = $apiKey; $data['api_key'] = $apiKey;

View File

@ -988,48 +988,69 @@ SQL;
return $freeSpace >= $fileSize; return $freeSpace >= $fileSize;
} }
public static function copyFileToStor($p_targetDir, $fileName, $tempname) /**
* Copy a newly uploaded audio file from its temporary upload directory
* on the local disk (like /tmp) over to Airtime's "stor" directory,
* which is where all ingested music/media live.
*
* This is done in PHP here on the web server rather than in airtime_analyzer because
* the airtime_analyzer might be running on a different physical computer than the web server,
* and it probably won't have access to the web server's /tmp folder. The stor/organize directory
* is, however, both accessible to the machines running airtime_analyzer and the web server
* on Airtime Pro.
*
* The file is actually copied to "stor/organize", which is a staging directory where files go
* before they're processed by airtime_analyzer, which then moves them to "stor/imported" in the final
* step.
*
* TODO: Implement better error handling here...
*
* @param string $tempFilePath
* @param string $originalFilename
* @throws Exception
* @return Ambigous <unknown, string>
*/
public static function copyFileToStor($tempFilePath, $originalFilename)
{ {
$audio_file = $p_targetDir . DIRECTORY_SEPARATOR . $tempname; $audio_file = $tempFilePath;
Logging::info('copyFileToStor: moving file '.$audio_file); Logging::info('copyFileToStor: moving file '.$audio_file);
$storDir = Application_Model_MusicDir::getStorDir(); $storDir = Application_Model_MusicDir::getStorDir();
$stor = $storDir->getDirectory(); $stor = $storDir->getDirectory();
// check if "organize" dir exists and if not create one // check if "organize" dir exists and if not create one
if (!file_exists($stor."/organize")) { if (!file_exists($stor."/organize")) {
if (!mkdir($stor."/organize", 0777)) { if (!mkdir($stor."/organize", 0777)) {
return array( throw new Exception("Failed to create organize directory.");
"code" => 109,
"message" => _("Failed to create 'organize' directory."));
} }
} }
if (chmod($audio_file, 0644) === false) { if (chmod($audio_file, 0644) === false) {
Logging::info("Warning: couldn't change permissions of $audio_file to 0644"); Logging::info("Warning: couldn't change permissions of $audio_file to 0644");
} }
// Check if we have enough space before copying // Check if we have enough space before copying
if (!self::isEnoughDiskSpaceToCopy($stor, $audio_file)) { if (!self::isEnoughDiskSpaceToCopy($stor, $audio_file)) {
$freeSpace = disk_free_space($stor); $freeSpace = disk_free_space($stor);
$fileSize = filesize($audio_file); $fileSize = filesize($audio_file);
return array("code" => 107, throw new Exception(sprintf(_("The file was not uploaded, there is "
"message" => sprintf(_("The file was not uploaded, there is " ."%s MB of disk space left and the file you are "
."%s MB of disk space left and the file you are " ."uploading has a size of %s MB."), $freeSpace, $fileSize));
."uploading has a size of %s MB."), $freeSpace, $fileSize));
} }
// Check if liquidsoap can play this file // Check if liquidsoap can play this file
// TODO: Move this to airtime_analyzer
if (!self::liquidsoapFilePlayabilityTest($audio_file)) { if (!self::liquidsoapFilePlayabilityTest($audio_file)) {
return array( return array(
"code" => 110, "code" => 110,
"message" => _("This file appears to be corrupted and will not " "message" => _("This file appears to be corrupted and will not "
."be added to media library.")); ."be added to media library."));
} }
// Did all the checks for real, now trying to copy // Did all the checks for real, now trying to copy
$audio_stor = Application_Common_OsPath::join($stor, "organize", $audio_stor = Application_Common_OsPath::join($stor, "organize",
$fileName); $originalFilename);
$user = Application_Model_User::getCurrentUser(); $user = Application_Model_User::getCurrentUser();
if (is_null($user)) { if (is_null($user)) {
$uid = Application_Model_User::getFirstAdminId(); $uid = Application_Model_User::getFirstAdminId();
@ -1044,7 +1065,7 @@ SQL;
written)"); written)");
} else { } else {
Logging::info("Successfully written identification file for Logging::info("Successfully written identification file for
uploaded '$audio_stor'"); uploaded '$audio_stor'");
} }
//if the uploaded file is not UTF-8 encoded, let's encode it. Assuming source //if the uploaded file is not UTF-8 encoded, let's encode it. Assuming source
//encoding is ISO-8859-1 //encoding is ISO-8859-1
@ -1059,18 +1080,14 @@ SQL;
//is enough disk space . //is enough disk space .
unlink($audio_file); //remove the file after failed rename unlink($audio_file); //remove the file after failed rename
unlink($id_file); // Also remove the identifier file unlink($id_file); // Also remove the identifier file
return array( throw new Exception("The file was not uploaded, this error can occur if the computer "
"code" => 108, ."hard drive does not have enough disk space or the stor "
"message" => _("The file was not uploaded, this error can occur if the computer " ."directory does not have correct write permissions.");
."hard drive does not have enough disk space or the stor "
."directory does not have correct write permissions."));
} }
// Now that we successfully added this file, we will add another tag return $audio_stor;
// file that will identify the user that owns it
return null;
} }
/* /*
* Pass the file through Liquidsoap and test if it is readable. Return True if readable, and False otherwise. * Pass the file through Liquidsoap and test if it is readable. Return True if readable, and False otherwise.
*/ */

View File

@ -73,7 +73,7 @@ class Rest_MediaController extends Zend_Rest_Controller
$file->save(); $file->save();
$callbackUrl = $this->getRequest()->getScheme() . '://' . $this->getRequest()->getHttpHost() . $this->getRequest()->getRequestUri() . "/" . $file->getPrimaryKey(); $callbackUrl = $this->getRequest()->getScheme() . '://' . $this->getRequest()->getHttpHost() . $this->getRequest()->getRequestUri() . "/" . $file->getPrimaryKey();
$this->processUploadedFile($callbackUrl); $this->processUploadedFile($callbackUrl, $_FILES["file"]["name"], $this->getOwnerId());
$this->getResponse() $this->getResponse()
->setHttpResponseCode(201) ->setHttpResponseCode(201)
@ -95,7 +95,27 @@ class Rest_MediaController extends Zend_Rest_Controller
{ {
//TODO: Strip or sanitize the JSON output //TODO: Strip or sanitize the JSON output
$file->fromArray(json_decode($this->getRequest()->getRawBody(), true), BasePeer::TYPE_FIELDNAME); $fileFromJson = json_decode($this->getRequest()->getRawBody(), true);
//Our RESTful API takes "full_path" as a field, which we then split and translate to match
//our internal schema. Internally, file path is stored relative to a directory, with the directory
//as a foreign key to cc_music_dirs.
if ($fileFromJson["full_path"]) {
$fullPath = $fileFromJson["full_path"];
$storDir = Application_Model_MusicDir::getStorDir()->getDirectory();
$pos = strpos($fullPath, $storDir);
if ($pos !== FALSE)
{
assert($pos == 0); //Path must start with the stor directory path
$filePathRelativeToStor = substr($fullPath, strlen($storDir));
$fileFromJson["filepath"] = $filePathRelativeToStor;
$fileFromJson["directory"] = 1; //1 corresponds to the default stor/imported directory.
}
}
$file->fromArray($fileFromJson, BasePeer::TYPE_FIELDNAME);
$file->save(); $file->save();
$this->getResponse() $this->getResponse()
->setHttpResponseCode(200) ->setHttpResponseCode(200)
@ -181,7 +201,7 @@ class Rest_MediaController extends Zend_Rest_Controller
$resp->appendBody("ERROR: Media not found."); $resp->appendBody("ERROR: Media not found.");
} }
private function processUploadedFile($callbackUrl) private function processUploadedFile($callbackUrl, $originalFilename, $ownerId)
{ {
$CC_CONFIG = Config::getConfig(); $CC_CONFIG = Config::getConfig();
$apiKey = $CC_CONFIG["apiKey"][0]; $apiKey = $CC_CONFIG["apiKey"][0];
@ -192,14 +212,32 @@ class Rest_MediaController extends Zend_Rest_Controller
//TODO: Remove copyFileToStor from StoredFile... //TODO: Remove copyFileToStor from StoredFile...
$storDir = Application_Model_MusicDir::getStorDir(); //TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon?
$finalDestinationDir = $storDir->getDirectory() . "/organize";
$upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload";
$tempFilePath = $upload_dir . "/" . $tempFileName;
$storDir = Application_Model_MusicDir::getStorDir();
//$finalFullFilePath = $storDir->getDirectory() . "/imported/" . $ownerId . "/" . $originalFilename;
$importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId;
try {
//Copy the temporary file over to the "organize" folder so that it's off our webserver
//and accessible by airtime_analyzer which could be running on a different machine.
$newTempFilePath = Application_Model_StoredFile::copyFileToStor($tempFilePath, $originalFilename);
} catch (Exception $e) {
Logging::error($e->getMessage());
}
//Logging::info("New temporary file path: " . $newTempFilePath);
//Logging::info("Final file path: " . $finalFullFilePath);
//Dispatch a message to airtime_analyzer through RabbitMQ, //Dispatch a message to airtime_analyzer through RabbitMQ,
//notifying it that there's a new upload to process! //notifying it that there's a new upload to process!
Application_Model_RabbitMq::SendMessageToAnalyzer($tempFilePath, Application_Model_RabbitMq::SendMessageToAnalyzer($newTempFilePath,
$finalDestinationDir, $callbackUrl, $apiKey); $importedStorageDirectory, $originalFilename,
$callbackUrl, $apiKey);
} }
private function getOwnerId() private function getOwnerId()

View File

@ -1,7 +1,7 @@
import logging import logging
import multiprocessing import multiprocessing
import shutil import shutil
import os import os, errno
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
class AnalyzerPipeline: class AnalyzerPipeline:
@ -12,28 +12,59 @@ class AnalyzerPipeline:
# Take message dictionary and perform the necessary analysis. # Take message dictionary and perform the necessary analysis.
@staticmethod @staticmethod
def run_analysis(queue, audio_file_path, final_directory): def run_analysis(queue, audio_file_path, import_directory, original_filename):
if not isinstance(queue, multiprocessing.queues.Queue): if not isinstance(queue, multiprocessing.queues.Queue):
raise TypeError("queue must be a multiprocessing.Queue()") raise TypeError("queue must be a multiprocessing.Queue()")
if not isinstance(audio_file_path, unicode): if not isinstance(audio_file_path, unicode):
raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.") raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__ + " instead.")
if not isinstance(final_directory, unicode): if not isinstance(import_directory, unicode):
raise TypeError("final_directory must be unicode. Was of type " + type(final_directory).__name__ + " instead.") raise TypeError("import_directory must be unicode. Was of type " + type(import_directory).__name__ + " instead.")
if not isinstance(original_filename, unicode):
raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.")
#print ReplayGainAnalyzer.analyze("foo.mp3")
# Analyze the audio file we were told to analyze: # Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
queue.put(MetadataAnalyzer.analyze(audio_file_path)) results = MetadataAnalyzer.analyze(audio_file_path)
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication
# back to the main process. # back to the main process.
#print ReplayGainAnalyzer.analyze("foo.mp3") #Import the file over to it's final location.
final_file_path = import_directory
if results.has_key("artist_name"):
final_file_path += "/" + results["artist_name"]
if results.has_key("album"):
final_file_path += "/" + results["album"]
final_file_path += "/" + original_filename
final_audio_file_path = final_directory + os.sep + os.path.basename(audio_file_path) #Ensure any redundant slashes are stripped
if os.path.exists(final_audio_file_path) and not os.path.samefile(audio_file_path, final_audio_file_path): final_file_path = os.path.normpath(final_file_path)
os.remove(final_audio_file_path)
shutil.move(audio_file_path, final_audio_file_path) #final_audio_file_path = final_directory + os.sep + os.path.basename(audio_file_path)
if os.path.exists(final_file_path) and not os.path.samefile(audio_file_path, final_file_path):
raise Exception("File exists and will not be overwritten.") # by design
#Overwriting a file would mean Airtime's database has the wrong information...
#Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path))
#Move the file into its final destination directory
shutil.move(audio_file_path, final_file_path)
#Pass the full path back to Airtime
results["full_path"] = final_file_path
queue.put(results)
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise

View File

@ -83,11 +83,13 @@ class MessageListener:
try: try:
msg_dict = json.loads(body) msg_dict = json.loads(body)
audio_file_path = msg_dict["tmp_file_path"] audio_file_path = msg_dict["tmp_file_path"]
final_directory = msg_dict["final_directory"] #final_file_path = msg_dict["final_file_path"]
import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"]
callback_url = msg_dict["callback_url"] callback_url = msg_dict["callback_url"]
api_key = msg_dict["api_key"] api_key = msg_dict["api_key"]
audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, final_directory) audio_metadata = MessageListener.spawn_analyzer_process(audio_file_path, import_directory, original_filename)
StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata) StatusReporter.report_success_to_callback_url(callback_url, api_key, audio_metadata)
except KeyError as e: except KeyError as e:
@ -123,11 +125,11 @@ class MessageListener:
channel.basic_ack(delivery_tag=method_frame.delivery_tag) channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod @staticmethod
def spawn_analyzer_process(audio_file_path, final_directory): def spawn_analyzer_process(audio_file_path, import_directory, original_filename):
q = multiprocessing.Queue() q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis, p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, final_directory)) args=(q, audio_file_path, import_directory, original_filename))
p.start() p.start()
p.join() p.join()
if p.exitcode == 0: if p.exitcode == 0: