mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-01-26 22:49:01 -06:00
Breaking: Remove support for document and thumbnail encryption (#11850)
This commit is contained in:
@@ -1,93 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.core.management.base import CommandError
|
||||
|
||||
from documents.models import Document
|
||||
from paperless.db import GnuPG
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = (
|
||||
"This is how you migrate your stored documents from an encrypted "
|
||||
"state to an unencrypted one (or vice-versa)"
|
||||
)
|
||||
|
||||
def add_arguments(self, parser) -> None:
|
||||
parser.add_argument(
|
||||
"--passphrase",
|
||||
help=(
|
||||
"If PAPERLESS_PASSPHRASE isn't set already, you need to specify it here"
|
||||
),
|
||||
)
|
||||
|
||||
def handle(self, *args, **options) -> None:
|
||||
try:
|
||||
self.stdout.write(
|
||||
self.style.WARNING(
|
||||
"\n\n"
|
||||
"WARNING: This script is going to work directly on your "
|
||||
"document originals, so\n"
|
||||
"WARNING: you probably shouldn't run "
|
||||
"this unless you've got a recent backup\n"
|
||||
"WARNING: handy. It "
|
||||
"*should* work without a hitch, but be safe and backup your\n"
|
||||
"WARNING: stuff first.\n\n"
|
||||
"Hit Ctrl+C to exit now, or Enter to "
|
||||
"continue.\n\n",
|
||||
),
|
||||
)
|
||||
_ = input()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
|
||||
passphrase = options["passphrase"] or settings.PASSPHRASE
|
||||
if not passphrase:
|
||||
raise CommandError(
|
||||
"Passphrase not defined. Please set it with --passphrase or "
|
||||
"by declaring it in your environment or your config.",
|
||||
)
|
||||
|
||||
self.__gpg_to_unencrypted(passphrase)
|
||||
|
||||
def __gpg_to_unencrypted(self, passphrase: str) -> None:
|
||||
encrypted_files = Document.objects.filter(
|
||||
storage_type=Document.STORAGE_TYPE_GPG,
|
||||
)
|
||||
|
||||
for document in encrypted_files:
|
||||
self.stdout.write(f"Decrypting {document}")
|
||||
|
||||
old_paths = [document.source_path, document.thumbnail_path]
|
||||
|
||||
with document.source_file as file_handle:
|
||||
raw_document = GnuPG.decrypted(file_handle, passphrase)
|
||||
with document.thumbnail_file as file_handle:
|
||||
raw_thumb = GnuPG.decrypted(file_handle, passphrase)
|
||||
|
||||
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
|
||||
ext: str = Path(document.filename).suffix
|
||||
|
||||
if not ext == ".gpg":
|
||||
raise CommandError(
|
||||
f"Abort: encrypted file {document.source_path} does not "
|
||||
f"end with .gpg",
|
||||
)
|
||||
|
||||
document.filename = Path(document.filename).stem
|
||||
|
||||
with document.source_path.open("wb") as f:
|
||||
f.write(raw_document)
|
||||
|
||||
with document.thumbnail_path.open("wb") as f:
|
||||
f.write(raw_thumb)
|
||||
|
||||
Document.objects.filter(id=document.id).update(
|
||||
storage_type=document.storage_type,
|
||||
filename=document.filename,
|
||||
)
|
||||
|
||||
for path in old_paths:
|
||||
path.unlink()
|
||||
@@ -3,7 +3,6 @@ import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@@ -56,7 +55,6 @@ from documents.settings import EXPORTER_FILE_NAME
|
||||
from documents.settings import EXPORTER_THUMBNAIL_NAME
|
||||
from documents.utils import copy_file_with_basic_stats
|
||||
from paperless import version
|
||||
from paperless.db import GnuPG
|
||||
from paperless.models import ApplicationConfiguration
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.models import MailRule
|
||||
@@ -316,20 +314,17 @@ class Command(CryptMixin, BaseCommand):
|
||||
total=len(document_manifest),
|
||||
disable=self.no_progress_bar,
|
||||
):
|
||||
# 3.1. store files unencrypted
|
||||
document_dict["fields"]["storage_type"] = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
|
||||
document = document_map[document_dict["pk"]]
|
||||
|
||||
# 3.2. generate a unique filename
|
||||
# 3.1. generate a unique filename
|
||||
base_name = self.generate_base_name(document)
|
||||
|
||||
# 3.3. write filenames into manifest
|
||||
# 3.2. write filenames into manifest
|
||||
original_target, thumbnail_target, archive_target = (
|
||||
self.generate_document_targets(document, base_name, document_dict)
|
||||
)
|
||||
|
||||
# 3.4. write files to target folder
|
||||
# 3.3. write files to target folder
|
||||
if not self.data_only:
|
||||
self.copy_document_files(
|
||||
document,
|
||||
@@ -423,7 +418,6 @@ class Command(CryptMixin, BaseCommand):
|
||||
base_name = generate_filename(
|
||||
document,
|
||||
counter=filename_counter,
|
||||
append_gpg=False,
|
||||
)
|
||||
else:
|
||||
base_name = document.get_public_filename(counter=filename_counter)
|
||||
@@ -482,46 +476,24 @@ class Command(CryptMixin, BaseCommand):
|
||||
|
||||
If the document is encrypted, the files are decrypted before copying them to the target location.
|
||||
"""
|
||||
if document.storage_type == Document.STORAGE_TYPE_GPG:
|
||||
t = int(time.mktime(document.created.timetuple()))
|
||||
self.check_and_copy(
|
||||
document.source_path,
|
||||
document.checksum,
|
||||
original_target,
|
||||
)
|
||||
|
||||
original_target.parent.mkdir(parents=True, exist_ok=True)
|
||||
with document.source_file as out_file:
|
||||
original_target.write_bytes(GnuPG.decrypted(out_file))
|
||||
os.utime(original_target, times=(t, t))
|
||||
if thumbnail_target:
|
||||
self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
|
||||
|
||||
if thumbnail_target:
|
||||
thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
|
||||
with document.thumbnail_file as out_file:
|
||||
thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
|
||||
os.utime(thumbnail_target, times=(t, t))
|
||||
|
||||
if archive_target:
|
||||
archive_target.parent.mkdir(parents=True, exist_ok=True)
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(document.archive_path, Path)
|
||||
with document.archive_path as out_file:
|
||||
archive_target.write_bytes(GnuPG.decrypted(out_file))
|
||||
os.utime(archive_target, times=(t, t))
|
||||
else:
|
||||
if archive_target:
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(document.archive_path, Path)
|
||||
self.check_and_copy(
|
||||
document.source_path,
|
||||
document.checksum,
|
||||
original_target,
|
||||
document.archive_path,
|
||||
document.archive_checksum,
|
||||
archive_target,
|
||||
)
|
||||
|
||||
if thumbnail_target:
|
||||
self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
|
||||
|
||||
if archive_target:
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(document.archive_path, Path)
|
||||
self.check_and_copy(
|
||||
document.archive_path,
|
||||
document.archive_checksum,
|
||||
archive_target,
|
||||
)
|
||||
|
||||
def check_and_write_json(
|
||||
self,
|
||||
content: list[dict] | dict,
|
||||
|
||||
@@ -383,8 +383,6 @@ class Command(CryptMixin, BaseCommand):
|
||||
else:
|
||||
archive_path = None
|
||||
|
||||
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
|
||||
with FileLock(settings.MEDIA_LOCK):
|
||||
if Path(document.source_path).is_file():
|
||||
raise FileExistsError(document.source_path)
|
||||
|
||||
Reference in New Issue
Block a user