diff --git a/src/documents/management/commands/document_exporter.py b/src/documents/management/commands/document_exporter.py index 0e99e7b7d..07b4643f2 100644 --- a/src/documents/management/commands/document_exporter.py +++ b/src/documents/management/commands/document_exporter.py @@ -2,6 +2,7 @@ import hashlib import json import os import shutil +import tempfile import time import tqdm @@ -12,6 +13,7 @@ from django.core import serializers from django.core.management.base import BaseCommand from django.core.management.base import CommandError from django.db import transaction +from django.utils import timezone from documents.models import Comment from documents.models import Correspondent from documents.models import Document @@ -76,6 +78,7 @@ class Command(BaseCommand): "do not belong to the current export, such as files from " "deleted documents.", ) + parser.add_argument( "--no-progress-bar", default=False, @@ -83,6 +86,14 @@ class Command(BaseCommand): help="If set, the progress bar will not be shown", ) + parser.add_argument( + "-z", + "--zip", + default=False, + action="store_true", + help="Export the documents to a zip file in the given directory", + ) + def __init__(self, *args, **kwargs): BaseCommand.__init__(self, *args, **kwargs) self.target = None @@ -98,6 +109,19 @@ class Command(BaseCommand): self.compare_checksums = options["compare_checksums"] self.use_filename_format = options["use_filename_format"] self.delete = options["delete"] + zip_export: bool = options["zip"] + + # If zipping, save the original target for later and + # get a temporary directory for the target + temp_dir = None + original_target = None + if zip_export: + original_target = self.target + temp_dir = tempfile.TemporaryDirectory( + dir=settings.SCRATCH_DIR, + prefix="paperless-export", + ) + self.target = temp_dir.name if not os.path.exists(self.target): raise CommandError("That path doesn't exist") @@ -105,8 +129,26 @@ class Command(BaseCommand): if not os.access(self.target, os.W_OK): raise CommandError("That path doesn't appear to be writable") - with FileLock(settings.MEDIA_LOCK): - self.dump(options["no_progress_bar"]) + try: + with FileLock(settings.MEDIA_LOCK): + self.dump(options["no_progress_bar"]) + + # We've written everything to the temporary directory in this case, + # now make an archive in the original target, with all files stored + if zip_export: + shutil.make_archive( + os.path.join( + original_target, + f"export-{timezone.localdate().isoformat()}", + ), + format="zip", + root_dir=temp_dir.name, + ) + + finally: + # Always cleanup the temporary directory, if one was created + if zip_export and temp_dir is not None: + temp_dir.cleanup() def dump(self, progress_bar_disable=False): # 1. Take a snapshot of what files exist in the current export folder diff --git a/src/documents/tests/test_management_exporter.py b/src/documents/tests/test_management_exporter.py index fa7567f05..fd9c9366d 100644 --- a/src/documents/tests/test_management_exporter.py +++ b/src/documents/tests/test_management_exporter.py @@ -5,10 +5,12 @@ import shutil import tempfile from pathlib import Path from unittest import mock +from zipfile import ZipFile from django.core.management import call_command from django.test import override_settings from django.test import TestCase +from django.utils import timezone from documents.management.commands import document_exporter from documents.models import Comment from documents.models import Correspondent @@ -365,3 +367,56 @@ class TestExportImport(DirectoriesMixin, TestCase): mime_type="application/pdf", ) self.assertRaises(FileNotFoundError, call_command, "document_exporter", target) + + @override_settings(PASSPHRASE="test") + def test_export_zipped(self): + shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) + shutil.copytree( + os.path.join(os.path.dirname(__file__), "samples", "documents"), + os.path.join(self.dirs.media_dir, "documents"), + ) + + args = ["document_exporter", self.target, "--zip"] + + call_command(*args) + + expected_file = os.path.join( + self.target, + f"export-{timezone.localdate().isoformat()}.zip", + ) + + self.assertTrue(os.path.isfile(expected_file)) + + with ZipFile(expected_file) as zip: + self.assertEqual(len(zip.namelist()), 11) + self.assertIn("manifest.json", zip.namelist()) + self.assertIn("version.json", zip.namelist()) + + @override_settings(PASSPHRASE="test") + def test_export_zipped_format(self): + shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) + shutil.copytree( + os.path.join(os.path.dirname(__file__), "samples", "documents"), + os.path.join(self.dirs.media_dir, "documents"), + ) + + args = ["document_exporter", self.target, "--zip", "--use-filename-format"] + + with override_settings( + FILENAME_FORMAT="{created_year}/{correspondent}/{title}", + ): + call_command(*args) + + expected_file = os.path.join( + self.target, + f"export-{timezone.localdate().isoformat()}.zip", + ) + + self.assertTrue(os.path.isfile(expected_file)) + + with ZipFile(expected_file) as zip: + print(zip.namelist()) + # Extras are from the directories, which also appear in the listing + self.assertEqual(len(zip.namelist()), 14) + self.assertIn("manifest.json", zip.namelist()) + self.assertIn("version.json", zip.namelist())