feat(analyzer): enhance analyzer cli and logging (#1507)

Some initial work on modernizing the analyzer app. This replace any custom logger or `logging` based logger with the logging tools from `libretime_shared.logging` and `loguru`.

- rename cli to main
- use pathlib in setup.py
- add api-client and shared package as dev deps
- rework main entrypoint cli to use click and shared helpers
- remove unused imports
- replace logging with logger
- rework analyzer app using shared abstract app
- move analyzer log path to systemd service
- change analyzer working dir

BREAKING CHANGE: The analyzer cli has been reworked and uses new flags / environnement variables for configuration.
`--debug` flag becomes `--log-level <level>`
`--rmq-config-file` flag becomes `--config <filepath>`
`--http-retry-queue-file` flag becomes `--retry-queue-filepath`.
`retry-queue-filepath` default value changed from `/tmp/airtime_analyzer_http_retries` to `retry_queue` in the working dir.
`LIBRETIME_CONF_DIR` environnement variable replaced by `LIBRETIME_CONFIG_FILEPATH`.

BREAKING CHANGE: When running analyzer as a systemd service, the working directory is now /var/lib/libretime/analyzer.
This commit is contained in:
Jonas L 2022-01-17 09:26:30 +01:00 committed by GitHub
parent bf59f20ffd
commit fe0b2c4a7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 138 additions and 215 deletions

View File

@ -2,6 +2,9 @@
Description=LibreTime Media Analyzer Service
[Service]
Environment=LIBRETIME_LOG_FILEPATH=/var/log/libretime/analyzer.log
WorkingDirectory=/var/lib/libretime/analyzer
ExecStart=/usr/local/bin/libretime-analyzer
User=libretime-analyzer
Group=libretime-analyzer

View File

@ -1,89 +0,0 @@
"""Contains the main application class for airtime_analyzer.
"""
import logging
import logging.handlers
import signal
import sys
import traceback
from functools import partial
from . import config_file
from .message_listener import MessageListener
from .metadata_analyzer import MetadataAnalyzer
from .replaygain_analyzer import ReplayGainAnalyzer
from .status_reporter import StatusReporter
class AirtimeAnalyzerServer:
"""A server for importing uploads to Airtime as background jobs."""
# Constants
_LOG_PATH = "/var/log/airtime/airtime_analyzer.log"
# Variables
_log_level = logging.INFO
def __init__(self, rmq_config_path, http_retry_queue_path, debug=False):
# Dump a stacktrace with 'kill -SIGUSR2 <PID>'
signal.signal(
signal.SIGUSR2, lambda sig, frame: AirtimeAnalyzerServer.dump_stacktrace()
)
# Configure logging
self.setup_logging(debug)
# Read our rmq config file
rmq_config = config_file.read_config_file(rmq_config_path)
# Start up the StatusReporter process
StatusReporter.start_thread(http_retry_queue_path)
# Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we receive a shutdown signal.
self._msg_listener = MessageListener(rmq_config)
StatusReporter.stop_thread()
def setup_logging(self, debug):
"""Set up nicely formatted logging and log rotation.
Keyword arguments:
debug -- a boolean indicating whether to enable super verbose logging
to the screen and disk.
"""
if debug:
self._log_level = logging.DEBUG
else:
# Disable most pika/rabbitmq logging:
pika_logger = logging.getLogger("pika")
pika_logger.setLevel(logging.CRITICAL)
# Set up logging
logFormatter = logging.Formatter(
"%(asctime)s [%(module)s] [%(levelname)-5.5s] %(message)s"
)
rootLogger = logging.getLogger()
rootLogger.setLevel(self._log_level)
fileHandler = logging.handlers.RotatingFileHandler(
filename=self._LOG_PATH, maxBytes=1024 * 1024 * 30, backupCount=8
)
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
@classmethod
def dump_stacktrace(stack):
"""Dump a stacktrace for all threads"""
code = []
for threadId, stack in list(sys._current_frames().items()):
code.append("\n# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
logging.info("\n".join(code))

View File

@ -1,11 +1,9 @@
""" Analyzes and imports an audio file into the Airtime library.
"""
import configparser
import logging
import multiprocessing
import threading
from queue import Queue
from loguru import logger
from .cuepoint_analyzer import CuePointAnalyzer
from .filemover_analyzer import FileMoverAnalyzer
from .metadata_analyzer import MetadataAnalyzer
@ -49,11 +47,6 @@ class AnalyzerPipeline:
storage_backend: String indicating the storage backend (amazon_s3 or file)
file_prefix:
"""
# It is super critical to initialize a separate log file here so that we
# don't inherit logging/locks from the parent process. Supposedly
# this can lead to Bad Things (deadlocks): http://bugs.python.org/issue6721
AnalyzerPipeline.python_logger_deadlock_workaround()
try:
if not isinstance(queue, Queue):
raise TypeError("queue must be a Queue.Queue()")
@ -105,21 +98,11 @@ class AnalyzerPipeline:
# it back to the Airtime web application.
queue.put(metadata)
except UnplayableFileError as e:
logging.exception(e)
logger.exception(e)
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
metadata["reason"] = "The file could not be played."
raise e
except Exception as e:
# Ensures the traceback for this child process gets written to our log files:
logging.exception(e)
logger.exception(e)
raise e
@staticmethod
def python_logger_deadlock_workaround():
# Workaround for: http://bugs.python.org/issue6721#msg140215
logger_names = list(logging.Logger.manager.loggerDict.keys())
logger_names.append(None) # Root logger
for name in logger_names:
for handler in logging.getLogger(name).handlers:
handler.createLock()
logging._lock = threading.RLock()

View File

@ -1,51 +0,0 @@
"""
Main CLI entrypoint for the libretime-analyzer app.
"""
import argparse
import os
import libretime_analyzer.airtime_analyzer as aa
VERSION = "1.0"
LIBRETIME_CONF_DIR = os.getenv("LIBRETIME_CONF_DIR", "/etc/airtime")
DEFAULT_RMQ_CONFIG_PATH = os.path.join(LIBRETIME_CONF_DIR, "airtime.conf")
DEFAULT_HTTP_RETRY_PATH = "/tmp/airtime_analyzer_http_retries"
def main():
"""Entry-point for this application"""
print("LibreTime Analyzer {}".format(VERSION))
parser = argparse.ArgumentParser()
parser.add_argument(
"--debug", help="log full debugging output", action="store_true"
)
parser.add_argument(
"--rmq-config-file",
help="specify a configuration file with RabbitMQ settings (default is %s)"
% DEFAULT_RMQ_CONFIG_PATH,
)
parser.add_argument(
"--http-retry-queue-file",
help="specify where incompleted HTTP requests will be serialized (default is %s)"
% DEFAULT_HTTP_RETRY_PATH,
)
args = parser.parse_args()
# Default config file path
rmq_config_path = DEFAULT_RMQ_CONFIG_PATH
http_retry_queue_path = DEFAULT_HTTP_RETRY_PATH
if args.rmq_config_file:
rmq_config_path = args.rmq_config_file
if args.http_retry_queue_file:
http_retry_queue_path = args.http_retry_queue_file
aa.AirtimeAnalyzerServer(
rmq_config_path=rmq_config_path,
http_retry_queue_path=http_retry_queue_path,
debug=args.debug,
)
if __name__ == "__main__":
main()

View File

@ -1,8 +1,8 @@
import datetime
import json
import logging
import subprocess
import traceback
from loguru import logger
from .analyzer import Analyzer
@ -87,13 +87,13 @@ class CuePointAnalyzer(Analyzer):
metadata["cueout"] = silan_cueout
except OSError as e: # silan was not found
logging.warning(
logger.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have silan installed?")
)
except subprocess.CalledProcessError as e: # silan returned an error code
logging.warning("%s %s %s", e.cmd, e.output, e.returncode)
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e:
logging.warning(e)
logger.warning(e)
return metadata

View File

@ -1,10 +1,11 @@
import errno
import logging
import os
import shutil
import time
import uuid
from loguru import logger
from .analyzer import Analyzer
@ -111,7 +112,7 @@ class FileMoverAnalyzer(Analyzer):
mkdir_p(os.path.dirname(final_file_path))
# Move the file into its final destination directory
logging.debug("Moving %s to %s" % (audio_file_path, final_file_path))
logger.debug("Moving %s to %s" % (audio_file_path, final_file_path))
shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path

View File

@ -0,0 +1,69 @@
from pathlib import Path
from typing import Optional
import click
from libretime_shared.app import AbstractApp
from libretime_shared.cli import cli_config_options, cli_logging_options
from libretime_shared.config import DEFAULT_ENV_PREFIX
from .config_file import read_config_file
from .message_listener import MessageListener
from .status_reporter import StatusReporter
VERSION = "1.0"
DEFAULT_LIBRETIME_CONFIG_FILEPATH = Path("/etc/airtime/airtime.conf")
DEFAULT_RETRY_QUEUE_FILEPATH = Path("retry_queue")
@click.command()
@cli_logging_options
@cli_config_options
@click.option(
"--retry-queue-filepath",
envvar=f"{DEFAULT_ENV_PREFIX}_RETRY_QUEUE_FILEPATH",
type=click.Path(path_type=Path),
help="Path to the retry queue file.",
default=DEFAULT_RETRY_QUEUE_FILEPATH,
)
def cli(
log_level: str,
log_filepath: Optional[Path],
config_filepath: Optional[Path],
retry_queue_filepath: Path,
):
"""
Run analyzer.
"""
Analyzer(
log_level=log_level,
log_filepath=log_filepath,
# Handle default until the config file can be optional
config_filepath=config_filepath or DEFAULT_LIBRETIME_CONFIG_FILEPATH,
retry_queue_filepath=retry_queue_filepath,
)
class Analyzer(AbstractApp):
name = "analyzer"
def __init__(
self,
*,
config_filepath: Optional[Path],
retry_queue_filepath: Path,
**kwargs,
):
super().__init__(**kwargs)
# Read our rmq config file
rmq_config = read_config_file(config_filepath)
# Start up the StatusReporter process
StatusReporter.start_thread(retry_queue_filepath)
# Start listening for RabbitMQ messages telling us about newly
# uploaded files. This blocks until we receive a shutdown signal.
self._msg_listener = MessageListener(rmq_config)
StatusReporter.stop_thread()

View File

@ -1,13 +1,11 @@
import json
import logging
import multiprocessing
import queue
import select
import signal
import sys
import time
import pika
from loguru import logger
from .analyzer_pipeline import AnalyzerPipeline
from .status_reporter import StatusReporter
@ -94,13 +92,13 @@ class MessageListener:
except pika.exceptions.AMQPError as e:
if self._shutdown:
break
logging.error("Connection to message queue failed. ")
logging.error(e)
logging.info("Retrying in 5 seconds...")
logger.error("Connection to message queue failed. ")
logger.error(e)
logger.info("Retrying in 5 seconds...")
time.sleep(5)
self.disconnect_from_messaging_server()
logging.info("Exiting cleanly.")
logger.info("Exiting cleanly.")
def connect_to_messaging_server(self):
"""Connect to the RabbitMQ server and start listening for messages."""
@ -124,7 +122,7 @@ class MessageListener:
exchange=EXCHANGE, queue=QUEUE, routing_key=ROUTING_KEY
)
logging.info(" Listening for messages...")
logger.info(" Listening for messages...")
self._channel.basic_consume(QUEUE, self.msg_received_callback, auto_ack=False)
def wait_for_messages(self):
@ -152,7 +150,7 @@ class MessageListener:
Here we parse the message, spin up an analyzer process, and report the
metadata back to the Airtime web application (or report an error).
"""
logging.info(
logger.info(
" - Received '%s' on routing_key '%s'" % (body, method_frame.routing_key)
)
@ -199,7 +197,7 @@ class MessageListener:
except KeyError as e:
# A field in msg_dict that we needed was missing (eg. audio_file_path)
logging.exception(
logger.exception(
"A mandatory airtime_analyzer message field was missing from the message."
)
# See the huge comment about NACK below.
@ -208,7 +206,7 @@ class MessageListener:
) # Important that it doesn't requeue the message
except Exception as e:
logging.exception(e)
logger.exception(e)
""" If ANY exception happens while processing a file, we're going to NACK to the
messaging server and tell it to remove the message from the queue.
(NACK is a negative acknowledgement. We could use ACK instead, but this might come
@ -258,8 +256,8 @@ class MessageListener:
p.join()
if p.exitcode == 0:
results = q.get()
logging.info("Main process received results from child: ")
logging.info(results)
logger.info("Main process received results from child: ")
logger.info(results)
else:
raise Exception("Analyzer process terminated unexpectedly.")
"""
@ -277,7 +275,7 @@ class MessageListener:
)
metadata = q.get()
except Exception as e:
logging.error("Analyzer pipeline exception: %s" % str(e))
logger.error("Analyzer pipeline exception: %s" % str(e))
metadata["import_status"] = AnalyzerPipeline.IMPORT_STATUS_FAILED
# Ensure our queue doesn't fill up and block due to unexpected behaviour. Defensive code.

View File

@ -1,12 +1,11 @@
import datetime
import hashlib
import logging
import os
import time
import wave
import magic
import mutagen
from loguru import logger
from .analyzer import Analyzer
@ -190,6 +189,6 @@ class MetadataAnalyzer(Analyzer):
metadata["length_seconds"] = length_seconds
metadata["cueout"] = metadata["length"]
except wave.Error as ex:
logging.error("Invalid WAVE file: {}".format(str(ex)))
logger.error("Invalid WAVE file: {}".format(str(ex)))
raise
return metadata

View File

@ -1,8 +1,9 @@
__author__ = "asantoni"
import logging
import subprocess
from loguru import logger
from .analyzer import Analyzer
@ -34,7 +35,7 @@ class PlayabilityAnalyzer(Analyzer):
subprocess.check_output(command, stderr=subprocess.STDOUT, close_fds=True)
except OSError as e: # liquidsoap was not found
logging.warning(
logger.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have liquidsoap installed?")
)
@ -42,7 +43,7 @@ class PlayabilityAnalyzer(Analyzer):
subprocess.CalledProcessError,
Exception,
) as e: # liquidsoap returned an error code
logging.warning(e)
logger.warning(e)
raise UnplayableFileError()
return metadata

View File

@ -1,7 +1,8 @@
import logging
import re
import subprocess
from loguru import logger
from .analyzer import Analyzer
@ -34,13 +35,13 @@ class ReplayGainAnalyzer(Analyzer):
metadata["replay_gain"] = float(replaygain)
except OSError as e: # replaygain was not found
logging.warning(
logger.warning(
"Failed to run: %s - %s. %s"
% (command[0], e.strerror, "Do you have python-rgain installed?")
)
except subprocess.CalledProcessError as e: # replaygain returned an error code
logging.warning("%s %s %s", e.cmd, e.output, e.returncode)
logger.warning("%s %s %s", e.cmd, e.output, e.returncode)
except Exception as e:
logging.warning(e)
logger.warning(e)
return metadata

View File

@ -1,6 +1,5 @@
import collections
import json
import logging
import pickle
import queue
import threading
@ -9,6 +8,7 @@ import traceback
from urllib.parse import urlparse
import requests
from loguru import logger
# Disable urllib3 warnings because these can cause a rare deadlock due to Python 2's crappy internal non-reentrant locking
# around POSIX stuff. See SAAS-714. The hasattr() is for compatibility with older versions of requests.
@ -62,7 +62,7 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
# If we fail to unpickle a saved queue of failed HTTP requests, then we'll just log an error
# and continue because those HTTP requests are lost anyways. The pickled file will be
# overwritten the next time the analyzer is shut down too.
logging.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
logger.error("Failed to unpickle %s. Continuing..." % http_retry_queue_path)
pass
while True:
@ -93,7 +93,7 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
request = retry_queue.popleft()
send_http_request(request, retry_queue)
logging.info("Shutting down status_reporter")
logger.info("Shutting down status_reporter")
# Pickle retry_queue to disk so that we don't lose uploads if we're shut down while
# while the web server is down or unreachable.
with open(http_retry_queue_path, "wb") as pickle_file:
@ -102,9 +102,9 @@ def process_http_requests(ipc_queue, http_retry_queue_path):
except Exception as e: # Terrible top-level exception handler to prevent the thread from dying, just in case.
if shutdown:
return
logging.exception("Unhandled exception in StatusReporter")
logging.exception(e)
logging.info("Restarting StatusReporter thread")
logger.exception("Unhandled exception in StatusReporter")
logger.exception(e)
logger.info("Restarting StatusReporter thread")
time.sleep(2) # Throttle it
@ -122,11 +122,11 @@ def send_http_request(picklable_request, retry_queue):
prepared_request, timeout=StatusReporter._HTTP_REQUEST_TIMEOUT, verify=False
) # SNI is a pain in the ass
r.raise_for_status() # Raise an exception if there was an http error code returned
logging.info("HTTP request sent successfully.")
logger.info("HTTP request sent successfully.")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422:
# Do no retry the request if there was a metadata validation error
logging.error(
logger.error(
"HTTP request failed due to an HTTP exception. Exception was: %s"
% str(e)
)
@ -134,7 +134,7 @@ def send_http_request(picklable_request, retry_queue):
# The request failed with an error 500 probably, so let's check if Airtime and/or
# the web server are broken. If not, then our request was probably causing an
# error 500 in the media API (ie. a bug), so there's no point in retrying it.
logging.error("HTTP request failed. Exception was: %s" % str(e))
logger.error("HTTP request failed. Exception was: %s" % str(e))
parsed_url = urlparse(e.response.request.url)
if is_web_server_broken(parsed_url.scheme + "://" + parsed_url.netloc):
# If the web server is having problems, retry the request later:
@ -143,13 +143,13 @@ def send_http_request(picklable_request, retry_queue):
# You will have to find these bad requests in logs or you'll be
# notified by sentry.
except requests.exceptions.ConnectionError as e:
logging.error(
logger.error(
"HTTP request failed due to a connection error. Retrying later. %s" % str(e)
)
retry_queue.append(picklable_request) # Retry it later
except Exception as e:
logging.error("HTTP request failed with unhandled exception. %s" % str(e))
logging.error(traceback.format_exc())
logger.error("HTTP request failed with unhandled exception. %s" % str(e))
logger.error(traceback.format_exc())
# Don't put the request into the retry queue, just give up on this one.
# I'm doing this to protect against us getting some pathological request
# that breaks our code. I don't want us pickling data that potentially
@ -198,7 +198,7 @@ class StatusReporter:
@classmethod
def stop_thread(self):
logging.info("Terminating status_reporter process")
logger.info("Terminating status_reporter process")
# StatusReporter._http_thread.terminate() # Triggers SIGTERM on the child process
StatusReporter._ipc_queue.put("shutdown") # Special trigger
StatusReporter._http_thread.join()
@ -238,12 +238,12 @@ class StatusReporter:
"""
# Encode the audio metadata as json and post it back to the callback_url
put_payload = json.dumps(audio_metadata)
logging.debug("sending http put with payload: " + put_payload)
logger.debug("sending http put with payload: " + put_payload)
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
timeout=StatusReporter._HTTP_REQUEST_TIMEOUT)
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
logger.debug("HTTP request returned status: " + str(r.status_code))
logger.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # Raise an exception if there was an http error code returned
@ -259,12 +259,12 @@ class StatusReporter:
+ type(import_status).__name__
)
logging.debug("Reporting import failure to Airtime REST API...")
logger.debug("Reporting import failure to Airtime REST API...")
audio_metadata = dict()
audio_metadata["import_status"] = import_status
audio_metadata["comment"] = reason # hack attack
put_payload = json.dumps(audio_metadata)
# logging.debug("sending http put with payload: " + put_payload)
# logger.debug("sending http put with payload: " + put_payload)
"""
r = requests.put(callback_url, data=put_payload,
auth=requests.auth.HTTPBasicAuth(api_key, ''),
@ -276,8 +276,8 @@ class StatusReporter:
)
)
"""
logging.debug("HTTP request returned status: " + str(r.status_code))
logging.debug(r.text) # log the response body
logger.debug("HTTP request returned status: " + str(r.status_code))
logger.debug(r.text) # log the response body
#TODO: queue up failed requests and try them again later.
r.raise_for_status() # raise an exception if there was an http error code returned

View File

@ -1,9 +1,11 @@
import os
from os import chdir
from pathlib import Path
from setuptools import setup
# Change directory since setuptools uses relative paths
os.chdir(os.path.dirname(os.path.realpath(__file__)))
here = Path(__file__).parent
chdir(here)
setup(
name="libretime-analyzer",
@ -20,7 +22,7 @@ setup(
packages=["libretime_analyzer"],
entry_points={
"console_scripts": [
"libretime-analyzer=libretime_analyzer.cli:main",
"libretime-analyzer=libretime_analyzer.main:cli",
]
},
python_requires=">=3.6",
@ -37,6 +39,8 @@ setup(
extras_require={
"dev": [
"distro",
f"libretime-api-client @ file://localhost/{here.parent / 'api_client'}#egg=libretime_api_client",
f"libretime-shared @ file://localhost/{here.parent / 'shared'}#egg=libretime_shared",
],
},
zip_safe=False,

View File

@ -1015,6 +1015,8 @@ loud "\n-----------------------------------------------------"
loud " * Installing Airtime Services * "
loud "-----------------------------------------------------"
LIBRETIME_WORKING_DIR="/var/lib/libretime"
python_version=$($python_bin --version 2>&1 | awk '{ print $2 }')
verbose "Detected Python version: $python_version"
pip_cmd="$python_bin -m pip"
@ -1074,6 +1076,8 @@ verbose "...Done"
verbose "\n * Installing libretime-analyzer..."
loudCmd "$pip_cmd install ${AIRTIMEROOT}/analyzer"
loudCmd "mkdir -p ${LIBRETIME_WORKING_DIR}/analyzer"
loudCmd "chown -R ${web_user}:${web_user} ${LIBRETIME_WORKING_DIR}/analyzer"
systemInitInstall libretime-analyzer "$web_user"
verbose "...Done"