diff --git a/docs/administration.md b/docs/administration.md index f34156898..9fd60b6a0 100644 --- a/docs/administration.md +++ b/docs/administration.md @@ -185,6 +185,13 @@ For PostgreSQL, refer to [Upgrading a PostgreSQL Cluster](https://www.postgresql For MariaDB, refer to [Upgrading MariaDB](https://mariadb.com/kb/en/upgrading/) +You may also use the exporter and importer with the `--data-only` flag, after creating a new database with the updated version of PostgreSQL or MariaDB. + +!!! warning + + You should not change any settings, especially paths, when doing this or there is a + risk of data loss + ## Downgrading Paperless {#downgrade-paperless} Downgrades are possible. However, some updates also contain database @@ -269,6 +276,7 @@ optional arguments: -sm, --split-manifest -z, --zip -zn, --zip-name +--data-only ``` `target` is a folder to which the data gets written. This includes @@ -327,6 +335,9 @@ If `-z` or `--zip` is provided, the export will be a zip file in the target directory, named according to the current local date or the value set in `-zn` or `--zip-name`. +If `--data-only` is provided, only the database will be exported. This option is intended +to facilitate database upgrades without needing to clean documents and thumbnails from the media directory. + !!! warning If exporting with the file name format, there may be errors due to @@ -341,10 +352,15 @@ exporter](#exporter) and imports it into paperless. The importer works just like the exporter. You point it at a directory, and the script does the rest of the work: -``` +```shell document_importer source ``` +| Option | Required | Default | Description | +| ----------- | -------- | ------- | ------------------------------------------------------------------------- | +| source | Yes | N/A | The directory containing an export | +| --data-only | No | False | If provided, only import data, do not import document files or thumbnails | + When you use the provided docker compose script, put the export inside the `export` folder in your paperless source directory. Specify `../export` as the `source`. diff --git a/src/documents/management/commands/document_exporter.py b/src/documents/management/commands/document_exporter.py index 081dfb360..3f9143f44 100644 --- a/src/documents/management/commands/document_exporter.py +++ b/src/documents/management/commands/document_exporter.py @@ -5,6 +5,7 @@ import shutil import tempfile import time from pathlib import Path +from typing import TYPE_CHECKING from typing import Optional import tqdm @@ -22,6 +23,9 @@ from filelock import FileLock from guardian.models import GroupObjectPermission from guardian.models import UserObjectPermission +if TYPE_CHECKING: + from django.db.models import QuerySet + if settings.AUDIT_LOG_ENABLED: from auditlog.models import LogEntry @@ -147,6 +151,13 @@ class Command(BaseCommand): help="Sets the export zip file name", ) + parser.add_argument( + "--data-only", + default=False, + action="store_true", + help="If set, only the database will be imported, not files", + ) + parser.add_argument( "--no-progress-bar", default=False, @@ -154,19 +165,6 @@ class Command(BaseCommand): help="If set, the progress bar will not be shown", ) - def __init__(self, *args, **kwargs): - BaseCommand.__init__(self, *args, **kwargs) - self.target: Path = None - self.split_manifest = False - self.files_in_export_dir: set[Path] = set() - self.exported_files: list[Path] = [] - self.compare_checksums = False - self.use_filename_format = False - self.use_folder_prefix = False - self.delete = False - self.no_archive = False - self.no_thumbnail = False - def handle(self, *args, **options): self.target = Path(options["target"]).resolve() self.split_manifest: bool = options["split_manifest"] @@ -177,14 +175,17 @@ class Command(BaseCommand): self.no_archive: bool = options["no_archive"] self.no_thumbnail: bool = options["no_thumbnail"] self.zip_export: bool = options["zip"] + self.data_only: bool = options["data_only"] + self.no_progress_bar: bool = options["no_progress_bar"] + + self.files_in_export_dir: set[Path] = set() + self.exported_files: set[str] = set() # If zipping, save the original target for later and # get a temporary directory for the target instead temp_dir = None - self.original_target: Optional[Path] = None + self.original_target = self.target if self.zip_export: - self.original_target = self.target - settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) temp_dir = tempfile.TemporaryDirectory( dir=settings.SCRATCH_DIR, @@ -202,12 +203,13 @@ class Command(BaseCommand): raise CommandError("That path doesn't appear to be writable") try: + # Prevent any ongoing changes in the documents with FileLock(settings.MEDIA_LOCK): - self.dump(options["no_progress_bar"]) + self.dump() # We've written everything to the temporary directory in this case, # now make an archive in the original target, with all files stored - if self.zip_export: + if self.zip_export and temp_dir is not None: shutil.make_archive( os.path.join( self.original_target, @@ -222,7 +224,7 @@ class Command(BaseCommand): if self.zip_export and temp_dir is not None: temp_dir.cleanup() - def dump(self, progress_bar_disable=False): + def dump(self): # 1. Take a snapshot of what files exist in the current export folder for x in self.target.glob("**/*"): if x.is_file(): @@ -230,115 +232,59 @@ class Command(BaseCommand): # 2. Create manifest, containing all correspondents, types, tags, storage paths # note, documents and ui_settings + manifest_key_to_object_query: dict[str, QuerySet] = { + "correspondents": Correspondent.objects.all(), + "tags": Tag.objects.all(), + "document_types": DocumentType.objects.all(), + "storage_paths": StoragePath.objects.all(), + "mail_accounts": MailAccount.objects.all(), + "mail_rules": MailRule.objects.all(), + "saved_views": SavedView.objects.all(), + "saved_view_filter_rules": SavedViewFilterRule.objects.all(), + "groups": Group.objects.all(), + "users": User.objects.exclude( + username__in=["consumer", "AnonymousUser"], + ).all(), + "ui_settings": UiSettings.objects.all(), + "content_types": ContentType.objects.all(), + "permissions": Permission.objects.all(), + "user_object_permissions": UserObjectPermission.objects.all(), + "group_object_permissions": GroupObjectPermission.objects.all(), + "workflow_triggers": WorkflowTrigger.objects.all(), + "workflow_actions": WorkflowAction.objects.all(), + "workflows": Workflow.objects.all(), + "custom_fields": CustomField.objects.all(), + "custom_field_instances": CustomFieldInstance.objects.all(), + "app_configs": ApplicationConfiguration.objects.all(), + "notes": Note.objects.all(), + "documents": Document.objects.order_by("id").all(), + } + + if settings.AUDIT_LOG_ENABLED: + manifest_key_to_object_query["log_entries"] = LogEntry.objects.all() + with transaction.atomic(): - manifest = json.loads( - serializers.serialize("json", Correspondent.objects.all()), - ) + manifest_dict = {} - manifest += json.loads(serializers.serialize("json", Tag.objects.all())) - - manifest += json.loads( - serializers.serialize("json", DocumentType.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", StoragePath.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", MailAccount.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", MailRule.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", SavedView.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", SavedViewFilterRule.objects.all()), - ) - - manifest += json.loads(serializers.serialize("json", Group.objects.all())) - - manifest += json.loads( - serializers.serialize( - "json", - User.objects.exclude(username__in=["consumer", "AnonymousUser"]), - ), - ) - - manifest += json.loads( - serializers.serialize("json", UiSettings.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", ContentType.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", Permission.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", UserObjectPermission.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", GroupObjectPermission.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", WorkflowTrigger.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", WorkflowAction.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", Workflow.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", CustomField.objects.all()), - ) - - manifest += json.loads( - serializers.serialize("json", ApplicationConfiguration.objects.all()), - ) - - if settings.AUDIT_LOG_ENABLED: - manifest += json.loads( - serializers.serialize("json", LogEntry.objects.all()), + # Build an overall manifest + for key in manifest_key_to_object_query: + manifest_dict[key] = json.loads( + serializers.serialize("json", manifest_key_to_object_query[key]), ) # These are treated specially and included in the per-document manifest # if that setting is enabled. Otherwise, they are just exported to the bulk # manifest - documents = Document.objects.order_by("id") - document_map: dict[int, Document] = {d.pk: d for d in documents} - document_manifest = json.loads(serializers.serialize("json", documents)) - - notes = json.loads( - serializers.serialize("json", Note.objects.all()), - ) - - custom_field_instances = json.loads( - serializers.serialize("json", CustomFieldInstance.objects.all()), - ) - if not self.split_manifest: - manifest += document_manifest - manifest += notes - manifest += custom_field_instances + document_map: dict[int, Document] = { + d.pk: d for d in manifest_key_to_object_query["documents"] + } + document_manifest = manifest_dict["documents"] # 3. Export files from each document for index, document_dict in tqdm.tqdm( enumerate(document_manifest), total=len(document_manifest), - disable=progress_bar_disable, + disable=self.no_progress_bar, ): # 3.1. store files unencrypted document_dict["fields"]["storage_type"] = Document.STORAGE_TYPE_UNENCRYPTED @@ -346,102 +292,39 @@ class Command(BaseCommand): document = document_map[document_dict["pk"]] # 3.2. generate a unique filename - filename_counter = 0 - while True: - if self.use_filename_format: - base_name = generate_filename( - document, - counter=filename_counter, - append_gpg=False, - ) - else: - base_name = document.get_public_filename(counter=filename_counter) - - if base_name not in self.exported_files: - self.exported_files.append(base_name) - break - else: - filename_counter += 1 + base_name = self.generate_base_name(document) # 3.3. write filenames into manifest - original_name = base_name - if self.use_folder_prefix: - original_name = os.path.join("originals", original_name) - original_target = (self.target / Path(original_name)).resolve() - document_dict[EXPORTER_FILE_NAME] = original_name - - if not self.no_thumbnail: - thumbnail_name = base_name + "-thumbnail.webp" - if self.use_folder_prefix: - thumbnail_name = os.path.join("thumbnails", thumbnail_name) - thumbnail_target = (self.target / Path(thumbnail_name)).resolve() - document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name - else: - thumbnail_target = None - - if not self.no_archive and document.has_archive_version: - archive_name = base_name + "-archive.pdf" - if self.use_folder_prefix: - archive_name = os.path.join("archive", archive_name) - archive_target = (self.target / Path(archive_name)).resolve() - document_dict[EXPORTER_ARCHIVE_NAME] = archive_name - else: - archive_target = None + original_target, thumbnail_target, archive_target = ( + self.generate_document_targets(document, base_name, document_dict) + ) # 3.4. write files to target folder - if document.storage_type == Document.STORAGE_TYPE_GPG: - t = int(time.mktime(document.created.timetuple())) - - original_target.parent.mkdir(parents=True, exist_ok=True) - with document.source_file as out_file: - original_target.write_bytes(GnuPG.decrypted(out_file)) - os.utime(original_target, times=(t, t)) - - if thumbnail_target: - thumbnail_target.parent.mkdir(parents=True, exist_ok=True) - with document.thumbnail_file as out_file: - thumbnail_target.write_bytes(GnuPG.decrypted(out_file)) - os.utime(thumbnail_target, times=(t, t)) - - if archive_target: - archive_target.parent.mkdir(parents=True, exist_ok=True) - with document.archive_path as out_file: - archive_target.write_bytes(GnuPG.decrypted(out_file)) - os.utime(archive_target, times=(t, t)) - else: - self.check_and_copy( - document.source_path, - document.checksum, + if not self.data_only: + self.copy_document_files( + document, original_target, + thumbnail_target, + archive_target, ) - if thumbnail_target: - self.check_and_copy(document.thumbnail_path, None, thumbnail_target) - - if archive_target: - self.check_and_copy( - document.archive_path, - document.archive_checksum, - archive_target, - ) - if self.split_manifest: - manifest_name = base_name + "-manifest.json" + manifest_name = Path(base_name + "-manifest.json") if self.use_folder_prefix: - manifest_name = os.path.join("json", manifest_name) - manifest_name = (self.target / Path(manifest_name)).resolve() + manifest_name = Path("json") / manifest_name + manifest_name = (self.target / manifest_name).resolve() manifest_name.parent.mkdir(parents=True, exist_ok=True) content = [document_manifest[index]] content += list( filter( lambda d: d["fields"]["document"] == document_dict["pk"], - notes, + manifest_dict["notes"], ), ) content += list( filter( lambda d: d["fields"]["document"] == document_dict["pk"], - custom_field_instances, + manifest_dict["custom_field_instances"], ), ) manifest_name.write_text( @@ -451,8 +334,17 @@ class Command(BaseCommand): if manifest_name in self.files_in_export_dir: self.files_in_export_dir.remove(manifest_name) - # 4.1 write manifest to target folder - manifest_path = (self.target / Path("manifest.json")).resolve() + # These were exported already + if self.split_manifest: + del manifest_dict["documents"] + del manifest_dict["notes"] + del manifest_dict["custom_field_instances"] + + # 4.1 write primary manifest to target folder + manifest = [] + for key in manifest_dict: + manifest.extend(manifest_dict[key]) + manifest_path = (self.target / "manifest.json").resolve() manifest_path.write_text( json.dumps(manifest, indent=2, ensure_ascii=False), encoding="utf-8", @@ -461,7 +353,7 @@ class Command(BaseCommand): self.files_in_export_dir.remove(manifest_path) # 4.2 write version information to target folder - version_path = (self.target / Path("version.json")).resolve() + version_path = (self.target / "version.json").resolve() version_path.write_text( json.dumps( {"version": version.__full_version_str__}, @@ -491,7 +383,127 @@ class Command(BaseCommand): else: item.unlink() - def check_and_copy(self, source, source_checksum, target: Path): + def generate_base_name(self, document: Document) -> str: + """ + Generates a unique name for the document, one which hasn't already been exported (or will be) + """ + filename_counter = 0 + while True: + if self.use_filename_format: + base_name = generate_filename( + document, + counter=filename_counter, + append_gpg=False, + ) + else: + base_name = document.get_public_filename(counter=filename_counter) + + if base_name not in self.exported_files: + self.exported_files.add(base_name) + break + else: + filename_counter += 1 + return base_name + + def generate_document_targets( + self, + document: Document, + base_name: str, + document_dict: dict, + ) -> tuple[Path, Optional[Path], Optional[Path]]: + """ + Generates the targets for a given document, including the original file, archive file and thumbnail (depending on settings). + """ + original_name = base_name + if self.use_folder_prefix: + original_name = os.path.join("originals", original_name) + original_target = (self.target / Path(original_name)).resolve() + document_dict[EXPORTER_FILE_NAME] = original_name + + if not self.no_thumbnail: + thumbnail_name = base_name + "-thumbnail.webp" + if self.use_folder_prefix: + thumbnail_name = os.path.join("thumbnails", thumbnail_name) + thumbnail_target = (self.target / Path(thumbnail_name)).resolve() + document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name + else: + thumbnail_target = None + + if not self.no_archive and document.has_archive_version: + archive_name = base_name + "-archive.pdf" + if self.use_folder_prefix: + archive_name = os.path.join("archive", archive_name) + archive_target = (self.target / Path(archive_name)).resolve() + document_dict[EXPORTER_ARCHIVE_NAME] = archive_name + else: + archive_target = None + + return original_target, thumbnail_target, archive_target + + def copy_document_files( + self, + document: Document, + original_target: Path, + thumbnail_target: Optional[Path], + archive_target: Optional[Path], + ) -> None: + """ + Copies files from the document storage location to the specified target location. + + If the document is encrypted, the files are decrypted before copying them to the target location. + """ + if document.storage_type == Document.STORAGE_TYPE_GPG: + t = int(time.mktime(document.created.timetuple())) + + original_target.parent.mkdir(parents=True, exist_ok=True) + with document.source_file as out_file: + original_target.write_bytes(GnuPG.decrypted(out_file)) + os.utime(original_target, times=(t, t)) + + if thumbnail_target: + thumbnail_target.parent.mkdir(parents=True, exist_ok=True) + with document.thumbnail_file as out_file: + thumbnail_target.write_bytes(GnuPG.decrypted(out_file)) + os.utime(thumbnail_target, times=(t, t)) + + if archive_target: + archive_target.parent.mkdir(parents=True, exist_ok=True) + if TYPE_CHECKING: + assert isinstance(document.archive_path, Path) + with document.archive_path as out_file: + archive_target.write_bytes(GnuPG.decrypted(out_file)) + os.utime(archive_target, times=(t, t)) + else: + self.check_and_copy( + document.source_path, + document.checksum, + original_target, + ) + + if thumbnail_target: + self.check_and_copy(document.thumbnail_path, None, thumbnail_target) + + if archive_target: + if TYPE_CHECKING: + assert isinstance(document.archive_path, Path) + self.check_and_copy( + document.archive_path, + document.archive_checksum, + archive_target, + ) + + def check_and_copy( + self, + source: Path, + source_checksum: Optional[str], + target: Path, + ): + """ + Copies the source to the target, if target doesn't exist or the target doesn't seem to match + the source attributes + """ + + target = target.resolve() if target in self.files_in_export_dir: self.files_in_export_dir.remove(target) diff --git a/src/documents/management/commands/document_importer.py b/src/documents/management/commands/document_importer.py index 5cf036b0f..c6483011b 100644 --- a/src/documents/management/commands/document_importer.py +++ b/src/documents/management/commands/document_importer.py @@ -57,6 +57,7 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("source") + parser.add_argument( "--no-progress-bar", default=False, @@ -64,11 +65,12 @@ class Command(BaseCommand): help="If set, the progress bar will not be shown", ) - def __init__(self, *args, **kwargs): - BaseCommand.__init__(self, *args, **kwargs) - self.source = None - self.manifest = None - self.version = None + parser.add_argument( + "--data-only", + default=False, + action="store_true", + help="If set, only the database will be exported, not files", + ) def pre_check(self) -> None: """ @@ -82,17 +84,20 @@ class Command(BaseCommand): if not os.access(self.source, os.R_OK): raise CommandError("That path doesn't appear to be readable") - for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]: - if document_dir.exists() and document_dir.is_dir(): - for entry in document_dir.glob("**/*"): - if entry.is_dir(): - continue - self.stdout.write( - self.style.WARNING( - f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation", - ), - ) - break + # Skip this check if operating only on the database + # We can expect data to exist in that case + if not self.data_only: + for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]: + if document_dir.exists() and document_dir.is_dir(): + for entry in document_dir.glob("**/*"): + if entry.is_dir(): + continue + self.stdout.write( + self.style.WARNING( + f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation", + ), + ) + break if ( User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count() != 0 @@ -113,6 +118,8 @@ class Command(BaseCommand): logging.getLogger().handlers[0].level = logging.ERROR self.source = Path(options["source"]).resolve() + self.data_only: bool = options["data_only"] + self.no_progress_bar: bool = options["no_progress_bar"] self.pre_check() @@ -149,7 +156,8 @@ class Command(BaseCommand): else: self.stdout.write(self.style.NOTICE("No version.json file located")) - self._check_manifest_valid() + if not self.data_only: + self._check_manifest_files_valid() with ( disable_signal( @@ -200,13 +208,16 @@ class Command(BaseCommand): ) raise e - self._import_files_from_manifest(options["no_progress_bar"]) + if not self.data_only: + self._import_files_from_manifest() + else: + self.stdout.write(self.style.NOTICE("Data only import completed")) self.stdout.write("Updating search index...") call_command( "document_index", "reindex", - no_progress_bar=options["no_progress_bar"], + no_progress_bar=self.no_progress_bar, ) @staticmethod @@ -216,7 +227,7 @@ class Command(BaseCommand): "That directory doesn't appear to contain a manifest.json file.", ) - def _check_manifest_valid(self): + def _check_manifest_files_valid(self): """ Attempts to verify the manifest is valid. Namely checking the files referred to exist and the files can be read from @@ -233,15 +244,15 @@ class Command(BaseCommand): ) doc_file = record[EXPORTER_FILE_NAME] - doc_path = self.source / doc_file + doc_path: Path = self.source / doc_file if not doc_path.exists(): raise CommandError( f'The manifest file refers to "{doc_file}" which does not ' "appear to be in the source directory.", ) try: - with doc_path.open(mode="rb") as infile: - infile.read(1) + with doc_path.open(mode="rb"): + pass except Exception as e: raise CommandError( f"Failed to read from original file {doc_path}", @@ -249,21 +260,21 @@ class Command(BaseCommand): if EXPORTER_ARCHIVE_NAME in record: archive_file = record[EXPORTER_ARCHIVE_NAME] - doc_archive_path = self.source / archive_file + doc_archive_path: Path = self.source / archive_file if not doc_archive_path.exists(): raise CommandError( f"The manifest file refers to {archive_file} which " f"does not appear to be in the source directory.", ) try: - with doc_archive_path.open(mode="rb") as infile: - infile.read(1) + with doc_archive_path.open(mode="rb"): + pass except Exception as e: raise CommandError( f"Failed to read from archive file {doc_archive_path}", ) from e - def _import_files_from_manifest(self, progress_bar_disable): + def _import_files_from_manifest(self): settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) @@ -274,7 +285,7 @@ class Command(BaseCommand): filter(lambda r: r["model"] == "documents.document", self.manifest), ) - for record in tqdm.tqdm(manifest_documents, disable=progress_bar_disable): + for record in tqdm.tqdm(manifest_documents, disable=self.no_progress_bar): document = Document.objects.get(pk=record["pk"]) doc_file = record[EXPORTER_FILE_NAME] diff --git a/src/documents/tests/test_management_exporter.py b/src/documents/tests/test_management_exporter.py index b95d07dec..6d7eff980 100644 --- a/src/documents/tests/test_management_exporter.py +++ b/src/documents/tests/test_management_exporter.py @@ -37,10 +37,16 @@ from documents.sanity_checker import check_sanity from documents.settings import EXPORTER_FILE_NAME from documents.tests.utils import DirectoriesMixin from documents.tests.utils import FileSystemAssertsMixin +from documents.tests.utils import SampleDirMixin from documents.tests.utils import paperless_environment -class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): +class TestExportImport( + DirectoriesMixin, + FileSystemAssertsMixin, + SampleDirMixin, + TestCase, +): def setUp(self) -> None: self.target = Path(tempfile.mkdtemp()) self.addCleanup(shutil.rmtree, self.target) @@ -139,6 +145,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): @override_settings(PASSPHRASE="test") def _do_export( self, + *, use_filename_format=False, compare_checksums=False, delete=False, @@ -146,6 +153,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): no_thumbnail=False, split_manifest=False, use_folder_prefix=False, + data_only=False, ): args = ["document_exporter", self.target] if use_filename_format: @@ -162,6 +170,8 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): args += ["--split-manifest"] if use_folder_prefix: args += ["--use-folder-prefix"] + if data_only: + args += ["--data-only"] call_command(*args) @@ -794,3 +804,39 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): manifest = self._do_export(use_filename_format=True) for obj in manifest: self.assertNotEqual(obj["model"], "auditlog.logentry") + + def test_export_data_only(self): + """ + GIVEN: + - Request to export documents with data only + WHEN: + - Export command is called + THEN: + - No document files are exported + - Manifest and version are exported + """ + + shutil.rmtree(self.dirs.media_dir / "documents") + shutil.copytree( + self.SAMPLE_DIR / "documents", + self.dirs.media_dir / "documents", + ) + + _ = self._do_export(data_only=True) + + # Manifest and version files only should be present in the exported directory + self.assertFileCountInDir(self.target, 2) + self.assertIsFile(self.target / "manifest.json") + self.assertIsFile(self.target / "version.json") + + shutil.rmtree(self.dirs.media_dir / "documents") + Document.objects.all().delete() + + call_command( + "document_importer", + "--no-progress-bar", + "--data-only", + self.target, + ) + + self.assertEqual(Document.objects.all().count(), 4) diff --git a/src/documents/tests/test_management_importer.py b/src/documents/tests/test_management_importer.py index c0d155d02..9d07e8a60 100644 --- a/src/documents/tests/test_management_importer.py +++ b/src/documents/tests/test_management_importer.py @@ -14,9 +14,15 @@ from documents.settings import EXPORTER_ARCHIVE_NAME from documents.settings import EXPORTER_FILE_NAME from documents.tests.utils import DirectoriesMixin from documents.tests.utils import FileSystemAssertsMixin +from documents.tests.utils import SampleDirMixin -class TestCommandImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): +class TestCommandImport( + DirectoriesMixin, + FileSystemAssertsMixin, + SampleDirMixin, + TestCase, +): def test_check_manifest_exists(self): """ GIVEN: @@ -120,14 +126,14 @@ class TestCommandImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase): }, ] with self.assertRaises(CommandError) as cm: - cmd._check_manifest_valid() + cmd._check_manifest_files_valid() self.assertInt("Failed to read from original file", str(cm.exception)) original_path.chmod(0o444) archive_path.chmod(0o222) with self.assertRaises(CommandError) as cm: - cmd._check_manifest_valid() + cmd._check_manifest_files_valid() self.assertInt("Failed to read from archive file", str(cm.exception)) def test_import_source_not_existing(self): diff --git a/src/documents/tests/utils.py b/src/documents/tests/utils.py index fb4fa9f07..4ec0851df 100644 --- a/src/documents/tests/utils.py +++ b/src/documents/tests/utils.py @@ -156,10 +156,6 @@ class DirectoriesMixin: they are cleaned up on exit """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.dirs = None - def setUp(self) -> None: self.dirs = setup_directories() super().setUp() @@ -200,6 +196,16 @@ class FileSystemAssertsMixin: self.assertEqual(hash1, hash2, "File SHA256 mismatch") + def assertFileCountInDir(self, path: Union[PathLike, str], count: int): + path = Path(path).resolve() + self.assertTrue(path.is_dir(), f"Path {path} is not a directory") + files = [x for x in path.iterdir() if x.is_file()] + self.assertEqual( + len(files), + count, + f"Path {path} contains {len(files)} files instead of {count} files", + ) + class ConsumerProgressMixin: """