sintonia/analyzer/libretime_analyzer/analyzer_pipeline.py
Jonas L fe0b2c4a7a
feat(analyzer): enhance analyzer cli and logging (#1507)
Some initial work on modernizing the analyzer app. This replace any custom logger or `logging` based logger with the logging tools from `libretime_shared.logging` and `loguru`.

- rename cli to main
- use pathlib in setup.py
- add api-client and shared package as dev deps
- rework main entrypoint cli to use click and shared helpers
- remove unused imports
- replace logging with logger
- rework analyzer app using shared abstract app
- move analyzer log path to systemd service
- change analyzer working dir

BREAKING CHANGE: The analyzer cli has been reworked and uses new flags / environnement variables for configuration.
`--debug` flag becomes `--log-level <level>`
`--rmq-config-file` flag becomes `--config <filepath>`
`--http-retry-queue-file` flag becomes `--retry-queue-filepath`.
`retry-queue-filepath` default value changed from `/tmp/airtime_analyzer_http_retries` to `retry_queue` in the working dir.
`LIBRETIME_CONF_DIR` environnement variable replaced by `LIBRETIME_CONFIG_FILEPATH`.

BREAKING CHANGE: When running analyzer as a systemd service, the working directory is now /var/lib/libretime/analyzer.
2022-01-17 10:26:30 +02:00

108 lines
4.5 KiB
Python

""" Analyzes and imports an audio file into the Airtime library.
"""
from queue import Queue
from loguru import logger
from .cuepoint_analyzer import CuePointAnalyzer
from .filemover_analyzer import FileMoverAnalyzer
from .metadata_analyzer import MetadataAnalyzer
from .playability_analyzer import PlayabilityAnalyzer, UnplayableFileError
from .replaygain_analyzer import ReplayGainAnalyzer
class AnalyzerPipeline:
"""Analyzes and imports an audio file into the Airtime library.
This currently performs metadata extraction (eg. gets the ID3 tags from an MP3),
then moves the file to the Airtime music library (stor/imported), and returns
the results back to the parent process. This class is used in an isolated process
so that if it crashes, it does not kill the entire airtime_analyzer daemon and
the failure to import can be reported back to the web application.
"""
IMPORT_STATUS_FAILED = 2
@staticmethod
def run_analysis(
queue,
audio_file_path,
import_directory,
original_filename,
storage_backend,
file_prefix,
):
"""Analyze and import an audio file, and put all extracted metadata into queue.
Keyword arguments:
queue: A multiprocessing.queues.Queue which will be used to pass the
extracted metadata back to the parent process.
audio_file_path: Path on disk to the audio file to analyze.
import_directory: Path to the final Airtime "import" directory where
we will move the file.
original_filename: The original filename of the file, which we'll try to
preserve. The file at audio_file_path typically has a
temporary randomly generated name, which is why we want
to know what the original name was.
storage_backend: String indicating the storage backend (amazon_s3 or file)
file_prefix:
"""
try:
if not isinstance(queue, Queue):
raise TypeError("queue must be a Queue.Queue()")
if not isinstance(audio_file_path, str):
raise TypeError(
"audio_file_path must be unicode. Was of type "
+ type(audio_file_path).__name__
+ " instead."
)
if not isinstance(import_directory, str):
raise TypeError(
"import_directory must be unicode. Was of type "
+ type(import_directory).__name__
+ " instead."
)
if not isinstance(original_filename, str):
raise TypeError(
"original_filename must be unicode. Was of type "
+ type(original_filename).__name__
+ " instead."
)
if not isinstance(file_prefix, str):
raise TypeError(
"file_prefix must be unicode. Was of type "
+ type(file_prefix).__name__
+ " instead."
)
# Analyze the audio file we were told to analyze:
# First, we extract the ID3 tags and other metadata:
metadata = dict()
metadata["file_prefix"] = file_prefix
metadata = MetadataAnalyzer.analyze(audio_file_path, metadata)
metadata = CuePointAnalyzer.analyze(audio_file_path, metadata)
metadata = ReplayGainAnalyzer.analyze(audio_file_path, metadata)
metadata = PlayabilityAnalyzer.analyze(audio_file_path, metadata)
metadata = FileMoverAnalyzer.move(
audio_file_path, import_directory, original_filename, metadata
)
metadata["import_status"] = 0 # Successfully imported
# Note that the queue we're putting the results into is our interprocess communication
# back to the main process.
# Pass all the file metadata back to the main analyzer process, which then passes
# it back to the Airtime web application.
queue.put(metadata)
except UnplayableFileError as e:
logger.exception(e)
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
metadata["reason"] = "The file could not be played."
raise e
except Exception as e:
# Ensures the traceback for this child process gets written to our log files:
logger.exception(e)
raise e