feat(analyzer): load config using shared helpers
- make rabbitmq config optional
This commit is contained in:
parent
b527c2704d
commit
35d38df9d3
|
@ -3,6 +3,7 @@ Description=LibreTime Media Analyzer Service
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Environment=LIBRETIME_LOG_FILEPATH=/var/log/libretime/analyzer.log
|
Environment=LIBRETIME_LOG_FILEPATH=/var/log/libretime/analyzer.log
|
||||||
|
Environment=LIBRETIME_CONFIG_FILEPATH=/etc/airtime/airtime.conf
|
||||||
WorkingDirectory=/var/lib/libretime/analyzer
|
WorkingDirectory=/var/lib/libretime/analyzer
|
||||||
|
|
||||||
ExecStart=/usr/local/bin/libretime-analyzer
|
ExecStart=/usr/local/bin/libretime-analyzer
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
from libretime_shared.config import BaseConfig, RabbitMQConfig
|
||||||
|
|
||||||
|
|
||||||
|
class Config(BaseConfig):
|
||||||
|
rabbitmq = RabbitMQConfig()
|
|
@ -1,16 +0,0 @@
|
||||||
import configparser
|
|
||||||
|
|
||||||
|
|
||||||
def read_config_file(config_path):
|
|
||||||
"""Parse the application's config file located at config_path."""
|
|
||||||
config = configparser.SafeConfigParser()
|
|
||||||
try:
|
|
||||||
config.readfp(open(config_path))
|
|
||||||
except IOError as e:
|
|
||||||
print("Failed to open config file at {}: {}".format(config_path, e.strerror))
|
|
||||||
exit(-1)
|
|
||||||
except Exception as e:
|
|
||||||
print(e.strerror)
|
|
||||||
exit(-1)
|
|
||||||
|
|
||||||
return config
|
|
|
@ -6,13 +6,12 @@ from libretime_shared.app import AbstractApp
|
||||||
from libretime_shared.cli import cli_config_options, cli_logging_options
|
from libretime_shared.cli import cli_config_options, cli_logging_options
|
||||||
from libretime_shared.config import DEFAULT_ENV_PREFIX
|
from libretime_shared.config import DEFAULT_ENV_PREFIX
|
||||||
|
|
||||||
from .config_file import read_config_file
|
from .config import Config
|
||||||
from .message_listener import MessageListener
|
from .message_listener import MessageListener
|
||||||
from .status_reporter import StatusReporter
|
from .status_reporter import StatusReporter
|
||||||
|
|
||||||
VERSION = "1.0"
|
VERSION = "1.0"
|
||||||
|
|
||||||
DEFAULT_LIBRETIME_CONFIG_FILEPATH = Path("/etc/airtime/airtime.conf")
|
|
||||||
DEFAULT_RETRY_QUEUE_FILEPATH = Path("retry_queue")
|
DEFAULT_RETRY_QUEUE_FILEPATH = Path("retry_queue")
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,8 +37,7 @@ def cli(
|
||||||
Analyzer(
|
Analyzer(
|
||||||
log_level=log_level,
|
log_level=log_level,
|
||||||
log_filepath=log_filepath,
|
log_filepath=log_filepath,
|
||||||
# Handle default until the config file can be optional
|
config_filepath=config_filepath,
|
||||||
config_filepath=config_filepath or DEFAULT_LIBRETIME_CONFIG_FILEPATH,
|
|
||||||
retry_queue_filepath=retry_queue_filepath,
|
retry_queue_filepath=retry_queue_filepath,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -56,14 +54,13 @@ class Analyzer(AbstractApp):
|
||||||
):
|
):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
# Read our rmq config file
|
config = Config(filepath=config_filepath)
|
||||||
rmq_config = read_config_file(config_filepath)
|
|
||||||
|
|
||||||
# Start up the StatusReporter process
|
# Start up the StatusReporter process
|
||||||
StatusReporter.start_thread(retry_queue_filepath)
|
StatusReporter.start_thread(retry_queue_filepath)
|
||||||
|
|
||||||
# Start listening for RabbitMQ messages telling us about newly
|
# Start listening for RabbitMQ messages telling us about newly
|
||||||
# uploaded files. This blocks until we receive a shutdown signal.
|
# uploaded files. This blocks until we receive a shutdown signal.
|
||||||
self._msg_listener = MessageListener(rmq_config)
|
self._msg_listener = MessageListener(config.rabbitmq)
|
||||||
|
|
||||||
StatusReporter.stop_thread()
|
StatusReporter.stop_thread()
|
||||||
|
|
|
@ -5,6 +5,7 @@ import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import pika
|
import pika
|
||||||
|
from libretime_shared.config import RabbitMQConfig
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from .pipeline import Pipeline
|
from .pipeline import Pipeline
|
||||||
|
@ -56,25 +57,14 @@ QUEUE = "airtime-uploads"
|
||||||
|
|
||||||
|
|
||||||
class MessageListener:
|
class MessageListener:
|
||||||
def __init__(self, rmq_config):
|
def __init__(self, config: RabbitMQConfig):
|
||||||
"""Start listening for file upload notification messages
|
"""
|
||||||
from RabbitMQ
|
Start listening for file upload event messages from RabbitMQ.
|
||||||
|
|
||||||
Keyword arguments:
|
|
||||||
rmq_config: A ConfigParser object containing the [rabbitmq] configuration.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self.config = config
|
||||||
self._shutdown = False
|
self._shutdown = False
|
||||||
|
|
||||||
# Read the RabbitMQ connection settings from the rmq_config file
|
|
||||||
# The exceptions throw here by default give good error messages.
|
|
||||||
RMQ_CONFIG_SECTION = "rabbitmq"
|
|
||||||
self._host = rmq_config.get(RMQ_CONFIG_SECTION, "host")
|
|
||||||
self._port = rmq_config.getint(RMQ_CONFIG_SECTION, "port")
|
|
||||||
self._username = rmq_config.get(RMQ_CONFIG_SECTION, "user")
|
|
||||||
self._password = rmq_config.get(RMQ_CONFIG_SECTION, "password")
|
|
||||||
self._vhost = rmq_config.get(RMQ_CONFIG_SECTION, "vhost")
|
|
||||||
|
|
||||||
# Set up a signal handler so we can shutdown gracefully
|
# Set up a signal handler so we can shutdown gracefully
|
||||||
# For some reason, this signal handler must be set up here. I'd rather
|
# For some reason, this signal handler must be set up here. I'd rather
|
||||||
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
|
# put it in AirtimeAnalyzerServer, but it doesn't work there (something to do
|
||||||
|
@ -104,11 +94,12 @@ class MessageListener:
|
||||||
"""Connect to the RabbitMQ server and start listening for messages."""
|
"""Connect to the RabbitMQ server and start listening for messages."""
|
||||||
self._connection = pika.BlockingConnection(
|
self._connection = pika.BlockingConnection(
|
||||||
pika.ConnectionParameters(
|
pika.ConnectionParameters(
|
||||||
host=self._host,
|
host=self.config.host,
|
||||||
port=self._port,
|
port=self.config.port,
|
||||||
virtual_host=self._vhost,
|
virtual_host=self.config.vhost,
|
||||||
credentials=pika.credentials.PlainCredentials(
|
credentials=pika.credentials.PlainCredentials(
|
||||||
self._username, self._password
|
self.config.user,
|
||||||
|
self.config.password,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue