refactor(analyzer): redefine *_analyzer into steps
- update imports and names - define step as a protocol - extract analyzer function from classes
This commit is contained in:
parent
f6a52c8324
commit
2cae31a97a
|
@ -7,7 +7,7 @@ import time
|
|||
import pika
|
||||
from loguru import logger
|
||||
|
||||
from .analyzer_pipeline import AnalyzerPipeline
|
||||
from .pipeline import Pipeline
|
||||
from .status_reporter import StatusReporter
|
||||
|
||||
EXCHANGE = "airtime-uploads"
|
||||
|
@ -265,7 +265,7 @@ class MessageListener:
|
|||
|
||||
q = queue.Queue()
|
||||
try:
|
||||
AnalyzerPipeline.run_analysis(
|
||||
Pipeline.run_analysis(
|
||||
q,
|
||||
audio_file_path,
|
||||
import_directory,
|
||||
|
@ -276,7 +276,7 @@ class MessageListener:
|
|||
metadata = q.get()
|
||||
except Exception as e:
|
||||
logger.error("Analyzer pipeline exception: %s" % str(e))
|
||||
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
|
||||
metadata["import_status"] = Pipeline.IMPORT_STATUS_FAILED
|
||||
|
||||
# Ensure our queue doesn't fill up and block due to unexpected behaviour. Defensive code.
|
||||
while not q.empty():
|
||||
|
|
|
@ -4,14 +4,14 @@ from queue import Queue
|
|||
|
||||
from loguru import logger
|
||||
|
||||
from .cuepoint_analyzer import CuePointAnalyzer
|
||||
from .filemover_analyzer import FileMoverAnalyzer
|
||||
from .metadata_analyzer import MetadataAnalyzer
|
||||
from .playability_analyzer import PlayabilityAnalyzer, UnplayableFileError
|
||||
from .replaygain_analyzer import ReplayGainAnalyzer
|
||||
from .steps.analyze_cuepoint import analyze_cuepoint
|
||||
from .steps.analyze_metadata import analyze_metadata
|
||||
from .steps.analyze_playability import UnplayableFileError, analyze_playability
|
||||
from .steps.analyze_replaygain import analyze_replaygain
|
||||
from .steps.organise_file import organise_file
|
||||
|
||||
|
||||
class AnalyzerPipeline:
|
||||
class Pipeline:
|
||||
"""Analyzes and imports an audio file into the Airtime library.
|
||||
|
||||
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
|
||||
|
@ -80,12 +80,12 @@ class AnalyzerPipeline:
|
|||
metadata = dict()
|
||||
metadata["file_prefix"] = file_prefix
|
||||
|
||||
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
|
||||
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
|
||||
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
|
||||
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
|
||||
metadata = analyze_metadata(audio_file_path, metadata)
|
||||
metadata = analyze_cuepoint(audio_file_path, metadata)
|
||||
metadata = analyze_replaygain(audio_file_path, metadata)
|
||||
metadata = analyze_playability(audio_file_path, metadata)
|
||||
|
||||
metadata = FileMoverAnalyzer.move(
|
||||
metadata = organise_file(
|
||||
audio_file_path, import_directory, original_filename, metadata
|
||||
)
|
||||
|
||||
|
@ -99,7 +99,7 @@ class AnalyzerPipeline:
|
|||
queue.put(metadata)
|
||||
except UnplayableFileError as e:
|
||||
logger.exception(e)
|
||||
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
|
||||
metadata["import_status"] = Pipeline.IMPORT_STATUS_FAILED
|
||||
metadata["reason"] = "The file could not be played."
|
||||
raise e
|
||||
except Exception as e:
|
||||
|
|
|
@ -1,99 +1,94 @@
|
|||
import datetime
|
||||
import json
|
||||
import subprocess
|
||||
from typing import Any, Dict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .analyzer import Analyzer
|
||||
SILAN_EXECUTABLE = "silan"
|
||||
|
||||
|
||||
class CuePointAnalyzer(Analyzer):
|
||||
"""This class extracts the cue-in time, cue-out time, and length of a track using silan."""
|
||||
|
||||
SILAN_EXECUTABLE = "silan"
|
||||
|
||||
@staticmethod
|
||||
def analyze(filename, metadata):
|
||||
"""Extracts the cue-in and cue-out times along and sets the file duration based on that.
|
||||
The cue points are there to skip the silence at the start and end of a track, and are determined
|
||||
using "silan", which analyzes the loudness in a track.
|
||||
:param filename: The full path to the file to analyzer
|
||||
:param metadata: A metadata dictionary where the results will be put
|
||||
:return: The metadata dictionary
|
||||
"""
|
||||
""" The silan -F 0.99 parameter tweaks the highpass filter. The default is 0.98, but at that setting,
|
||||
the unit test on the short m4a file fails. With the new setting, it gets the correct cue-in time and
|
||||
all the unit tests pass.
|
||||
"""
|
||||
command = [
|
||||
CuePointAnalyzer.SILAN_EXECUTABLE,
|
||||
"-b",
|
||||
"-F",
|
||||
"0.99",
|
||||
"-f",
|
||||
"JSON",
|
||||
"-t",
|
||||
"1.0",
|
||||
filename,
|
||||
]
|
||||
def analyze_cuepoint(filename: str, metadata: Dict[str, Any]):
|
||||
"""Extracts the cue-in and cue-out times along and sets the file duration based on that.
|
||||
The cue points are there to skip the silence at the start and end of a track, and are determined
|
||||
using "silan", which analyzes the loudness in a track.
|
||||
:param filename: The full path to the file to analyzer
|
||||
:param metadata: A metadata dictionary where the results will be put
|
||||
:return: The metadata dictionary
|
||||
"""
|
||||
""" The silan -F 0.99 parameter tweaks the highpass filter. The default is 0.98, but at that setting,
|
||||
the unit test on the short m4a file fails. With the new setting, it gets the correct cue-in time and
|
||||
all the unit tests pass.
|
||||
"""
|
||||
command = [
|
||||
SILAN_EXECUTABLE,
|
||||
"-b",
|
||||
"-F",
|
||||
"0.99",
|
||||
"-f",
|
||||
"JSON",
|
||||
"-t",
|
||||
"1.0",
|
||||
filename,
|
||||
]
|
||||
try:
|
||||
results_json = subprocess.check_output(
|
||||
command, stderr=subprocess.STDOUT, close_fds=True
|
||||
)
|
||||
try:
|
||||
results_json = subprocess.check_output(
|
||||
command, stderr=subprocess.STDOUT, close_fds=True
|
||||
)
|
||||
try:
|
||||
results_json = results_json.decode()
|
||||
except (UnicodeDecodeError, AttributeError):
|
||||
pass
|
||||
silan_results = json.loads(results_json)
|
||||
results_json = results_json.decode()
|
||||
except (UnicodeDecodeError, AttributeError):
|
||||
pass
|
||||
silan_results = json.loads(results_json)
|
||||
|
||||
# Defensive coding against Silan wildly miscalculating the cue in and out times:
|
||||
silan_length_seconds = float(silan_results["file duration"])
|
||||
silan_cuein = format(silan_results["sound"][0][0], "f")
|
||||
silan_cueout = format(silan_results["sound"][0][1], "f")
|
||||
# Defensive coding against Silan wildly miscalculating the cue in and out times:
|
||||
silan_length_seconds = float(silan_results["file duration"])
|
||||
silan_cuein = format(silan_results["sound"][0][0], "f")
|
||||
silan_cueout = format(silan_results["sound"][0][1], "f")
|
||||
|
||||
# Sanity check the results against any existing metadata passed to us (presumably extracted by Mutagen):
|
||||
if "length_seconds" in metadata:
|
||||
# Silan has a rare bug where it can massively overestimate the length or cue out time sometimes.
|
||||
if (silan_length_seconds - metadata["length_seconds"] > 3) or (
|
||||
float(silan_cueout) - metadata["length_seconds"] > 2
|
||||
):
|
||||
# Don't trust anything silan says then...
|
||||
raise Exception(
|
||||
"Silan cue out {0} or length {1} differs too much from the Mutagen length {2}. Ignoring Silan values.".format(
|
||||
silan_cueout,
|
||||
silan_length_seconds,
|
||||
metadata["length_seconds"],
|
||||
)
|
||||
# Sanity check the results against any existing metadata passed to us (presumably extracted by Mutagen):
|
||||
if "length_seconds" in metadata:
|
||||
# Silan has a rare bug where it can massively overestimate the length or cue out time sometimes.
|
||||
if (silan_length_seconds - metadata["length_seconds"] > 3) or (
|
||||
float(silan_cueout) - metadata["length_seconds"] > 2
|
||||
):
|
||||
# Don't trust anything silan says then...
|
||||
raise Exception(
|
||||
"Silan cue out {0} or length {1} differs too much from the Mutagen length {2}. Ignoring Silan values.".format(
|
||||
silan_cueout,
|
||||
silan_length_seconds,
|
||||
metadata["length_seconds"],
|
||||
)
|
||||
# Don't allow silan to trim more than the greater of 3 seconds or 5% off the start of a track
|
||||
if float(silan_cuein) > max(silan_length_seconds * 0.05, 3):
|
||||
raise Exception(
|
||||
"Silan cue in time {0} too big, ignoring.".format(silan_cuein)
|
||||
)
|
||||
else:
|
||||
# Only use the Silan track length in the worst case, where Mutagen didn't give us one for some reason.
|
||||
# (This is mostly to make the unit tests still pass.)
|
||||
# Convert the length into a formatted time string.
|
||||
metadata["length_seconds"] = silan_length_seconds #
|
||||
track_length = datetime.timedelta(seconds=metadata["length_seconds"])
|
||||
metadata["length"] = str(track_length)
|
||||
)
|
||||
# Don't allow silan to trim more than the greater of 3 seconds or 5% off the start of a track
|
||||
if float(silan_cuein) > max(silan_length_seconds * 0.05, 3):
|
||||
raise Exception(
|
||||
"Silan cue in time {0} too big, ignoring.".format(silan_cuein)
|
||||
)
|
||||
else:
|
||||
# Only use the Silan track length in the worst case, where Mutagen didn't give us one for some reason.
|
||||
# (This is mostly to make the unit tests still pass.)
|
||||
# Convert the length into a formatted time string.
|
||||
metadata["length_seconds"] = silan_length_seconds #
|
||||
track_length = datetime.timedelta(seconds=metadata["length_seconds"])
|
||||
metadata["length"] = str(track_length)
|
||||
|
||||
""" XXX: I've commented out the track_length stuff below because Mutagen seems more accurate than silan
|
||||
as of Mutagen version 1.31. We are always going to use Mutagen's length now because Silan's
|
||||
length can be off by a few seconds reasonably often.
|
||||
"""
|
||||
""" XXX: I've commented out the track_length stuff below because Mutagen seems more accurate than silan
|
||||
as of Mutagen version 1.31. We are always going to use Mutagen's length now because Silan's
|
||||
length can be off by a few seconds reasonably often.
|
||||
"""
|
||||
|
||||
metadata["cuein"] = silan_cuein
|
||||
metadata["cueout"] = silan_cueout
|
||||
metadata["cuein"] = silan_cuein
|
||||
metadata["cueout"] = silan_cueout
|
||||
|
||||
except OSError as e: # silan was not found
|
||||
logger.warning(
|
||||
"Failed to run: %s - %s. %s"
|
||||
% (command[0], e.strerror, "Do you have silan installed?")
|
||||
)
|
||||
except subprocess.CalledProcessError as e: # silan returned an error code
|
||||
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
except OSError as e: # silan was not found
|
||||
logger.warning(
|
||||
"Failed to run: %s - %s. %s"
|
||||
% (command[0], e.strerror, "Do you have silan installed?")
|
||||
)
|
||||
except subprocess.CalledProcessError as e: # silan returned an error code
|
||||
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
|
||||
return metadata
|
||||
return metadata
|
||||
|
|
|
@ -2,193 +2,190 @@ import datetime
|
|||
import hashlib
|
||||
import os
|
||||
import wave
|
||||
from typing import Any, Dict
|
||||
|
||||
import magic
|
||||
import mutagen
|
||||
from loguru import logger
|
||||
|
||||
from .analyzer import Analyzer
|
||||
|
||||
def analyze_metadata(filename: str, metadata: Dict[str, Any]):
|
||||
"""Extract audio metadata from tags embedded in the file (eg. ID3 tags)
|
||||
|
||||
class MetadataAnalyzer(Analyzer):
|
||||
@staticmethod
|
||||
def analyze(filename, metadata):
|
||||
"""Extract audio metadata from tags embedded in the file (eg. ID3 tags)
|
||||
Keyword arguments:
|
||||
filename: The path to the audio file to extract metadata from.
|
||||
metadata: A dictionary that the extracted metadata will be added to.
|
||||
"""
|
||||
if not isinstance(filename, str):
|
||||
raise TypeError(
|
||||
"filename must be string. Was of type " + type(filename).__name__
|
||||
)
|
||||
if not isinstance(metadata, dict):
|
||||
raise TypeError(
|
||||
"metadata must be a dict. Was of type " + type(metadata).__name__
|
||||
)
|
||||
if not os.path.exists(filename):
|
||||
raise FileNotFoundError("audio file not found: {}".format(filename))
|
||||
|
||||
Keyword arguments:
|
||||
filename: The path to the audio file to extract metadata from.
|
||||
metadata: A dictionary that the extracted metadata will be added to.
|
||||
"""
|
||||
if not isinstance(filename, str):
|
||||
raise TypeError(
|
||||
"filename must be string. Was of type " + type(filename).__name__
|
||||
)
|
||||
if not isinstance(metadata, dict):
|
||||
raise TypeError(
|
||||
"metadata must be a dict. Was of type " + type(metadata).__name__
|
||||
)
|
||||
if not os.path.exists(filename):
|
||||
raise FileNotFoundError("audio file not found: {}".format(filename))
|
||||
# Airtime <= 2.5.x nonsense:
|
||||
metadata["ftype"] = "audioclip"
|
||||
# Other fields we'll want to set for Airtime:
|
||||
metadata["hidden"] = False
|
||||
|
||||
# Airtime <= 2.5.x nonsense:
|
||||
metadata["ftype"] = "audioclip"
|
||||
# Other fields we'll want to set for Airtime:
|
||||
metadata["hidden"] = False
|
||||
# Get file size and md5 hash of the file
|
||||
metadata["filesize"] = os.path.getsize(filename)
|
||||
|
||||
# Get file size and md5 hash of the file
|
||||
metadata["filesize"] = os.path.getsize(filename)
|
||||
with open(filename, "rb") as fh:
|
||||
m = hashlib.md5()
|
||||
while True:
|
||||
data = fh.read(8192)
|
||||
if not data:
|
||||
break
|
||||
m.update(data)
|
||||
metadata["md5"] = m.hexdigest()
|
||||
|
||||
with open(filename, "rb") as fh:
|
||||
m = hashlib.md5()
|
||||
while True:
|
||||
data = fh.read(8192)
|
||||
if not data:
|
||||
break
|
||||
m.update(data)
|
||||
metadata["md5"] = m.hexdigest()
|
||||
# Mutagen doesn't handle WAVE files so we use a different package
|
||||
ms = magic.open(magic.MIME_TYPE)
|
||||
ms.load()
|
||||
with open(filename, "rb") as fh:
|
||||
mime_check = ms.buffer(fh.read(2014))
|
||||
metadata["mime"] = mime_check
|
||||
if mime_check == "audio/x-wav":
|
||||
return _analyze_wave(filename, metadata)
|
||||
|
||||
# Mutagen doesn't handle WAVE files so we use a different package
|
||||
ms = magic.open(magic.MIME_TYPE)
|
||||
ms.load()
|
||||
with open(filename, "rb") as fh:
|
||||
mime_check = ms.buffer(fh.read(2014))
|
||||
metadata["mime"] = mime_check
|
||||
if mime_check == "audio/x-wav":
|
||||
return MetadataAnalyzer._analyze_wave(filename, metadata)
|
||||
# Extract metadata from an audio file using mutagen
|
||||
audio_file = mutagen.File(filename, easy=True)
|
||||
|
||||
# Extract metadata from an audio file using mutagen
|
||||
audio_file = mutagen.File(filename, easy=True)
|
||||
# Bail if the file couldn't be parsed. The title should stay as the filename
|
||||
# inside Airtime.
|
||||
if (
|
||||
audio_file == None
|
||||
): # Don't use "if not" here. It is wrong due to mutagen's design.
|
||||
return metadata
|
||||
# Note that audio_file can equal {} if the file is valid but there's no metadata tags.
|
||||
# We can still try to grab the info variables below.
|
||||
|
||||
# Bail if the file couldn't be parsed. The title should stay as the filename
|
||||
# inside Airtime.
|
||||
if (
|
||||
audio_file == None
|
||||
): # Don't use "if not" here. It is wrong due to mutagen's design.
|
||||
return metadata
|
||||
# Note that audio_file can equal {} if the file is valid but there's no metadata tags.
|
||||
# We can still try to grab the info variables below.
|
||||
# Grab other file information that isn't encoded in a tag, but instead usually
|
||||
# in the file header. Mutagen breaks that out into a separate "info" object:
|
||||
info = audio_file.info
|
||||
if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent
|
||||
metadata["sample_rate"] = info.sample_rate
|
||||
if hasattr(info, "length"):
|
||||
metadata["length_seconds"] = info.length
|
||||
# Converting the length in seconds (float) to a formatted time string
|
||||
track_length = datetime.timedelta(seconds=info.length)
|
||||
metadata["length"] = str(
|
||||
track_length
|
||||
) # time.strftime("%H:%M:%S.%f", track_length)
|
||||
# Other fields for Airtime
|
||||
metadata["cueout"] = metadata["length"]
|
||||
|
||||
# Grab other file information that isn't encoded in a tag, but instead usually
|
||||
# in the file header. Mutagen breaks that out into a separate "info" object:
|
||||
info = audio_file.info
|
||||
if hasattr(info, "sample_rate"): # Mutagen is annoying and inconsistent
|
||||
metadata["sample_rate"] = info.sample_rate
|
||||
if hasattr(info, "length"):
|
||||
metadata["length_seconds"] = info.length
|
||||
# Converting the length in seconds (float) to a formatted time string
|
||||
track_length = datetime.timedelta(seconds=info.length)
|
||||
metadata["length"] = str(
|
||||
track_length
|
||||
) # time.strftime("%H:%M:%S.%f", track_length)
|
||||
# Other fields for Airtime
|
||||
metadata["cueout"] = metadata["length"]
|
||||
# Set a default cue in time in seconds
|
||||
metadata["cuein"] = 0.0
|
||||
|
||||
# Set a default cue in time in seconds
|
||||
metadata["cuein"] = 0.0
|
||||
if hasattr(info, "bitrate"):
|
||||
metadata["bit_rate"] = info.bitrate
|
||||
|
||||
if hasattr(info, "bitrate"):
|
||||
metadata["bit_rate"] = info.bitrate
|
||||
# Use the mutagen to get the MIME type, if it has one. This is more reliable and
|
||||
# consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic.
|
||||
if audio_file.mime:
|
||||
metadata["mime"] = audio_file.mime[0]
|
||||
|
||||
# Use the mutagen to get the MIME type, if it has one. This is more reliable and
|
||||
# consistent for certain types of MP3s or MPEG files than the MIMEs returned by magic.
|
||||
if audio_file.mime:
|
||||
metadata["mime"] = audio_file.mime[0]
|
||||
|
||||
# Try to get the number of channels if mutagen can...
|
||||
try:
|
||||
# Special handling for getting the # of channels from MP3s. It's in the "mode" field
|
||||
# which is 0=Stereo, 1=Joint Stereo, 2=Dual Channel, 3=Mono. Part of the ID3 spec...
|
||||
if metadata["mime"] in ["audio/mpeg", "audio/mp3"]:
|
||||
if info.mode == 3:
|
||||
metadata["channels"] = 1
|
||||
else:
|
||||
metadata["channels"] = 2
|
||||
# Try to get the number of channels if mutagen can...
|
||||
try:
|
||||
# Special handling for getting the # of channels from MP3s. It's in the "mode" field
|
||||
# which is 0=Stereo, 1=Joint Stereo, 2=Dual Channel, 3=Mono. Part of the ID3 spec...
|
||||
if metadata["mime"] in ["audio/mpeg", "audio/mp3"]:
|
||||
if info.mode == 3:
|
||||
metadata["channels"] = 1
|
||||
else:
|
||||
metadata["channels"] = info.channels
|
||||
except (AttributeError, KeyError):
|
||||
# If mutagen can't figure out the number of channels, we'll just leave it out...
|
||||
pass
|
||||
metadata["channels"] = 2
|
||||
else:
|
||||
metadata["channels"] = info.channels
|
||||
except (AttributeError, KeyError):
|
||||
# If mutagen can't figure out the number of channels, we'll just leave it out...
|
||||
pass
|
||||
|
||||
# Try to extract the number of tracks on the album if we can (the "track total")
|
||||
# Try to extract the number of tracks on the album if we can (the "track total")
|
||||
try:
|
||||
track_number = audio_file["tracknumber"]
|
||||
if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh
|
||||
track_number = track_number[0]
|
||||
track_number_tokens = track_number
|
||||
if "/" in track_number:
|
||||
track_number_tokens = track_number.split("/")
|
||||
track_number = track_number_tokens[0]
|
||||
elif "-" in track_number:
|
||||
track_number_tokens = track_number.split("-")
|
||||
track_number = track_number_tokens[0]
|
||||
metadata["track_number"] = track_number
|
||||
track_total = track_number_tokens[1]
|
||||
metadata["track_total"] = track_total
|
||||
except (AttributeError, KeyError, IndexError):
|
||||
# If we couldn't figure out the track_number or track_total, just ignore it...
|
||||
pass
|
||||
|
||||
# We normalize the mutagen tags slightly here, so in case mutagen changes,
|
||||
# we find the
|
||||
mutagen_to_airtime_mapping = {
|
||||
"title": "track_title",
|
||||
"artist": "artist_name",
|
||||
"album": "album_title",
|
||||
"bpm": "bpm",
|
||||
"composer": "composer",
|
||||
"conductor": "conductor",
|
||||
"copyright": "copyright",
|
||||
"comment": "comment",
|
||||
"encoded_by": "encoder",
|
||||
"genre": "genre",
|
||||
"isrc": "isrc",
|
||||
"label": "label",
|
||||
"organization": "label",
|
||||
#'length': 'length',
|
||||
"language": "language",
|
||||
"last_modified": "last_modified",
|
||||
"mood": "mood",
|
||||
"bit_rate": "bit_rate",
|
||||
"replay_gain": "replaygain",
|
||||
#'tracknumber': 'track_number',
|
||||
#'track_total': 'track_total',
|
||||
"website": "website",
|
||||
"date": "year",
|
||||
#'mime_type': 'mime',
|
||||
}
|
||||
|
||||
for mutagen_tag, airtime_tag in mutagen_to_airtime_mapping.items():
|
||||
try:
|
||||
track_number = audio_file["tracknumber"]
|
||||
if isinstance(track_number, list): # Sometimes tracknumber is a list, ugh
|
||||
track_number = track_number[0]
|
||||
track_number_tokens = track_number
|
||||
if "/" in track_number:
|
||||
track_number_tokens = track_number.split("/")
|
||||
track_number = track_number_tokens[0]
|
||||
elif "-" in track_number:
|
||||
track_number_tokens = track_number.split("-")
|
||||
track_number = track_number_tokens[0]
|
||||
metadata["track_number"] = track_number
|
||||
track_total = track_number_tokens[1]
|
||||
metadata["track_total"] = track_total
|
||||
except (AttributeError, KeyError, IndexError):
|
||||
# If we couldn't figure out the track_number or track_total, just ignore it...
|
||||
pass
|
||||
metadata[airtime_tag] = audio_file[mutagen_tag]
|
||||
|
||||
# We normalize the mutagen tags slightly here, so in case mutagen changes,
|
||||
# we find the
|
||||
mutagen_to_airtime_mapping = {
|
||||
"title": "track_title",
|
||||
"artist": "artist_name",
|
||||
"album": "album_title",
|
||||
"bpm": "bpm",
|
||||
"composer": "composer",
|
||||
"conductor": "conductor",
|
||||
"copyright": "copyright",
|
||||
"comment": "comment",
|
||||
"encoded_by": "encoder",
|
||||
"genre": "genre",
|
||||
"isrc": "isrc",
|
||||
"label": "label",
|
||||
"organization": "label",
|
||||
#'length': 'length',
|
||||
"language": "language",
|
||||
"last_modified": "last_modified",
|
||||
"mood": "mood",
|
||||
"bit_rate": "bit_rate",
|
||||
"replay_gain": "replaygain",
|
||||
#'tracknumber': 'track_number',
|
||||
#'track_total': 'track_total',
|
||||
"website": "website",
|
||||
"date": "year",
|
||||
#'mime_type': 'mime',
|
||||
}
|
||||
# Some tags are returned as lists because there could be multiple values.
|
||||
# This is unusual so we're going to always just take the first item in the list.
|
||||
if isinstance(metadata[airtime_tag], list):
|
||||
if metadata[airtime_tag]:
|
||||
metadata[airtime_tag] = metadata[airtime_tag][0]
|
||||
else: # Handle empty lists
|
||||
metadata[airtime_tag] = ""
|
||||
|
||||
for mutagen_tag, airtime_tag in mutagen_to_airtime_mapping.items():
|
||||
try:
|
||||
metadata[airtime_tag] = audio_file[mutagen_tag]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
# Some tags are returned as lists because there could be multiple values.
|
||||
# This is unusual so we're going to always just take the first item in the list.
|
||||
if isinstance(metadata[airtime_tag], list):
|
||||
if metadata[airtime_tag]:
|
||||
metadata[airtime_tag] = metadata[airtime_tag][0]
|
||||
else: # Handle empty lists
|
||||
metadata[airtime_tag] = ""
|
||||
return metadata
|
||||
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return metadata
|
||||
|
||||
@staticmethod
|
||||
def _analyze_wave(filename, metadata):
|
||||
try:
|
||||
reader = wave.open(filename, "rb")
|
||||
metadata["channels"] = reader.getnchannels()
|
||||
metadata["sample_rate"] = reader.getframerate()
|
||||
length_seconds = float(reader.getnframes()) / float(metadata["sample_rate"])
|
||||
# Converting the length in seconds (float) to a formatted time string
|
||||
track_length = datetime.timedelta(seconds=length_seconds)
|
||||
metadata["length"] = str(
|
||||
track_length
|
||||
) # time.strftime("%H:%M:%S.%f", track_length)
|
||||
metadata["length_seconds"] = length_seconds
|
||||
metadata["cueout"] = metadata["length"]
|
||||
except wave.Error as ex:
|
||||
logger.error("Invalid WAVE file: {}".format(str(ex)))
|
||||
raise
|
||||
return metadata
|
||||
def _analyze_wave(filename, metadata):
|
||||
try:
|
||||
reader = wave.open(filename, "rb")
|
||||
metadata["channels"] = reader.getnchannels()
|
||||
metadata["sample_rate"] = reader.getframerate()
|
||||
length_seconds = float(reader.getnframes()) / float(metadata["sample_rate"])
|
||||
# Converting the length in seconds (float) to a formatted time string
|
||||
track_length = datetime.timedelta(seconds=length_seconds)
|
||||
metadata["length"] = str(
|
||||
track_length
|
||||
) # time.strftime("%H:%M:%S.%f", track_length)
|
||||
metadata["length_seconds"] = length_seconds
|
||||
metadata["cueout"] = metadata["length"]
|
||||
except wave.Error as ex:
|
||||
logger.error("Invalid WAVE file: {}".format(str(ex)))
|
||||
raise
|
||||
return metadata
|
||||
|
|
|
@ -1,49 +1,45 @@
|
|||
__author__ = "asantoni"
|
||||
|
||||
import subprocess
|
||||
from typing import Any, Dict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .analyzer import Analyzer
|
||||
|
||||
|
||||
class UnplayableFileError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PlayabilityAnalyzer(Analyzer):
|
||||
"""This class checks if a file can actually be played with Liquidsoap."""
|
||||
LIQUIDSOAP_EXECUTABLE = "liquidsoap"
|
||||
|
||||
LIQUIDSOAP_EXECUTABLE = "liquidsoap"
|
||||
|
||||
@staticmethod
|
||||
def analyze(filename, metadata):
|
||||
"""Checks if a file can be played by Liquidsoap.
|
||||
:param filename: The full path to the file to analyzer
|
||||
:param metadata: A metadata dictionary where the results will be put
|
||||
:return: The metadata dictionary
|
||||
"""
|
||||
command = [
|
||||
PlayabilityAnalyzer.LIQUIDSOAP_EXECUTABLE,
|
||||
"-v",
|
||||
"-c",
|
||||
"output.dummy(audio_to_stereo(single(argv(1))))",
|
||||
"--",
|
||||
filename,
|
||||
]
|
||||
try:
|
||||
subprocess.check_output(command, stderr=subprocess.STDOUT, close_fds=True)
|
||||
def analyze_playability(filename: str, metadata: Dict[str, Any]):
|
||||
"""Checks if a file can be played by Liquidsoap.
|
||||
:param filename: The full path to the file to analyzer
|
||||
:param metadata: A metadata dictionary where the results will be put
|
||||
:return: The metadata dictionary
|
||||
"""
|
||||
command = [
|
||||
LIQUIDSOAP_EXECUTABLE,
|
||||
"-v",
|
||||
"-c",
|
||||
"output.dummy(audio_to_stereo(single(argv(1))))",
|
||||
"--",
|
||||
filename,
|
||||
]
|
||||
try:
|
||||
subprocess.check_output(command, stderr=subprocess.STDOUT, close_fds=True)
|
||||
|
||||
except OSError as e: # liquidsoap was not found
|
||||
logger.warning(
|
||||
"Failed to run: %s - %s. %s"
|
||||
% (command[0], e.strerror, "Do you have liquidsoap installed?")
|
||||
)
|
||||
except (
|
||||
subprocess.CalledProcessError,
|
||||
Exception,
|
||||
) as e: # liquidsoap returned an error code
|
||||
logger.warning(e)
|
||||
raise UnplayableFileError()
|
||||
except OSError as e: # liquidsoap was not found
|
||||
logger.warning(
|
||||
"Failed to run: %s - %s. %s"
|
||||
% (command[0], e.strerror, "Do you have liquidsoap installed?")
|
||||
)
|
||||
except (
|
||||
subprocess.CalledProcessError,
|
||||
Exception,
|
||||
) as e: # liquidsoap returned an error code
|
||||
logger.warning(e)
|
||||
raise UnplayableFileError()
|
||||
|
||||
return metadata
|
||||
return metadata
|
||||
|
|
|
@ -1,47 +1,42 @@
|
|||
import re
|
||||
import subprocess
|
||||
from typing import Any, Dict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .analyzer import Analyzer
|
||||
REPLAYGAIN_EXECUTABLE = "replaygain" # From the rgain3 python package
|
||||
|
||||
|
||||
class ReplayGainAnalyzer(Analyzer):
|
||||
"""This class extracts the ReplayGain using a tool from the python-rgain package."""
|
||||
def analyze_replaygain(filename: str, metadata: Dict[str, Any]):
|
||||
"""Extracts the Replaygain loudness normalization factor of a track.
|
||||
:param filename: The full path to the file to analyzer
|
||||
:param metadata: A metadata dictionary where the results will be put
|
||||
:return: The metadata dictionary
|
||||
"""
|
||||
""" The -d flag means do a dry-run, ie. don't modify the file directly.
|
||||
"""
|
||||
command = [REPLAYGAIN_EXECUTABLE, "-d", filename]
|
||||
try:
|
||||
results = subprocess.check_output(
|
||||
command,
|
||||
stderr=subprocess.STDOUT,
|
||||
close_fds=True,
|
||||
universal_newlines=True,
|
||||
)
|
||||
gain_match = (
|
||||
r"Calculating Replay Gain information \.\.\.(?:\n|.)*?:([\d.-]*) dB"
|
||||
)
|
||||
replaygain = re.search(gain_match, results).group(1)
|
||||
metadata["replay_gain"] = float(replaygain)
|
||||
|
||||
REPLAYGAIN_EXECUTABLE = "replaygain" # From the rgain3 python package
|
||||
except OSError as e: # replaygain was not found
|
||||
logger.warning(
|
||||
"Failed to run: %s - %s. %s"
|
||||
% (command[0], e.strerror, "Do you have python-rgain installed?")
|
||||
)
|
||||
except subprocess.CalledProcessError as e: # replaygain returned an error code
|
||||
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
|
||||
@staticmethod
|
||||
def analyze(filename, metadata):
|
||||
"""Extracts the Replaygain loudness normalization factor of a track.
|
||||
:param filename: The full path to the file to analyzer
|
||||
:param metadata: A metadata dictionary where the results will be put
|
||||
:return: The metadata dictionary
|
||||
"""
|
||||
""" The -d flag means do a dry-run, ie. don't modify the file directly.
|
||||
"""
|
||||
command = [ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE, "-d", filename]
|
||||
try:
|
||||
results = subprocess.check_output(
|
||||
command,
|
||||
stderr=subprocess.STDOUT,
|
||||
close_fds=True,
|
||||
universal_newlines=True,
|
||||
)
|
||||
gain_match = (
|
||||
r"Calculating Replay Gain information \.\.\.(?:\n|.)*?:([\d.-]*) dB"
|
||||
)
|
||||
replaygain = re.search(gain_match, results).group(1)
|
||||
metadata["replay_gain"] = float(replaygain)
|
||||
|
||||
except OSError as e: # replaygain was not found
|
||||
logger.warning(
|
||||
"Failed to run: %s - %s. %s"
|
||||
% (command[0], e.strerror, "Do you have python-rgain installed?")
|
||||
)
|
||||
except subprocess.CalledProcessError as e: # replaygain returned an error code
|
||||
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
|
||||
except Exception as e:
|
||||
logger.warning(e)
|
||||
|
||||
return metadata
|
||||
return metadata
|
||||
|
|
|
@ -6,11 +6,11 @@ import uuid
|
|||
|
||||
from loguru import logger
|
||||
|
||||
from .analyzer import Analyzer
|
||||
|
||||
def organise_file(audio_file_path, import_directory, original_filename, metadata):
|
||||
"""Move the file at audio_file_path over into the import_directory/import,
|
||||
renaming it to original_filename.
|
||||
|
||||
class FileMoverAnalyzer(Analyzer):
|
||||
"""
|
||||
This analyzer copies a file over from a temporary directory (stor/organize)
|
||||
into the Airtime library (stor/imported).
|
||||
|
||||
|
@ -18,105 +18,92 @@ class FileMoverAnalyzer(Analyzer):
|
|||
- The filename is of the first file preserved.
|
||||
- The filename of the second file has the timestamp attached to it.
|
||||
- The filename of the third file has a UUID placed after the timestamp, but ONLY IF it's imported within 1 second of the second file (ie. if the timestamp is the same).
|
||||
|
||||
Keyword arguments:
|
||||
audio_file_path: Path to the file to be imported.
|
||||
import_directory: Path to the "import" directory inside the Airtime stor directory.
|
||||
(eg. /srv/airtime/stor/import)
|
||||
original_filename: The filename of the file when it was uploaded to Airtime.
|
||||
metadata: A dictionary where the "full_path" of where the file is moved to will be added.
|
||||
"""
|
||||
if not isinstance(audio_file_path, str):
|
||||
raise TypeError(
|
||||
"audio_file_path must be string. Was of type "
|
||||
+ type(audio_file_path).__name__
|
||||
)
|
||||
if not isinstance(import_directory, str):
|
||||
raise TypeError(
|
||||
"import_directory must be string. Was of type "
|
||||
+ type(import_directory).__name__
|
||||
)
|
||||
if not isinstance(original_filename, str):
|
||||
raise TypeError(
|
||||
"original_filename must be string. Was of type "
|
||||
+ type(original_filename).__name__
|
||||
)
|
||||
if not isinstance(metadata, dict):
|
||||
raise TypeError(
|
||||
"metadata must be a dict. Was of type " + type(metadata).__name__
|
||||
)
|
||||
if not os.path.exists(audio_file_path):
|
||||
raise FileNotFoundError("audio file not found: {}".format(audio_file_path))
|
||||
|
||||
@staticmethod
|
||||
def analyze(audio_file_path, metadata):
|
||||
"""Dummy method because we need more info than analyze gets passed to it"""
|
||||
raise Exception("Use FileMoverAnalyzer.move() instead.")
|
||||
# Import the file over to it's final location.
|
||||
# TODO: Also, handle the case where the move fails and write some code
|
||||
# to possibly move the file to problem_files.
|
||||
|
||||
@staticmethod
|
||||
def move(audio_file_path, import_directory, original_filename, metadata):
|
||||
"""Move the file at audio_file_path over into the import_directory/import,
|
||||
renaming it to original_filename.
|
||||
|
||||
Keyword arguments:
|
||||
audio_file_path: Path to the file to be imported.
|
||||
import_directory: Path to the "import" directory inside the Airtime stor directory.
|
||||
(eg. /srv/airtime/stor/import)
|
||||
original_filename: The filename of the file when it was uploaded to Airtime.
|
||||
metadata: A dictionary where the "full_path" of where the file is moved to will be added.
|
||||
"""
|
||||
if not isinstance(audio_file_path, str):
|
||||
raise TypeError(
|
||||
"audio_file_path must be string. Was of type "
|
||||
+ type(audio_file_path).__name__
|
||||
)
|
||||
if not isinstance(import_directory, str):
|
||||
raise TypeError(
|
||||
"import_directory must be string. Was of type "
|
||||
+ type(import_directory).__name__
|
||||
)
|
||||
if not isinstance(original_filename, str):
|
||||
raise TypeError(
|
||||
"original_filename must be string. Was of type "
|
||||
+ type(original_filename).__name__
|
||||
)
|
||||
if not isinstance(metadata, dict):
|
||||
raise TypeError(
|
||||
"metadata must be a dict. Was of type " + type(metadata).__name__
|
||||
)
|
||||
if not os.path.exists(audio_file_path):
|
||||
raise FileNotFoundError("audio file not found: {}".format(audio_file_path))
|
||||
|
||||
# Import the file over to it's final location.
|
||||
# TODO: Also, handle the case where the move fails and write some code
|
||||
# to possibly move the file to problem_files.
|
||||
|
||||
max_dir_len = 48
|
||||
max_file_len = 48
|
||||
final_file_path = import_directory
|
||||
orig_file_basename, orig_file_extension = os.path.splitext(original_filename)
|
||||
if "artist_name" in metadata:
|
||||
final_file_path += (
|
||||
"/" + metadata["artist_name"][0:max_dir_len]
|
||||
) # truncating with array slicing
|
||||
if "album_title" in metadata:
|
||||
final_file_path += "/" + metadata["album_title"][0:max_dir_len]
|
||||
# Note that orig_file_extension includes the "." already
|
||||
max_dir_len = 48
|
||||
max_file_len = 48
|
||||
final_file_path = import_directory
|
||||
orig_file_basename, orig_file_extension = os.path.splitext(original_filename)
|
||||
if "artist_name" in metadata:
|
||||
final_file_path += (
|
||||
"/" + orig_file_basename[0:max_file_len] + orig_file_extension
|
||||
"/" + metadata["artist_name"][0:max_dir_len]
|
||||
) # truncating with array slicing
|
||||
if "album_title" in metadata:
|
||||
final_file_path += "/" + metadata["album_title"][0:max_dir_len]
|
||||
# Note that orig_file_extension includes the "." already
|
||||
final_file_path += "/" + orig_file_basename[0:max_file_len] + orig_file_extension
|
||||
|
||||
# Ensure any redundant slashes are stripped
|
||||
final_file_path = os.path.normpath(final_file_path)
|
||||
|
||||
# If a file with the same name already exists in the "import" directory, then
|
||||
# we add a unique string to the end of this one. We never overwrite a file on import
|
||||
# because if we did that, it would mean Airtime's database would have
|
||||
# the wrong information for the file we just overwrote (eg. the song length would be wrong!)
|
||||
# If the final file path is the same as the file we've been told to import (which
|
||||
# you often do when you're debugging), then don't move the file at all.
|
||||
|
||||
if os.path.exists(final_file_path):
|
||||
if os.path.samefile(audio_file_path, final_file_path):
|
||||
metadata["full_path"] = final_file_path
|
||||
return metadata
|
||||
base_file_path, file_extension = os.path.splitext(final_file_path)
|
||||
final_file_path = "%s_%s%s" % (
|
||||
base_file_path,
|
||||
time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()),
|
||||
file_extension,
|
||||
)
|
||||
|
||||
# Ensure any redundant slashes are stripped
|
||||
final_file_path = os.path.normpath(final_file_path)
|
||||
# If THAT path exists, append a UUID instead:
|
||||
while os.path.exists(final_file_path):
|
||||
base_file_path, file_extension = os.path.splitext(final_file_path)
|
||||
final_file_path = "%s_%s%s" % (
|
||||
base_file_path,
|
||||
str(uuid.uuid4()),
|
||||
file_extension,
|
||||
)
|
||||
|
||||
# If a file with the same name already exists in the "import" directory, then
|
||||
# we add a unique string to the end of this one. We never overwrite a file on import
|
||||
# because if we did that, it would mean Airtime's database would have
|
||||
# the wrong information for the file we just overwrote (eg. the song length would be wrong!)
|
||||
# If the final file path is the same as the file we've been told to import (which
|
||||
# you often do when you're debugging), then don't move the file at all.
|
||||
# Ensure the full path to the file exists
|
||||
mkdir_p(os.path.dirname(final_file_path))
|
||||
|
||||
if os.path.exists(final_file_path):
|
||||
if os.path.samefile(audio_file_path, final_file_path):
|
||||
metadata["full_path"] = final_file_path
|
||||
return metadata
|
||||
base_file_path, file_extension = os.path.splitext(final_file_path)
|
||||
final_file_path = "%s_%s%s" % (
|
||||
base_file_path,
|
||||
time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()),
|
||||
file_extension,
|
||||
)
|
||||
# Move the file into its final destination directory
|
||||
logger.debug("Moving %s to %s" % (audio_file_path, final_file_path))
|
||||
shutil.move(audio_file_path, final_file_path)
|
||||
|
||||
# If THAT path exists, append a UUID instead:
|
||||
while os.path.exists(final_file_path):
|
||||
base_file_path, file_extension = os.path.splitext(final_file_path)
|
||||
final_file_path = "%s_%s%s" % (
|
||||
base_file_path,
|
||||
str(uuid.uuid4()),
|
||||
file_extension,
|
||||
)
|
||||
|
||||
# Ensure the full path to the file exists
|
||||
mkdir_p(os.path.dirname(final_file_path))
|
||||
|
||||
# Move the file into its final destination directory
|
||||
logger.debug("Moving %s to %s" % (audio_file_path, final_file_path))
|
||||
shutil.move(audio_file_path, final_file_path)
|
||||
|
||||
metadata["full_path"] = final_file_path
|
||||
return metadata
|
||||
metadata["full_path"] = final_file_path
|
||||
return metadata
|
||||
|
||||
|
||||
def mkdir_p(path):
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
# TODO: use an abstract base class (ie. import from abc ...) once we have python >=3.3 that supports @staticmethod with @abstractmethod
|
||||
from typing import Any, Dict, Protocol
|
||||
|
||||
|
||||
class Analyzer:
|
||||
"""Abstract base class for all "analyzers"."""
|
||||
|
||||
class Step(Protocol):
|
||||
@staticmethod
|
||||
def analyze(filename, metadata):
|
||||
raise NotImplementedError
|
||||
def __call__(filename: str, metadata: Dict[str, Any]):
|
||||
...
|
||||
|
|
|
@ -5,14 +5,14 @@ from queue import Queue
|
|||
|
||||
import pytest
|
||||
|
||||
from libretime_analyzer.analyzer_pipeline import AnalyzerPipeline
|
||||
from libretime_analyzer.pipeline import Pipeline
|
||||
|
||||
from .conftest import AUDIO_FILENAME, AUDIO_IMPORT_DEST
|
||||
|
||||
|
||||
def test_run_analysis(src_dir, dest_dir):
|
||||
queue = Queue()
|
||||
AnalyzerPipeline.run_analysis(
|
||||
Pipeline.run_analysis(
|
||||
queue,
|
||||
os.path.join(src_dir, AUDIO_FILENAME),
|
||||
dest_dir,
|
||||
|
@ -46,4 +46,4 @@ def test_run_analysis(src_dir, dest_dir):
|
|||
)
|
||||
def test_run_analysis_wrong_params(params, exception):
|
||||
with pytest.raises(exception):
|
||||
AnalyzerPipeline.run_analysis(*params)
|
||||
Pipeline.run_analysis(*params)
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
import distro
|
||||
import pytest
|
||||
|
||||
from libretime_analyzer.cuepoint_analyzer import CuePointAnalyzer
|
||||
from libretime_analyzer.steps.analyze_cuepoint import analyze_cuepoint
|
||||
|
||||
from .fixtures import FILE_INVALID_DRM, FILES, Fixture
|
||||
from ..fixtures import FILE_INVALID_DRM, FILES, Fixture
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -11,7 +13,7 @@ from .fixtures import FILE_INVALID_DRM, FILES, Fixture
|
|||
map(lambda i: (str(i.path), i.length, i.cuein, i.cueout), FILES),
|
||||
)
|
||||
def test_analyze(filepath, length, cuein, cueout):
|
||||
metadata = CuePointAnalyzer.analyze(filepath, dict())
|
||||
metadata = analyze_cuepoint(filepath, dict())
|
||||
|
||||
assert metadata["length_seconds"] == pytest.approx(length, abs=0.1)
|
||||
|
||||
|
@ -32,10 +34,11 @@ def test_analyze(filepath, length, cuein, cueout):
|
|||
|
||||
|
||||
def test_analyze_missing_silan():
|
||||
old = CuePointAnalyzer.SILAN_EXECUTABLE
|
||||
CuePointAnalyzer.SILAN_EXECUTABLE = "foobar"
|
||||
CuePointAnalyzer.analyze(str(FILES[0].path), dict())
|
||||
CuePointAnalyzer.SILAN_EXECUTABLE = old
|
||||
with patch(
|
||||
"libretime_analyzer.steps.analyze_cuepoint.SILAN_EXECUTABLE",
|
||||
"foobar",
|
||||
):
|
||||
analyze_cuepoint(str(FILES[0].path), dict())
|
||||
|
||||
|
||||
def test_analyze_invalid_filepath():
|
||||
|
|
|
@ -4,9 +4,9 @@ from unittest import mock
|
|||
import mutagen
|
||||
import pytest
|
||||
|
||||
from libretime_analyzer.metadata_analyzer import MetadataAnalyzer
|
||||
from libretime_analyzer.steps.analyze_metadata import analyze_metadata
|
||||
|
||||
from .fixtures import FILE_INVALID_DRM, FILE_INVALID_TXT, FILES_TAGGED, FixtureMeta
|
||||
from ..fixtures import FILE_INVALID_DRM, FILE_INVALID_TXT, FILES_TAGGED, FixtureMeta
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -18,7 +18,7 @@ from .fixtures import FILE_INVALID_DRM, FILE_INVALID_TXT, FILES_TAGGED, FixtureM
|
|||
)
|
||||
def test_analyze_wrong_params(params, exception):
|
||||
with pytest.raises(exception):
|
||||
MetadataAnalyzer.analyze(*params)
|
||||
analyze_metadata(*params)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -26,7 +26,7 @@ def test_analyze_wrong_params(params, exception):
|
|||
map(lambda i: (str(i.path), i.metadata), FILES_TAGGED),
|
||||
)
|
||||
def test_analyze(filepath: str, metadata: dict):
|
||||
found = MetadataAnalyzer.analyze(filepath, dict())
|
||||
found = analyze_metadata(filepath, dict())
|
||||
|
||||
# Mutagen does not support wav files yet
|
||||
if filepath.endswith("wav"):
|
||||
|
@ -50,12 +50,12 @@ def test_analyze(filepath: str, metadata: dict):
|
|||
|
||||
|
||||
def test_invalid_wma():
|
||||
metadata = MetadataAnalyzer.analyze(str(FILE_INVALID_DRM), dict())
|
||||
metadata = analyze_metadata(str(FILE_INVALID_DRM), dict())
|
||||
assert metadata["mime"] == "audio/x-ms-wma"
|
||||
|
||||
|
||||
def test_unparsable_file():
|
||||
metadata = MetadataAnalyzer.analyze(str(FILE_INVALID_TXT), dict())
|
||||
metadata = analyze_metadata(str(FILE_INVALID_TXT), dict())
|
||||
assert metadata == {
|
||||
"filesize": 10,
|
||||
"ftype": "audioclip",
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
import distro
|
||||
import pytest
|
||||
|
||||
from libretime_analyzer.playability_analyzer import (
|
||||
PlayabilityAnalyzer,
|
||||
from libretime_analyzer.steps.analyze_playability import (
|
||||
UnplayableFileError,
|
||||
analyze_playability,
|
||||
)
|
||||
|
||||
from .fixtures import FILE_INVALID_DRM, FILES, Fixture
|
||||
from ..fixtures import FILE_INVALID_DRM, FILES, Fixture
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -14,14 +16,15 @@ from .fixtures import FILE_INVALID_DRM, FILES, Fixture
|
|||
map(lambda i: str(i.path), FILES),
|
||||
)
|
||||
def test_analyze(filepath):
|
||||
PlayabilityAnalyzer.analyze(filepath, dict())
|
||||
analyze_playability(filepath, dict())
|
||||
|
||||
|
||||
def test_analyze_missing_liquidsoap():
|
||||
old = PlayabilityAnalyzer.LIQUIDSOAP_EXECUTABLE
|
||||
PlayabilityAnalyzer.LIQUIDSOAP_EXECUTABLE = "foobar"
|
||||
PlayabilityAnalyzer.analyze(str(FILES[0].path), dict())
|
||||
PlayabilityAnalyzer.LIQUIDSOAP_EXECUTABLE = old
|
||||
with patch(
|
||||
"libretime_analyzer.steps.analyze_playability.LIQUIDSOAP_EXECUTABLE",
|
||||
"foobar",
|
||||
):
|
||||
analyze_playability(str(FILES[0].path), dict())
|
||||
|
||||
|
||||
def test_analyze_invalid_filepath():
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from libretime_analyzer.replaygain_analyzer import ReplayGainAnalyzer
|
||||
from libretime_analyzer.steps.analyze_replaygain import analyze_replaygain
|
||||
|
||||
from .fixtures import FILE_INVALID_DRM, FILES, Fixture
|
||||
from ..fixtures import FILE_INVALID_DRM, FILES, Fixture
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -10,15 +12,16 @@ from .fixtures import FILE_INVALID_DRM, FILES, Fixture
|
|||
map(lambda i: (str(i.path), i.replaygain), FILES),
|
||||
)
|
||||
def test_analyze(filepath, replaygain):
|
||||
metadata = ReplayGainAnalyzer.analyze(filepath, dict())
|
||||
metadata = analyze_replaygain(filepath, dict())
|
||||
assert metadata["replay_gain"] == pytest.approx(replaygain, abs=0.6)
|
||||
|
||||
|
||||
def test_analyze_missing_replaygain():
|
||||
old = ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE
|
||||
ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE = "foobar"
|
||||
ReplayGainAnalyzer.analyze(str(FILES[0].path), dict())
|
||||
ReplayGainAnalyzer.REPLAYGAIN_EXECUTABLE = old
|
||||
with patch(
|
||||
"libretime_analyzer.steps.analyze_replaygain.REPLAYGAIN_EXECUTABLE",
|
||||
"foobar",
|
||||
):
|
||||
analyze_replaygain(str(FILES[0].path), dict())
|
||||
|
||||
|
||||
def test_analyze_invalid_filepath():
|
||||
|
|
|
@ -6,14 +6,9 @@ from unittest import mock
|
|||
|
||||
import pytest
|
||||
|
||||
from libretime_analyzer.filemover_analyzer import FileMoverAnalyzer
|
||||
from libretime_analyzer.steps.organise_file import organise_file
|
||||
|
||||
from .conftest import AUDIO_FILENAME
|
||||
|
||||
|
||||
def test_analyze():
|
||||
with pytest.raises(Exception):
|
||||
FileMoverAnalyzer.analyze("foo", dict())
|
||||
from ..conftest import AUDIO_FILENAME
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -27,11 +22,11 @@ def test_analyze():
|
|||
)
|
||||
def test_move_wrong_params(params, exception):
|
||||
with pytest.raises(exception):
|
||||
FileMoverAnalyzer.move(*params)
|
||||
organise_file(*params)
|
||||
|
||||
|
||||
def test_move(src_dir, dest_dir):
|
||||
FileMoverAnalyzer.move(
|
||||
def test_organise_file(src_dir, dest_dir):
|
||||
organise_file(
|
||||
os.path.join(src_dir, AUDIO_FILENAME),
|
||||
dest_dir,
|
||||
AUDIO_FILENAME,
|
||||
|
@ -40,8 +35,8 @@ def test_move(src_dir, dest_dir):
|
|||
assert os.path.exists(os.path.join(dest_dir, AUDIO_FILENAME))
|
||||
|
||||
|
||||
def test_move_samefile(src_dir):
|
||||
FileMoverAnalyzer.move(
|
||||
def test_organise_file_samefile(src_dir):
|
||||
organise_file(
|
||||
os.path.join(src_dir, AUDIO_FILENAME),
|
||||
src_dir,
|
||||
AUDIO_FILENAME,
|
||||
|
@ -52,11 +47,11 @@ def test_move_samefile(src_dir):
|
|||
|
||||
def import_and_restore(src_dir, dest_dir) -> dict:
|
||||
"""
|
||||
Small helper to test the FileMoverAnalyzer.move function.
|
||||
Small helper to test the organise_file function.
|
||||
Move the file and restore it back to it's origine.
|
||||
"""
|
||||
# Import the file
|
||||
metadata = FileMoverAnalyzer.move(
|
||||
metadata = organise_file(
|
||||
os.path.join(src_dir, AUDIO_FILENAME),
|
||||
dest_dir,
|
||||
AUDIO_FILENAME,
|
||||
|
@ -88,7 +83,7 @@ def test_move_triplicate_file(src_dir, dest_dir):
|
|||
# Here we use mock to patch out the time.localtime() function so that it
|
||||
# always returns the same value. This allows us to consistently simulate this test cases
|
||||
# where the last two of the three files are imported at the same time as the timestamp.
|
||||
with mock.patch("libretime_analyzer.filemover_analyzer.time") as mock_time:
|
||||
with mock.patch("libretime_analyzer.steps.organise_file.time") as mock_time:
|
||||
mock_time.localtime.return_value = time.localtime() # date(2010, 10, 8)
|
||||
mock_time.side_effect = time.time
|
||||
|
||||
|
@ -113,7 +108,7 @@ def test_move_triplicate_file(src_dir, dest_dir):
|
|||
def test_move_bad_permissions_dest_dir(src_dir):
|
||||
with pytest.raises(OSError):
|
||||
# /sys is using sysfs on Linux, which is unwritable
|
||||
FileMoverAnalyzer.move(
|
||||
organise_file(
|
||||
os.path.join(src_dir, AUDIO_FILENAME),
|
||||
"/sys/foobar",
|
||||
AUDIO_FILENAME,
|
||||
|
|
Loading…
Reference in New Issue