chore: rename airtime_analyzer dir to libretime_analyzer

This commit is contained in:
jo 2021-12-23 23:51:23 +01:00 committed by Kyle Robbertze
parent bd43688757
commit 62476de478
13 changed files with 0 additions and 0 deletions

View file

View file

@ -0,0 +1,89 @@
"""Contains the main application class for airtime_analyzer.
"""
import logging
import logging.handlers
import signal
import sys
import traceback
from functools import partial
from . import config_file
from .message_listener import MessageListener
from .metadata_analyzer import MetadataAnalyzer
from .replaygain_analyzer import ReplayGainAnalyzer
from .status_reporter import StatusReporter
class AirtimeAnalyzerServer:
"""A server for importing uploads to Airtime as background jobs."""
# Constants
_LOG_PATH = "/var/log/airtime/airtime_analyzer.log"
# Variables
_log_level = logging.INFO
def __init__(self, rmq_config_path, http_retry_queue_path, debug=False):
# Dump a stacktrace with 'kill -SIGUSR2 <PID>'
signal.signal(
signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace()
)
# Configure logging
self.setup_logging(debug)
# Read our rmq config file
rmq_config = config_file.read_config_file(rmq_config_path)
# Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path)
# Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we receive a shutdown signal.
self._msg_listener = MessageListener(rmq_config)
StatusReporter.stop_thread()
def setup_logging(self, debug):
"""Set up nicely formatted logging and log rotation.
Keyword arguments:
debug -- a boolean indicating whether to enable super verbose logging
to the screen and disk.
"""
if debug:
self._log_level = logging.DEBUG
else:
# Disable most pika/rabbitmq logging:
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.CRITICAL)
# Set up logging
logFormatter = logging.Formatter(
"%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s"
)
rootLogger = logging.getLogger()
rootLogger.setLevel(self._log_level)
fileHandler = logging.handlers.RotatingFileHandler(
filename=self._LOG_PATH, maxBytes=1024 * 1024 * 30, backupCount=8
)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
@classmethod
def dump_stacktrace(stack):
"""Dump a stacktrace for all threads"""
code = []
for threadId, stack in list(sys._current_frames().items()):
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
logging.info("\n".join(code))

View file

@ -0,0 +1,9 @@
# TODO: use an abstract base class (ie. import from abc ...) once we have python >=3.3 that supports @staticmethod with @abstractmethod
class Analyzer:
"""Abstract base class for all "analyzers"."""
@staticmethod
def analyze(filename, metadata):
raise NotImplementedError

View file

@ -0,0 +1,125 @@
""" Analyzes and imports an audio file into the Airtime library.
"""
import configparser
import logging
import multiprocessing
import threading
from queue import Queue
from .cuepoint_analyzer import CuePointAnalyzer
from .filemover_analyzer import FileMoverAnalyzer
from .metadata_analyzer import MetadataAnalyzer
from .playability_analyzer import PlayabilityAnalyzer, UnplayableFileError
from .replaygain_analyzer import ReplayGainAnalyzer
class AnalyzerPipeline:
"""Analyzes and imports an audio file into the Airtime library.
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
then moves the file to the Airtime music library (stor/imported), and returns
the results back to the parent process. This class is used in an isolated process
so that if it crashes, it does not kill the entire airtime_analyzer daemon and
the failure to import can be reported back to the web application.
"""
IMPORT_STATUS_FAILED = 2
@staticmethod
def run_analysis(
queue,
audio_file_path,
import_directory,
original_filename,
storage_backend,
file_prefix,
):
"""Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments:
queue: A multiprocessing.queues.Queue which will be used to pass the
extracted metadata back to the parent process.
audio_file_path: Path on disk to the audio file to analyze.
import_directory: Path to the final Airtime "import" directory where
we will move the file.
original_filename: The original filename of the file, which we'll try to
preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want
to know what the original name was.
storage_backend: String indicating the storage backend (amazon_s3 or file)
file_prefix:
"""
# It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly
# this can lead to Bad Things (deadlocks): http://bugs.python.org/issue6721
AnalyzerPipeline.python_logger_deadlock_workaround()
try:
if not isinstance(queue, Queue):
raise TypeError("queue must be a Queue.Queue()")
if not isinstance(audio_file_path, str):
raise TypeError(
"audio_file_path must be unicode. Was of type "
+ type(audio_file_path).__name__
+ " instead."
)
if not isinstance(import_directory, str):
raise TypeError(
"import_directory must be unicode. Was of type "
+ type(import_directory).__name__
+ " instead."
)
if not isinstance(original_filename, str):
raise TypeError(
"original_filename must be unicode. Was of type "
+ type(original_filename).__name__
+ " instead."
)
if not isinstance(file_prefix, str):
raise TypeError(
"file_prefix must be unicode. Was of type "
+ type(file_prefix).__name__
+ " instead."
)
# Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata:
metadata = dict()
metadata["file_prefix"] = file_prefix
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
metadata = FileMoverAnalyzer.move(
audio_file_path, import_directory, original_filename, metadata
)
metadata["import_status"] = 0 # Successfully imported
# Note that the queue we're putting the results into is our interprocess communication
# back to the main process.
# Pass all the file metadata back to the main analyzer process, which then passes
# it back to the Airtime web application.
queue.put(metadata)
except UnplayableFileError as e:
logging.exception(e)
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
metadata["reason"] = "The file could not be played."
raise e
except Exception as e:
# Ensures the traceback for this child process gets written to our log files:
logging.exception(e)
raise e
@staticmethod
def python_logger_deadlock_workaround():
# Workaround for: http://bugs.python.org/issue6721#msg140215
logger_names = list(logging.Logger.manager.loggerDict.keys())
logger_names.append(None) # Root logger
for name in logger_names:
for handler in logging.getLogger(name).handlers:
handler.createLock()
logging._lock = threading.RLock()

View file

@ -0,0 +1,51 @@
"""
Main CLI entrypoint for the libretime-analyzer app.
"""
import argparse
import os
import airtime_analyzer.airtime_analyzer as aa
VERSION = "1.0"
LIBRETIME_CONF_DIR = os.getenv("LIBRETIME_CONF_DIR", "/etc/airtime")
DEFAULT_RMQ_CONFIG_PATH = os.path.join(LIBRETIME_CONF_DIR, "airtime.conf")
DEFAULT_HTTP_RETRY_PATH = "/tmp/airtime_analyzer_http_retries"
def main():
"""Entry-point for this application"""
print("LibreTime Analyzer {}".format(VERSION))
parser = argparse.ArgumentParser()
parser.add_argument(
"--debug", help="log full debugging output", action="store_true"
)
parser.add_argument(
"--rmq-config-file",
help="specify a configuration file with RabbitMQ settings (default is %s)"
% DEFAULT_RMQ_CONFIG_PATH,
)
parser.add_argument(
"--http-retry-queue-file",
help="specify where incompleted HTTP requests will be serialized (default is %s)"
% DEFAULT_HTTP_RETRY_PATH,
)
args = parser.parse_args()
# Default config file path
rmq_config_path = DEFAULT_RMQ_CONFIG_PATH
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
if args.rmq_config_file:
rmq_config_path = args.rmq_config_file
if args.http_retry_queue_file:
http_retry_queue_path = args.http_retry_queue_file
aa.AirtimeAnalyzerServer(
rmq_config_path=rmq_config_path,
http_retry_queue_path=http_retry_queue_path,
debug=args.debug,
)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,16 @@
import configparser
def read_config_file(config_path):
"""Parse the application's config file located at config_path."""
config = configparser.SafeConfigParser()
try:
config.readfp(open(config_path))
except IOError as e:
print("Failed to open config file at {}: {}".format(config_path, e.strerror))
exit(-1)
except Exception as e:
print(e.strerror)
exit(-1)
return config

View file

@ -0,0 +1,99 @@
import datetime
import json
import logging
import subprocess
import traceback
from .analyzer import Analyzer
class CuePointAnalyzer(Analyzer):
"""This class extracts the cue-in time, cue-out time, and length of a track using silan."""
SILAN_EXECUTABLE = "silan"
@staticmethod
def analyze(filename, metadata):
"""Extracts the cue-in and cue-out times along and sets the file duration based on that.
The cue points are there to skip the silence at the start and end of a track, and are determined
using "silan", which analyzes the loudness in a track.
:param filename: The full path to the file to analyzer
:param metadata: A metadata dictionary where the results will be put
:return: The metadata dictionary
"""
""" The silan -F 0.99 parameter tweaks the highpass filter. The default is 0.98, but at that setting,
the unit test on the short m4a file fails. With the new setting, it gets the correct cue-in time and
all the unit tests pass.
"""
command = [
CuePointAnalyzer.SILAN_EXECUTABLE,
"-b",
"-F",
"0.99",
"-f",
"JSON",
"-t",
"1.0",
filename,
]
try:
results_json = subprocess.check_output(
command, stderr=subprocess.STDOUT, close_fds=True
)
try:
results_json = results_json.decode()
except (UnicodeDecodeError, AttributeError):
pass
silan_results = json.loads(results_json)
# Defensive coding against Silan wildly miscalculating the cue in and out times:
silan_length_seconds = float(silan_results["file duration"])
silan_cuein = format(silan_results["sound"][0][0], "f")
silan_cueout = format(silan_results["sound"][0][1], "f")
# Sanity check the results against any existing metadata passed to us (presumably extracted by Mutagen):
if "length_seconds" in metadata:
# Silan has a rare bug where it can massively overestimate the length or cue out time sometimes.
if (silan_length_seconds - metadata["length_seconds"] > 3) or (
float(silan_cueout) - metadata["length_seconds"] > 2
):
# Don't trust anything silan says then...
raise Exception(
"Silan cue out {0} or length {1} differs too much from the Mutagen length {2}. Ignoring Silan values.".format(
silan_cueout,
silan_length_seconds,
metadata["length_seconds"],
)
)
# Don't allow silan to trim more than the greater of 3 seconds or 5% off the start of a track
if float(silan_cuein) > max(silan_length_seconds * 0.05, 3):
raise Exception(
"Silan cue in time {0} too big, ignoring.".format(silan_cuein)
)
else:
# Only use the Silan track length in the worst case, where Mutagen didn't give us one for some reason.
# (This is mostly to make the unit tests still pass.)
# Convert the length into a formatted time string.
metadata["length_seconds"] = silan_length_seconds #
track_length = datetime.timedelta(seconds=metadata["length_seconds"])
metadata["length"] = str(track_length)
""" XXX: I've commented out the track_length stuff below because Mutagen seems more accurate than silan
as of Mutagen version 1.31. We are always going to use Mutagen's length now because Silan's
length can be off by a few seconds reasonably often.
"""
metadata["cuein"] = silan_cuein
metadata["cueout"] = silan_cueout
except OSError as e: # silan was not found
logging.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have silan installed?")
)
except subprocess.CalledProcessError as e: # silan returned an error code
logging.warning("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e:
logging.warning(e)
return metadata

View file

@ -0,0 +1,131 @@
import errno
import logging
import os
import shutil
import time
import uuid
from .analyzer import Analyzer
class FileMoverAnalyzer(Analyzer):
"""
This analyzer copies a file over from a temporary directory (stor/organize)
into the Airtime library (stor/imported).
If you import three copies of the same file, the behaviour is:
- The filename is of the first file preserved.
- The filename of the second file has the timestamp attached to it.
- The filename of the third file has a UUID placed after the timestamp, but ONLY IF it's imported within 1 second of the second file (ie. if the timestamp is the same).
"""
@staticmethod
def analyze(audio_file_path, metadata):
"""Dummy method because we need more info than analyze gets passed to it"""
raise Exception("Use FileMoverAnalyzer.move() instead.")
@staticmethod
def move(audio_file_path, import_directory, original_filename, metadata):
"""Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename.
Keyword arguments:
audio_file_path: Path to the file to be imported.
import_directory: Path to the "import" directory inside the Airtime stor directory.
(eg. /srv/airtime/stor/import)
original_filename: The filename of the file when it was uploaded to Airtime.
metadata: A dictionary where the "full_path" of where the file is moved to will be added.
"""
if not isinstance(audio_file_path, str):
raise TypeError(
"audio_file_path must be string. Was of type "
+ type(audio_file_path).__name__
)
if not isinstance(import_directory, str):
raise TypeError(
"import_directory must be string. Was of type "
+ type(import_directory).__name__
)
if not isinstance(original_filename, str):
raise TypeError(
"original_filename must be string. Was of type "
+ type(original_filename).__name__
)
if not isinstance(metadata, dict):
raise TypeError(
"metadata must be a dict. Was of type " + type(metadata).__name__
)
if not os.path.exists(audio_file_path):
raise FileNotFoundError("audio file not found: {}".format(audio_file_path))
# Import the file over to it's final location.
# TODO: Also, handle the case where the move fails and write some code
# to possibly move the file to problem_files.
max_dir_len = 48
max_file_len = 48
final_file_path = import_directory
orig_file_basename, orig_file_extension = os.path.splitext(original_filename)
if "artist_name" in metadata:
final_file_path += (
"/" + metadata["artist_name"][0:max_dir_len]
) # truncating with array slicing
if "album_title" in metadata:
final_file_path += "/" + metadata["album_title"][0:max_dir_len]
# Note that orig_file_extension includes the "." already
final_file_path += (
"/" + orig_file_basename[0:max_file_len] + orig_file_extension
)
# Ensure any redundant slashes are stripped
final_file_path = os.path.normpath(final_file_path)
# If a file with the same name already exists in the "import" directory, then
# we add a unique string to the end of this one. We never overwrite a file on import
# because if we did that, it would mean Airtime's database would have
# the wrong information for the file we just overwrote (eg. the song length would be wrong!)
# If the final file path is the same as the file we've been told to import (which
# you often do when you're debugging), then don't move the file at all.
if os.path.exists(final_file_path):
if os.path.samefile(audio_file_path, final_file_path):
metadata["full_path"] = final_file_path
return metadata
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "%s_%s%s" % (
base_file_path,
time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()),
file_extension,
)
# If THAT path exists, append a UUID instead:
while os.path.exists(final_file_path):
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "%s_%s%s" % (
base_file_path,
str(uuid.uuid4()),
file_extension,
)
# Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path))
# Move the file into its final destination directory
logging.debug("Moving %s to %s" % (audio_file_path, final_file_path))
shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path
return metadata
def mkdir_p(path):
"""Make all directories in a tree (like mkdir -p)"""
if path == "":
return
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise

View file

@ -0,0 +1,287 @@
import json
import logging
import multiprocessing
import queue
import select
import signal
import sys
import time
import pika
from .analyzer_pipeline import AnalyzerPipeline
from .status_reporter import StatusReporter
EXCHANGE = "airtime-uploads"
EXCHANGE_TYPE = "topic"
ROUTING_KEY = ""
QUEUE = "airtime-uploads"
""" A message listener class that waits for messages from Airtime through RabbitMQ
notifying us about new uploads.
This is probably the most important class in this application. It connects
to RabbitMQ (or an AMQP server) and listens for messages that notify us
when a user uploads a new file to Airtime, either through the web interface
or via FTP (on Airtime Pro). When we get a notification, we spawn a child
process that extracts the uploaded audio file's metadata and moves it into
Airtime's music library directory. Lastly, the extracted metadata is
reported back to the Airtime web application.
There's a couple of Very Important technical details and constraints that you
need to know if you're going to work on this code:
1) airtime_analyzer is designed so it doesn't have to run on the same
computer as the web server. It just needs access to your Airtime library
folder (stor).
2) airtime_analyzer is multi-tenant - One process can be used for many
Airtime instances. It's designed NOT to know about whether it's running
in a single tenant or multi-tenant environment. All the information it
needs to import a file into an Airtime instance is passed in via those
RabbitMQ messages.
3) We're using a "topic exchange" for the new upload notification RabbitMQ
messages. This means if we run several airtime_analyzer processes on
different computers, RabbitMQ will do round-robin dispatching of the
file notification. This is cheap, easy load balancing and
redundancy for us. You can even run multiple airtime_analyzer processes
on one machine if you want.
4) We run the actual work (metadata analysis and file moving) in a separate
child process so that if it crashes, we can stop RabbitMQ from resending
the file notification message to another airtime_analyzer process (NACK),
which would otherwise cause cascading failure. We also do this so that we
can report the problem file to the Airtime web interface ("import failed").
So that is a quick overview of the design constraints for this application, and
why airtime_analyzer is written this way.
"""
class MessageListener:
def __init__(self, rmq_config):
"""Start listening for file upload notification messages
from RabbitMQ
Keyword arguments:
rmq_config: A ConfigParser object containing the [rabbitmq] configuration.
"""
self._shutdown = False
# Read the RabbitMQ connection settings from the rmq_config file
# The exceptions throw here by default give good error messages.
RMQ_CONFIG_SECTION = "rabbitmq"
self._host = rmq_config.get(RMQ_CONFIG_SECTION, "host")
self._port = rmq_config.getint(RMQ_CONFIG_SECTION, "port")
self._username = rmq_config.get(RMQ_CONFIG_SECTION, "user")
self._password = rmq_config.get(RMQ_CONFIG_SECTION, "password")
self._vhost = rmq_config.get(RMQ_CONFIG_SECTION, "vhost")
# Set up a signal handler so we can shutdown gracefully
# For some reason, this signal handler must be set up here. I'd rather
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
# with pika's SIGTERM handler interfering with it, I think...)
signal.signal(signal.SIGTERM, self.graceful_shutdown)
while not self._shutdown:
try:
self.connect_to_messaging_server()
self.wait_for_messages()
except (KeyboardInterrupt, SystemExit):
break # Break out of the while loop and exit the application
except select.error:
pass
except pika.exceptions.AMQPError as e:
if self._shutdown:
break
logging.error("Connection to message queue failed. ")
logging.error(e)
logging.info("Retrying in 5 seconds...")
time.sleep(5)
self.disconnect_from_messaging_server()
logging.info("Exiting cleanly.")
def connect_to_messaging_server(self):
"""Connect to the RabbitMQ server and start listening for messages."""
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self._host,
port=self._port,
virtual_host=self._vhost,
credentials=pika.credentials.PlainCredentials(
self._username, self._password
),
)
)
self._channel = self._connection.channel()
self._channel.exchange_declare(
exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE, durable=True
)
result = self._channel.queue_declare(queue=QUEUE, durable=True)
self._channel.queue_bind(
exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY
)
logging.info(" Listening for messages...")
self._channel.basic_consume(QUEUE, self.msg_received_callback, auto_ack=False)
def wait_for_messages(self):
"""Wait until we've received a RabbitMQ message."""
self._channel.start_consuming()
def disconnect_from_messaging_server(self):
"""Stop consuming RabbitMQ messages and disconnect"""
# If you try to close a connection that's already closed, you're going to have a bad time.
# We're breaking EAFP because this can be called multiple times depending on exception
# handling flow here.
if not self._channel.is_closed and not self._channel.is_closing:
self._channel.stop_consuming()
if not self._connection.is_closed and not self._connection.is_closing:
self._connection.close()
def graceful_shutdown(self, signum, frame):
"""Disconnect and break out of the message listening loop"""
self._shutdown = True
self.disconnect_from_messaging_server()
def msg_received_callback(self, channel, method_frame, header_frame, body):
"""A callback method that runs when a RabbitMQ message is received.
Here we parse the message, spin up an analyzer process, and report the
metadata back to the Airtime web application (or report an error).
"""
logging.info(
" - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key)
)
# Declare all variables here so they exist in the exception handlers below, no matter what.
audio_file_path = ""
# final_file_path = ""
import_directory = ""
original_filename = ""
callback_url = ""
api_key = ""
file_prefix = ""
""" Spin up a worker process. We use the multiprocessing module and multiprocessing.Queue
to pass objects between the processes so that if the analyzer process crashes, it does not
take down the rest of the daemon and we NACK that message so that it doesn't get
propagated to other airtime_analyzer daemons (eg. running on other servers).
We avoid cascading failure this way.
"""
try:
try:
body = body.decode()
except (UnicodeDecodeError, AttributeError):
pass
msg_dict = json.loads(body)
api_key = msg_dict["api_key"]
callback_url = msg_dict["callback_url"]
audio_file_path = msg_dict["tmp_file_path"]
import_directory = msg_dict["import_directory"]
original_filename = msg_dict["original_filename"]
file_prefix = msg_dict["file_prefix"]
storage_backend = msg_dict["storage_backend"]
audio_metadata = MessageListener.spawn_analyzer_process(
audio_file_path,
import_directory,
original_filename,
storage_backend,
file_prefix,
)
StatusReporter.report_success_to_callback_url(
callback_url, api_key, audio_metadata
)
except KeyError as e:
# A field in msg_dict that we needed was missing (eg. audio_file_path)
logging.exception(
"A mandatory airtime_analyzer message field was missing from the message."
)
# See the huge comment about NACK below.
channel.basic_nack(
delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False
) # Important that it doesn't requeue the message
except Exception as e:
logging.exception(e)
""" If ANY exception happens while processing a file, we're going to NACK to the
messaging server and tell it to remove the message from the queue.
(NACK is a negative acknowledgement. We could use ACK instead, but this might come
in handy in the future.)
Exceptions in this context are unexpected, unhandled errors. We try to recover
from as many errors as possible in AnalyzerPipeline, but we're safeguarding ourselves
here from any catastrophic or genuinely unexpected errors:
"""
channel.basic_nack(
delivery_tag=method_frame.delivery_tag, multiple=False, requeue=False
) # Important that it doesn't requeue the message
#
# TODO: If the JSON was invalid or the web server is down,
# then don't report that failure to the REST API
# TODO: Catch exceptions from this HTTP request too:
if (
callback_url
): # If we got an invalid message, there might be no callback_url in the JSON
# Report this as a failed upload to the File Upload REST API.
StatusReporter.report_failure_to_callback_url(
callback_url,
api_key,
import_status=2,
reason="An error occurred while importing this file",
)
else:
# ACK at the very end, after the message has been successfully processed.
# If we don't ack, then RabbitMQ will redeliver the message in the future.
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
@staticmethod
def spawn_analyzer_process(
audio_file_path,
import_directory,
original_filename,
storage_backend,
file_prefix,
):
"""Spawn a child process to analyze and import a new audio file."""
"""
q = multiprocessing.Queue()
p = multiprocessing.Process(target=AnalyzerPipeline.run_analysis,
args=(q, audio_file_path, import_directory, original_filename, storage_backend, file_prefix))
p.start()
p.join()
if p.exitcode == 0:
results = q.get()
logging.info("Main process received results from child: ")
logging.info(results)
else:
raise Exception("Analyzer process terminated unexpectedly.")
"""
metadata = {}
q = queue.Queue()
try:
AnalyzerPipeline.run_analysis(
q,
audio_file_path,
import_directory,
original_filename,
storage_backend,
file_prefix,
)
metadata = q.get()
except Exception as e:
logging.error("Analyzer pipeline exception: %s" % str(e))
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
# Ensure our queue doesn't fill up and block due to unexpected behaviour. Defensive code.
while not q.empty():
q.get()
return metadata

View file

@ -0,0 +1,195 @@
import datetime
import hashlib
import logging
import os
import time
import wave
import magic
import mutagen
from .analyzer import Analyzer
class MetadataAnalyzer(Analyzer):
@staticmethod
def analyze(filename, metadata):
"""Extract audio metadata from tags embedded in the file (eg. ID3 tags)
Keyword arguments:
filename: The path to the audio file to extract metadata from.
metadata: A dictionary that the extracted metadata will be added to.
"""
if not isinstance(filename, str):
raise TypeError(
"filename must be string. Was of type " + type(filename).__name__
)
if not isinstance(metadata, dict):
raise TypeError(
"metadata must be a dict. Was of type " + type(metadata).__name__
)
if not os.path.exists(filename):
raise FileNotFoundError("audio file not found: {}".format(filename))
# Airtime <= 2.5.x nonsense:
metadata["ftype"] = "audioclip"
# Other fields we'll want to set for Airtime:
metadata["hidden"] = False
# Get file size and md5 hash of the file
metadata["filesize"] = os.path.getsize(filename)
with open(filename, "rb") as fh:
m = hashlib.md5()
while True:
data = fh.read(8192)
if not data:
break
m.update(data)
metadata["md5"] = m.hexdigest()
# Mutagen doesn't handle WAVE files so we use a different package
ms = magic.open(magic.MIME_TYPE)
ms.load()
with open(filename, "rb") as fh:
mime_check = ms.buffer(fh.read(2014))
metadata["mime"] = mime_check
if mime_check == "audio/x-wav":
return MetadataAnalyzer._analyze_wave(filename, metadata)
# Extract metadata from an audio file using mutagen
audio_file = mutagen.File(filename, easy=True)
# Bail if the file couldn't be parsed. The title should stay as the filename
# inside Airtime.
if (
audio_file == None
): # Don't use "if not" here. It is wrong due to mutagen's design.
return metadata
# Note that audio_file can equal {} if the file is valid but there's no metadata tags.
# We can still try to grab the info variables below.
# Grab other file information that isn't encoded in a tag, but instead usually
# in the file header. Mutagen breaks that out into a separate "info" object:
info = audio_file.info
if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent
metadata["sample_rate"] = info.sample_rate
if hasattr(info, "length"):
metadata["length_seconds"] = info.length
# Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=info.length)
metadata["length"] = str(
track_length
) # time.strftime("%H:%M:%S.%f", track_length)
# Other fields for Airtime
metadata["cueout"] = metadata["length"]
# Set a default cue in time in seconds
metadata["cuein"] = 0.0
if hasattr(info, "bitrate"):
metadata["bit_rate"] = info.bitrate
# Use the mutagen to get the MIME type, if it has one. This is more reliable and
# consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic.
if audio_file.mime:
metadata["mime"] = audio_file.mime[0]
# Try to get the number of channels if mutagen can...
try:
# Special handling for getting the # of channels from MP3s. It's in the "mode" field
# which is 0=Stereo, 1=Joint Stereo, 2=Dual Channel, 3=Mono. Part of the ID3 spec...
if metadata["mime"] in ["audio/mpeg", "audio/mp3"]:
if info.mode == 3:
metadata["channels"] = 1
else:
metadata["channels"] = 2
else:
metadata["channels"] = info.channels
except (AttributeError, KeyError):
# If mutagen can't figure out the number of channels, we'll just leave it out...
pass
# Try to extract the number of tracks on the album if we can (the "track total")
try:
track_number = audio_file["tracknumber"]
if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh
track_number = track_number[0]
track_number_tokens = track_number
if "/" in track_number:
track_number_tokens = track_number.split("/")
track_number = track_number_tokens[0]
elif "-" in track_number:
track_number_tokens = track_number.split("-")
track_number = track_number_tokens[0]
metadata["track_number"] = track_number
track_total = track_number_tokens[1]
metadata["track_total"] = track_total
except (AttributeError, KeyError, IndexError):
# If we couldn't figure out the track_number or track_total, just ignore it...
pass
# We normalize the mutagen tags slightly here, so in case mutagen changes,
# we find the
mutagen_to_airtime_mapping = {
"title": "track_title",
"artist": "artist_name",
"album": "album_title",
"bpm": "bpm",
"composer": "composer",
"conductor": "conductor",
"copyright": "copyright",
"comment": "comment",
"encoded_by": "encoder",
"genre": "genre",
"isrc": "isrc",
"label": "label",
"organization": "label",
#'length': 'length',
"language": "language",
"last_modified": "last_modified",
"mood": "mood",
"bit_rate": "bit_rate",
"replay_gain": "replaygain",
#'tracknumber': 'track_number',
#'track_total': 'track_total',
"website": "website",
"date": "year",
#'mime_type': 'mime',
}
for mutagen_tag, airtime_tag in mutagen_to_airtime_mapping.items():
try:
metadata[airtime_tag] = audio_file[mutagen_tag]
# Some tags are returned as lists because there could be multiple values.
# This is unusual so we're going to always just take the first item in the list.
if isinstance(metadata[airtime_tag], list):
if metadata[airtime_tag]:
metadata[airtime_tag] = metadata[airtime_tag][0]
else: # Handle empty lists
metadata[airtime_tag] = ""
except KeyError:
continue
return metadata
@staticmethod
def _analyze_wave(filename, metadata):
try:
reader = wave.open(filename, "rb")
metadata["channels"] = reader.getnchannels()
metadata["sample_rate"] = reader.getframerate()
length_seconds = float(reader.getnframes()) / float(metadata["sample_rate"])
# Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=length_seconds)
metadata["length"] = str(
track_length
) # time.strftime("%H:%M:%S.%f", track_length)
metadata["length_seconds"] = length_seconds
metadata["cueout"] = metadata["length"]
except wave.Error as ex:
logging.error("Invalid WAVE file: {}".format(str(ex)))
raise
return metadata

View file

@ -0,0 +1,48 @@
__author__ = "asantoni"
import logging
import subprocess
from .analyzer import Analyzer
class UnplayableFileError(Exception):
pass
class PlayabilityAnalyzer(Analyzer):
"""This class checks if a file can actually be played with Liquidsoap."""
LIQUIDSOAP_EXECUTABLE = "liquidsoap"
@staticmethod
def analyze(filename, metadata):
"""Checks if a file can be played by Liquidsoap.
:param filename: The full path to the file to analyzer
:param metadata: A metadata dictionary where the results will be put
:return: The metadata dictionary
"""
command = [
PlayabilityAnalyzer.LIQUIDSOAP_EXECUTABLE,
"-v",
"-c",
"output.dummy(audio_to_stereo(single(argv(1))))",
"--",
filename,
]
try:
subprocess.check_output(command, stderr=subprocess.STDOUT, close_fds=True)
except OSError as e: # liquidsoap was not found
logging.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have liquidsoap installed?")
)
except (
subprocess.CalledProcessError,
Exception,
) as e: # liquidsoap returned an error code
logging.warning(e)
raise UnplayableFileError()
return metadata

View file

@ -0,0 +1,46 @@
import logging
import re
import subprocess
from .analyzer import Analyzer
class ReplayGainAnalyzer(Analyzer):
"""This class extracts the ReplayGain using a tool from the python-rgain package."""
REPLAYGAIN_EXECUTABLE = "replaygain" # From the rgain3 python package
@staticmethod
def analyze(filename, metadata):
"""Extracts the Replaygain loudness normalization factor of a track.
:param filename: The full path to the file to analyzer
:param metadata: A metadata dictionary where the results will be put
:return: The metadata dictionary
"""
""" The -d flag means do a dry-run, ie. don't modify the file directly.
"""
command = [ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE, "-d", filename]
try:
results = subprocess.check_output(
command,
stderr=subprocess.STDOUT,
close_fds=True,
universal_newlines=True,
)
gain_match = (
r"Calculating Replay Gain information \.\.\.(?:\n|.)*?:([\d.-]*) dB"
)
replaygain = re.search(gain_match, results).group(1)
metadata["replay_gain"] = float(replaygain)
except OSError as e: # replaygain was not found
logging.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have python-rgain installed?")
)
except subprocess.CalledProcessError as e: # replaygain returned an error code
logging.warning("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e:
logging.warning(e)
return metadata

View file

@ -0,0 +1,284 @@
import collections
import json
import logging
import pickle
import queue
import threading
import time
import traceback
from urllib.parse import urlparse
import requests
# Disable urllib3 warnings because these can cause a rare deadlock due to Python 2's crappy internal non-reentrant locking
# around POSIX stuff. See SAAS-714. The hasattr() is for compatibility with older versions of requests.
if hasattr(requests, "packages"):
requests.packages.urllib3.disable_warnings()
class PicklableHttpRequest:
def __init__(self, method, url, data, api_key):
self.method = method
self.url = url
self.data = data
self.api_key = api_key
def create_request(self):
return requests.Request(
method=self.method,
url=self.url,
data=self.data,
auth=requests.auth.HTTPBasicAuth(self.api_key, ""),
)
def process_http_requests(ipc_queue, http_retry_queue_path):
"""Runs in a separate thread and performs all the HTTP requests where we're
reporting extracted audio file metadata or errors back to the Airtime web application.
This process also checks every 5 seconds if there's failed HTTP requests that we
need to retry. We retry failed HTTP requests so that we don't lose uploads if the
web server is temporarily down.
"""
# Store any failed requests (eg. due to web server errors or downtime) to be
# retried later:
retry_queue = collections.deque()
shutdown = False
# Unpickle retry_queue from disk so that we won't have lost any uploads
# if airtime_analyzer is shut down while the web server is down or unreachable,
# and there were failed HTTP requests pending, waiting to be retried.
try:
with open(http_retry_queue_path, "rb") as pickle_file:
retry_queue = pickle.load(pickle_file)
except IOError as e:
if e.errno == 2:
pass
else:
raise e
except Exception as e:
# If we fail to unpickle a saved queue of failed HTTP requests, then we'll just log an error
# and continue because those HTTP requests are lost anyways. The pickled file will be
# overwritten the next time the analyzer is shut down too.
logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
pass
while True:
try:
while not shutdown:
try:
request = ipc_queue.get(block=True, timeout=5)
if (
isinstance(request, str) and request == "shutdown"
): # Bit of a cheat
shutdown = True
break
if not isinstance(request, PicklableHttpRequest):
raise TypeError(
"request must be a PicklableHttpRequest. Was of type "
+ type(request).__name__
)
except queue.Empty:
request = None
# If there's no new HTTP request we need to execute, let's check our "retry
# queue" and see if there's any failed HTTP requests we can retry:
if request:
send_http_request(request, retry_queue)
else:
# Using a for loop instead of while so we only iterate over all the requests once!
for i in range(len(retry_queue)):
request = retry_queue.popleft()
send_http_request(request, retry_queue)
logging.info("Shutting down status_reporter")
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
# while the web server is down or unreachable.
with open(http_retry_queue_path, "wb") as pickle_file:
pickle.dump(retry_queue, pickle_file)
return
except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case.
if shutdown:
return
logging.exception("Unhandled exception in StatusReporter")
logging.exception(e)
logging.info("Restarting StatusReporter thread")
time.sleep(2) # Throttle it
def send_http_request(picklable_request, retry_queue):
if not isinstance(picklable_request, PicklableHttpRequest):
raise TypeError(
"picklable_request must be a PicklableHttpRequest. Was of type "
+ type(picklable_request).__name__
)
try:
bare_request = picklable_request.create_request()
s = requests.Session()
prepared_request = s.prepare_request(bare_request)
r = s.send(
prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT, verify=False
) # SNI is a pain in the ass
r.raise_for_status() # Raise an exception if there was an http error code returned
logging.info("HTTP request sent successfully.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422:
# Do no retry the request if there was a metadata validation error
logging.error(
"HTTP request failed due to an HTTP exception. Exception was: %s"
% str(e)
)
else:
# The request failed with an error 500 probably, so let's check if Airtime and/or
# the web server are broken. If not, then our request was probably causing an
# error 500 in the media API (ie. a bug), so there's no point in retrying it.
logging.error("HTTP request failed. Exception was: %s" % str(e))
parsed_url = urlparse(e.response.request.url)
if is_web_server_broken(parsed_url.scheme + "://" + parsed_url.netloc):
# If the web server is having problems, retry the request later:
retry_queue.append(picklable_request)
# Otherwise, if the request was bad, the request is never retried.
# You will have to find these bad requests in logs or you'll be
# notified by sentry.
except requests.exceptions.ConnectionError as e:
logging.error(
"HTTP request failed due to a connection error. Retrying later. %s" % str(e)
)
retry_queue.append(picklable_request) # Retry it later
except Exception as e:
logging.error("HTTP request failed with unhandled exception. %s" % str(e))
logging.error(traceback.format_exc())
# Don't put the request into the retry queue, just give up on this one.
# I'm doing this to protect against us getting some pathological request
# that breaks our code. I don't want us pickling data that potentially
# breaks airtime_analyzer.
def is_web_server_broken(url):
"""Do a naive test to check if the web server we're trying to access is down.
We use this to try to differentiate between error 500s that are coming
from (for example) a bug in the Airtime Media REST API and error 500s
caused by Airtime or the webserver itself being broken temporarily.
"""
try:
test_req = requests.get(url, verify=False)
test_req.raise_for_status()
except Exception as e:
return True
else:
# The request worked fine, so the web server and Airtime are still up.
return False
return False
class StatusReporter:
"""Reports the extracted audio file metadata and job status back to the
Airtime web application.
"""
_HTTP_REQUEST_TIMEOUT = 30
""" We use multiprocessing.Process again here because we need a thread for this stuff
anyways, and Python gives us process isolation for free (crash safety).
"""
_ipc_queue = queue.Queue()
# _http_thread = multiprocessing.Process(target=process_http_requests,
# args=(_ipc_queue,))
_http_thread = None
@classmethod
def start_thread(self, http_retry_queue_path):
StatusReporter._http_thread = threading.Thread(
target=process_http_requests,
args=(StatusReporter._ipc_queue, http_retry_queue_path),
)
StatusReporter._http_thread.start()
@classmethod
def stop_thread(self):
logging.info("Terminating status_reporter process")
# StatusReporter._http_thread.terminate() # Triggers SIGTERM on the child process
StatusReporter._ipc_queue.put("shutdown") # Special trigger
StatusReporter._http_thread.join()
@classmethod
def _send_http_request(self, request):
StatusReporter._ipc_queue.put(request)
@classmethod
def report_success_to_callback_url(self, callback_url, api_key, audio_metadata):
"""Report the extracted metadata and status of the successfully imported file
to the callback URL (which should be the Airtime File Upload API)
"""
put_payload = json.dumps(audio_metadata)
# r = requests.Request(method='PUT', url=callback_url, data=put_payload,
# auth=requests.auth.HTTPBasicAuth(api_key, ''))
"""
r = requests.Request(method='PUT', url=callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''))
StatusReporter._send_http_request(r)
"""
StatusReporter._send_http_request(
PicklableHttpRequest(
method="PUT", url=callback_url, data=put_payload, api_key=api_key
)
)
"""
try:
r.raise_for_status() # Raise an exception if there was an http error code returned
except requests.exceptions.RequestException:
StatusReporter._ipc_queue.put(r.prepare())
"""
"""
# Encode the audio metadata as json and post it back to the callback_url
put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload)
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # Raise an exception if there was an http error code returned
"""
@classmethod
def report_failure_to_callback_url(
self, callback_url, api_key, import_status, reason
):
if not isinstance(import_status, int):
raise TypeError(
"import_status must be an integer. Was of type "
+ type(import_status).__name__
)
logging.debug("Reporting import failure to Airtime REST API...")
audio_metadata = dict()
audio_metadata["import_status"] = import_status
audio_metadata["comment"] = reason # hack attack
put_payload = json.dumps(audio_metadata)
# logging.debug("sending http put with payload: " + put_payload)
"""
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
"""
StatusReporter._send_http_request(
PicklableHttpRequest(
method="PUT", url=callback_url, data=put_payload, api_key=api_key
)
)
"""
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # raise an exception if there was an http error code returned
"""