From 8da2535a657fffa484c05c23b79d98f4616660cf Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Thu, 4 Jan 2024 11:58:58 -0800 Subject: [PATCH] Fix: zip exports not respecting the --delete option (#5245) --- .../management/commands/document_exporter.py | 36 +++++++------ .../tests/test_management_exporter.py | 50 ++++++++++++++++++- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/src/documents/management/commands/document_exporter.py b/src/documents/management/commands/document_exporter.py index b08b0b208..9bdbd1c51 100644 --- a/src/documents/management/commands/document_exporter.py +++ b/src/documents/management/commands/document_exporter.py @@ -5,6 +5,7 @@ import shutil import tempfile import time from pathlib import Path +from typing import Optional import tqdm from django.conf import settings @@ -172,14 +173,14 @@ class Command(BaseCommand): self.delete: bool = options["delete"] self.no_archive: bool = options["no_archive"] self.no_thumbnail: bool = options["no_thumbnail"] - zip_export: bool = options["zip"] + self.zip_export: bool = options["zip"] # If zipping, save the original target for later and # get a temporary directory for the target instead temp_dir = None - original_target = None - if zip_export: - original_target = self.target + self.original_target: Optional[Path] = None + if self.zip_export: + self.original_target = self.target os.makedirs(settings.SCRATCH_DIR, exist_ok=True) temp_dir = tempfile.TemporaryDirectory( @@ -203,10 +204,10 @@ class Command(BaseCommand): # We've written everything to the temporary directory in this case, # now make an archive in the original target, with all files stored - if zip_export: + if self.zip_export: shutil.make_archive( os.path.join( - original_target, + self.original_target, options["zip_name"], ), format="zip", @@ -215,7 +216,7 @@ class Command(BaseCommand): finally: # Always cleanup the temporary directory, if one was created - if zip_export and temp_dir is not None: + if self.zip_export and temp_dir is not None: temp_dir.cleanup() def dump(self, progress_bar_disable=False): @@ -466,14 +467,21 @@ class Command(BaseCommand): if self.delete: # 5. Remove files which we did not explicitly export in this run + if not self.zip_export: + for f in self.files_in_export_dir: + f.unlink() - for f in self.files_in_export_dir: - f.unlink() - - delete_empty_directories( - f.parent, - self.target, - ) + delete_empty_directories( + f.parent, + self.target, + ) + else: + # 5. Remove anything in the original location (before moving the zip) + for item in self.original_target.glob("*"): + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() def check_and_copy(self, source, source_checksum, target: Path): if target in self.files_in_export_dir: diff --git a/src/documents/tests/test_management_exporter.py b/src/documents/tests/test_management_exporter.py index a51bd4662..888572b58 100644 --- a/src/documents/tests/test_management_exporter.py +++ b/src/documents/tests/test_management_exporter.py @@ -42,7 +42,7 @@ from documents.tests.utils import paperless_environment class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): def setUp(self) -> None: - self.target = tempfile.mkdtemp() + self.target = Path(tempfile.mkdtemp()) self.addCleanup(shutil.rmtree, self.target) self.user = User.objects.create(username="temp_admin") @@ -496,6 +496,54 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): self.assertIn("manifest.json", zip.namelist()) self.assertIn("version.json", zip.namelist()) + @override_settings(PASSPHRASE="test") + def test_export_zipped_with_delete(self): + """ + GIVEN: + - Request to export documents to zipfile + - There is one existing file in the target + - There is one existing directory in the target + WHEN: + - Documents are exported + - deletion of existing files is requested + THEN: + - Zipfile is created + - Zipfile contains exported files + - The existing file and directory in target are removed + """ + shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) + shutil.copytree( + os.path.join(os.path.dirname(__file__), "samples", "documents"), + os.path.join(self.dirs.media_dir, "documents"), + ) + + # Create stuff in target directory + existing_file = self.target / "test.txt" + existing_file.touch() + existing_dir = self.target / "somedir" + existing_dir.mkdir(parents=True) + + self.assertIsFile(existing_file) + self.assertIsDir(existing_dir) + + args = ["document_exporter", self.target, "--zip", "--delete"] + + call_command(*args) + + expected_file = os.path.join( + self.target, + f"export-{timezone.localdate().isoformat()}.zip", + ) + + self.assertIsFile(expected_file) + self.assertIsNotFile(existing_file) + self.assertIsNotDir(existing_dir) + + with ZipFile(expected_file) as zip: + self.assertEqual(len(zip.namelist()), 11) + self.assertIn("manifest.json", zip.namelist()) + self.assertIn("version.json", zip.namelist()) + def test_export_target_not_exists(self): """ GIVEN: