feat(analyzer): rework analyze_metadata step

- Upgrade mutagen to 1.45.1
- Remove custom wave data extraction
- Add .wav and ogg without extension fixtures
- Move md5 sum and mime type in there own function
- Cleanup comments
- Let analyze_cuepoints handle cuein and cueout metadata
- Remove python magic mime guessing
This commit is contained in:
jo 2022-02-14 21:35:15 +01:00 committed by Kyle Robbertze
parent 0106b4c6cb
commit 88dcd13fc8
5 changed files with 164 additions and 155 deletions

View file

@ -1,116 +1,61 @@
import datetime
import hashlib
import os
import wave
from datetime import timedelta
from pathlib import Path
from typing import Any, Dict
import magic
import mutagen
from loguru import logger
def analyze_metadata(filename: str, metadata: Dict[str, Any]):
"""Extract audio metadata from tags embedded in the file (eg. ID3 tags)
Keyword arguments:
filename: The path to the audio file to extract metadata from.
metadata: A dictionary that the extracted metadata will be added to.
def analyze_metadata(filepath_: str, metadata: Dict[str, Any]):
"""
if not isinstance(filename, str):
raise TypeError(
"filename must be string. Was of type " + type(filename).__name__
)
if not isinstance(metadata, dict):
raise TypeError(
"metadata must be a dict. Was of type " + type(metadata).__name__
)
if not os.path.exists(filename):
raise FileNotFoundError(f"audio file not found: {filename}")
Extract audio metadata from tags embedded in the file using mutagen.
"""
filepath = Path(filepath_)
# Airtime <= 2.5.x nonsense:
# Airtime <= 2.5.x required fields
metadata["ftype"] = "audioclip"
# Other fields we'll want to set for Airtime:
metadata["hidden"] = False
# Get file size and md5 hash of the file
metadata["filesize"] = os.path.getsize(filename)
# Get file properties
metadata["filesize"] = filepath.stat().st_size
metadata["md5"] = compute_md5(filepath)
with open(filename, "rb") as fh:
m = hashlib.md5()
while True:
data = fh.read(8192)
if not data:
break
m.update(data)
metadata["md5"] = m.hexdigest()
# Mutagen doesn't handle WAVE files so we use a different package
ms = magic.open(magic.MIME_TYPE)
ms.load()
with open(filename, "rb") as fh:
mime_check = ms.buffer(fh.read(2014))
metadata["mime"] = mime_check
if mime_check == "audio/x-wav":
return _analyze_wave(filename, metadata)
# Extract metadata from an audio file using mutagen
audio_file = mutagen.File(filename, easy=True)
# Bail if the file couldn't be parsed. The title should stay as the filename
# inside Airtime.
if (
audio_file == None
): # Don't use "if not" here. It is wrong due to mutagen's design.
# Get audio file metadata
extracted = mutagen.File(filepath, easy=True)
if extracted is None:
logger.warning(f"no metadata were extracted for {filepath}")
return metadata
# Note that audio_file can equal {} if the file is valid but there's no metadata tags.
# We can still try to grab the info variables below.
# Grab other file information that isn't encoded in a tag, but instead usually
# in the file header. Mutagen breaks that out into a separate "info" object:
info = audio_file.info
if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent
metadata["mime"] = extracted.mime[0]
info = extracted.info
if hasattr(info, "sample_rate"):
metadata["sample_rate"] = info.sample_rate
if hasattr(info, "length"):
metadata["length_seconds"] = info.length
# Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=info.length)
metadata["length"] = str(
track_length
) # time.strftime("%H:%M:%S.%f", track_length)
# Other fields for Airtime
metadata["cueout"] = metadata["length"]
# Set a default cue in time in seconds
metadata["cuein"] = 0.0
if hasattr(info, "bitrate"):
metadata["bit_rate"] = info.bitrate
# Use the mutagen to get the MIME type, if it has one. This is more reliable and
# consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic.
if audio_file.mime:
metadata["mime"] = audio_file.mime[0]
if hasattr(info, "length"):
metadata["length_seconds"] = info.length
metadata["length"] = str(timedelta(seconds=info.length))
# Try to get the number of channels if mutagen can...
try:
# Special handling for getting the # of channels from MP3s. It's in the "mode" field
# which is 0=Stereo, 1=Joint Stereo, 2=Dual Channel, 3=Mono. Part of the ID3 spec...
if metadata["mime"] in ["audio/mpeg", "audio/mp3"]:
if info.mode == 3:
metadata["channels"] = 1
else:
metadata["channels"] = 2
# Special handling for the number of channels in mp3 files.
# 0=stereo, 1=joint stereo, 2=dual channel, 3=mono
if metadata["mime"] in ("audio/mpeg", "audio/mp3"):
metadata["channels"] = 1 if info.mode == 3 else 2
else:
metadata["channels"] = info.channels
except (AttributeError, KeyError):
# If mutagen can't figure out the number of channels, we'll just leave it out...
pass
# Try to extract the number of tracks on the album if we can (the "track total")
try:
track_number = audio_file["tracknumber"]
if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh
track_number = extracted["tracknumber"]
if isinstance(track_number, list):
track_number = track_number[0]
track_number_tokens = track_number
if "/" in track_number:
track_number_tokens = track_number.split("/")
@ -122,12 +67,9 @@ def analyze_metadata(filename: str, metadata: Dict[str, Any]):
track_total = track_number_tokens[1]
metadata["track_total"] = track_total
except (AttributeError, KeyError, IndexError):
# If we couldn't figure out the track_number or track_total, just ignore it...
pass
# We normalize the mutagen tags slightly here, so in case mutagen changes,
# we find the
mutagen_to_airtime_mapping = {
extracted_tags_mapping = {
"title": "track_title",
"artist": "artist_name",
"album": "album_title",
@ -141,51 +83,43 @@ def analyze_metadata(filename: str, metadata: Dict[str, Any]):
"isrc": "isrc",
"label": "label",
"organization": "label",
#'length': 'length',
# "length": "length",
"language": "language",
"last_modified": "last_modified",
"mood": "mood",
"bit_rate": "bit_rate",
"replay_gain": "replaygain",
#'tracknumber': 'track_number',
#'track_total': 'track_total',
# "tracknumber": "track_number",
# "track_total": "track_total",
"website": "website",
"date": "year",
#'mime_type': 'mime',
# "mime_type": "mime",
}
for mutagen_tag, airtime_tag in mutagen_to_airtime_mapping.items():
for extracted_key, metadata_key in extracted_tags_mapping.items():
try:
metadata[airtime_tag] = audio_file[mutagen_tag]
# Some tags are returned as lists because there could be multiple values.
# This is unusual so we're going to always just take the first item in the list.
if isinstance(metadata[airtime_tag], list):
if metadata[airtime_tag]:
metadata[airtime_tag] = metadata[airtime_tag][0]
else: # Handle empty lists
metadata[airtime_tag] = ""
metadata[metadata_key] = extracted[extracted_key]
if isinstance(metadata[metadata_key], list):
if len(metadata[metadata_key]):
metadata[metadata_key] = metadata[metadata_key][0]
else:
metadata[metadata_key] = ""
except KeyError:
continue
return metadata
def _analyze_wave(filename, metadata):
try:
reader = wave.open(filename, "rb")
metadata["channels"] = reader.getnchannels()
metadata["sample_rate"] = reader.getframerate()
length_seconds = float(reader.getnframes()) / float(metadata["sample_rate"])
# Converting the length in seconds (float) to a formatted time string
track_length = datetime.timedelta(seconds=length_seconds)
metadata["length"] = str(
track_length
) # time.strftime("%H:%M:%S.%f", track_length)
metadata["length_seconds"] = length_seconds
metadata["cueout"] = metadata["length"]
except wave.Error as ex:
logger.error(f"Invalid WAVE file: {str(ex)}")
raise
return metadata
def compute_md5(filepath: Path) -> str:
"""
Compute a file md5sum.
"""
with filepath.open("rb") as file:
buffer = hashlib.md5() # nosec
while True:
blob = file.read(8192)
if not blob:
break
buffer.update(blob)
return buffer.hexdigest()