feat(worker): load config using shared helpers
BREAKING CHANGE: The worker `RMQ_CONFIG_FILE` environement variable has been renamed to `LIBRETIME_CONFIG_FILEPATH`. In addition the systemd working directory for the worker has changed from `/srv/airtime` to `/var/lib/libretime/worker`.
This commit is contained in:
parent
d42615eb6a
commit
9b6d657fd6
1
install
1
install
|
@ -1055,6 +1055,7 @@ verbose "...Done"
|
|||
|
||||
verbose "\n * Installing celery..."
|
||||
loudCmd "$pip_cmd install ${AIRTIMEROOT}/worker"
|
||||
mkdir_and_chown "${web_user}:${web_user}" "${LIBRETIME_WORKING_DIR}/worker"
|
||||
# Create the Celery user
|
||||
if $is_centos_dist; then
|
||||
loudCmd "id celery 2>/dev/null || adduser --no-create-home -c 'LibreTime Celery' -r celery || true"
|
||||
|
|
|
@ -2,7 +2,7 @@ all: lint
|
|||
|
||||
include ../tools/python.mk
|
||||
|
||||
PIP_INSTALL := --editable .
|
||||
PIP_INSTALL := --editable .[dev]
|
||||
PYLINT_ARG := libretime_worker || true
|
||||
MYPY_ARG := libretime_worker || true
|
||||
BANDIT_ARG := libretime_worker || true
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
[Unit]
|
||||
Description=LibreTime Celery Service
|
||||
After=network.target
|
||||
Description=LibreTime Worker Service
|
||||
|
||||
[Service]
|
||||
User=celery
|
||||
Group=celery
|
||||
Environment=RMQ_CONFIG_FILE=/etc/airtime/airtime.conf
|
||||
WorkingDirectory=/srv/airtime
|
||||
ExecStart=/usr/local/bin/celery worker -A libretime_worker.tasks:celery --time-limit=1800 --concurrency=1 --config=celeryconfig -l INFO
|
||||
Environment=LIBRETIME_CONFIG_FILEPATH=/etc/airtime/airtime.conf
|
||||
WorkingDirectory=/var/lib/libretime/worker
|
||||
|
||||
ExecStart=/usr/local/bin/celery worker \
|
||||
--app=libretime_worker.tasks:worker \
|
||||
--config=libretime_worker.config \
|
||||
--time-limit=1800 \
|
||||
--concurrency=1 \
|
||||
--loglevel=INFO
|
||||
User=libretime-worker
|
||||
Group=libretime-worker
|
||||
Restart=always
|
||||
|
||||
[Install]
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
import os
|
||||
|
||||
# Make the celeryconfig module visible to celery
|
||||
os.environ["CELERY_CONFIG_MODULE"] = "libretime_worker.celeryconfig"
|
|
@ -1,30 +1,19 @@
|
|||
import os
|
||||
from os import getenv
|
||||
|
||||
from configobj import ConfigObj
|
||||
from kombu import Exchange, Queue
|
||||
|
||||
# Get the broker string from airtime.conf
|
||||
RMQ_CONFIG_SECTION = "rabbitmq"
|
||||
from libretime_shared.config import BaseConfig, RabbitMQConfig
|
||||
|
||||
|
||||
def get_rmq_broker():
|
||||
rmq_config = ConfigObj(os.environ["RMQ_CONFIG_FILE"])
|
||||
rmq_settings = parse_rmq_config(rmq_config)
|
||||
return "amqp://{username}:{password}@{host}:{port}/{vhost}".format(**rmq_settings)
|
||||
class Config(BaseConfig):
|
||||
rabbitmq: RabbitMQConfig = RabbitMQConfig()
|
||||
|
||||
|
||||
def parse_rmq_config(rmq_config):
|
||||
return {
|
||||
"host": rmq_config[RMQ_CONFIG_SECTION]["host"],
|
||||
"port": rmq_config[RMQ_CONFIG_SECTION]["port"],
|
||||
"username": rmq_config[RMQ_CONFIG_SECTION]["user"],
|
||||
"password": rmq_config[RMQ_CONFIG_SECTION]["password"],
|
||||
"vhost": rmq_config[RMQ_CONFIG_SECTION]["vhost"],
|
||||
}
|
||||
LIBRETIME_CONFIG_FILEPATH = getenv("LIBRETIME_CONFIG_FILEPATH")
|
||||
|
||||
config = Config(filepath=LIBRETIME_CONFIG_FILEPATH)
|
||||
|
||||
# Celery amqp settings
|
||||
BROKER_URL = get_rmq_broker()
|
||||
BROKER_URL = config.rabbitmq.url
|
||||
CELERY_RESULT_BACKEND = "amqp" # Use RabbitMQ as the celery backend
|
||||
CELERY_RESULT_PERSISTENT = True # Persist through a broker restart
|
||||
CELERY_TASK_RESULT_EXPIRES = 900 # Expire task results after 15 minutes
|
|
@ -1,12 +1,10 @@
|
|||
import cgi
|
||||
import json
|
||||
import os
|
||||
import posixpath
|
||||
import shutil
|
||||
import tempfile
|
||||
import traceback
|
||||
from contextlib import closing
|
||||
from io import StringIO
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
import mutagen
|
||||
|
@ -14,11 +12,11 @@ import requests
|
|||
from celery import Celery
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
celery = Celery()
|
||||
worker = Celery()
|
||||
logger = get_task_logger(__name__)
|
||||
|
||||
|
||||
@celery.task(name="podcast-download", acks_late=True)
|
||||
@worker.task(name="podcast-download", acks_late=True)
|
||||
def podcast_download(
|
||||
id, url, callback_url, api_key, podcast_name, album_override, track_title
|
||||
):
|
||||
|
@ -138,7 +136,7 @@ def get_filename(r):
|
|||
filename = params["filename"]
|
||||
except Exception as e:
|
||||
# We end up here if we get a Content-Disposition header with no filename
|
||||
logger.warn(
|
||||
logger.warning(
|
||||
"Couldn't find file name in Content-Disposition header, using url"
|
||||
)
|
||||
if not filename:
|
||||
|
|
|
@ -24,9 +24,13 @@ setup(
|
|||
install_requires=[
|
||||
"celery==4.4.7",
|
||||
"kombu==4.6.10",
|
||||
"configobj",
|
||||
"mutagen>=1.31.0",
|
||||
"requests>=2.7.0",
|
||||
],
|
||||
extras_require={
|
||||
"dev": [
|
||||
f"libretime-shared @ file://localhost{here.parent / 'shared'}",
|
||||
],
|
||||
},
|
||||
zip_safe=False,
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue