Fix: zip exports not respecting the --delete option (#5245)

This commit is contained in:
Trenton H 2024-01-04 11:58:58 -08:00 committed by GitHub
parent 5963dfe41b
commit 8da2535a65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 15 deletions

View File

@ -5,6 +5,7 @@ import shutil
import tempfile
import time
from pathlib import Path
from typing import Optional
import tqdm
from django.conf import settings
@ -172,14 +173,14 @@ class Command(BaseCommand):
self.delete: bool = options["delete"]
self.no_archive: bool = options["no_archive"]
self.no_thumbnail: bool = options["no_thumbnail"]
zip_export: bool = options["zip"]
self.zip_export: bool = options["zip"]
# If zipping, save the original target for later and
# get a temporary directory for the target instead
temp_dir = None
original_target = None
if zip_export:
original_target = self.target
self.original_target: Optional[Path] = None
if self.zip_export:
self.original_target = self.target
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
temp_dir = tempfile.TemporaryDirectory(
@ -203,10 +204,10 @@ class Command(BaseCommand):
# We've written everything to the temporary directory in this case,
# now make an archive in the original target, with all files stored
if zip_export:
if self.zip_export:
shutil.make_archive(
os.path.join(
original_target,
self.original_target,
options["zip_name"],
),
format="zip",
@ -215,7 +216,7 @@ class Command(BaseCommand):
finally:
# Always cleanup the temporary directory, if one was created
if zip_export and temp_dir is not None:
if self.zip_export and temp_dir is not None:
temp_dir.cleanup()
def dump(self, progress_bar_disable=False):
@ -466,14 +467,21 @@ class Command(BaseCommand):
if self.delete:
# 5. Remove files which we did not explicitly export in this run
if not self.zip_export:
for f in self.files_in_export_dir:
f.unlink()
for f in self.files_in_export_dir:
f.unlink()
delete_empty_directories(
f.parent,
self.target,
)
delete_empty_directories(
f.parent,
self.target,
)
else:
# 5. Remove anything in the original location (before moving the zip)
for item in self.original_target.glob("*"):
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
def check_and_copy(self, source, source_checksum, target: Path):
if target in self.files_in_export_dir:

View File

@ -42,7 +42,7 @@ from documents.tests.utils import paperless_environment
class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def setUp(self) -> None:
self.target = tempfile.mkdtemp()
self.target = Path(tempfile.mkdtemp())
self.addCleanup(shutil.rmtree, self.target)
self.user = User.objects.create(username="temp_admin")
@ -496,6 +496,54 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
self.assertIn("manifest.json", zip.namelist())
self.assertIn("version.json", zip.namelist())
@override_settings(PASSPHRASE="test")
def test_export_zipped_with_delete(self):
"""
GIVEN:
- Request to export documents to zipfile
- There is one existing file in the target
- There is one existing directory in the target
WHEN:
- Documents are exported
- deletion of existing files is requested
THEN:
- Zipfile is created
- Zipfile contains exported files
- The existing file and directory in target are removed
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
)
# Create stuff in target directory
existing_file = self.target / "test.txt"
existing_file.touch()
existing_dir = self.target / "somedir"
existing_dir.mkdir(parents=True)
self.assertIsFile(existing_file)
self.assertIsDir(existing_dir)
args = ["document_exporter", self.target, "--zip", "--delete"]
call_command(*args)
expected_file = os.path.join(
self.target,
f"export-{timezone.localdate().isoformat()}.zip",
)
self.assertIsFile(expected_file)
self.assertIsNotFile(existing_file)
self.assertIsNotDir(existing_dir)
with ZipFile(expected_file) as zip:
self.assertEqual(len(zip.namelist()), 11)
self.assertIn("manifest.json", zip.namelist())
self.assertIn("version.json", zip.namelist())
def test_export_target_not_exists(self):
"""
GIVEN: