sintonia/worker/libretime_worker/tasks.py

176 lines
5.2 KiB
Python
Raw Normal View History

import json
2023-02-26 23:24:29 +01:00
import os
from email.message import EmailMessage
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any, Dict, Optional
from urllib.parse import urlsplit
import mutagen
import requests
2023-02-26 23:24:29 +01:00
from celery import Celery, signals
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from libretime_api_client.v1 import ApiClient as LegacyClient
from mutagen import MutagenError
from requests import RequestException, Response
2023-02-26 23:24:29 +01:00
from . import PACKAGE, VERSION
from .config import config
worker = Celery()
logger = get_task_logger(__name__)
legacy_client = LegacyClient(
base_url=config.general.public_url,
api_key=config.general.api_key,
)
2023-02-26 23:24:29 +01:00
@signals.worker_init.connect
def init_sentry(**_kwargs):
if "SENTRY_DSN" in os.environ:
logger.info("installing sentry")
# pylint: disable=import-outside-toplevel
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
traces_sample_rate=1.0,
release=f"{PACKAGE}@{VERSION}",
integrations=[
CeleryIntegration(),
],
)
worker.conf.beat_schedule = {
"legacy-trigger-task-manager": {
"task": "libretime_worker.tasks.legacy_trigger_task_manager",
"schedule": crontab(minute="*/5"),
},
}
@worker.task()
def legacy_trigger_task_manager():
"""
Trigger the legacy task manager to perform background tasks.
"""
legacy_client.trigger_task_manager()
@worker.task(name="podcast-download", acks_late=True)
def podcast_download(
episode_id: int,
episode_url: str,
episode_title: Optional[str],
podcast_name: str,
override_album: bool,
):
"""
Download a podcast episode.
Args:
episode_id: Episode ID.
episode_url: Episode download url.
episode_title: Episode title to override the title metadata.
podcast_name: Podcast name to save to the metadata.
override_album: Whether to override the album metadata.
Returns:
Status of the podcast download as JSON string.
"""
result: Dict[str, Any] = {"episodeid": episode_id}
tmp_file = None
try:
# Download podcast episode file
try:
with requests.get(episode_url, stream=True, timeout=30) as resp:
resp.raise_for_status()
filename = extract_filename(resp)
# The filename extension helps to determine the file type using mutagen
with NamedTemporaryFile(suffix=filename, delete=False) as tmp_file:
for chunk in resp.iter_content(chunk_size=2048):
tmp_file.write(chunk)
except RequestException as exception:
logger.exception("could not download podcast episode %s", episode_id)
raise exception
# Save metadata to podcast episode file
try:
metadata = mutagen.File(tmp_file.name, easy=True)
if metadata is None:
raise MutagenError(
f"could not determine episode {episode_id} file type"
)
if override_album:
logger.debug("overriding album name with podcast name %s", podcast_name)
metadata["artist"] = podcast_name
metadata["album"] = podcast_name
metadata["title"] = episode_title
elif "album" not in metadata:
logger.debug("setting album name to podcast name %s", podcast_name)
metadata["album"] = podcast_name
metadata.save()
logger.debug("saved metadata %s", metadata)
except MutagenError as exception:
logger.exception("could not save episode %s metadata", episode_id)
raise exception
# Upload podcast episode file
try:
with requests.post(
f"{config.general.public_url}/rest/media",
files={"file": (filename, open(tmp_file.name, "rb"))},
auth=(config.general.api_key, ""),
timeout=30,
) as upload_resp:
upload_resp.raise_for_status()
upload_payload = upload_resp.json()
result["fileid"] = upload_payload["id"]
result["status"] = 1
except RequestException as exception:
logger.exception("could not upload episode %s", episode_id)
raise exception
except (RequestException, MutagenError) as exception:
result["status"] = 0
result["error"] = str(exception)
if tmp_file is not None:
Path(tmp_file.name).unlink()
return json.dumps(result)
def extract_filename(response: Response) -> str:
"""
Extract the filename from a download request.
Args:
response: Download request response.
Returns:
Extracted filename.
"""
content_disposition = "Content-Disposition"
value = response.headers.get(content_disposition)
if value and "filename" in value:
parser = EmailMessage()
parser[content_disposition] = value
params = parser[content_disposition].params
return params["filename"]
return Path(urlsplit(response.url).path).name