feat(api): create bulk_import command

- filter by allowed_extensions
This commit is contained in:
jo 2022-06-15 15:59:18 +02:00 committed by Kyle Robbertze
parent be9f36dbdc
commit 70a31338f7
9 changed files with 299 additions and 0 deletions

View File

@ -0,0 +1,162 @@
import logging
from pathlib import Path
from typing import List, Optional
import requests
from django.conf import settings
from django.core.management.base import BaseCommand, CommandParser
from libretime_shared.files import compute_md5
logger = logging.getLogger(__name__)
DEFAULT_ALLOWED_EXTENSIONS = [
".flac",
".m4a",
".mp3",
".ogg",
".opus",
".wav",
]
class Command(BaseCommand):
help = "Bulk file upload."
def add_arguments(self, parser: CommandParser):
parser.add_argument(
"--path",
help="Path to the directory to scan.",
required=True,
)
parser.add_argument(
"--track-type",
help="Track type for the new files.",
)
parser.add_argument(
"--allowed-extensions",
help="Allowed file extensions.",
action="append",
default=DEFAULT_ALLOWED_EXTENSIONS,
)
parser.add_argument(
"--delete-after-upload",
help="Delete file if upload succeeded.",
action="store_true",
)
parser.add_argument(
"--delete-if-exists",
help="Delete file if it already exists.",
action="store_true",
)
def handle(self, *args, **options):
url = settings.CONFIG.general.public_url
auth_key = settings.CONFIG.general.api_key
delete_after_upload = options.get("delete_after_upload", False)
delete_if_exists = options.get("delete_if_exists", False)
path = options.get("path")
track_type = options.get("track_type", None)
allowed_extensions = options.get("allowed_extensions")
importer = Importer(url, auth_key, delete_after_upload, delete_if_exists)
importer.import_dir(Path(path).resolve(), track_type, allowed_extensions)
class Importer:
def __init__(
self,
url: str,
auth_key: str,
delete_after_upload: bool = False,
delete_if_exists: bool = False,
) -> None:
self.url = url
self.auth_key = auth_key
self.delete_after_upload = delete_after_upload
self.delete_if_exists = delete_if_exists
def _check_file_md5(self, filepath: Path) -> bool:
from ...models import File
file_md5 = compute_md5(filepath)
return File.objects.filter(md5=file_md5).exists()
def _upload_file(self, filepath: Path, track_type: Optional[str]) -> None:
try:
resp = requests.post(
f"{self.url}/rest/media",
auth=(self.auth_key, ""),
files=[
("file", (filepath.name, filepath.open("rb"))),
],
timeout=30,
cookies={"tt_upload": track_type} if track_type is not None else {},
)
resp.raise_for_status()
except requests.exceptions.HTTPError as exception:
raise RuntimeError(f"could not upload {filepath}") from exception
def _delete_file(self, filepath: Path) -> None:
logger.info(f"deleting {filepath}")
filepath.unlink()
def _handle_file(self, filepath: Path, track_type: Optional[str]) -> None:
logger.debug(f"handling file {filepath}")
if not filepath.is_file():
raise ValueError(f"provided path {filepath} is not a file")
if self._check_file_md5(filepath):
logger.info(f"found similar md5sum, ignoring {filepath}")
if self.delete_if_exists:
self._delete_file(filepath)
return
self._upload_file(filepath, track_type)
if self.delete_after_upload:
self._delete_file(filepath)
def _walk_dir(
self,
path: Path,
track_type: Optional[str],
allowed_extensions: List[str],
) -> None:
if not path.is_dir():
raise ValueError(f"provided path {path} is not a directory")
for sub_path in path.iterdir():
if sub_path.is_dir():
self._walk_dir(sub_path, track_type, allowed_extensions)
continue
if sub_path.suffix.lower() not in allowed_extensions:
continue
self._handle_file(sub_path.resolve(), track_type)
def _check_track_type(self, track_type: str) -> bool:
from ...models import TrackType
return TrackType.objects.filter(code=track_type).exists()
def import_dir(
self,
path: Path,
track_type: Optional[str],
allowed_extensions: List[str],
) -> None:
if track_type is not None and not self._check_track_type(track_type):
raise ValueError(f"provided track type {track_type} does not exist")
allowed_extensions = [
(x if x.startswith(".") else "." + x) for x in allowed_extensions
]
self._walk_dir(path, track_type, allowed_extensions)

View File

@ -0,0 +1,118 @@
# pylint: disable=protected-access
from pathlib import Path
from shutil import copy
from typing import Tuple
from unittest.mock import MagicMock
import pytest
from model_bakery import baker
from requests_mock import Mocker
from ....._fixtures import AUDIO_FILENAME, fixture_path
from ....management.commands.bulk_import import Importer
FAKE_URL = "https://somehost.com"
@pytest.fixture(name="import_paths")
def _import_paths(tmp_path: Path):
sub_dir = tmp_path / "dir1/dir2"
sub_dir.mkdir(parents=True)
test_file = sub_dir / AUDIO_FILENAME
copy(fixture_path / AUDIO_FILENAME, test_file)
return (tmp_path, test_file)
@pytest.fixture(name="track_type")
def _track_type():
return baker.make(
"storage.TrackType",
code="MUS",
type_name="Music",
description="Description",
)
@pytest.fixture(name="importer")
def _importer(requests_mock: Mocker):
requests_mock.post(f"{FAKE_URL}/rest/media", status_code=200)
obj = Importer(FAKE_URL, "auth")
obj._handle_file = MagicMock(wraps=obj._handle_file)
obj._upload_file = MagicMock(wraps=obj._upload_file)
obj._delete_file = MagicMock(wraps=obj._delete_file)
yield obj
def test_importer(
db,
import_paths: Tuple[Path, Path],
importer: Importer,
track_type,
):
importer.import_dir(import_paths[0], track_type.code, [".mp3"])
importer._handle_file.assert_called_with(import_paths[1], track_type.code)
importer._upload_file.assert_called_with(import_paths[1], track_type.code)
importer._delete_file.assert_not_called()
def test_importer_and_delete(
db,
import_paths: Tuple[Path, Path],
importer: Importer,
track_type,
):
importer.delete_after_upload = True
importer.import_dir(import_paths[0], track_type.code, [".mp3"])
importer._handle_file.assert_called_with(import_paths[1], track_type.code)
importer._upload_file.assert_called_with(import_paths[1], track_type.code)
importer._delete_file.assert_called_with(import_paths[1])
def test_importer_existing_file(
db,
import_paths: Tuple[Path, Path],
importer: Importer,
track_type,
):
baker.make("storage.File", md5="46305a7cf42ee53976c88d337e47e940")
importer.import_dir(import_paths[0], track_type.code, [".mp3"])
importer._handle_file.assert_called_with(import_paths[1], track_type.code)
importer._upload_file.assert_not_called()
importer._delete_file.assert_not_called()
def test_importer_existing_file_and_delete(
db,
import_paths: Tuple[Path, Path],
importer: Importer,
track_type,
):
baker.make("storage.File", md5="46305a7cf42ee53976c88d337e47e940")
importer.delete_if_exists = True
importer.import_dir(import_paths[0], track_type.code, [".mp3"])
importer._handle_file.assert_called_with(import_paths[1], track_type.code)
importer._upload_file.assert_not_called()
importer._delete_file.assert_called_with(import_paths[1])
def test_importer_missing_track_type(
db,
import_paths: Tuple[Path, Path],
importer: Importer,
):
with pytest.raises(
ValueError,
match="provided track type MISSING does not exist",
):
importer.import_dir(import_paths[0], "MISSING", [".mp3"])

View File

@ -7,3 +7,4 @@ djangorestframework~=3.13.1
drf-spectacular~=0.22.1
markdown
psycopg2
requests

View File

@ -38,6 +38,7 @@ setup(
"django-filter~=21.1.0",
"drf-spectacular~=0.22.1",
"markdown",
"requests",
],
extras_require={
"prod": [
@ -50,6 +51,7 @@ setup(
"psycopg2-binary",
"pylint-django",
"pytest-django",
"requests-mock",
f"libretime-shared @ file://localhost/{here.parent / 'shared'}#egg=libretime_shared",
],
},

View File

@ -0,0 +1,16 @@
---
title: Library management
sidebar_position: 30
---
This page describe the available options to manage the LibreTime library.
## Files bulk import
To scan a directory and import the files into the library, you can use the following command:
```bash
sudo -u www-data libretime-api bulk_import --path PATH_THE_DIRECTORY_TO_SCAN
```
See the command usage to get available options.