CC-5709: Airtime Analyzer

* Remove the "hidden" field from the REST blacklist, the analyzer needs to set it.
* Added import_status column messages in the recent uploads table
* Auto-refresh the recent uploads table while imports are pending
* Moved the file moving stuff to its own analyzer in airtime_analyzer
* Basic error reporting to the REST API in airtime_analyzer, needs
  hardeneing though
* Fixed a bug with the number of recent uploads
* Prevent airtime_analyzer from running if media_monitor is running
This commit is contained in:
Albert Santoni 2014-03-22 02:12:03 -04:00
parent 8f7ecafcf6
commit 61c2c90b7e
9 changed files with 118 additions and 88 deletions

View File

@ -48,20 +48,20 @@ class PluploadController extends Zend_Controller_Action
$rowStart = isset($_GET['iDisplayStart']) ? $_GET['iDisplayStart'] : 0; $rowStart = isset($_GET['iDisplayStart']) ? $_GET['iDisplayStart'] : 0;
$recentUploadsQuery = CcFilesQuery::create()->filterByDbUtime(array('min' => time() - 30 * 24 * 60 * 60)) $recentUploadsQuery = CcFilesQuery::create()->filterByDbUtime(array('min' => time() - 30 * 24 * 60 * 60))
->orderByDbUtime(Criteria::DESC) ->orderByDbUtime(Criteria::DESC);
->offset($rowStart)
->limit($limit); $numTotalRecentUploads = $recentUploadsQuery->find()->count();
if ($filter == "pending") { if ($filter == "pending") {
$recentUploadsQuery->filterByDbImportStatus("1"); $recentUploadsQuery->filterByDbImportStatus("1");
} else if ($filter == "failed") { } else if ($filter == "failed") {
$recentUploadsQuery->filterByDbImportStatus(array('min' => 100)); $recentUploadsQuery->filterByDbImportStatus(array('min' => 100));
} }
$recentUploads = $recentUploadsQuery->find();
$recentUploads = $recentUploadsQuery->offset($rowStart)->limit($limit)->find();
$numRecentUploads = $limit; $numRecentUploads = $limit;
$numTotalRecentUploads = CcFilesQuery::create()->filterByDbUtime(array('min' => time() - 30 * 24 * 60 * 60)) //CcFilesQuery::create()->filterByDbUtime(array('min' => time() - 30 * 24 * 60 * 60))
->count();
//$this->_helper->json->sendJson(array("jsonrpc" => "2.0", "tempfilepath" => $tempFileName)); //$this->_helper->json->sendJson(array("jsonrpc" => "2.0", "tempfilepath" => $tempFileName));

View File

@ -7,7 +7,6 @@ class Rest_MediaController extends Zend_Rest_Controller
private $blackList = array( private $blackList = array(
'id', 'id',
'file_exists', 'file_exists',
'hidden',
'silan_check', 'silan_check',
'soundcloud_id', 'soundcloud_id',
'is_scheduled', 'is_scheduled',
@ -17,7 +16,6 @@ class Rest_MediaController extends Zend_Rest_Controller
//fields we should never expose through our RESTful API //fields we should never expose through our RESTful API
private $privateFields = array( private $privateFields = array(
'file_exists', 'file_exists',
'hidden',
'silan_check', 'silan_check',
'is_scheduled', 'is_scheduled',
'is_playlist' 'is_playlist'

View File

@ -3,6 +3,16 @@ $(document).ready(function() {
var uploader; var uploader;
var self = this; var self = this;
self.uploadFilter = "all"; self.uploadFilter = "all";
self.IMPORT_STATUS_CODES = {
0 : { message: $.i18n._("Successfully imported")},
1 : { message: $.i18n._("Pending import")},
2 : { message: $.i18n._("Import failed.")},
UNKNOWN : { message: $.i18n._("Unknown")}
};
if (Object.freeze) {
Object.freeze(self.IMPORT_STATUS_CODES);
}
$("#plupload_files").pluploadQueue({ $("#plupload_files").pluploadQueue({
// General settings // General settings
@ -47,17 +57,13 @@ $(document).ready(function() {
console.log("Invalid data type for the import_status."); console.log("Invalid data type for the import_status.");
return; return;
} }
var statusStr = $.i18n._("Unknown"); var statusStr = self.IMPORT_STATUS_CODES.UNKNOWN.message;
if (data == 0) var importStatusCode = data;
{ if (self.IMPORT_STATUS_CODES[importStatusCode]) {
statusStr = $.i18n._("Successfully imported"); statusStr = self.IMPORT_STATUS_CODES[importStatusCode].message;
} };
else if (data == 1)
{
statusStr = $.i18n._("Pending import");
}
return statusStr; return statusStr;
}; };
self.renderFileActions = function ( data, type, full ) { self.renderFileActions = function ( data, type, full ) {
@ -114,6 +120,23 @@ $(document).ready(function() {
aoData.push( { "name": "uploadFilter", "value": self.uploadFilter } ); aoData.push( { "name": "uploadFilter", "value": self.uploadFilter } );
$.getJSON( sSource, aoData, function (json) { $.getJSON( sSource, aoData, function (json) {
fnCallback(json); fnCallback(json);
if (json.files) {
var areAnyFileImportsPending = false;
for (var i = 0; i < json.files.length; i++) {
//console.log(file);
var file = json.files[i];
if (file.import_status == 1)
{
areAnyFileImportsPending = true;
}
}
if (areAnyFileImportsPending) {
//alert("pending uploads, starting refresh on timer");
self.startRefreshingRecentUploads();
} else {
self.stopRefreshingRecentUploads();
}
}
} ); } );
} }
}); });
@ -121,6 +144,25 @@ $(document).ready(function() {
return recentUploadsTable; return recentUploadsTable;
}; };
self.startRefreshingRecentUploads = function()
{
if (self.isRecentUploadsRefreshTimerActive()) { //Prevent multiple timers from running
return;
}
self.recentUploadsRefreshTimer = setTimeout("self.recentUploadsTable.fnDraw()", 3000);
};
self.isRecentUploadsRefreshTimerActive = function()
{
return (self.recentUploadsRefreshTimer != null);
};
self.stopRefreshingRecentUploads = function()
{
clearTimeout(self.recentUploadsRefreshTimer);
self.recentUploadsRefreshTimer = null;
};
$("#upload_status_all").click(function() { $("#upload_status_all").click(function() {
self.uploadFilter = "all"; self.uploadFilter = "all";
self.recentUploadsTable.fnDraw(); self.recentUploadsTable.fnDraw();

View File

@ -2,7 +2,7 @@
class Analyzer: class Analyzer:
@staticmethod @staticmethod
def analyze(filename): def analyze(filename, results):
raise NotImplementedError raise NotImplementedError
class AnalyzerError(Exception): class AnalyzerError(Exception):

View File

@ -1,10 +1,7 @@
import logging import logging
import multiprocessing import multiprocessing
import shutil
import os, errno
import time
import uuid
from metadata_analyzer import MetadataAnalyzer from metadata_analyzer import MetadataAnalyzer
from filemover_analyzer import FileMoverAnalyzer
class AnalyzerPipeline: class AnalyzerPipeline:
@ -29,59 +26,16 @@ class AnalyzerPipeline:
# Analyze the audio file we were told to analyze: # Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata: # First, we extract the ID3 tags and other metadata:
results = MetadataAnalyzer.analyze(audio_file_path) metadata = dict()
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = FileMoverAnalyzer.move(audio_file_path, import_directory, original_filename, metadata)
metadata["import_status"] = 0 # imported
# Note that the queue we're putting the results into is our interprocess communication # Note that the queue we're putting the results into is our interprocess communication
# back to the main process. # back to the main process.
#Import the file over to it's final location. #Pass all the file metadata back to the main analyzer process, which then passes
#TODO: Move all this file moving stuff to its own Analyzer class. #it back to the Airtime web application.
# Also, handle the case where the move fails and write some code queue.put(metadata)
# to possibly move the file to problem_files.
final_file_path = import_directory
if results.has_key("artist_name"):
final_file_path += "/" + results["artist_name"]
if results.has_key("album"):
final_file_path += "/" + results["album"]
final_file_path += "/" + original_filename
#Ensure any redundant slashes are stripped
final_file_path = os.path.normpath(final_file_path)
#If a file with the same name already exists in the "import" directory, then
#we add a unique string to the end of this one. We never overwrite a file on import
#because if we did that, it would mean Airtime's database would have
#the wrong information for the file we just overwrote (eg. the song length would be wrong!)
if os.path.exists(final_file_path) and not os.path.samefile(audio_file_path, final_file_path):
#If the final file path is the same as the file we've been told to import (which
#you often do when you're debugging), then don't move the file at all.
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "%s_%s%s" % (base_file_path, time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()), file_extension)
#If THAT path exists, append a UUID instead:
while os.path.exists(final_file_path):
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "%s_%s%s" % (base_file_path, str(uuid.uuid4()), file_extension)
#Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path))
#Move the file into its final destination directory
shutil.move(audio_file_path, final_file_path)
#Pass the full path back to Airtime
results["full_path"] = final_file_path
queue.put(results)
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise

View File

@ -114,8 +114,8 @@ class MessageListener:
# #
# TODO: If the JSON was invalid or the web server is down, # TODO: If the JSON was invalid or the web server is down,
# then don't report that failure to the REST API # then don't report that failure to the REST API
#TODO: Catch exceptions from this HTTP request too:
StatusReporter.report_failure_to_callback_url(callback_url, api_key, error_status=1, StatusReporter.report_failure_to_callback_url(callback_url, api_key, import_status=2,
reason=u'An error occurred while importing this file') reason=u'An error occurred while importing this file')
logging.exception(e) logging.exception(e)

View File

@ -10,9 +10,12 @@ class MetadataAnalyzer(Analyzer):
pass pass
@staticmethod @staticmethod
def analyze(filename): def analyze(filename, metadata):
if not isinstance(filename, unicode):
raise TypeError("filename must be unicode. Was of type " + type(filename).__name__)
if not isinstance(metadata, dict):
raise TypeError("metadata must be a dict. Was of type " + type(metadata).__name__)
metadata = dict()
#Extract metadata from an audio file using mutagen #Extract metadata from an audio file using mutagen
audio_file = mutagen.File(filename, easy=True) audio_file = mutagen.File(filename, easy=True)

View File

@ -11,20 +11,33 @@ class StatusReporter():
@classmethod @classmethod
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata): def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
# Encode the audio metadata as JSON and post it back to the callback_url # encode the audio metadata as json and post it back to the callback_url
put_payload = json.dumps(audio_metadata) put_payload = json.dumps(audio_metadata)
logging.debug("Sending HTTP PUT with payload: " + put_payload) logging.debug("sending http put with payload: " + put_payload)
r = requests.put(callback_url, data=put_payload, r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''), auth=requests.auth.httpbasicauth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT) timeout=statusreporter._http_request_timeout)
logging.debug("HTTP request returned status: " + str(r.status_code)) logging.debug("http request returned status: " + str(r.status_code))
logging.debug(r.text) # Log the response body logging.debug(r.text) # log the response body
#TODO: Queue up failed requests and try them again later. #todo: queue up failed requests and try them again later.
r.raise_for_status() # Raise an exception if there was an HTTP error code returned r.raise_for_status() # raise an exception if there was an http error code returned
@classmethod @classmethod
def report_failure_to_callback_url(self, callback_url, api_key, error_status, reason): def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason):
# TODO: Make error_status is an int? # TODO: Make import_status is an int?
pass
logging.debug("Reporting import failure to Airtime REST API...")
audio_metadata["import_status"] = import_status
audio_metadata["comment"] = reason # hack attack
put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload)
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.httpbasicauth(api_key, ''),
timeout=statusreporter._http_request_timeout)
logging.debug("http request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
#todo: queue up failed requests and try them again later.
r.raise_for_status() # raise an exception if there was an http error code returned

View File

@ -2,6 +2,7 @@
import daemon import daemon
import argparse import argparse
import os
import airtime_analyzer.airtime_analyzer as aa import airtime_analyzer.airtime_analyzer as aa
VERSION = "1.0" VERSION = "1.0"
@ -13,6 +14,25 @@ parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true
parser.add_argument("--debug", help="log full debugging output", action="store_true") parser.add_argument("--debug", help="log full debugging output", action="store_true")
args = parser.parse_args() args = parser.parse_args()
'''Ensure media_monitor isn't running before we start, because it'll move newly uploaded
files into the library on us and screw up the operation of airtime_analyzer.
media_monitor is deprecated.
'''
def check_if_media_monitor_is_running():
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
for pid in pids:
try:
process_name = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
if 'media_monitor.py' in process_name:
print "Error: This process conflicts with media_monitor, and media_monitor is running."
print " Please terminate the running media_monitor.py process and try again."
exit(1)
except IOError: # proc has already terminated
continue
check_if_media_monitor_is_running()
if args.daemon: if args.daemon:
with daemon.DaemonContext(): with daemon.DaemonContext():
analyzer = aa.AirtimeAnalyzerServer(debug=args.debug) analyzer = aa.AirtimeAnalyzerServer(debug=args.debug)