refactor(analyzer): improve analyzer pipeline module (#1542)

* rename steps to pipeline module

* move pipeline entrypoint to pipeline module

* rename steps test module to pipeline

* fix paths after renames

* move step protocol to pipeline

* create pipeline status enum

* use Protocol from typing extensions

* Fix linting
This commit is contained in:
Jonas L 2022-01-28 06:09:19 +01:00 committed by GitHub
parent 74c8d20284
commit cba905e367
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 34 additions and 26 deletions

View file

@ -0,0 +1 @@
from .pipeline import Pipeline, PipelineStatus

View file

@ -0,0 +1,69 @@
from datetime import timedelta
from math import isclose
from subprocess import CalledProcessError
from typing import Any, Dict
from loguru import logger
from ..ffmpeg import compute_silences, probe_duration
def analyze_cuepoint(filepath: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Extracts the cuein and cueout times along and sets the file duration using ffmpeg.
"""
try:
duration = probe_duration(filepath)
if "length_seconds" in metadata and not isclose(
metadata["length_seconds"],
duration,
abs_tol=0.1,
):
logger.warning(
f"existing duration {metadata['length_seconds']} differs "
f"from the probed duration {duration}."
)
metadata["length_seconds"] = duration
metadata["length"] = str(timedelta(seconds=duration))
metadata["cuein"] = 0.0
metadata["cueout"] = duration
silences = compute_silences(filepath)
if len(silences) > 2:
# Only keep first and last silence
silences = silences[:: len(silences) - 1]
for silence in silences:
# Sanity check
if silence[0] >= silence[1]:
raise ValueError(
f"silence starts ({silence[0]}) after ending ({silence[1]})"
)
# Is this really the first silence ?
if isclose(
0.0,
max(0.0, silence[0]), # Clamp negative value
abs_tol=0.1,
):
metadata["cuein"] = max(0.0, silence[1])
# Is this really the last silence ?
elif isclose(
min(silence[1], duration), # Clamp infinity value
duration,
abs_tol=0.1,
):
metadata["cueout"] = min(silence[0], duration)
metadata["cuein"] = format(metadata["cuein"], "f")
metadata["cueout"] = format(metadata["cueout"], "f")
except (CalledProcessError, OSError):
pass
return metadata

View file

@ -0,0 +1,191 @@
import datetime
import hashlib
import os
import wave
from typing import Any, Dict
import magic
import mutagen
from loguru import logger
def analyze_metadata(filename: str, metadata: Dict[str, Any]):
"""Extract audio metadata from tags embedded in the file (eg. ID3 tags)
Keyword arguments:
filename: The path to the audio file to extract metadata from.
metadata: A dictionary that the extracted metadata will be added to.
"""
if not isinstance(filename, str):
raise TypeError(
"filename must be string. Was of type " + type(filename).__name__
)
if not isinstance(metadata, dict):
raise TypeError(
"metadata must be a dict. Was of type " + type(metadata).__name__
)
if not os.path.exists(filename):
raise FileNotFoundError(f"audio file not found: {filename}")
# Airtime <= 2.5.x nonsense:
metadata["ftype"] = "audioclip"
# Other fields we'll want to set for Airtime:
metadata["hidden"] = False
# Get file size and md5 hash of the file
metadata["filesize"] = os.path.getsize(filename)
with open(filename, "rb") as fh:
m = hashlib.md5()
while True:
data = fh.read(8192)
if not data:
break
m.update(data)
metadata["md5"] = m.hexdigest()
# Mutagen doesn't handle WAVE files so we use a different package
ms = magic.open(magic.MIME_TYPE)
ms.load()
with open(filename, "rb") as fh:
mime_check = ms.buffer(fh.read(2014))
metadata["mime"] = mime_check
if mime_check == "audio/x-wav":
return _analyze_wave(filename, metadata)
# Extract metadata from an audio file using mutagen
audio_file = mutagen.File(filename, easy=True)
# Bail if the file couldn't be parsed. The title should stay as the filename
# inside Airtime.
if (
audio_file == None
): # Don't use "if not" here. It is wrong due to mutagen's design.
return metadata
# Note that audio_file can equal {} if the file is valid but there's no metadata tags.
# We can still try to grab the info variables below.
# Grab other file information that isn't encoded in a tag, but instead usually
# in the file header. Mutagen breaks that out into a separate "info" object:
info = audio_file.info
if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent
metadata["sample_rate"] = info.sample_rate
if hasattr(info, "length"):
metadata["length_seconds"] = info.length
# Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=info.length)
metadata["length"] = str(
track_length
) # time.strftime("%H:%M:%S.%f", track_length)
# Other fields for Airtime
metadata["cueout"] = metadata["length"]
# Set a default cue in time in seconds
metadata["cuein"] = 0.0
if hasattr(info, "bitrate"):
metadata["bit_rate"] = info.bitrate
# Use the mutagen to get the MIME type, if it has one. This is more reliable and
# consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic.
if audio_file.mime:
metadata["mime"] = audio_file.mime[0]
# Try to get the number of channels if mutagen can...
try:
# Special handling for getting the # of channels from MP3s. It's in the "mode" field
# which is 0=Stereo, 1=Joint Stereo, 2=Dual Channel, 3=Mono. Part of the ID3 spec...
if metadata["mime"] in ["audio/mpeg", "audio/mp3"]:
if info.mode == 3:
metadata["channels"] = 1
else:
metadata["channels"] = 2
else:
metadata["channels"] = info.channels
except (AttributeError, KeyError):
# If mutagen can't figure out the number of channels, we'll just leave it out...
pass
# Try to extract the number of tracks on the album if we can (the "track total")
try:
track_number = audio_file["tracknumber"]
if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh
track_number = track_number[0]
track_number_tokens = track_number
if "/" in track_number:
track_number_tokens = track_number.split("/")
track_number = track_number_tokens[0]
elif "-" in track_number:
track_number_tokens = track_number.split("-")
track_number = track_number_tokens[0]
metadata["track_number"] = track_number
track_total = track_number_tokens[1]
metadata["track_total"] = track_total
except (AttributeError, KeyError, IndexError):
# If we couldn't figure out the track_number or track_total, just ignore it...
pass
# We normalize the mutagen tags slightly here, so in case mutagen changes,
# we find the
mutagen_to_airtime_mapping = {
"title": "track_title",
"artist": "artist_name",
"album": "album_title",
"bpm": "bpm",
"composer": "composer",
"conductor": "conductor",
"copyright": "copyright",
"comment": "comment",
"encoded_by": "encoder",
"genre": "genre",
"isrc": "isrc",
"label": "label",
"organization": "label",
#'length': 'length',
"language": "language",
"last_modified": "last_modified",
"mood": "mood",
"bit_rate": "bit_rate",
"replay_gain": "replaygain",
#'tracknumber': 'track_number',
#'track_total': 'track_total',
"website": "website",
"date": "year",
#'mime_type': 'mime',
}
for mutagen_tag, airtime_tag in mutagen_to_airtime_mapping.items():
try:
metadata[airtime_tag] = audio_file[mutagen_tag]
# Some tags are returned as lists because there could be multiple values.
# This is unusual so we're going to always just take the first item in the list.
if isinstance(metadata[airtime_tag], list):
if metadata[airtime_tag]:
metadata[airtime_tag] = metadata[airtime_tag][0]
else: # Handle empty lists
metadata[airtime_tag] = ""
except KeyError:
continue
return metadata
def _analyze_wave(filename, metadata):
try:
reader = wave.open(filename, "rb")
metadata["channels"] = reader.getnchannels()
metadata["sample_rate"] = reader.getframerate()
length_seconds = float(reader.getnframes()) / float(metadata["sample_rate"])
# Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=length_seconds)
metadata["length"] = str(
track_length
) # time.strftime("%H:%M:%S.%f", track_length)
metadata["length_seconds"] = length_seconds
metadata["cueout"] = metadata["length"]
except wave.Error as ex:
logger.error(f"Invalid WAVE file: {str(ex)}")
raise
return metadata

View file

@ -0,0 +1,45 @@
__author__ = "asantoni"
import subprocess
from typing import Any, Dict
from loguru import logger
class UnplayableFileError(Exception):
pass
LIQUIDSOAP_EXECUTABLE = "liquidsoap"
def analyze_playability(filename: str, metadata: Dict[str, Any]):
"""Checks if a file can be played by Liquidsoap.
:param filename: The full path to the file to analyzer
:param metadata: A metadata dictionary where the results will be put
:return: The metadata dictionary
"""
command = [
LIQUIDSOAP_EXECUTABLE,
"-v",
"-c",
"output.dummy(audio_to_stereo(single(argv(1))))",
"--",
filename,
]
try:
subprocess.check_output(command, stderr=subprocess.STDOUT, close_fds=True)
except OSError as e: # liquidsoap was not found
logger.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have liquidsoap installed?")
)
except (
subprocess.CalledProcessError,
Exception,
) as e: # liquidsoap returned an error code
logger.warning(e)
raise UnplayableFileError()
return metadata

View file

@ -0,0 +1,27 @@
from subprocess import CalledProcessError
from typing import Any, Dict
from ..ffmpeg import compute_replaygain, probe_replaygain
def analyze_replaygain(filepath: str, metadata: Dict[str, Any]):
"""
Extracts the Replaygain loudness normalization factor of a track using ffmpeg.
"""
try:
# First probe for existing replaygain metadata.
track_gain = probe_replaygain(filepath)
if track_gain is not None:
metadata["replay_gain"] = track_gain
return metadata
except (CalledProcessError, OSError):
pass
try:
track_gain = compute_replaygain(filepath)
if track_gain is not None:
metadata["replay_gain"] = track_gain
except (CalledProcessError, OSError):
pass
return metadata

View file

@ -0,0 +1,119 @@
import errno
import os
import shutil
import time
import uuid
from loguru import logger
def organise_file(audio_file_path, import_directory, original_filename, metadata):
"""Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename.
This analyzer copies a file over from a temporary directory (stor/organize)
into the Airtime library (stor/imported).
If you import three copies of the same file, the behaviour is:
- The filename is of the first file preserved.
- The filename of the second file has the timestamp attached to it.
- The filename of the third file has a UUID placed after the timestamp, but ONLY IF it's imported within 1 second of the second file (ie. if the timestamp is the same).
Keyword arguments:
audio_file_path: Path to the file to be imported.
import_directory: Path to the "import" directory inside the Airtime stor directory.
(eg. /srv/airtime/stor/import)
original_filename: The filename of the file when it was uploaded to Airtime.
metadata: A dictionary where the "full_path" of where the file is moved to will be added.
"""
if not isinstance(audio_file_path, str):
raise TypeError(
"audio_file_path must be string. Was of type "
+ type(audio_file_path).__name__
)
if not isinstance(import_directory, str):
raise TypeError(
"import_directory must be string. Was of type "
+ type(import_directory).__name__
)
if not isinstance(original_filename, str):
raise TypeError(
"original_filename must be string. Was of type "
+ type(original_filename).__name__
)
if not isinstance(metadata, dict):
raise TypeError(
"metadata must be a dict. Was of type " + type(metadata).__name__
)
if not os.path.exists(audio_file_path):
raise FileNotFoundError(f"audio file not found: {audio_file_path}")
# Import the file over to it's final location.
# TODO: Also, handle the case where the move fails and write some code
# to possibly move the file to problem_files.
max_dir_len = 48
max_file_len = 48
final_file_path = import_directory
orig_file_basename, orig_file_extension = os.path.splitext(original_filename)
if "artist_name" in metadata:
final_file_path += (
"/" + metadata["artist_name"][0:max_dir_len]
) # truncating with array slicing
if "album_title" in metadata:
final_file_path += "/" + metadata["album_title"][0:max_dir_len]
# Note that orig_file_extension includes the "." already
final_file_path += "/" + orig_file_basename[0:max_file_len] + orig_file_extension
# Ensure any redundant slashes are stripped
final_file_path = os.path.normpath(final_file_path)
# If a file with the same name already exists in the "import" directory, then
# we add a unique string to the end of this one. We never overwrite a file on import
# because if we did that, it would mean Airtime's database would have
# the wrong information for the file we just overwrote (eg. the song length would be wrong!)
# If the final file path is the same as the file we've been told to import (which
# you often do when you're debugging), then don't move the file at all.
if os.path.exists(final_file_path):
if os.path.samefile(audio_file_path, final_file_path):
metadata["full_path"] = final_file_path
return metadata
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "{}_{}{}".format(
base_file_path,
time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()),
file_extension,
)
# If THAT path exists, append a UUID instead:
while os.path.exists(final_file_path):
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "{}_{}{}".format(
base_file_path,
str(uuid.uuid4()),
file_extension,
)
# Ensure the full path to the file exists
mkdir_p(os.path.dirname(final_file_path))
# Move the file into its final destination directory
logger.debug(f"Moving {audio_file_path} to {final_file_path}")
shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path
return metadata
def mkdir_p(path):
"""Make all directories in a tree (like mkdir -p)"""
if path == "":
return
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise

View file

@ -0,0 +1,121 @@
""" Analyzes and imports an audio file into the Airtime library.
"""
from enum import Enum
from queue import Queue
from typing import Any, Dict
from loguru import logger
from typing_extensions import Protocol
from .analyze_cuepoint import analyze_cuepoint
from .analyze_metadata import analyze_metadata
from .analyze_playability import UnplayableFileError, analyze_playability
from .analyze_replaygain import analyze_replaygain
from .organise_file import organise_file
class Step(Protocol):
@staticmethod
def __call__(filename: str, metadata: Dict[str, Any]):
...
class PipelineStatus(int, Enum):
succeed = 0
pending = 1
failed = 2
class Pipeline:
"""Analyzes and imports an audio file into the Airtime library.
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
then moves the file to the Airtime music library (stor/imported), and returns
the results back to the parent process. This class is used in an isolated process
so that if it crashes, it does not kill the entire airtime_analyzer daemon and
the failure to import can be reported back to the web application.
"""
@staticmethod
def run_analysis(
queue,
audio_file_path,
import_directory,
original_filename,
storage_backend,
file_prefix,
):
"""Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments:
queue: A multiprocessing.queues.Queue which will be used to pass the
extracted metadata back to the parent process.
audio_file_path: Path on disk to the audio file to analyze.
import_directory: Path to the final Airtime "import" directory where
we will move the file.
original_filename: The original filename of the file, which we'll try to
preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want
to know what the original name was.
storage_backend: String indicating the storage backend (amazon_s3 or file)
file_prefix:
"""
try:
if not isinstance(queue, Queue):
raise TypeError("queue must be a Queue.Queue()")
if not isinstance(audio_file_path, str):
raise TypeError(
"audio_file_path must be unicode. Was of type "
+ type(audio_file_path).__name__
+ " instead."
)
if not isinstance(import_directory, str):
raise TypeError(
"import_directory must be unicode. Was of type "
+ type(import_directory).__name__
+ " instead."
)
if not isinstance(original_filename, str):
raise TypeError(
"original_filename must be unicode. Was of type "
+ type(original_filename).__name__
+ " instead."
)
if not isinstance(file_prefix, str):
raise TypeError(
"file_prefix must be unicode. Was of type "
+ type(file_prefix).__name__
+ " instead."
)
# Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata:
metadata = dict()
metadata["file_prefix"] = file_prefix
metadata = analyze_metadata(audio_file_path, metadata)
metadata = analyze_cuepoint(audio_file_path, metadata)
metadata = analyze_replaygain(audio_file_path, metadata)
metadata = analyze_playability(audio_file_path, metadata)
metadata = organise_file(
audio_file_path, import_directory, original_filename, metadata
)
metadata["import_status"] = 0 # Successfully imported
# Note that the queue we're putting the results into is our interprocess communication
# back to the main process.
# Pass all the file metadata back to the main analyzer process, which then passes
# it back to the Airtime web application.
queue.put(metadata)
except UnplayableFileError as e:
logger.exception(e)
metadata["import_status"] = PipelineStatus.failed
metadata["reason"] = "The file could not be played."
raise e
except Exception as e:
# Ensures the traceback for this child process gets written to our log files:
logger.exception(e)
raise e