diff --git a/docs/administration.md b/docs/administration.md index a65647836..1286b6933 100644 --- a/docs/administration.md +++ b/docs/administration.md @@ -248,6 +248,7 @@ optional arguments: -z, --zip -zn, --zip-name --data-only +--passphrase ``` `target` is a folder to which the data gets written. This includes @@ -309,6 +310,9 @@ value set in `-zn` or `--zip-name`. If `--data-only` is provided, only the database will be exported. This option is intended to facilitate database upgrades without needing to clean documents and thumbnails from the media directory. +If `--passphrase` is provided, it will be used to encrypt certain fields in the export. This value +must be provided to import. If this value is lost, the export cannot be imported. + !!! warning If exporting with the file name format, there may be errors due to @@ -327,16 +331,18 @@ and the script does the rest of the work: document_importer source ``` -| Option | Required | Default | Description | -| ----------- | -------- | ------- | ------------------------------------------------------------------------- | -| source | Yes | N/A | The directory containing an export | -| --data-only | No | False | If provided, only import data, do not import document files or thumbnails | +| Option | Required | Default | Description | +| -------------- | -------- | ------- | ------------------------------------------------------------------------- | +| source | Yes | N/A | The directory containing an export | +| `--data-only` | No | False | If provided, only import data, do not import document files or thumbnails | +| `--passphrase` | No | N/A | If your export was encrypted with a passphrase, must be provided | When you use the provided docker compose script, put the export inside the `export` folder in your paperless source directory. Specify `../export` as the `source`. -Note that .zip files (as can be generated from the exporter) are not supported. +Note that .zip files (as can be generated from the exporter) are not supported. You must unzip them into +the target directory first. !!! note @@ -346,6 +352,7 @@ Note that .zip files (as can be generated from the exporter) are not supported. !!! warning The importer should be run against a completely empty installation (database and directories) of Paperless-ngx. + If using a data only import, only the database must be empty. ### Document retagger {#retagger} diff --git a/src/documents/management/commands/document_exporter.py b/src/documents/management/commands/document_exporter.py index 3f9143f44..618c1a4e5 100644 --- a/src/documents/management/commands/document_exporter.py +++ b/src/documents/management/commands/document_exporter.py @@ -31,6 +31,7 @@ if settings.AUDIT_LOG_ENABLED: from documents.file_handling import delete_empty_directories from documents.file_handling import generate_filename +from documents.management.commands.mixins import CryptMixin from documents.models import Correspondent from documents.models import CustomField from documents.models import CustomFieldInstance @@ -56,7 +57,7 @@ from paperless_mail.models import MailAccount from paperless_mail.models import MailRule -class Command(BaseCommand): +class Command(CryptMixin, BaseCommand): help = ( "Decrypt and rename all files in our collection into a given target " "directory. And include a manifest file containing document data for " @@ -165,6 +166,11 @@ class Command(BaseCommand): help="If set, the progress bar will not be shown", ) + parser.add_argument( + "--passphrase", + help="If provided, is used to encrypt sensitive data in the export", + ) + def handle(self, *args, **options): self.target = Path(options["target"]).resolve() self.split_manifest: bool = options["split_manifest"] @@ -177,6 +183,7 @@ class Command(BaseCommand): self.zip_export: bool = options["zip"] self.data_only: bool = options["data_only"] self.no_progress_bar: bool = options["no_progress_bar"] + self.passphrase: Optional[str] = options.get("passphrase") self.files_in_export_dir: set[Path] = set() self.exported_files: set[str] = set() @@ -272,6 +279,8 @@ class Command(BaseCommand): serializers.serialize("json", manifest_key_to_object_query[key]), ) + self.encrypt_secret_fields(manifest_dict) + # These are treated specially and included in the per-document manifest # if that setting is enabled. Otherwise, they are just exported to the bulk # manifest @@ -353,17 +362,25 @@ class Command(BaseCommand): self.files_in_export_dir.remove(manifest_path) # 4.2 write version information to target folder - version_path = (self.target / "version.json").resolve() - version_path.write_text( + extra_metadata_path = (self.target / "metadata.json").resolve() + metadata: dict[str, str | int | dict[str, str | int]] = { + "version": version.__full_version_str__, + } + + # 4.2.1 If needed, write the crypto values into the metadata + # Django stores most of these in the field itself, we store them once here + if self.passphrase: + metadata.update(self.get_crypt_params()) + extra_metadata_path.write_text( json.dumps( - {"version": version.__full_version_str__}, + metadata, indent=2, ensure_ascii=False, ), encoding="utf-8", ) - if version_path in self.files_in_export_dir: - self.files_in_export_dir.remove(version_path) + if extra_metadata_path in self.files_in_export_dir: + self.files_in_export_dir.remove(extra_metadata_path) if self.delete: # 5. Remove files which we did not explicitly export in this run @@ -527,3 +544,29 @@ class Command(BaseCommand): if perform_copy: target.parent.mkdir(parents=True, exist_ok=True) copy_file_with_basic_stats(source, target) + + def encrypt_secret_fields(self, manifest: dict) -> None: + """ + Encrypts certain fields in the export. Currently limited to the mail account password + """ + + if self.passphrase: + self.setup_crypto(passphrase=self.passphrase) + + for crypt_config in self.CRYPT_FIELDS: + exporter_key = crypt_config["exporter_key"] + crypt_fields = crypt_config["fields"] + for manifest_record in manifest[exporter_key]: + for field in crypt_fields: + manifest_record["fields"][field] = self.encrypt_string( + value=manifest_record["fields"][field], + ) + + elif MailAccount.objects.count() > 0: + self.stdout.write( + self.style.NOTICE( + "You have configured mail accounts, " + "but no passphrase was given. " + "Passwords will be in plaintext", + ), + ) diff --git a/src/documents/management/commands/document_importer.py b/src/documents/management/commands/document_importer.py index c6483011b..97b73b743 100644 --- a/src/documents/management/commands/document_importer.py +++ b/src/documents/management/commands/document_importer.py @@ -3,6 +3,7 @@ import logging import os from contextlib import contextmanager from pathlib import Path +from typing import Optional import tqdm from django.conf import settings @@ -21,6 +22,7 @@ from django.db.models.signals import post_save from filelock import FileLock from documents.file_handling import create_source_path_directory +from documents.management.commands.mixins import CryptMixin from documents.models import Correspondent from documents.models import CustomField from documents.models import CustomFieldInstance @@ -30,6 +32,7 @@ from documents.models import Note from documents.models import Tag from documents.parsers import run_convert from documents.settings import EXPORTER_ARCHIVE_NAME +from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME from documents.settings import EXPORTER_FILE_NAME from documents.settings import EXPORTER_THUMBNAIL_NAME from documents.signals.handlers import update_filename_and_move_files @@ -49,7 +52,7 @@ def disable_signal(sig, receiver, sender): sig.connect(receiver=receiver, sender=sender) -class Command(BaseCommand): +class Command(CryptMixin, BaseCommand): help = ( "Using a manifest.json file, load the data from there, and import the " "documents it refers to." @@ -72,92 +75,173 @@ class Command(BaseCommand): help="If set, only the database will be exported, not files", ) + parser.add_argument( + "--passphrase", + help="If provided, is used to sensitive fields in the export", + ) + def pre_check(self) -> None: """ - Runs some initial checks against the source directory, including looking for - common mistakes like having files still and users other than expected + Runs some initial checks against the state of the install and source, including: + - Does the target exist? + - Can we access the target? + - Does the target have a manifest file? + - Are there existing files in the document folders? + - Are there existing users or documents in the database? """ + def pre_check_maybe_not_empty(): + # Skip this check if operating only on the database + # We can expect data to exist in that case + if not self.data_only: + for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]: + if document_dir.exists() and document_dir.is_dir(): + for entry in document_dir.glob("**/*"): + if entry.is_dir(): + continue + self.stdout.write( + self.style.WARNING( + f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation", + ), + ) + break + # But existing users or other data still matters in a data only + if ( + User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count() + != 0 + ): + self.stdout.write( + self.style.WARNING( + "Found existing user(s), this might indicate a non-empty installation", + ), + ) + if Document.objects.count() != 0: + self.stdout.write( + self.style.WARNING( + "Found existing documents(s), this might indicate a non-empty installation", + ), + ) + + def pre_check_manifest_exists(): + if not (self.source / "manifest.json").exists(): + raise CommandError( + "That directory doesn't appear to contain a manifest.json file.", + ) + if not self.source.exists(): raise CommandError("That path doesn't exist") if not os.access(self.source, os.R_OK): raise CommandError("That path doesn't appear to be readable") - # Skip this check if operating only on the database - # We can expect data to exist in that case - if not self.data_only: - for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]: - if document_dir.exists() and document_dir.is_dir(): - for entry in document_dir.glob("**/*"): - if entry.is_dir(): - continue - self.stdout.write( - self.style.WARNING( - f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation", - ), - ) - break - if ( - User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count() - != 0 - ): + pre_check_maybe_not_empty() + pre_check_manifest_exists() + + def load_manifest_files(self) -> None: + """ + Loads manifest data from the various JSON files for parsing and loading the database + """ + main_manifest_path = self.source / "manifest.json" + + with main_manifest_path.open() as infile: + self.manifest = json.load(infile) + self.manifest_paths.append(main_manifest_path) + + for file in Path(self.source).glob("**/*-manifest.json"): + with file.open() as infile: + self.manifest += json.load(infile) + self.manifest_paths.append(file) + + def load_metadata(self) -> None: + """ + Loads either just the version information or the version information and extra data + + Must account for the old style of export as well, with just version.json + """ + version_path = self.source / "version.json" + metadata_path = self.source / "metadata.json" + if not version_path.exists() and not metadata_path.exists(): + self.stdout.write( + self.style.NOTICE("No version.json or metadata.json file located"), + ) + return + + if version_path.exists(): + with version_path.open() as infile: + self.version = json.load(infile)["version"] + elif metadata_path.exists(): + with metadata_path.open() as infile: + data = json.load(infile) + self.version = data["version"] + if not self.passphrase and EXPORTER_CRYPTO_SETTINGS_NAME in data: + raise CommandError( + "No passphrase was given, but this export contains encrypted fields", + ) + elif EXPORTER_CRYPTO_SETTINGS_NAME in data: + self.load_crypt_params(data) + + if self.version and self.version != version.__full_version_str__: self.stdout.write( self.style.WARNING( - "Found existing user(s), this might indicate a non-empty installation", - ), - ) - if Document.objects.count() != 0: - self.stdout.write( - self.style.WARNING( - "Found existing documents(s), this might indicate a non-empty installation", + "Version mismatch: " + f"Currently {version.__full_version_str__}," + f" importing {self.version}." + " Continuing, but import may fail.", ), ) + def load_data_to_database(self) -> None: + """ + As the name implies, loads data from the JSON file(s) into the database + """ + try: + with transaction.atomic(): + # delete these since pk can change, re-created from import + ContentType.objects.all().delete() + Permission.objects.all().delete() + for manifest_path in self.manifest_paths: + call_command("loaddata", manifest_path) + except (FieldDoesNotExist, DeserializationError, IntegrityError) as e: + self.stdout.write(self.style.ERROR("Database import failed")) + if ( + self.version is not None + and self.version != version.__full_version_str__ + ): # pragma: no cover + self.stdout.write( + self.style.ERROR( + "Version mismatch: " + f"Currently {version.__full_version_str__}," + f" importing {self.version}", + ), + ) + raise e + else: + self.stdout.write( + self.style.ERROR("No version information present"), + ) + raise e + def handle(self, *args, **options): logging.getLogger().handlers[0].level = logging.ERROR self.source = Path(options["source"]).resolve() self.data_only: bool = options["data_only"] self.no_progress_bar: bool = options["no_progress_bar"] + self.passphrase: str | None = options.get("passphrase") + self.version: Optional[str] = None + self.salt: Optional[str] = None + self.manifest_paths = [] + self.manifest = [] self.pre_check() - manifest_paths = [] + self.load_metadata() - main_manifest_path = self.source / "manifest.json" + self.load_manifest_files() - self._check_manifest_exists(main_manifest_path) + self.check_manifest_validity() - with main_manifest_path.open() as infile: - self.manifest = json.load(infile) - manifest_paths.append(main_manifest_path) - - for file in Path(self.source).glob("**/*-manifest.json"): - with file.open() as infile: - self.manifest += json.load(infile) - manifest_paths.append(file) - - version_path = self.source / "version.json" - if version_path.exists(): - with version_path.open() as infile: - self.version = json.load(infile)["version"] - # Provide an initial warning if needed to the user - if self.version != version.__full_version_str__: - self.stdout.write( - self.style.WARNING( - "Version mismatch: " - f"Currently {version.__full_version_str__}," - f" importing {self.version}." - " Continuing, but import may fail.", - ), - ) - - else: - self.stdout.write(self.style.NOTICE("No version.json file located")) - - if not self.data_only: - self._check_manifest_files_valid() + self.decrypt_secret_fields() with ( disable_signal( @@ -181,32 +265,7 @@ class Command(BaseCommand): auditlog.unregister(CustomFieldInstance) # Fill up the database with whatever is in the manifest - try: - with transaction.atomic(): - # delete these since pk can change, re-created from import - ContentType.objects.all().delete() - Permission.objects.all().delete() - for manifest_path in manifest_paths: - call_command("loaddata", manifest_path) - except (FieldDoesNotExist, DeserializationError, IntegrityError) as e: - self.stdout.write(self.style.ERROR("Database import failed")) - if ( - self.version is not None - and self.version != version.__full_version_str__ - ): - self.stdout.write( - self.style.ERROR( - "Version mismatch: " - f"Currently {version.__full_version_str__}," - f" importing {self.version}", - ), - ) - raise e - else: - self.stdout.write( - self.style.ERROR("No version information present"), - ) - raise e + self.load_data_to_database() if not self.data_only: self._import_files_from_manifest() @@ -220,30 +279,20 @@ class Command(BaseCommand): no_progress_bar=self.no_progress_bar, ) - @staticmethod - def _check_manifest_exists(path: Path): - if not path.exists(): - raise CommandError( - "That directory doesn't appear to contain a manifest.json file.", - ) - - def _check_manifest_files_valid(self): + def check_manifest_validity(self): """ Attempts to verify the manifest is valid. Namely checking the files referred to exist and the files can be read from """ - self.stdout.write("Checking the manifest") - for record in self.manifest: - if record["model"] != "documents.document": - continue - if EXPORTER_FILE_NAME not in record: + def check_document_validity(document_record: dict): + if EXPORTER_FILE_NAME not in document_record: raise CommandError( "The manifest file contains a record which does not " "refer to an actual document file.", ) - doc_file = record[EXPORTER_FILE_NAME] + doc_file = document_record[EXPORTER_FILE_NAME] doc_path: Path = self.source / doc_file if not doc_path.exists(): raise CommandError( @@ -258,8 +307,8 @@ class Command(BaseCommand): f"Failed to read from original file {doc_path}", ) from e - if EXPORTER_ARCHIVE_NAME in record: - archive_file = record[EXPORTER_ARCHIVE_NAME] + if EXPORTER_ARCHIVE_NAME in document_record: + archive_file = document_record[EXPORTER_ARCHIVE_NAME] doc_archive_path: Path = self.source / archive_file if not doc_archive_path.exists(): raise CommandError( @@ -274,6 +323,13 @@ class Command(BaseCommand): f"Failed to read from archive file {doc_archive_path}", ) from e + self.stdout.write("Checking the manifest") + for record in self.manifest: + # Only check if the document files exist if this is not data only + # We don't care about documents for a data only import + if not self.data_only and record["model"] == "documents.document": + check_document_validity(record) + def _import_files_from_manifest(self): settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) @@ -339,3 +395,33 @@ class Command(BaseCommand): copy_file_with_basic_stats(archive_path, document.archive_path) document.save() + + def decrypt_secret_fields(self) -> None: + """ + The converse decryption of some fields out of the export before importing to database + """ + if self.passphrase: + # Salt has been loaded from metadata.json at this point, so it cannot be None + self.setup_crypto(passphrase=self.passphrase, salt=self.salt) + + had_at_least_one_record = False + + for crypt_config in self.CRYPT_FIELDS: + importer_model = crypt_config["model_name"] + crypt_fields = crypt_config["fields"] + for record in filter( + lambda x: x["model"] == importer_model, + self.manifest, + ): + had_at_least_one_record = True + for field in crypt_fields: + record["fields"][field] = self.decrypt_string( + value=record["fields"][field], + ) + + if had_at_least_one_record: + # It's annoying, but the DB is loaded from the JSON directly + # Maybe could change that in the future? + (self.source / "manifest.json").write_text( + json.dumps(self.manifest, indent=2, ensure_ascii=False), + ) diff --git a/src/documents/management/commands/mixins.py b/src/documents/management/commands/mixins.py index 6fed739b8..823631586 100644 --- a/src/documents/management/commands/mixins.py +++ b/src/documents/management/commands/mixins.py @@ -1,8 +1,27 @@ +import base64 import os from argparse import ArgumentParser +from typing import Optional +from typing import TypedDict +from typing import Union +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from django.core.management import CommandError +from documents.settings import EXPORTER_CRYPTO_ALGO_NAME +from documents.settings import EXPORTER_CRYPTO_KEY_ITERATIONS_NAME +from documents.settings import EXPORTER_CRYPTO_KEY_SIZE_NAME +from documents.settings import EXPORTER_CRYPTO_SALT_NAME +from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME + + +class CryptFields(TypedDict): + exporter_key: str + model_name: str + fields: list[str] + class MultiProcessMixin: """ @@ -41,3 +60,109 @@ class ProgressBarMixin: def handle_progress_bar_mixin(self, *args, **options): self.no_progress_bar = options["no_progress_bar"] self.use_progress_bar = not self.no_progress_bar + + +class CryptMixin: + """ + Fully based on: + https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet + + To encrypt: + 1. Call setup_crypto providing the user provided passphrase + 2. Call encrypt_string with a value + 3. Store the returned hexadecimal representation of the value + + To decrypt: + 1. Load the required parameters: + a. key iterations + b. key size + c. key algorithm + 2. Call setup_crypto providing the user provided passphrase and stored salt + 3. Call decrypt_string with a value + 4. Use the returned value + + """ + + # This matches to Django's default for now + # https://github.com/django/django/blob/adae61942/django/contrib/auth/hashers.py#L315 + + # Set the defaults to be used during export + # During import, these are overridden from the loaded values to ensure decryption is possible + key_iterations = 1_000_000 + salt_size = 16 + key_size = 32 + kdf_algorithm = "pbkdf2_sha256" + + CRYPT_FIELDS: CryptFields = [ + { + "exporter_key": "mail_accounts", + "model_name": "paperless_mail.mailaccount", + "fields": [ + "password", + ], + }, + ] + + def get_crypt_params(self) -> dict[str, dict[str, Union[str, int]]]: + return { + EXPORTER_CRYPTO_SETTINGS_NAME: { + EXPORTER_CRYPTO_ALGO_NAME: self.kdf_algorithm, + EXPORTER_CRYPTO_KEY_ITERATIONS_NAME: self.key_iterations, + EXPORTER_CRYPTO_KEY_SIZE_NAME: self.key_size, + EXPORTER_CRYPTO_SALT_NAME: self.salt, + }, + } + + def load_crypt_params(self, metadata: dict): + # Load up the values for setting up decryption + self.kdf_algorithm: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][ + EXPORTER_CRYPTO_ALGO_NAME + ] + self.key_iterations: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][ + EXPORTER_CRYPTO_KEY_ITERATIONS_NAME + ] + self.key_size: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][ + EXPORTER_CRYPTO_KEY_SIZE_NAME + ] + self.salt: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][ + EXPORTER_CRYPTO_SALT_NAME + ] + + def setup_crypto(self, *, passphrase: str, salt: Optional[str] = None): + """ + Constructs a class for encryption or decryption using the specified passphrase and salt + + Salt is assumed to be a hexadecimal representation of a cryptographically secure random byte string. + If not provided, it will be derived from the system secure random + """ + self.salt = salt or os.urandom(self.salt_size).hex() + + # Derive the KDF based on loaded settings + if self.kdf_algorithm == "pbkdf2_sha256": + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=self.key_size, + salt=bytes.fromhex(self.salt), + iterations=self.key_iterations, + ) + else: # pragma: no cover + raise CommandError( + f"{self.kdf_algorithm} is an unknown key derivation function", + ) + + key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8"))) + + self.fernet = Fernet(key) + + def encrypt_string(self, *, value: str) -> str: + """ + Given a string value, encrypts it and returns the hexadecimal representation of the encrypted token + + """ + return self.fernet.encrypt(value.encode("utf-8")).hex() + + def decrypt_string(self, *, value: str) -> str: + """ + Given a string value, decrypts it and returns the original value of the field + """ + return self.fernet.decrypt(bytes.fromhex(value)).decode("utf-8") diff --git a/src/documents/settings.py b/src/documents/settings.py index c591d397d..9dff44c95 100644 --- a/src/documents/settings.py +++ b/src/documents/settings.py @@ -3,3 +3,9 @@ EXPORTER_FILE_NAME = "__exported_file_name__" EXPORTER_THUMBNAIL_NAME = "__exported_thumbnail_name__" EXPORTER_ARCHIVE_NAME = "__exported_archive_name__" + +EXPORTER_CRYPTO_SETTINGS_NAME = "__crypto__" +EXPORTER_CRYPTO_SALT_NAME = "__salt_hex__" +EXPORTER_CRYPTO_KEY_ITERATIONS_NAME = "__key_iters__" +EXPORTER_CRYPTO_KEY_SIZE_NAME = "__key_size__" +EXPORTER_CRYPTO_ALGO_NAME = "__key_algo__" diff --git a/src/documents/tests/test_management_exporter.py b/src/documents/tests/test_management_exporter.py index 6d7eff980..74431bdae 100644 --- a/src/documents/tests/test_management_exporter.py +++ b/src/documents/tests/test_management_exporter.py @@ -3,6 +3,7 @@ import json import os import shutil import tempfile +from io import StringIO from pathlib import Path from unittest import mock from zipfile import ZipFile @@ -39,6 +40,7 @@ from documents.tests.utils import DirectoriesMixin from documents.tests.utils import FileSystemAssertsMixin from documents.tests.utils import SampleDirMixin from documents.tests.utils import paperless_environment +from paperless_mail.models import MailAccount class TestExportImport( @@ -466,7 +468,7 @@ class TestExportImport( with ZipFile(expected_file) as zip: self.assertEqual(len(zip.namelist()), 11) self.assertIn("manifest.json", zip.namelist()) - self.assertIn("version.json", zip.namelist()) + self.assertIn("metadata.json", zip.namelist()) @override_settings(PASSPHRASE="test") def test_export_zipped_format(self): @@ -504,7 +506,7 @@ class TestExportImport( # Extras are from the directories, which also appear in the listing self.assertEqual(len(zip.namelist()), 14) self.assertIn("manifest.json", zip.namelist()) - self.assertIn("version.json", zip.namelist()) + self.assertIn("metadata.json", zip.namelist()) @override_settings(PASSPHRASE="test") def test_export_zipped_with_delete(self): @@ -552,7 +554,7 @@ class TestExportImport( with ZipFile(expected_file) as zip: self.assertEqual(len(zip.namelist()), 11) self.assertIn("manifest.json", zip.namelist()) - self.assertIn("version.json", zip.namelist()) + self.assertIn("metadata.json", zip.namelist()) def test_export_target_not_exists(self): """ @@ -827,7 +829,7 @@ class TestExportImport( # Manifest and version files only should be present in the exported directory self.assertFileCountInDir(self.target, 2) self.assertIsFile(self.target / "manifest.json") - self.assertIsFile(self.target / "version.json") + self.assertIsFile(self.target / "metadata.json") shutil.rmtree(self.dirs.media_dir / "documents") Document.objects.all().delete() @@ -840,3 +842,139 @@ class TestExportImport( ) self.assertEqual(Document.objects.all().count(), 4) + + +class TestCryptExportImport( + DirectoriesMixin, + FileSystemAssertsMixin, + TestCase, +): + def setUp(self) -> None: + self.target = Path(tempfile.mkdtemp()) + return super().setUp() + + def tearDown(self) -> None: + shutil.rmtree(self.target, ignore_errors=True) + return super().tearDown() + + def test_export_passphrase(self): + """ + GIVEN: + - A mail account exists + WHEN: + - Export command is called + - Passphrase is provided + THEN: + - Output password is not plaintext + """ + MailAccount.objects.create( + name="Test Account", + imap_server="test.imap.com", + username="myusername", + password="mypassword", + ) + + call_command( + "document_exporter", + "--no-progress-bar", + "--passphrase", + "securepassword", + self.target, + ) + + self.assertIsFile(self.target / "metadata.json") + self.assertIsFile(self.target / "manifest.json") + + data = json.loads((self.target / "manifest.json").read_text()) + + mail_accounts = list( + filter(lambda r: r["model"] == "paperless_mail.mailaccount", data), + ) + + self.assertEqual(len(mail_accounts), 1) + + mail_account_data = mail_accounts[0] + + self.assertNotEqual(mail_account_data["fields"]["password"], "mypassword") + + MailAccount.objects.all().delete() + + call_command( + "document_importer", + "--no-progress-bar", + "--passphrase", + "securepassword", + self.target, + ) + + account = MailAccount.objects.first() + + self.assertIsNotNone(account) + self.assertEqual(account.password, "mypassword") + + def test_import_crypt_no_passphrase(self): + """ + GIVEN: + - A mail account exists + WHEN: + - Export command is called + - Passphrase is provided + - Import command is called + - No passphrase is given + THEN: + - An error is raised for the issue + """ + call_command( + "document_exporter", + "--no-progress-bar", + "--passphrase", + "securepassword", + self.target, + ) + + with self.assertRaises(CommandError) as err: + call_command( + "document_importer", + "--no-progress-bar", + self.target, + ) + self.assertEqual( + err.msg, + "No passphrase was given, but this export contains encrypted fields", + ) + + def test_export_warn_plaintext(self): + """ + GIVEN: + - A mail account exists + WHEN: + - Export command is called + - No passphrase is provided + THEN: + - Output password is plaintext + - Warning is output + """ + MailAccount.objects.create( + name="Test Account", + imap_server="test.imap.com", + username="myusername", + password="mypassword", + ) + + stdout = StringIO() + + call_command( + "document_exporter", + "--no-progress-bar", + str(self.target), + stdout=stdout, + ) + stdout.seek(0) + self.assertIn( + ( + "You have configured mail accounts, " + "but no passphrase was given. " + "Passwords will be in plaintext" + ), + stdout.read(), + ) diff --git a/src/documents/tests/test_management_importer.py b/src/documents/tests/test_management_importer.py index 9d07e8a60..5cee9ae47 100644 --- a/src/documents/tests/test_management_importer.py +++ b/src/documents/tests/test_management_importer.py @@ -125,15 +125,16 @@ class TestCommandImport( EXPORTER_ARCHIVE_NAME: "archive.pdf", }, ] + cmd.data_only = False with self.assertRaises(CommandError) as cm: - cmd._check_manifest_files_valid() + cmd.check_manifest_validity() self.assertInt("Failed to read from original file", str(cm.exception)) original_path.chmod(0o444) archive_path.chmod(0o222) with self.assertRaises(CommandError) as cm: - cmd._check_manifest_files_valid() + cmd.check_manifest_validity() self.assertInt("Failed to read from archive file", str(cm.exception)) def test_import_source_not_existing(self): @@ -240,7 +241,7 @@ class TestCommandImport( stdout.seek(0) self.assertIn( "Found existing user(s), this might indicate a non-empty installation", - str(stdout.read()), + stdout.read(), ) def test_import_with_documents_exists(self): @@ -278,3 +279,59 @@ class TestCommandImport( "Found existing documents(s), this might indicate a non-empty installation", str(stdout.read()), ) + + def test_import_no_metadata_or_version_file(self): + """ + GIVEN: + - A source directory with a manifest file only + WHEN: + - An import is attempted + THEN: + - Warning about the missing files is output + """ + stdout = StringIO() + + (self.dirs.scratch_dir / "manifest.json").touch() + + # We're not building a manifest, so it fails, but this test doesn't care + with self.assertRaises(json.decoder.JSONDecodeError): + call_command( + "document_importer", + "--no-progress-bar", + str(self.dirs.scratch_dir), + stdout=stdout, + ) + stdout.seek(0) + stdout_str = str(stdout.read()) + + self.assertIn("No version.json or metadata.json file located", stdout_str) + + def test_import_version_file(self): + """ + GIVEN: + - A source directory with a manifest file and version file + WHEN: + - An import is attempted + THEN: + - Warning about the the version mismatch is output + """ + stdout = StringIO() + + (self.dirs.scratch_dir / "manifest.json").touch() + (self.dirs.scratch_dir / "version.json").write_text( + json.dumps({"version": "2.8.1"}), + ) + + # We're not building a manifest, so it fails, but this test doesn't care + with self.assertRaises(json.decoder.JSONDecodeError): + call_command( + "document_importer", + "--no-progress-bar", + str(self.dirs.scratch_dir), + stdout=stdout, + ) + stdout.seek(0) + stdout_str = str(stdout.read()) + + self.assertIn("Version mismatch:", stdout_str) + self.assertIn("importing 2.8.1", stdout_str) diff --git a/src/setup.cfg b/src/setup.cfg index 1877cb16e..4350c0451 100644 --- a/src/setup.cfg +++ b/src/setup.cfg @@ -4,6 +4,7 @@ addopts = --pythonwarnings=all --cov --cov-report=html --cov-report=xml --numpro env = PAPERLESS_DISABLE_DBHANDLER=true PAPERLESS_CACHE_BACKEND=django.core.cache.backends.locmem.LocMemCache +norecursedirs = locale/* [coverage:run] source =