feat(analyzer): rework organise_file using pathlib

- use uuids instead of datetime
- massively using pathlib to manipulate paths
This commit is contained in:
jo 2022-02-15 12:02:17 +01:00 committed by Kyle Robbertze
parent dac09869f3
commit d4ffaf9a89
2 changed files with 66 additions and 190 deletions

View File

@ -1,119 +1,54 @@
import errno
import os
import shutil import shutil
import time from pathlib import Path
import uuid from uuid import uuid4
from loguru import logger from loguru import logger
MAX_DIR_LEN = 48
MAX_FILE_LEN = 48
def organise_file(audio_file_path, import_directory, original_filename, metadata):
"""Move the file at audio_file_path over into the import_directory/import,
renaming it to original_filename.
This analyzer copies a file over from a temporary directory (stor/organize) def organise_file(
into the Airtime library (stor/imported). filepath_: str,
storage_url: str,
If you import three copies of the same file, the behaviour is: original_filename: str,
- The filename is of the first file preserved. metadata: dict,
- The filename of the second file has the timestamp attached to it. ) -> dict:
- The filename of the third file has a UUID placed after the timestamp, but ONLY IF it's imported within 1 second of the second file (ie. if the timestamp is the same).
Keyword arguments:
audio_file_path: Path to the file to be imported.
import_directory: Path to the "import" directory inside the Airtime stor directory.
(eg. /srv/airtime/stor/import)
original_filename: The filename of the file when it was uploaded to Airtime.
metadata: A dictionary where the "full_path" of where the file is moved to will be added.
""" """
if not isinstance(audio_file_path, str): Move the incoming file into the storage, while preserving the original filename.
raise TypeError(
"audio_file_path must be string. Was of type "
+ type(audio_file_path).__name__
)
if not isinstance(import_directory, str):
raise TypeError(
"import_directory must be string. Was of type "
+ type(import_directory).__name__
)
if not isinstance(original_filename, str):
raise TypeError(
"original_filename must be string. Was of type "
+ type(original_filename).__name__
)
if not isinstance(metadata, dict):
raise TypeError(
"metadata must be a dict. Was of type " + type(metadata).__name__
)
if not os.path.exists(audio_file_path):
raise FileNotFoundError(f"audio file not found: {audio_file_path}")
# Import the file over to it's final location. If you import multiple copies of the same file, the behavior is:
# TODO: Also, handle the case where the move fails and write some code - The first filename is preserved.
# to possibly move the file to problem_files. - The next filenames receive the current date append to the name.
"""
filepath = Path(filepath_)
max_dir_len = 48 orig_filename = Path(original_filename)
max_file_len = 48 dest_path = Path(storage_url)
final_file_path = import_directory
orig_file_basename, orig_file_extension = os.path.splitext(original_filename) # Building import path
if "artist_name" in metadata: if "artist_name" in metadata:
final_file_path += ( dest_path /= metadata["artist_name"][0:MAX_DIR_LEN]
"/" + metadata["artist_name"][0:max_dir_len]
) # truncating with array slicing
if "album_title" in metadata: if "album_title" in metadata:
final_file_path += "/" + metadata["album_title"][0:max_dir_len] dest_path /= metadata["album_title"][0:MAX_DIR_LEN]
# Note that orig_file_extension includes the "." already
final_file_path += "/" + orig_file_basename[0:max_file_len] + orig_file_extension
# Ensure any redundant slashes are stripped dest_path /= orig_filename.stem[0:MAX_FILE_LEN] + orig_filename.suffix
final_file_path = os.path.normpath(final_file_path)
# If a file with the same name already exists in the "import" directory, then # Handle when a file already exists
# we add a unique string to the end of this one. We never overwrite a file on import if dest_path.is_file():
# because if we did that, it would mean Airtime's database would have if filepath.samefile(dest_path):
# the wrong information for the file we just overwrote (eg. the song length would be wrong!) metadata["full_path"] = str(dest_path)
# If the final file path is the same as the file we've been told to import (which
# you often do when you're debugging), then don't move the file at all.
if os.path.exists(final_file_path):
if os.path.samefile(audio_file_path, final_file_path):
metadata["full_path"] = final_file_path
return metadata return metadata
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "{}_{}{}".format(
base_file_path,
time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()),
file_extension,
)
# If THAT path exists, append a UUID instead: dest_path = dest_path.with_name(f"{dest_path.stem}_{uuid4()}{dest_path.suffix}")
while os.path.exists(final_file_path): logger.warning(f"found existing file, using new filepath {dest_path}")
base_file_path, file_extension = os.path.splitext(final_file_path)
final_file_path = "{}_{}{}".format(
base_file_path,
str(uuid.uuid4()),
file_extension,
)
# Ensure the full path to the file exists # Import
mkdir_p(os.path.dirname(final_file_path)) dest_path.parent.mkdir(parents=True, exist_ok=True)
# Move the file into its final destination directory logger.debug(f"moving {filepath} to {dest_path}")
logger.debug(f"Moving {audio_file_path} to {final_file_path}") shutil.move(filepath, dest_path)
shutil.move(audio_file_path, final_file_path)
metadata["full_path"] = final_file_path metadata["full_path"] = str(dest_path)
return metadata return metadata
def mkdir_p(path):
"""Make all directories in a tree (like mkdir -p)"""
if path == "":
return
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise

View File

@ -1,7 +1,5 @@
import os
import shutil import shutil
import time from pathlib import Path
from unittest import mock
import pytest import pytest
@ -10,106 +8,49 @@ from libretime_analyzer.pipeline.organise_file import organise_file
from ..conftest import AUDIO_FILENAME from ..conftest import AUDIO_FILENAME
@pytest.mark.parametrize( def organise_file_args_factory(filepath: Path, dest_dir: Path):
"params,exception", return (
[ str(filepath),
((42, "", "", dict()), TypeError), str(dest_dir),
(("", 23, "", dict()), TypeError),
(("", "", 5, dict()), TypeError),
(("", "", "", 12345), TypeError),
],
)
def test_organise_file_wrong_params(params, exception):
with pytest.raises(exception):
organise_file(*params)
def test_organise_file(src_dir, dest_dir):
organise_file(
os.path.join(src_dir, AUDIO_FILENAME),
dest_dir,
AUDIO_FILENAME, AUDIO_FILENAME,
dict(), {},
)
assert os.path.exists(os.path.join(dest_dir, AUDIO_FILENAME))
def test_organise_file_samefile(src_dir):
organise_file(
os.path.join(src_dir, AUDIO_FILENAME),
src_dir,
AUDIO_FILENAME,
dict(),
)
assert os.path.exists(os.path.join(src_dir, AUDIO_FILENAME))
def import_and_restore(src_dir, dest_dir) -> dict:
"""
Small helper to test the organise_file function.
Move the file and restore it back to it's origine.
"""
# Import the file
metadata = organise_file(
os.path.join(src_dir, AUDIO_FILENAME),
dest_dir,
AUDIO_FILENAME,
dict(),
) )
# Copy it back to the original location
shutil.copy(
os.path.join(dest_dir, AUDIO_FILENAME),
os.path.join(src_dir, AUDIO_FILENAME),
)
return metadata def test_organise_file(src_dir: Path, dest_dir: Path):
organise_file(*organise_file_args_factory(src_dir / AUDIO_FILENAME, dest_dir))
assert (dest_dir / AUDIO_FILENAME).exists()
def test_organise_file_duplicate_file(src_dir, dest_dir): def test_organise_file_samefile(src_dir: Path):
# Import the file once organise_file(*organise_file_args_factory(src_dir / AUDIO_FILENAME, src_dir))
import_and_restore(src_dir, dest_dir) assert (src_dir / AUDIO_FILENAME).exists()
# Import it again. It shouldn't overwrite the old file and instead create a new
metadata = import_and_restore(src_dir, dest_dir)
assert metadata["full_path"] != os.path.join(dest_dir, AUDIO_FILENAME)
assert os.path.exists(metadata["full_path"])
assert os.path.exists(os.path.join(dest_dir, AUDIO_FILENAME))
def test_organise_file_triplicate_file(src_dir, dest_dir): def test_organise_file_duplicate_file(src_dir: Path, dest_dir: Path):
# Here we use mock to patch out the time.localtime() function so that it for i in range(1, 4):
# always returns the same value. This allows us to consistently simulate this test cases # Make a copy so we can reuse the file
# where the last two of the three files are imported at the same time as the timestamp. filename = f"{i}_{AUDIO_FILENAME}"
with mock.patch("libretime_analyzer.pipeline.organise_file.time") as mock_time: shutil.copy(src_dir / AUDIO_FILENAME, src_dir / filename)
mock_time.localtime.return_value = time.localtime() # date(2010, 10, 8)
mock_time.side_effect = time.time
# Import the file once metadata = organise_file(
import_and_restore(src_dir, dest_dir) *organise_file_args_factory(src_dir / filename, dest_dir)
# Import it again. It shouldn't overwrite the old file and instead create a new )
metadata1 = import_and_restore(src_dir, dest_dir)
# Reimport for the third time, which should have the same timestamp as the second one full_path = Path(metadata["full_path"])
# thanks to us mocking out time.localtime() assert full_path.exists()
metadata2 = import_and_restore(src_dir, dest_dir) if i == 1:
assert full_path.name == AUDIO_FILENAME
# Check if file exists and if filename is <original>_<date>.<ext> else:
assert os.path.exists(metadata1["full_path"]) assert len(full_path.name) == len(AUDIO_FILENAME) + 1 + 36 # _ + UUID size
assert len(os.path.basename(metadata1["full_path"]).split("_")) == 2
# Check if file exists and if filename is <original>_<date>_<uuid>.<ext>
assert os.path.exists(metadata2["full_path"])
assert len(os.path.basename(metadata2["full_path"]).split("_")) == 3
def test_organise_file_bad_permissions_dest_dir(src_dir): def test_organise_file_bad_permissions_dest_dir(src_dir: Path):
with pytest.raises(OSError): with pytest.raises(OSError):
# /sys is using sysfs on Linux, which is unwritable # /sys is using sysfs on Linux, which is unwritable
organise_file( organise_file(
os.path.join(src_dir, AUDIO_FILENAME), *organise_file_args_factory(
"/sys/foobar", src_dir / AUDIO_FILENAME,
AUDIO_FILENAME, Path("/sys/foobar"),
dict(), )
) )