feat(analyzer): analyze cuepoint using ffmpeg

- store cue(in|out) as strings
- reraise when executable was not found

BREAKING CHANGE: The analyzer requires 'ffmpeg'. The 'silan' system dependency can be removed.
This commit is contained in:
jo 2022-01-21 09:09:42 +01:00 committed by Kyle Robbertze
parent ceab19271d
commit d93fb44356
5 changed files with 200 additions and 121 deletions

View File

@ -1,6 +1,7 @@
import re import re
from math import inf
from pathlib import Path from pathlib import Path
from typing import Optional from typing import List, Optional, Tuple
from .utils import run_ from .utils import run_
@ -54,3 +55,56 @@ def compute_replaygain(filepath: Path) -> Optional[float]:
if track_gain_match: if track_gain_match:
return float(track_gain_match.group(1)) return float(track_gain_match.group(1))
_SILENCE_DETECT_RE = re.compile(
r"\[silencedetect.*\] silence_(start|end): (-?\d+(?:\.\d+)?)(?: \| silence_duration: (\d+(?:\.\d+)?))?"
)
def compute_silences(filepath: Path) -> List[Tuple[float, float]]:
"""
Compute silence will analyse the given audio file and return a list of silences.
"""
cmd = _ffmpeg(
*("-i", filepath),
"-vn",
*("-filter", "highpass=frequency=1000"),
*("-filter", "silencedetect=noise=0.15:duration=1"),
)
starts, ends = [], []
for line in cmd.stderr.splitlines():
match = _SILENCE_DETECT_RE.search(line)
if match is None:
continue
kind = match.group(1)
if kind == "start":
start = float(match.group(2))
start = max(start, 0.0)
starts.append(start)
elif kind == "end":
end = float(match.group(2))
ends.append(end)
# ffmpeg v3 (bionic) does not warn about silence end when the track ends.
# Set the last silence ending to infinity, and clamp it to the track duration before
# using this value.
if len(starts) - 1 == len(ends):
ends.append(inf)
return list(zip(starts, ends))
def probe_duration(filepath: Path) -> float:
"""
Probe duration will probe the given audio file and return the duration.
"""
cmd = _ffprobe(
*("-i", filepath),
*("-show_entries", "format=duration"),
*("-v", "quiet"),
*("-of", "csv=p=0"),
)
return float(cmd.stdout.strip("\n"))

View File

@ -1,94 +1,69 @@
import datetime from datetime import timedelta
import json from math import isclose
import subprocess from subprocess import CalledProcessError
from typing import Any, Dict from typing import Any, Dict
from loguru import logger from loguru import logger
SILAN_EXECUTABLE = "silan" from ..ffmpeg import compute_silences, probe_duration
def analyze_cuepoint(filename: str, metadata: Dict[str, Any]): def analyze_cuepoint(filepath: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""Extracts the cue-in and cue-out times along and sets the file duration based on that.
The cue points are there to skip the silence at the start and end of a track, and are determined
using "silan", which analyzes the loudness in a track.
:param filename: The full path to the file to analyzer
:param metadata: A metadata dictionary where the results will be put
:return: The metadata dictionary
""" """
""" The silan -F 0.99 parameter tweaks the highpass filter. The default is 0.98, but at that setting, Extracts the cuein and cueout times along and sets the file duration using ffmpeg.
the unit test on the short m4a file fails. With the new setting, it gets the correct cue-in time and
all the unit tests pass.
""" """
command = [
SILAN_EXECUTABLE,
"-b",
"-F",
"0.99",
"-f",
"JSON",
"-t",
"1.0",
filename,
]
try: try:
results_json = subprocess.check_output( duration = probe_duration(filepath)
command, stderr=subprocess.STDOUT, close_fds=True
)
try:
results_json = results_json.decode()
except (UnicodeDecodeError, AttributeError):
pass
silan_results = json.loads(results_json)
# Defensive coding against Silan wildly miscalculating the cue in and out times: if "length_seconds" in metadata and not isclose(
silan_length_seconds = float(silan_results["file duration"]) metadata["length_seconds"],
silan_cuein = format(silan_results["sound"][0][0], "f") duration,
silan_cueout = format(silan_results["sound"][0][1], "f") abs_tol=0.1,
):
logger.warning(
f"existing duration {metadata['length_seconds']} differs "
f"from the probed duration {duration}."
)
# Sanity check the results against any existing metadata passed to us (presumably extracted by Mutagen): metadata["length_seconds"] = duration
if "length_seconds" in metadata: metadata["length"] = str(timedelta(seconds=duration))
# Silan has a rare bug where it can massively overestimate the length or cue out time sometimes. metadata["cuein"] = 0.0
if (silan_length_seconds - metadata["length_seconds"] > 3) or ( metadata["cueout"] = duration
float(silan_cueout) - metadata["length_seconds"] > 2
silences = compute_silences(filepath)
if len(silences) > 2:
# Only keep first and last silence
silences = silences[:: len(silences) - 1]
for silence in silences:
# Sanity check
if silence[0] >= silence[1]:
raise ValueError(
f"silence starts ({silence[0]}) after ending ({silence[1]})"
)
# Is this really the first silence ?
if isclose(
0.0,
max(0.0, silence[0]), # Clamp negative value
abs_tol=0.1,
): ):
# Don't trust anything silan says then... metadata["cuein"] = max(0.0, silence[1])
raise Exception(
"Silan cue out {0} or length {1} differs too much from the Mutagen length {2}. Ignoring Silan values.".format(
silan_cueout,
silan_length_seconds,
metadata["length_seconds"],
)
)
# Don't allow silan to trim more than the greater of 3 seconds or 5% off the start of a track
if float(silan_cuein) > max(silan_length_seconds * 0.05, 3):
raise Exception(
"Silan cue in time {0} too big, ignoring.".format(silan_cuein)
)
else:
# Only use the Silan track length in the worst case, where Mutagen didn't give us one for some reason.
# (This is mostly to make the unit tests still pass.)
# Convert the length into a formatted time string.
metadata["length_seconds"] = silan_length_seconds #
track_length = datetime.timedelta(seconds=metadata["length_seconds"])
metadata["length"] = str(track_length)
""" XXX: I've commented out the track_length stuff below because Mutagen seems more accurate than silan # Is this really the last silence ?
as of Mutagen version 1.31. We are always going to use Mutagen's length now because Silan's elif isclose(
length can be off by a few seconds reasonably often. min(silence[1], duration), # Clamp infinity value
""" duration,
abs_tol=0.1,
):
metadata["cueout"] = min(silence[0], duration)
metadata["cuein"] = silan_cuein metadata["cuein"] = format(metadata["cuein"], "f")
metadata["cueout"] = silan_cueout metadata["cueout"] = format(metadata["cueout"], "f")
except OSError as e: # silan was not found except (CalledProcessError, OSError):
logger.warning( pass
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have silan installed?")
)
except subprocess.CalledProcessError as e: # silan returned an error code
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e:
logger.warning(e)
return metadata return metadata

View File

@ -27,12 +27,9 @@ liquidsoap = buster, bullseye, bionic, focal
python3-pika = buster, bullseye, bionic, focal python3-pika = buster, bullseye, bionic, focal
[ffmpeg] [ffmpeg]
# Detect replaygain # Detect duration, silences and replaygain
ffmpeg = buster, bullseye, bionic, focal ffmpeg = buster, bullseye, bionic, focal
[silan]
silan = buster, bullseye, bionic, focal
[=development] [=development]
# Generate fixtures # Generate fixtures
ffmpeg = buster, bullseye, bionic, focal ffmpeg = buster, bullseye, bionic, focal

View File

@ -1,7 +1,15 @@
from math import inf
import distro import distro
import pytest import pytest
from libretime_analyzer.ffmpeg import compute_replaygain, probe_replaygain from libretime_analyzer.ffmpeg import (
_SILENCE_DETECT_RE,
compute_replaygain,
compute_silences,
probe_duration,
probe_replaygain,
)
from .fixtures import FILES from .fixtures import FILES
@ -28,3 +36,77 @@ def test_compute_replaygain(filepath, replaygain):
tolerance = 5 tolerance = 5
assert compute_replaygain(filepath) == pytest.approx(replaygain, abs=tolerance) assert compute_replaygain(filepath) == pytest.approx(replaygain, abs=tolerance)
# Be sure to test a matrix of integer / float, positive / negative values
SILENCE_DETECT_RE_RAW = """
[silencedetect @ 0x563121aee500] silence_start: -0.00154195
[silencedetect @ 0x563121aee500] silence_end: 0.998458 | silence_duration: 1
[silencedetect @ 0x563121aee500] silence_start: 2.99383
[silencedetect @ 0x563121aee500] silence_end: 4.99229 | silence_duration: 1.99846
[silencedetect @ 0x563121aee500] silence_start: 6.98766
[silencedetect @ 0x563121aee500] silence_end: 8.98612 | silence_duration: 1.99846
[silencedetect @ 0x563121aee500] silence_start: 12
[silencedetect @ 0x563121aee500] silence_end: 13 | silence_duration: 1
"""
SILENCE_DETECT_RE_EXPECTED = [
("start", -0.00154195),
("end", 0.998458),
("start", 2.99383),
("end", 4.99229),
("start", 6.98766),
("end", 8.98612),
("start", 12.0),
("end", 13.0),
]
@pytest.mark.parametrize(
"line,expected",
zip(
SILENCE_DETECT_RE_RAW.strip().splitlines(),
SILENCE_DETECT_RE_EXPECTED,
),
)
def test_silence_detect_re(line, expected):
match = _SILENCE_DETECT_RE.search(line)
assert match is not None
assert match.group(1) == expected[0]
assert float(match.group(2)) == expected[1]
@pytest.mark.parametrize(
"filepath,length,cuein,cueout",
map(
lambda i: pytest.param(i.path, i.length, i.cuein, i.cueout, id=i.path.name),
FILES,
),
)
def test_compute_silences(filepath, length, cuein, cueout):
result = compute_silences(filepath)
if cuein != 0.0:
assert len(result) > 0
first = result.pop(0)
assert first[0] == pytest.approx(0.0, abs=0.1)
assert first[1] == pytest.approx(cuein, abs=1)
if cueout != length:
# ffmpeg v3 (bionic) does not warn about silence end when the track ends.
# Check for infinity on last silence ending
if distro.codename() == "bionic":
length = inf
assert len(result) > 0
last = result.pop()
assert last[0] == pytest.approx(cueout, abs=1)
assert last[1] == pytest.approx(length, abs=0.1)
@pytest.mark.parametrize(
"filepath,length",
map(lambda i: pytest.param(i.path, i.length, id=i.path.name), FILES),
)
def test_probe_duration(filepath, length):
assert probe_duration(filepath) == pytest.approx(length, abs=0.05)

View File

@ -1,51 +1,22 @@
from unittest.mock import patch
import distro
import pytest import pytest
from libretime_analyzer.steps.analyze_cuepoint import analyze_cuepoint from libretime_analyzer.steps.analyze_cuepoint import analyze_cuepoint
from ..fixtures import FILE_INVALID_DRM, FILES from ..fixtures import FILES
@pytest.mark.parametrize( @pytest.mark.parametrize(
"filepath,length,cuein,cueout", "filepath,length,cuein,cueout",
map(lambda i: (str(i.path), i.length, i.cuein, i.cueout), FILES), map(
lambda i: pytest.param(
str(i.path), i.length, i.cuein, i.cueout, id=i.path.name
),
FILES,
),
) )
def test_analyze_cuepoint(filepath, length, cuein, cueout): def test_analyze_cuepoint(filepath, length, cuein, cueout):
metadata = analyze_cuepoint(filepath, dict()) metadata = analyze_cuepoint(filepath, dict())
assert metadata["length_seconds"] == pytest.approx(length, abs=0.1) assert metadata["length_seconds"] == pytest.approx(length, abs=0.1)
assert float(metadata["cuein"]) == pytest.approx(float(cuein), abs=1)
# Silan does not work with m4a files yet assert float(metadata["cueout"]) == pytest.approx(float(cueout), abs=1)
if filepath.endswith("m4a"):
return
# Silan does not work with mp3 on buster, bullseye, focal
if filepath.endswith("mp3") and distro.codename() in (
"buster",
"bullseye",
"focal",
):
return
assert float(metadata["cuein"]) == pytest.approx(cuein, abs=0.5)
assert float(metadata["cueout"]) == pytest.approx(cueout, abs=0.5)
def test_analyze_cuepoint_missing_silan():
with patch(
"libretime_analyzer.steps.analyze_cuepoint.SILAN_EXECUTABLE",
"foobar",
):
analyze_cuepoint(str(FILES[0].path), dict())
def test_analyze_cuepoint_invalid_filepath():
with pytest.raises(KeyError):
test_analyze_cuepoint("non-existent-file", None, None, None)
def test_analyze_cuepoint_invalid_wma():
with pytest.raises(KeyError):
test_analyze_cuepoint(FILE_INVALID_DRM, None, None, None)