From d4ffaf9a89a30870fc32b9090a7e196cff941046 Mon Sep 17 00:00:00 2001 From: jo Date: Tue, 15 Feb 2022 12:02:17 +0100 Subject: [PATCH] feat(analyzer): rework organise_file using pathlib - use uuids instead of datetime - massively using pathlib to manipulate paths --- .../pipeline/organise_file.py | 135 +++++------------- analyzer/tests/pipeline/organise_file_test.py | 121 ++++------------ 2 files changed, 66 insertions(+), 190 deletions(-) diff --git a/analyzer/libretime_analyzer/pipeline/organise_file.py b/analyzer/libretime_analyzer/pipeline/organise_file.py index f45ac71f0..d3fca0cbe 100644 --- a/analyzer/libretime_analyzer/pipeline/organise_file.py +++ b/analyzer/libretime_analyzer/pipeline/organise_file.py @@ -1,119 +1,54 @@ -import errno -import os import shutil -import time -import uuid +from pathlib import Path +from uuid import uuid4 from loguru import logger +MAX_DIR_LEN = 48 +MAX_FILE_LEN = 48 -def organise_file(audio_file_path, import_directory, original_filename, metadata): - """Move the file at audio_file_path over into the import_directory/import, - renaming it to original_filename. - This analyzer copies a file over from a temporary directory (stor/organize) - into the Airtime library (stor/imported). - - If you import three copies of the same file, the behaviour is: - - The filename is of the first file preserved. - - The filename of the second file has the timestamp attached to it. - - The filename of the third file has a UUID placed after the timestamp, but ONLY IF it's imported within 1 second of the second file (ie. if the timestamp is the same). - - Keyword arguments: - audio_file_path: Path to the file to be imported. - import_directory: Path to the "import" directory inside the Airtime stor directory. - (eg. /srv/airtime/stor/import) - original_filename: The filename of the file when it was uploaded to Airtime. - metadata: A dictionary where the "full_path" of where the file is moved to will be added. +def organise_file( + filepath_: str, + storage_url: str, + original_filename: str, + metadata: dict, +) -> dict: """ - if not isinstance(audio_file_path, str): - raise TypeError( - "audio_file_path must be string. Was of type " - + type(audio_file_path).__name__ - ) - if not isinstance(import_directory, str): - raise TypeError( - "import_directory must be string. Was of type " - + type(import_directory).__name__ - ) - if not isinstance(original_filename, str): - raise TypeError( - "original_filename must be string. Was of type " - + type(original_filename).__name__ - ) - if not isinstance(metadata, dict): - raise TypeError( - "metadata must be a dict. Was of type " + type(metadata).__name__ - ) - if not os.path.exists(audio_file_path): - raise FileNotFoundError(f"audio file not found: {audio_file_path}") + Move the incoming file into the storage, while preserving the original filename. - # Import the file over to it's final location. - # TODO: Also, handle the case where the move fails and write some code - # to possibly move the file to problem_files. + If you import multiple copies of the same file, the behavior is: + - The first filename is preserved. + - The next filenames receive the current date append to the name. + """ + filepath = Path(filepath_) - max_dir_len = 48 - max_file_len = 48 - final_file_path = import_directory - orig_file_basename, orig_file_extension = os.path.splitext(original_filename) + orig_filename = Path(original_filename) + dest_path = Path(storage_url) + + # Building import path if "artist_name" in metadata: - final_file_path += ( - "/" + metadata["artist_name"][0:max_dir_len] - ) # truncating with array slicing + dest_path /= metadata["artist_name"][0:MAX_DIR_LEN] + if "album_title" in metadata: - final_file_path += "/" + metadata["album_title"][0:max_dir_len] - # Note that orig_file_extension includes the "." already - final_file_path += "/" + orig_file_basename[0:max_file_len] + orig_file_extension + dest_path /= metadata["album_title"][0:MAX_DIR_LEN] - # Ensure any redundant slashes are stripped - final_file_path = os.path.normpath(final_file_path) + dest_path /= orig_filename.stem[0:MAX_FILE_LEN] + orig_filename.suffix - # If a file with the same name already exists in the "import" directory, then - # we add a unique string to the end of this one. We never overwrite a file on import - # because if we did that, it would mean Airtime's database would have - # the wrong information for the file we just overwrote (eg. the song length would be wrong!) - # If the final file path is the same as the file we've been told to import (which - # you often do when you're debugging), then don't move the file at all. - - if os.path.exists(final_file_path): - if os.path.samefile(audio_file_path, final_file_path): - metadata["full_path"] = final_file_path + # Handle when a file already exists + if dest_path.is_file(): + if filepath.samefile(dest_path): + metadata["full_path"] = str(dest_path) return metadata - base_file_path, file_extension = os.path.splitext(final_file_path) - final_file_path = "{}_{}{}".format( - base_file_path, - time.strftime("%m-%d-%Y-%H-%M-%S", time.localtime()), - file_extension, - ) - # If THAT path exists, append a UUID instead: - while os.path.exists(final_file_path): - base_file_path, file_extension = os.path.splitext(final_file_path) - final_file_path = "{}_{}{}".format( - base_file_path, - str(uuid.uuid4()), - file_extension, - ) + dest_path = dest_path.with_name(f"{dest_path.stem}_{uuid4()}{dest_path.suffix}") + logger.warning(f"found existing file, using new filepath {dest_path}") - # Ensure the full path to the file exists - mkdir_p(os.path.dirname(final_file_path)) + # Import + dest_path.parent.mkdir(parents=True, exist_ok=True) - # Move the file into its final destination directory - logger.debug(f"Moving {audio_file_path} to {final_file_path}") - shutil.move(audio_file_path, final_file_path) + logger.debug(f"moving {filepath} to {dest_path}") + shutil.move(filepath, dest_path) - metadata["full_path"] = final_file_path + metadata["full_path"] = str(dest_path) return metadata - - -def mkdir_p(path): - """Make all directories in a tree (like mkdir -p)""" - if path == "": - return - try: - os.makedirs(path) - except OSError as exc: # Python >2.5 - if exc.errno == errno.EEXIST and os.path.isdir(path): - pass - else: - raise diff --git a/analyzer/tests/pipeline/organise_file_test.py b/analyzer/tests/pipeline/organise_file_test.py index 2b2551fed..51cc8b826 100644 --- a/analyzer/tests/pipeline/organise_file_test.py +++ b/analyzer/tests/pipeline/organise_file_test.py @@ -1,7 +1,5 @@ -import os import shutil -import time -from unittest import mock +from pathlib import Path import pytest @@ -10,106 +8,49 @@ from libretime_analyzer.pipeline.organise_file import organise_file from ..conftest import AUDIO_FILENAME -@pytest.mark.parametrize( - "params,exception", - [ - ((42, "", "", dict()), TypeError), - (("", 23, "", dict()), TypeError), - (("", "", 5, dict()), TypeError), - (("", "", "", 12345), TypeError), - ], -) -def test_organise_file_wrong_params(params, exception): - with pytest.raises(exception): - organise_file(*params) - - -def test_organise_file(src_dir, dest_dir): - organise_file( - os.path.join(src_dir, AUDIO_FILENAME), - dest_dir, +def organise_file_args_factory(filepath: Path, dest_dir: Path): + return ( + str(filepath), + str(dest_dir), AUDIO_FILENAME, - dict(), - ) - assert os.path.exists(os.path.join(dest_dir, AUDIO_FILENAME)) - - -def test_organise_file_samefile(src_dir): - organise_file( - os.path.join(src_dir, AUDIO_FILENAME), - src_dir, - AUDIO_FILENAME, - dict(), - ) - assert os.path.exists(os.path.join(src_dir, AUDIO_FILENAME)) - - -def import_and_restore(src_dir, dest_dir) -> dict: - """ - Small helper to test the organise_file function. - Move the file and restore it back to it's origine. - """ - # Import the file - metadata = organise_file( - os.path.join(src_dir, AUDIO_FILENAME), - dest_dir, - AUDIO_FILENAME, - dict(), + {}, ) - # Copy it back to the original location - shutil.copy( - os.path.join(dest_dir, AUDIO_FILENAME), - os.path.join(src_dir, AUDIO_FILENAME), - ) - return metadata +def test_organise_file(src_dir: Path, dest_dir: Path): + organise_file(*organise_file_args_factory(src_dir / AUDIO_FILENAME, dest_dir)) + assert (dest_dir / AUDIO_FILENAME).exists() -def test_organise_file_duplicate_file(src_dir, dest_dir): - # Import the file once - import_and_restore(src_dir, dest_dir) - - # Import it again. It shouldn't overwrite the old file and instead create a new - metadata = import_and_restore(src_dir, dest_dir) - - assert metadata["full_path"] != os.path.join(dest_dir, AUDIO_FILENAME) - assert os.path.exists(metadata["full_path"]) - assert os.path.exists(os.path.join(dest_dir, AUDIO_FILENAME)) +def test_organise_file_samefile(src_dir: Path): + organise_file(*organise_file_args_factory(src_dir / AUDIO_FILENAME, src_dir)) + assert (src_dir / AUDIO_FILENAME).exists() -def test_organise_file_triplicate_file(src_dir, dest_dir): - # Here we use mock to patch out the time.localtime() function so that it - # always returns the same value. This allows us to consistently simulate this test cases - # where the last two of the three files are imported at the same time as the timestamp. - with mock.patch("libretime_analyzer.pipeline.organise_file.time") as mock_time: - mock_time.localtime.return_value = time.localtime() # date(2010, 10, 8) - mock_time.side_effect = time.time +def test_organise_file_duplicate_file(src_dir: Path, dest_dir: Path): + for i in range(1, 4): + # Make a copy so we can reuse the file + filename = f"{i}_{AUDIO_FILENAME}" + shutil.copy(src_dir / AUDIO_FILENAME, src_dir / filename) - # Import the file once - import_and_restore(src_dir, dest_dir) - # Import it again. It shouldn't overwrite the old file and instead create a new - metadata1 = import_and_restore(src_dir, dest_dir) + metadata = organise_file( + *organise_file_args_factory(src_dir / filename, dest_dir) + ) - # Reimport for the third time, which should have the same timestamp as the second one - # thanks to us mocking out time.localtime() - metadata2 = import_and_restore(src_dir, dest_dir) - - # Check if file exists and if filename is _. - assert os.path.exists(metadata1["full_path"]) - assert len(os.path.basename(metadata1["full_path"]).split("_")) == 2 - - # Check if file exists and if filename is __. - assert os.path.exists(metadata2["full_path"]) - assert len(os.path.basename(metadata2["full_path"]).split("_")) == 3 + full_path = Path(metadata["full_path"]) + assert full_path.exists() + if i == 1: + assert full_path.name == AUDIO_FILENAME + else: + assert len(full_path.name) == len(AUDIO_FILENAME) + 1 + 36 # _ + UUID size -def test_organise_file_bad_permissions_dest_dir(src_dir): +def test_organise_file_bad_permissions_dest_dir(src_dir: Path): with pytest.raises(OSError): # /sys is using sysfs on Linux, which is unwritable organise_file( - os.path.join(src_dir, AUDIO_FILENAME), - "/sys/foobar", - AUDIO_FILENAME, - dict(), + *organise_file_args_factory( + src_dir / AUDIO_FILENAME, + Path("/sys/foobar"), + ) )