CC-5709: Airtime Analyzer

* Remove awful StoredFile::uploadFile() function
* Massive airtime_analyzer commenting and cleanup
* Cleaned up the upload code
* Temporarily disabled the liquidsoap playability test.
This commit is contained in:
Albert Santoni 2014-04-03 16:13:26 -04:00
parent cb62850558
commit 95b369c54d
13 changed files with 204 additions and 191 deletions

View File

@ -453,6 +453,8 @@ class ApiController extends Zend_Controller_Action
public function uploadFileAction()
{
Logging::error("FIXME: Change the show recorder to use the File Upload API and remove this function."); // Albert - April 3, 2014
/**
$upload_dir = ini_get("upload_tmp_dir");
$tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir);
$tempFileName = basename($tempFilePath);
@ -464,7 +466,7 @@ class ApiController extends Zend_Controller_Action
$this->_helper->json->sendJson(
array("jsonrpc" => "2.0", "error" => array("code" => $result['code'], "message" => $result['message']))
);
}
}*/
}
public function uploadRecordedAction()

View File

@ -899,107 +899,6 @@ SQL;
return $results;
}
public static function uploadFile($p_targetDir)
{
// HTTP headers for no cache etc
header('Content-type: text/plain; charset=UTF-8');
header("Expires: Mon, 26 Jul 1997 05:00:00 GMT");
header("Last-Modified: " . gmdate("D, d M Y H:i:s") . " GMT");
header("Cache-Control: no-store, no-cache, must-revalidate");
header("Cache-Control: post-check=0, pre-check=0", false);
header("Pragma: no-cache");
// Settings
$cleanupTargetDir = false; // Remove old files
$maxFileAge = 60 * 60; // Temp file age in seconds
// 5 minutes execution time
@set_time_limit(5 * 60);
// usleep(5000);
// Get parameters
$chunk = isset($_REQUEST["chunk"]) ? $_REQUEST["chunk"] : 0;
$chunks = isset($_REQUEST["chunks"]) ? $_REQUEST["chunks"] : 0;
$fileName = isset($_REQUEST["name"]) ? $_REQUEST["name"] : '';
# TODO : should not log __FILE__ itself. there is general logging for
# this
Logging::info(__FILE__.":uploadFile(): filename=$fileName to $p_targetDir");
// Clean the fileName for security reasons
//this needs fixing for songs not in ascii.
//$fileName = preg_replace('/[^\w\._]+/', '', $fileName);
// Create target dir
if (!file_exists($p_targetDir))
@mkdir($p_targetDir);
// Remove old temp files
if (is_dir($p_targetDir) && ($dir = opendir($p_targetDir))) {
while (($file = readdir($dir)) !== false) {
$filePath = $p_targetDir . DIRECTORY_SEPARATOR . $file;
// Remove temp files if they are older than the max age
if (preg_match('/\.tmp$/', $file) && (filemtime($filePath) < time() - $maxFileAge))
@unlink($filePath);
}
closedir($dir);
} else
die('{"jsonrpc" : "2.0", "error" : {"code": 100, "message": _("Failed to open temp directory.")}, "id" : "id"}');
// Look for the content type header
if (isset($_SERVER["HTTP_CONTENT_TYPE"]))
$contentType = $_SERVER["HTTP_CONTENT_TYPE"];
if (isset($_SERVER["CONTENT_TYPE"]))
$contentType = $_SERVER["CONTENT_TYPE"];
// create temp file name (CC-3086)
// we are not using mktemp command anymore.
// plupload support unique_name feature.
$tempFilePath= $p_targetDir . DIRECTORY_SEPARATOR . $fileName;
// Old IBM code...
if (strpos($contentType, "multipart") !== false) {
if (isset($_FILES['file']['tmp_name']) && is_uploaded_file($_FILES['file']['tmp_name'])) {
// Open temp file
$out = fopen($tempFilePath, $chunk == 0 ? "wb" : "ab");
if ($out) {
// Read binary input stream and append it to temp file
$in = fopen($_FILES['file']['tmp_name'], "rb");
if ($in) {
while (($buff = fread($in, 4096)))
fwrite($out, $buff);
} else
die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": _("Failed to open input stream.")}, "id" : "id"}');
fclose($out);
unlink($_FILES['file']['tmp_name']);
} else
die('{"jsonrpc" : "2.0", "error" : {"code": 102, "message": _("Failed to open output stream.")}, "id" : "id"}');
} else
die('{"jsonrpc" : "2.0", "error" : {"code": 103, "message": _("Failed to move uploaded file.")}, "id" : "id"}');
} else {
// Open temp file
$out = fopen($tempFilePath, $chunk == 0 ? "wb" : "ab");
if ($out) {
// Read binary input stream and append it to temp file
$in = fopen("php://input", "rb");
if ($in) {
while (($buff = fread($in, 4096)))
fwrite($out, $buff);
} else
die('{"jsonrpc" : "2.0", "error" : {"code": 101, "message": _("Failed to open input stream.")}, "id" : "id"}');
fclose($out);
} else
die('{"jsonrpc" : "2.0", "error" : {"code": 102, "message": _("Failed to open output stream.")}, "id" : "id"}');
}
return $tempFilePath;
}
/**
* Check, using disk_free_space, the space available in the $destination_folder folder to see if it has
* enough space to move the $audio_file into and report back to the user if not.
@ -1065,12 +964,13 @@ SQL;
// Check if liquidsoap can play this file
// TODO: Move this to airtime_analyzer
/*
if (!self::liquidsoapFilePlayabilityTest($audio_file)) {
return array(
"code" => 110,
"message" => _("This file appears to be corrupted and will not "
."be added to media library."));
}
}*/
// Did all the checks for real, now trying to copy

View File

@ -333,22 +333,19 @@ class Rest_MediaController extends Zend_Rest_Controller
$CC_CONFIG = Config::getConfig();
$apiKey = $CC_CONFIG["apiKey"][0];
$upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload";
$tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir);
//$upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload";
//$tempFilePath = Application_Model_StoredFile::uploadFile($upload_dir);
$tempFilePath = $_FILES['file']['tmp_name'];
$tempFileName = basename($tempFilePath);
//TODO: Remove copyFileToStor from StoredFile...
//TODO: Remove uploadFileAction from ApiController.php **IMPORTANT** - It's used by the recorder daemon?
$upload_dir = ini_get("upload_tmp_dir") . DIRECTORY_SEPARATOR . "plupload";
$tempFilePath = $upload_dir . "/" . $tempFileName;
$storDir = Application_Model_MusicDir::getStorDir();
//$finalFullFilePath = $storDir->getDirectory() . "/imported/" . $ownerId . "/" . $originalFilename;
$importedStorageDirectory = $storDir->getDirectory() . "/imported/" . $ownerId;
try {
//Copy the temporary file over to the "organize" folder so that it's off our webserver
//and accessible by airtime_analyzer which could be running on a different machine.
@ -357,8 +354,8 @@ class Rest_MediaController extends Zend_Rest_Controller
Logging::error($e->getMessage());
}
//Logging::info("New temporary file path: " . $newTempFilePath);
//Logging::info("Final file path: " . $finalFullFilePath);
Logging::info($newTempFilePath);
//Logging::info("Old temp file path: " . $tempFilePath);
//Dispatch a message to airtime_analyzer through RabbitMQ,
//notifying it that there's a new upload to process!

View File

@ -1,5 +1,14 @@
airtime_analyzer
==========
Ghetto temporary installation instructions
airtime_analyzer is a daemon that processes Airtime file uploads as background jobs.
It performs metadata extraction using Mutagen and moves uploads into Airtime's
music library directory (stor/imported).
airtime_analyzer uses process isolation to make it resilient to crashes and runs in
a multi-tenant environment with no modifications.
Installation
==========
$ sudo python setup.py install
@ -71,3 +80,9 @@ a test, run:
To run the unit tests and generate a code coverage report, run:
$ nosetests --with-coverage --cover-package=airtime_analyzer
History and Design Motivation
===========

View File

@ -1,3 +1,5 @@
"""Contains the main application class for airtime_analyzer.
"""
import ConfigParser
import logging
import logging.handlers
@ -8,6 +10,8 @@ from message_listener import MessageListener
class AirtimeAnalyzerServer:
"""A server for importing uploads to Airtime as background jobs.
"""
# Constants
_LOG_PATH = "/var/log/airtime/airtime_analyzer.log"
@ -29,7 +33,12 @@ class AirtimeAnalyzerServer:
def setup_logging(self, debug):
"""Set up nicely formatted logging and log rotation.
Keyword arguments:
debug -- a boolean indicating whether to enable super verbose logging
to the screen and disk.
"""
if debug:
self._log_level = logging.DEBUG
else:
@ -37,8 +46,6 @@ class AirtimeAnalyzerServer:
pika_logger = logging.getLogger('pika')
pika_logger.setLevel(logging.CRITICAL)
#self.log = logging.getLogger(__name__)
# Set up logging
logFormatter = logging.Formatter("%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s")
rootLogger = logging.getLogger()
@ -55,6 +62,7 @@ class AirtimeAnalyzerServer:
def read_config_file(self, config_path):
"""Parse the application's config file located at config_path."""
config = ConfigParser.SafeConfigParser()
try:
config.readfp(open(config_path))
@ -66,12 +74,4 @@ class AirtimeAnalyzerServer:
exit(-1)
return config
''' When being run from the command line, analyze a file passed
as an argument. '''
if __name__ == "__main__":
import sys
analyzers = AnalyzerPipeline()

View File

@ -1,12 +1,13 @@
class Analyzer:
""" Abstract base class fpr all "analyzers".
"""
@staticmethod
def analyze(filename, metadata):
raise NotImplementedError
'''
class AnalyzerError(Exception):
class AnalyzerError(Error):
def __init__(self):
super.__init__(self)
'''

View File

@ -1,14 +1,35 @@
""" Analyzes and imports an audio file into the Airtime library.
"""
import logging
import multiprocessing
from metadata_analyzer import MetadataAnalyzer
from filemover_analyzer import FileMoverAnalyzer
class AnalyzerPipeline:
# Take message dictionary and perform the necessary analysis.
""" Analyzes and imports an audio file into the Airtime library.
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
then moves the file to the Airtime music library (stor/imported), and returns
the results back to the parent process. This class is used in an isolated process
so that if it crashes, it does not kill the entire airtime_analyzer daemon and
the failure to import can be reported back to the web application.
"""
@staticmethod
def run_analysis(queue, audio_file_path, import_directory, original_filename):
"""Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments:
queue: A multiprocessing.queues.Queue which will be used to pass the
extracted metadata back to the parent process.
audio_file_path: Path on disk to the audio file to analyze.
import_directory: Path to the final Airtime "import" directory where
we will move the file.
original_filename: The original filename of the file, which we'll try to
preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want
to know what the original name was.
"""
if not isinstance(queue, multiprocessing.queues.Queue):
raise TypeError("queue must be a multiprocessing.Queue()")
if not isinstance(audio_file_path, unicode):
@ -18,8 +39,6 @@ class AnalyzerPipeline:
if not isinstance(original_filename, unicode):
raise TypeError("original_filename must be unicode. Was of type " + type(original_filename).__name__ + " instead.")
#print ReplayGainAnalyzer.analyze("foo.mp3")
# Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata:
metadata = dict()
@ -30,8 +49,8 @@ class AnalyzerPipeline:
# Note that the queue we're putting the results into is our interprocess communication
# back to the main process.
#Pass all the file metadata back to the main analyzer process, which then passes
#it back to the Airtime web application.
# Pass all the file metadata back to the main analyzer process, which then passes
# it back to the Airtime web application.
queue.put(metadata)

View File

@ -9,13 +9,26 @@ import uuid
from analyzer import Analyzer
class FileMoverAnalyzer(Analyzer):
"""This analyzer copies a file over from a temporary directory (stor/organize)
into the Airtime library (stor/imported).
"""
@staticmethod
def analyze(audio_file_path, metadata):
"""Dummy method because we need more info than analyze gets passed to it"""
raise Exception("Use FileMoverAnalyzer.move() instead.")
@staticmethod
def move(audio_file_path, import_directory, original_filename, metadata):
"""Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename.
Keyword arguments:
audio_file_path: Path to the file to be imported.
import_directory: Path to the "import" directory inside the Airtime stor directory.
(eg. /srv/airtime/stor/import)
original_filename: The filename of the file when it was uploaded to Airtime.
metadata: A dictionary where the "full_path" of where the file is moved to will be added.
"""
if not isinstance(audio_file_path, unicode):
raise TypeError("audio_file_path must be unicode. Was of type " + type(audio_file_path).__name__)
if not isinstance(import_directory, unicode):
@ -68,6 +81,7 @@ class FileMoverAnalyzer(Analyzer):
return metadata
def mkdir_p(path):
""" Make all directories in a tree (like mkdir -p)"""
if path == "":
return
try:

View File

@ -9,19 +9,56 @@ from status_reporter import StatusReporter
EXCHANGE = "airtime-uploads"
EXCHANGE_TYPE = "topic"
ROUTING_KEY = "" #"airtime.analyzer.tasks"
ROUTING_KEY = ""
QUEUE = "airtime-uploads"
''' TODO: Document me
- round robin messaging
- acking
- why we use the multiprocess architecture
- in general, how it works and why it works this way
'''
""" A message listener class that waits for messages from Airtime through RabbitMQ
notifying us about new uploads.
This is probably the most important class in this application. It connects
to RabbitMQ (or an AMQP server) and listens for messages that notify us
when a user uploads a new file to Airtime, either through the web interface
or via FTP (on Airtime Pro). When we get a notification, we spawn a child
process that extracts the uploaded audio file's metadata and moves it into
Airtime's music library directory. Lastly, the extracted metadata is
reported back to the Airtime web application.
There's a couple of Very Important technical details and contraints that you
need to know if you're going to work on this code:
1) airtime_analyzer is designed so it doesn't have to run on the same
computer as the web server. It just needs access to your Airtime library
folder (stor).
2) airtime_analyzer is multi-tenant - One process can be used for many
Airtime instances. It's designed NOT to know about whether it's running
in a single tenant or multi-tenant environment. All the information it
needs to import a file into an Airtime instance is passed in via those
RabbitMQ messages.
3) We're using a "topic exchange" for the new upload notification RabbitMQ
messages. This means if we run several airtime_analyzer processes on
different computers, RabbitMQ will do round-robin dispatching of the
file notification. This is cheap, easy load balancing and
redundancy for us. You can even run multiple airtime_analyzer processes
on one machine if you want.
4) We run the actual work (metadata analysis and file moving) in a separate
child process so that if it crashes, we can stop RabbitMQ from resending
the file notification message to another airtime_analyzer process (NACK),
which would otherwise cause cascading failure. We also do this so that we
can report the problem file to the Airtime web interface ("import failed").
So that is a quick overview of the design constraints for this application, and
why airtime_analyzer is written this way.
"""
class MessageListener:
def __init__(self, config):
''' Start listening for file upload notification messages
from RabbitMQ
Keyword arguments:
config: A ConfigParser object containing the [rabbitmq] configuration.
'''
# Read the RabbitMQ connection settings from the config file
# The exceptions throw here by default give good error messages.
@ -49,7 +86,7 @@ class MessageListener:
def connect_to_messaging_server(self):
'''Connect to the RabbitMQ server and start listening for messages.'''
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host,
port=self._port, virtual_host=self._vhost,
credentials=pika.credentials.PlainCredentials(self._username, self._password)))
@ -64,15 +101,21 @@ class MessageListener:
queue=QUEUE, no_ack=False)
def wait_for_messages(self):
'''Wait until we've received a RabbitMQ message.'''
self._channel.start_consuming()
def disconnect_from_messaging_server(self):
'''Stop consuming RabbitMQ messages and disconnect'''
self._channel.stop_consuming()
# consume callback function
@staticmethod
def msg_received_callback(channel, method_frame, header_frame, body):
''' A callback method that runs when a RabbitMQ message is received.
Here we parse the message, spin up an analyzer process, and report the
metadata back to the Airtime web application (or report an error).
'''
logging.info(" - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key))
#Declare all variables here so they exist in the exception handlers below, no matter what.
@ -83,11 +126,12 @@ class MessageListener:
callback_url = ""
api_key = ""
# Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
# to pass objects between the processes so that if the analyzer process crashes, it does not
# take down the rest of the daemon and we NACK that message so that it doesn't get
# propagated to other airtime_analyzer daemons (eg. running on other servers).
# We avoid cascading failure this way.
''' Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
to pass objects between the processes so that if the analyzer process crashes, it does not
take down the rest of the daemon and we NACK that message so that it doesn't get
propagated to other airtime_analyzer daemons (eg. running on other servers).
We avoid cascading failure this way.
'''
try:
msg_dict = json.loads(body)
audio_file_path = msg_dict["tmp_file_path"]
@ -109,13 +153,14 @@ class MessageListener:
except Exception as e:
logging.exception(e)
#If ANY exception happens while processing a file, we're going to NACK to the
#messaging server and tell it to remove the message from the queue.
#(NACK is a negative acknowledgement. We could use ACK instead, but this might come
# in handy in the future.)
#Exceptions in this context are unexpected, unhandled errors. We try to recover
#from as many errors as possble in AnalyzerPipeline, but we're safeguarding ourselves
#here from any catastrophic or genuinely unexpected errors:
''' If ANY exception happens while processing a file, we're going to NACK to the
messaging server and tell it to remove the message from the queue.
(NACK is a negative acknowledgement. We could use ACK instead, but this might come
in handy in the future.)
Exceptions in this context are unexpected, unhandled errors. We try to recover
from as many errors as possble in AnalyzerPipeline, but we're safeguarding ourselves
here from any catastrophic or genuinely unexpected errors:
'''
channel.basic_nack(delivery_tag=method_frame.delivery_tag, multiple=False,
requeue=False) #Important that it doesn't requeue the message
@ -136,7 +181,7 @@ class MessageListener:
@staticmethod
def spawn_analyzer_process(audio_file_path, import_directory, original_filename):
''' Spawn a child process to analyze and import a new audio file. '''
q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename))

View File

@ -8,6 +8,12 @@ class MetadataAnalyzer(Analyzer):
@staticmethod
def analyze(filename, metadata):
''' Extract audio metadata from tags embedded in the file (eg. ID3 tags)
Keyword arguments:
filename: The path to the audio file to extract metadata from.
metadata: A dictionary that the extracted metadata will be added to.
'''
if not isinstance(filename, unicode):
raise TypeError("filename must be unicode. Was of type " + type(filename).__name__)
if not isinstance(metadata, dict):
@ -25,7 +31,6 @@ class MetadataAnalyzer(Analyzer):
track_length = datetime.timedelta(seconds=info.length)
metadata["length"] = str(track_length) #time.strftime("%H:%M:%S.%f", track_length)
metadata["bit_rate"] = info.bitrate
#metadata["channels"] = info.channels
#Use the python-magic module to get the MIME type.
mime_magic = magic.Magic(mime=True)

View File

@ -1,6 +1,8 @@
from analyzer import Analyzer
''' TODO: everything '''
''' TODO: ReplayGain is currently calculated by pypo but it should
be done here in the analyzer.
'''
class ReplayGainAnalyzer(Analyzer):
def __init__(self):

View File

@ -3,15 +3,18 @@ import json
import logging
class StatusReporter():
''' Reports the extracted audio file metadata and job status back to the
Airtime web application.
'''
_HTTP_REQUEST_TIMEOUT = 30
# Report the extracted metadata and status of the successfully imported file
# to the callback URL (which should be the Airtime File Upload API)
@classmethod
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
# encode the audio metadata as json and post it back to the callback_url
''' Report the extracted metadata and status of the successfully imported file
to the callback URL (which should be the Airtime File Upload API)
'''
# Encode the audio metadata as json and post it back to the callback_url
put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload)
r = requests.put(callback_url, data=put_payload,
@ -20,13 +23,14 @@ class StatusReporter():
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
#todo: queue up failed requests and try them again later.
r.raise_for_status() # raise an exception if there was an http error code returned
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # Raise an exception if there was an http error code returned
@classmethod
def report_failure_to_callback_url(self, callback_url, api_key, import_status, reason):
# TODO: Make import_status is an int?
if not isinstance(import_status, (int, long) ):
raise TypeError("import_status must be an integer. Was of type " + type(import_status).__name__)
logging.debug("Reporting import failure to Airtime REST API...")
audio_metadata = dict()
audio_metadata["import_status"] = import_status

View File

@ -1,3 +1,5 @@
"""Runs the airtime_analyzer application.
"""
#!/usr/bin/env python
import daemon
@ -8,19 +10,37 @@ import airtime_analyzer.airtime_analyzer as aa
VERSION = "1.0"
DEFAULT_CONFIG_PATH = '/etc/airtime/airtime.conf'
print "Airtime Analyzer " + VERSION
def run():
'''Entry-point for this application'''
print "Airtime Analyzer " + VERSION
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true")
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is /etc/airtime/airtime.conf)")
args = parser.parse_args()
check_if_media_monitor_is_running()
#Default config file path
config_path = DEFAULT_CONFIG_PATH
if args.rmq_config_file:
config_path = args.rmq_config_file
if args.daemon:
with daemon.DaemonContext():
aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug)
else:
# Run without daemonizing
aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug)
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--daemon", help="run as a daemon", action="store_true")
parser.add_argument("--debug", help="log full debugging output", action="store_true")
parser.add_argument("--rmq-config-file", help="specify a configuration file with RabbitMQ settings (default is /etc/airtime/airtime.conf)")
args = parser.parse_args()
'''Ensure media_monitor isn't running before we start, because it'll move newly uploaded
files into the library on us and screw up the operation of airtime_analyzer.
media_monitor is deprecated.
'''
def check_if_media_monitor_is_running():
"""Ensure media_monitor isn't running before we start.
We do this because media_monitor will move newly uploaded
files into the library on us and screw up the operation of airtime_analyzer.
media_monitor is deprecated.
"""
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
for pid in pids:
@ -33,17 +53,6 @@ def check_if_media_monitor_is_running():
except IOError: # proc has already terminated
continue
check_if_media_monitor_is_running()
run()
#Default config file path
config_path = DEFAULT_CONFIG_PATH
if args.rmq_config_file:
config_path = args.rmq_config_file
if args.daemon:
with daemon.DaemonContext():
analyzer = aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug)
else:
# Run without daemonizing
analyzer = aa.AirtimeAnalyzerServer(config_path=config_path, debug=args.debug)