Feature: Allow encrypting sensitive fields in export (#6927)

Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
This commit is contained in:
Trenton H 2024-06-09 07:41:18 -07:00 committed by GitHub
parent 6ddb62bf3f
commit d9002005b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 583 additions and 120 deletions

View File

@ -248,6 +248,7 @@ optional arguments:
-z, --zip -z, --zip
-zn, --zip-name -zn, --zip-name
--data-only --data-only
--passphrase
``` ```
`target` is a folder to which the data gets written. This includes `target` is a folder to which the data gets written. This includes
@ -309,6 +310,9 @@ value set in `-zn` or `--zip-name`.
If `--data-only` is provided, only the database will be exported. This option is intended If `--data-only` is provided, only the database will be exported. This option is intended
to facilitate database upgrades without needing to clean documents and thumbnails from the media directory. to facilitate database upgrades without needing to clean documents and thumbnails from the media directory.
If `--passphrase` is provided, it will be used to encrypt certain fields in the export. This value
must be provided to import. If this value is lost, the export cannot be imported.
!!! warning !!! warning
If exporting with the file name format, there may be errors due to If exporting with the file name format, there may be errors due to
@ -327,16 +331,18 @@ and the script does the rest of the work:
document_importer source document_importer source
``` ```
| Option | Required | Default | Description | | Option | Required | Default | Description |
| ----------- | -------- | ------- | ------------------------------------------------------------------------- | | -------------- | -------- | ------- | ------------------------------------------------------------------------- |
| source | Yes | N/A | The directory containing an export | | source | Yes | N/A | The directory containing an export |
| --data-only | No | False | If provided, only import data, do not import document files or thumbnails | | `--data-only` | No | False | If provided, only import data, do not import document files or thumbnails |
| `--passphrase` | No | N/A | If your export was encrypted with a passphrase, must be provided |
When you use the provided docker compose script, put the export inside When you use the provided docker compose script, put the export inside
the `export` folder in your paperless source directory. Specify the `export` folder in your paperless source directory. Specify
`../export` as the `source`. `../export` as the `source`.
Note that .zip files (as can be generated from the exporter) are not supported. Note that .zip files (as can be generated from the exporter) are not supported. You must unzip them into
the target directory first.
!!! note !!! note
@ -346,6 +352,7 @@ Note that .zip files (as can be generated from the exporter) are not supported.
!!! warning !!! warning
The importer should be run against a completely empty installation (database and directories) of Paperless-ngx. The importer should be run against a completely empty installation (database and directories) of Paperless-ngx.
If using a data only import, only the database must be empty.
### Document retagger {#retagger} ### Document retagger {#retagger}

View File

@ -31,6 +31,7 @@ if settings.AUDIT_LOG_ENABLED:
from documents.file_handling import delete_empty_directories from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_filename from documents.file_handling import generate_filename
from documents.management.commands.mixins import CryptMixin
from documents.models import Correspondent from documents.models import Correspondent
from documents.models import CustomField from documents.models import CustomField
from documents.models import CustomFieldInstance from documents.models import CustomFieldInstance
@ -56,7 +57,7 @@ from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule from paperless_mail.models import MailRule
class Command(BaseCommand): class Command(CryptMixin, BaseCommand):
help = ( help = (
"Decrypt and rename all files in our collection into a given target " "Decrypt and rename all files in our collection into a given target "
"directory. And include a manifest file containing document data for " "directory. And include a manifest file containing document data for "
@ -165,6 +166,11 @@ class Command(BaseCommand):
help="If set, the progress bar will not be shown", help="If set, the progress bar will not be shown",
) )
parser.add_argument(
"--passphrase",
help="If provided, is used to encrypt sensitive data in the export",
)
def handle(self, *args, **options): def handle(self, *args, **options):
self.target = Path(options["target"]).resolve() self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"] self.split_manifest: bool = options["split_manifest"]
@ -177,6 +183,7 @@ class Command(BaseCommand):
self.zip_export: bool = options["zip"] self.zip_export: bool = options["zip"]
self.data_only: bool = options["data_only"] self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"] self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: Optional[str] = options.get("passphrase")
self.files_in_export_dir: set[Path] = set() self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set() self.exported_files: set[str] = set()
@ -272,6 +279,8 @@ class Command(BaseCommand):
serializers.serialize("json", manifest_key_to_object_query[key]), serializers.serialize("json", manifest_key_to_object_query[key]),
) )
self.encrypt_secret_fields(manifest_dict)
# These are treated specially and included in the per-document manifest # These are treated specially and included in the per-document manifest
# if that setting is enabled. Otherwise, they are just exported to the bulk # if that setting is enabled. Otherwise, they are just exported to the bulk
# manifest # manifest
@ -353,17 +362,25 @@ class Command(BaseCommand):
self.files_in_export_dir.remove(manifest_path) self.files_in_export_dir.remove(manifest_path)
# 4.2 write version information to target folder # 4.2 write version information to target folder
version_path = (self.target / "version.json").resolve() extra_metadata_path = (self.target / "metadata.json").resolve()
version_path.write_text( metadata: dict[str, str | int | dict[str, str | int]] = {
"version": version.__full_version_str__,
}
# 4.2.1 If needed, write the crypto values into the metadata
# Django stores most of these in the field itself, we store them once here
if self.passphrase:
metadata.update(self.get_crypt_params())
extra_metadata_path.write_text(
json.dumps( json.dumps(
{"version": version.__full_version_str__}, metadata,
indent=2, indent=2,
ensure_ascii=False, ensure_ascii=False,
), ),
encoding="utf-8", encoding="utf-8",
) )
if version_path in self.files_in_export_dir: if extra_metadata_path in self.files_in_export_dir:
self.files_in_export_dir.remove(version_path) self.files_in_export_dir.remove(extra_metadata_path)
if self.delete: if self.delete:
# 5. Remove files which we did not explicitly export in this run # 5. Remove files which we did not explicitly export in this run
@ -527,3 +544,29 @@ class Command(BaseCommand):
if perform_copy: if perform_copy:
target.parent.mkdir(parents=True, exist_ok=True) target.parent.mkdir(parents=True, exist_ok=True)
copy_file_with_basic_stats(source, target) copy_file_with_basic_stats(source, target)
def encrypt_secret_fields(self, manifest: dict) -> None:
"""
Encrypts certain fields in the export. Currently limited to the mail account password
"""
if self.passphrase:
self.setup_crypto(passphrase=self.passphrase)
for crypt_config in self.CRYPT_FIELDS:
exporter_key = crypt_config["exporter_key"]
crypt_fields = crypt_config["fields"]
for manifest_record in manifest[exporter_key]:
for field in crypt_fields:
manifest_record["fields"][field] = self.encrypt_string(
value=manifest_record["fields"][field],
)
elif MailAccount.objects.count() > 0:
self.stdout.write(
self.style.NOTICE(
"You have configured mail accounts, "
"but no passphrase was given. "
"Passwords will be in plaintext",
),
)

View File

@ -3,6 +3,7 @@ import logging
import os import os
from contextlib import contextmanager from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from typing import Optional
import tqdm import tqdm
from django.conf import settings from django.conf import settings
@ -21,6 +22,7 @@ from django.db.models.signals import post_save
from filelock import FileLock from filelock import FileLock
from documents.file_handling import create_source_path_directory from documents.file_handling import create_source_path_directory
from documents.management.commands.mixins import CryptMixin
from documents.models import Correspondent from documents.models import Correspondent
from documents.models import CustomField from documents.models import CustomField
from documents.models import CustomFieldInstance from documents.models import CustomFieldInstance
@ -30,6 +32,7 @@ from documents.models import Note
from documents.models import Tag from documents.models import Tag
from documents.parsers import run_convert from documents.parsers import run_convert
from documents.settings import EXPORTER_ARCHIVE_NAME from documents.settings import EXPORTER_ARCHIVE_NAME
from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME
from documents.settings import EXPORTER_FILE_NAME from documents.settings import EXPORTER_FILE_NAME
from documents.settings import EXPORTER_THUMBNAIL_NAME from documents.settings import EXPORTER_THUMBNAIL_NAME
from documents.signals.handlers import update_filename_and_move_files from documents.signals.handlers import update_filename_and_move_files
@ -49,7 +52,7 @@ def disable_signal(sig, receiver, sender):
sig.connect(receiver=receiver, sender=sender) sig.connect(receiver=receiver, sender=sender)
class Command(BaseCommand): class Command(CryptMixin, BaseCommand):
help = ( help = (
"Using a manifest.json file, load the data from there, and import the " "Using a manifest.json file, load the data from there, and import the "
"documents it refers to." "documents it refers to."
@ -72,92 +75,173 @@ class Command(BaseCommand):
help="If set, only the database will be exported, not files", help="If set, only the database will be exported, not files",
) )
parser.add_argument(
"--passphrase",
help="If provided, is used to sensitive fields in the export",
)
def pre_check(self) -> None: def pre_check(self) -> None:
""" """
Runs some initial checks against the source directory, including looking for Runs some initial checks against the state of the install and source, including:
common mistakes like having files still and users other than expected - Does the target exist?
- Can we access the target?
- Does the target have a manifest file?
- Are there existing files in the document folders?
- Are there existing users or documents in the database?
""" """
def pre_check_maybe_not_empty():
# Skip this check if operating only on the database
# We can expect data to exist in that case
if not self.data_only:
for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
if document_dir.exists() and document_dir.is_dir():
for entry in document_dir.glob("**/*"):
if entry.is_dir():
continue
self.stdout.write(
self.style.WARNING(
f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
),
)
break
# But existing users or other data still matters in a data only
if (
User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
!= 0
):
self.stdout.write(
self.style.WARNING(
"Found existing user(s), this might indicate a non-empty installation",
),
)
if Document.objects.count() != 0:
self.stdout.write(
self.style.WARNING(
"Found existing documents(s), this might indicate a non-empty installation",
),
)
def pre_check_manifest_exists():
if not (self.source / "manifest.json").exists():
raise CommandError(
"That directory doesn't appear to contain a manifest.json file.",
)
if not self.source.exists(): if not self.source.exists():
raise CommandError("That path doesn't exist") raise CommandError("That path doesn't exist")
if not os.access(self.source, os.R_OK): if not os.access(self.source, os.R_OK):
raise CommandError("That path doesn't appear to be readable") raise CommandError("That path doesn't appear to be readable")
# Skip this check if operating only on the database pre_check_maybe_not_empty()
# We can expect data to exist in that case pre_check_manifest_exists()
if not self.data_only:
for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]: def load_manifest_files(self) -> None:
if document_dir.exists() and document_dir.is_dir(): """
for entry in document_dir.glob("**/*"): Loads manifest data from the various JSON files for parsing and loading the database
if entry.is_dir(): """
continue main_manifest_path = self.source / "manifest.json"
self.stdout.write(
self.style.WARNING( with main_manifest_path.open() as infile:
f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation", self.manifest = json.load(infile)
), self.manifest_paths.append(main_manifest_path)
)
break for file in Path(self.source).glob("**/*-manifest.json"):
if ( with file.open() as infile:
User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count() self.manifest += json.load(infile)
!= 0 self.manifest_paths.append(file)
):
def load_metadata(self) -> None:
"""
Loads either just the version information or the version information and extra data
Must account for the old style of export as well, with just version.json
"""
version_path = self.source / "version.json"
metadata_path = self.source / "metadata.json"
if not version_path.exists() and not metadata_path.exists():
self.stdout.write(
self.style.NOTICE("No version.json or metadata.json file located"),
)
return
if version_path.exists():
with version_path.open() as infile:
self.version = json.load(infile)["version"]
elif metadata_path.exists():
with metadata_path.open() as infile:
data = json.load(infile)
self.version = data["version"]
if not self.passphrase and EXPORTER_CRYPTO_SETTINGS_NAME in data:
raise CommandError(
"No passphrase was given, but this export contains encrypted fields",
)
elif EXPORTER_CRYPTO_SETTINGS_NAME in data:
self.load_crypt_params(data)
if self.version and self.version != version.__full_version_str__:
self.stdout.write( self.stdout.write(
self.style.WARNING( self.style.WARNING(
"Found existing user(s), this might indicate a non-empty installation", "Version mismatch: "
), f"Currently {version.__full_version_str__},"
) f" importing {self.version}."
if Document.objects.count() != 0: " Continuing, but import may fail.",
self.stdout.write(
self.style.WARNING(
"Found existing documents(s), this might indicate a non-empty installation",
), ),
) )
def load_data_to_database(self) -> None:
"""
As the name implies, loads data from the JSON file(s) into the database
"""
try:
with transaction.atomic():
# delete these since pk can change, re-created from import
ContentType.objects.all().delete()
Permission.objects.all().delete()
for manifest_path in self.manifest_paths:
call_command("loaddata", manifest_path)
except (FieldDoesNotExist, DeserializationError, IntegrityError) as e:
self.stdout.write(self.style.ERROR("Database import failed"))
if (
self.version is not None
and self.version != version.__full_version_str__
): # pragma: no cover
self.stdout.write(
self.style.ERROR(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}",
),
)
raise e
else:
self.stdout.write(
self.style.ERROR("No version information present"),
)
raise e
def handle(self, *args, **options): def handle(self, *args, **options):
logging.getLogger().handlers[0].level = logging.ERROR logging.getLogger().handlers[0].level = logging.ERROR
self.source = Path(options["source"]).resolve() self.source = Path(options["source"]).resolve()
self.data_only: bool = options["data_only"] self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"] self.no_progress_bar: bool = options["no_progress_bar"]
self.passphrase: str | None = options.get("passphrase")
self.version: Optional[str] = None
self.salt: Optional[str] = None
self.manifest_paths = []
self.manifest = []
self.pre_check() self.pre_check()
manifest_paths = [] self.load_metadata()
main_manifest_path = self.source / "manifest.json" self.load_manifest_files()
self._check_manifest_exists(main_manifest_path) self.check_manifest_validity()
with main_manifest_path.open() as infile: self.decrypt_secret_fields()
self.manifest = json.load(infile)
manifest_paths.append(main_manifest_path)
for file in Path(self.source).glob("**/*-manifest.json"):
with file.open() as infile:
self.manifest += json.load(infile)
manifest_paths.append(file)
version_path = self.source / "version.json"
if version_path.exists():
with version_path.open() as infile:
self.version = json.load(infile)["version"]
# Provide an initial warning if needed to the user
if self.version != version.__full_version_str__:
self.stdout.write(
self.style.WARNING(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}."
" Continuing, but import may fail.",
),
)
else:
self.stdout.write(self.style.NOTICE("No version.json file located"))
if not self.data_only:
self._check_manifest_files_valid()
with ( with (
disable_signal( disable_signal(
@ -181,32 +265,7 @@ class Command(BaseCommand):
auditlog.unregister(CustomFieldInstance) auditlog.unregister(CustomFieldInstance)
# Fill up the database with whatever is in the manifest # Fill up the database with whatever is in the manifest
try: self.load_data_to_database()
with transaction.atomic():
# delete these since pk can change, re-created from import
ContentType.objects.all().delete()
Permission.objects.all().delete()
for manifest_path in manifest_paths:
call_command("loaddata", manifest_path)
except (FieldDoesNotExist, DeserializationError, IntegrityError) as e:
self.stdout.write(self.style.ERROR("Database import failed"))
if (
self.version is not None
and self.version != version.__full_version_str__
):
self.stdout.write(
self.style.ERROR(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
f" importing {self.version}",
),
)
raise e
else:
self.stdout.write(
self.style.ERROR("No version information present"),
)
raise e
if not self.data_only: if not self.data_only:
self._import_files_from_manifest() self._import_files_from_manifest()
@ -220,30 +279,20 @@ class Command(BaseCommand):
no_progress_bar=self.no_progress_bar, no_progress_bar=self.no_progress_bar,
) )
@staticmethod def check_manifest_validity(self):
def _check_manifest_exists(path: Path):
if not path.exists():
raise CommandError(
"That directory doesn't appear to contain a manifest.json file.",
)
def _check_manifest_files_valid(self):
""" """
Attempts to verify the manifest is valid. Namely checking the files Attempts to verify the manifest is valid. Namely checking the files
referred to exist and the files can be read from referred to exist and the files can be read from
""" """
self.stdout.write("Checking the manifest")
for record in self.manifest:
if record["model"] != "documents.document":
continue
if EXPORTER_FILE_NAME not in record: def check_document_validity(document_record: dict):
if EXPORTER_FILE_NAME not in document_record:
raise CommandError( raise CommandError(
"The manifest file contains a record which does not " "The manifest file contains a record which does not "
"refer to an actual document file.", "refer to an actual document file.",
) )
doc_file = record[EXPORTER_FILE_NAME] doc_file = document_record[EXPORTER_FILE_NAME]
doc_path: Path = self.source / doc_file doc_path: Path = self.source / doc_file
if not doc_path.exists(): if not doc_path.exists():
raise CommandError( raise CommandError(
@ -258,8 +307,8 @@ class Command(BaseCommand):
f"Failed to read from original file {doc_path}", f"Failed to read from original file {doc_path}",
) from e ) from e
if EXPORTER_ARCHIVE_NAME in record: if EXPORTER_ARCHIVE_NAME in document_record:
archive_file = record[EXPORTER_ARCHIVE_NAME] archive_file = document_record[EXPORTER_ARCHIVE_NAME]
doc_archive_path: Path = self.source / archive_file doc_archive_path: Path = self.source / archive_file
if not doc_archive_path.exists(): if not doc_archive_path.exists():
raise CommandError( raise CommandError(
@ -274,6 +323,13 @@ class Command(BaseCommand):
f"Failed to read from archive file {doc_archive_path}", f"Failed to read from archive file {doc_archive_path}",
) from e ) from e
self.stdout.write("Checking the manifest")
for record in self.manifest:
# Only check if the document files exist if this is not data only
# We don't care about documents for a data only import
if not self.data_only and record["model"] == "documents.document":
check_document_validity(record)
def _import_files_from_manifest(self): def _import_files_from_manifest(self):
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
@ -339,3 +395,33 @@ class Command(BaseCommand):
copy_file_with_basic_stats(archive_path, document.archive_path) copy_file_with_basic_stats(archive_path, document.archive_path)
document.save() document.save()
def decrypt_secret_fields(self) -> None:
"""
The converse decryption of some fields out of the export before importing to database
"""
if self.passphrase:
# Salt has been loaded from metadata.json at this point, so it cannot be None
self.setup_crypto(passphrase=self.passphrase, salt=self.salt)
had_at_least_one_record = False
for crypt_config in self.CRYPT_FIELDS:
importer_model = crypt_config["model_name"]
crypt_fields = crypt_config["fields"]
for record in filter(
lambda x: x["model"] == importer_model,
self.manifest,
):
had_at_least_one_record = True
for field in crypt_fields:
record["fields"][field] = self.decrypt_string(
value=record["fields"][field],
)
if had_at_least_one_record:
# It's annoying, but the DB is loaded from the JSON directly
# Maybe could change that in the future?
(self.source / "manifest.json").write_text(
json.dumps(self.manifest, indent=2, ensure_ascii=False),
)

View File

@ -1,8 +1,27 @@
import base64
import os import os
from argparse import ArgumentParser from argparse import ArgumentParser
from typing import Optional
from typing import TypedDict
from typing import Union
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from django.core.management import CommandError from django.core.management import CommandError
from documents.settings import EXPORTER_CRYPTO_ALGO_NAME
from documents.settings import EXPORTER_CRYPTO_KEY_ITERATIONS_NAME
from documents.settings import EXPORTER_CRYPTO_KEY_SIZE_NAME
from documents.settings import EXPORTER_CRYPTO_SALT_NAME
from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME
class CryptFields(TypedDict):
exporter_key: str
model_name: str
fields: list[str]
class MultiProcessMixin: class MultiProcessMixin:
""" """
@ -41,3 +60,109 @@ class ProgressBarMixin:
def handle_progress_bar_mixin(self, *args, **options): def handle_progress_bar_mixin(self, *args, **options):
self.no_progress_bar = options["no_progress_bar"] self.no_progress_bar = options["no_progress_bar"]
self.use_progress_bar = not self.no_progress_bar self.use_progress_bar = not self.no_progress_bar
class CryptMixin:
"""
Fully based on:
https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet
To encrypt:
1. Call setup_crypto providing the user provided passphrase
2. Call encrypt_string with a value
3. Store the returned hexadecimal representation of the value
To decrypt:
1. Load the required parameters:
a. key iterations
b. key size
c. key algorithm
2. Call setup_crypto providing the user provided passphrase and stored salt
3. Call decrypt_string with a value
4. Use the returned value
"""
# This matches to Django's default for now
# https://github.com/django/django/blob/adae61942/django/contrib/auth/hashers.py#L315
# Set the defaults to be used during export
# During import, these are overridden from the loaded values to ensure decryption is possible
key_iterations = 1_000_000
salt_size = 16
key_size = 32
kdf_algorithm = "pbkdf2_sha256"
CRYPT_FIELDS: CryptFields = [
{
"exporter_key": "mail_accounts",
"model_name": "paperless_mail.mailaccount",
"fields": [
"password",
],
},
]
def get_crypt_params(self) -> dict[str, dict[str, Union[str, int]]]:
return {
EXPORTER_CRYPTO_SETTINGS_NAME: {
EXPORTER_CRYPTO_ALGO_NAME: self.kdf_algorithm,
EXPORTER_CRYPTO_KEY_ITERATIONS_NAME: self.key_iterations,
EXPORTER_CRYPTO_KEY_SIZE_NAME: self.key_size,
EXPORTER_CRYPTO_SALT_NAME: self.salt,
},
}
def load_crypt_params(self, metadata: dict):
# Load up the values for setting up decryption
self.kdf_algorithm: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_ALGO_NAME
]
self.key_iterations: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_KEY_ITERATIONS_NAME
]
self.key_size: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_KEY_SIZE_NAME
]
self.salt: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
EXPORTER_CRYPTO_SALT_NAME
]
def setup_crypto(self, *, passphrase: str, salt: Optional[str] = None):
"""
Constructs a class for encryption or decryption using the specified passphrase and salt
Salt is assumed to be a hexadecimal representation of a cryptographically secure random byte string.
If not provided, it will be derived from the system secure random
"""
self.salt = salt or os.urandom(self.salt_size).hex()
# Derive the KDF based on loaded settings
if self.kdf_algorithm == "pbkdf2_sha256":
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=self.key_size,
salt=bytes.fromhex(self.salt),
iterations=self.key_iterations,
)
else: # pragma: no cover
raise CommandError(
f"{self.kdf_algorithm} is an unknown key derivation function",
)
key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
self.fernet = Fernet(key)
def encrypt_string(self, *, value: str) -> str:
"""
Given a string value, encrypts it and returns the hexadecimal representation of the encrypted token
"""
return self.fernet.encrypt(value.encode("utf-8")).hex()
def decrypt_string(self, *, value: str) -> str:
"""
Given a string value, decrypts it and returns the original value of the field
"""
return self.fernet.decrypt(bytes.fromhex(value)).decode("utf-8")

View File

@ -3,3 +3,9 @@
EXPORTER_FILE_NAME = "__exported_file_name__" EXPORTER_FILE_NAME = "__exported_file_name__"
EXPORTER_THUMBNAIL_NAME = "__exported_thumbnail_name__" EXPORTER_THUMBNAIL_NAME = "__exported_thumbnail_name__"
EXPORTER_ARCHIVE_NAME = "__exported_archive_name__" EXPORTER_ARCHIVE_NAME = "__exported_archive_name__"
EXPORTER_CRYPTO_SETTINGS_NAME = "__crypto__"
EXPORTER_CRYPTO_SALT_NAME = "__salt_hex__"
EXPORTER_CRYPTO_KEY_ITERATIONS_NAME = "__key_iters__"
EXPORTER_CRYPTO_KEY_SIZE_NAME = "__key_size__"
EXPORTER_CRYPTO_ALGO_NAME = "__key_algo__"

View File

@ -3,6 +3,7 @@ import json
import os import os
import shutil import shutil
import tempfile import tempfile
from io import StringIO
from pathlib import Path from pathlib import Path
from unittest import mock from unittest import mock
from zipfile import ZipFile from zipfile import ZipFile
@ -39,6 +40,7 @@ from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import SampleDirMixin from documents.tests.utils import SampleDirMixin
from documents.tests.utils import paperless_environment from documents.tests.utils import paperless_environment
from paperless_mail.models import MailAccount
class TestExportImport( class TestExportImport(
@ -466,7 +468,7 @@ class TestExportImport(
with ZipFile(expected_file) as zip: with ZipFile(expected_file) as zip:
self.assertEqual(len(zip.namelist()), 11) self.assertEqual(len(zip.namelist()), 11)
self.assertIn("manifest.json", zip.namelist()) self.assertIn("manifest.json", zip.namelist())
self.assertIn("version.json", zip.namelist()) self.assertIn("metadata.json", zip.namelist())
@override_settings(PASSPHRASE="test") @override_settings(PASSPHRASE="test")
def test_export_zipped_format(self): def test_export_zipped_format(self):
@ -504,7 +506,7 @@ class TestExportImport(
# Extras are from the directories, which also appear in the listing # Extras are from the directories, which also appear in the listing
self.assertEqual(len(zip.namelist()), 14) self.assertEqual(len(zip.namelist()), 14)
self.assertIn("manifest.json", zip.namelist()) self.assertIn("manifest.json", zip.namelist())
self.assertIn("version.json", zip.namelist()) self.assertIn("metadata.json", zip.namelist())
@override_settings(PASSPHRASE="test") @override_settings(PASSPHRASE="test")
def test_export_zipped_with_delete(self): def test_export_zipped_with_delete(self):
@ -552,7 +554,7 @@ class TestExportImport(
with ZipFile(expected_file) as zip: with ZipFile(expected_file) as zip:
self.assertEqual(len(zip.namelist()), 11) self.assertEqual(len(zip.namelist()), 11)
self.assertIn("manifest.json", zip.namelist()) self.assertIn("manifest.json", zip.namelist())
self.assertIn("version.json", zip.namelist()) self.assertIn("metadata.json", zip.namelist())
def test_export_target_not_exists(self): def test_export_target_not_exists(self):
""" """
@ -827,7 +829,7 @@ class TestExportImport(
# Manifest and version files only should be present in the exported directory # Manifest and version files only should be present in the exported directory
self.assertFileCountInDir(self.target, 2) self.assertFileCountInDir(self.target, 2)
self.assertIsFile(self.target / "manifest.json") self.assertIsFile(self.target / "manifest.json")
self.assertIsFile(self.target / "version.json") self.assertIsFile(self.target / "metadata.json")
shutil.rmtree(self.dirs.media_dir / "documents") shutil.rmtree(self.dirs.media_dir / "documents")
Document.objects.all().delete() Document.objects.all().delete()
@ -840,3 +842,139 @@ class TestExportImport(
) )
self.assertEqual(Document.objects.all().count(), 4) self.assertEqual(Document.objects.all().count(), 4)
class TestCryptExportImport(
DirectoriesMixin,
FileSystemAssertsMixin,
TestCase,
):
def setUp(self) -> None:
self.target = Path(tempfile.mkdtemp())
return super().setUp()
def tearDown(self) -> None:
shutil.rmtree(self.target, ignore_errors=True)
return super().tearDown()
def test_export_passphrase(self):
"""
GIVEN:
- A mail account exists
WHEN:
- Export command is called
- Passphrase is provided
THEN:
- Output password is not plaintext
"""
MailAccount.objects.create(
name="Test Account",
imap_server="test.imap.com",
username="myusername",
password="mypassword",
)
call_command(
"document_exporter",
"--no-progress-bar",
"--passphrase",
"securepassword",
self.target,
)
self.assertIsFile(self.target / "metadata.json")
self.assertIsFile(self.target / "manifest.json")
data = json.loads((self.target / "manifest.json").read_text())
mail_accounts = list(
filter(lambda r: r["model"] == "paperless_mail.mailaccount", data),
)
self.assertEqual(len(mail_accounts), 1)
mail_account_data = mail_accounts[0]
self.assertNotEqual(mail_account_data["fields"]["password"], "mypassword")
MailAccount.objects.all().delete()
call_command(
"document_importer",
"--no-progress-bar",
"--passphrase",
"securepassword",
self.target,
)
account = MailAccount.objects.first()
self.assertIsNotNone(account)
self.assertEqual(account.password, "mypassword")
def test_import_crypt_no_passphrase(self):
"""
GIVEN:
- A mail account exists
WHEN:
- Export command is called
- Passphrase is provided
- Import command is called
- No passphrase is given
THEN:
- An error is raised for the issue
"""
call_command(
"document_exporter",
"--no-progress-bar",
"--passphrase",
"securepassword",
self.target,
)
with self.assertRaises(CommandError) as err:
call_command(
"document_importer",
"--no-progress-bar",
self.target,
)
self.assertEqual(
err.msg,
"No passphrase was given, but this export contains encrypted fields",
)
def test_export_warn_plaintext(self):
"""
GIVEN:
- A mail account exists
WHEN:
- Export command is called
- No passphrase is provided
THEN:
- Output password is plaintext
- Warning is output
"""
MailAccount.objects.create(
name="Test Account",
imap_server="test.imap.com",
username="myusername",
password="mypassword",
)
stdout = StringIO()
call_command(
"document_exporter",
"--no-progress-bar",
str(self.target),
stdout=stdout,
)
stdout.seek(0)
self.assertIn(
(
"You have configured mail accounts, "
"but no passphrase was given. "
"Passwords will be in plaintext"
),
stdout.read(),
)

View File

@ -125,15 +125,16 @@ class TestCommandImport(
EXPORTER_ARCHIVE_NAME: "archive.pdf", EXPORTER_ARCHIVE_NAME: "archive.pdf",
}, },
] ]
cmd.data_only = False
with self.assertRaises(CommandError) as cm: with self.assertRaises(CommandError) as cm:
cmd._check_manifest_files_valid() cmd.check_manifest_validity()
self.assertInt("Failed to read from original file", str(cm.exception)) self.assertInt("Failed to read from original file", str(cm.exception))
original_path.chmod(0o444) original_path.chmod(0o444)
archive_path.chmod(0o222) archive_path.chmod(0o222)
with self.assertRaises(CommandError) as cm: with self.assertRaises(CommandError) as cm:
cmd._check_manifest_files_valid() cmd.check_manifest_validity()
self.assertInt("Failed to read from archive file", str(cm.exception)) self.assertInt("Failed to read from archive file", str(cm.exception))
def test_import_source_not_existing(self): def test_import_source_not_existing(self):
@ -240,7 +241,7 @@ class TestCommandImport(
stdout.seek(0) stdout.seek(0)
self.assertIn( self.assertIn(
"Found existing user(s), this might indicate a non-empty installation", "Found existing user(s), this might indicate a non-empty installation",
str(stdout.read()), stdout.read(),
) )
def test_import_with_documents_exists(self): def test_import_with_documents_exists(self):
@ -278,3 +279,59 @@ class TestCommandImport(
"Found existing documents(s), this might indicate a non-empty installation", "Found existing documents(s), this might indicate a non-empty installation",
str(stdout.read()), str(stdout.read()),
) )
def test_import_no_metadata_or_version_file(self):
"""
GIVEN:
- A source directory with a manifest file only
WHEN:
- An import is attempted
THEN:
- Warning about the missing files is output
"""
stdout = StringIO()
(self.dirs.scratch_dir / "manifest.json").touch()
# We're not building a manifest, so it fails, but this test doesn't care
with self.assertRaises(json.decoder.JSONDecodeError):
call_command(
"document_importer",
"--no-progress-bar",
str(self.dirs.scratch_dir),
stdout=stdout,
)
stdout.seek(0)
stdout_str = str(stdout.read())
self.assertIn("No version.json or metadata.json file located", stdout_str)
def test_import_version_file(self):
"""
GIVEN:
- A source directory with a manifest file and version file
WHEN:
- An import is attempted
THEN:
- Warning about the the version mismatch is output
"""
stdout = StringIO()
(self.dirs.scratch_dir / "manifest.json").touch()
(self.dirs.scratch_dir / "version.json").write_text(
json.dumps({"version": "2.8.1"}),
)
# We're not building a manifest, so it fails, but this test doesn't care
with self.assertRaises(json.decoder.JSONDecodeError):
call_command(
"document_importer",
"--no-progress-bar",
str(self.dirs.scratch_dir),
stdout=stdout,
)
stdout.seek(0)
stdout_str = str(stdout.read())
self.assertIn("Version mismatch:", stdout_str)
self.assertIn("importing 2.8.1", stdout_str)

View File

@ -4,6 +4,7 @@ addopts = --pythonwarnings=all --cov --cov-report=html --cov-report=xml --numpro
env = env =
PAPERLESS_DISABLE_DBHANDLER=true PAPERLESS_DISABLE_DBHANDLER=true
PAPERLESS_CACHE_BACKEND=django.core.cache.backends.locmem.LocMemCache PAPERLESS_CACHE_BACKEND=django.core.cache.backends.locmem.LocMemCache
norecursedirs = locale/*
[coverage:run] [coverage:run]
source = source =