paperless-ngx/src/documents/migrations/1037_webp_encrypted_thumbnail_conversion.py
2023-06-29 13:21:15 -07:00

163 lines
5.2 KiB
Python

# Generated by Django 4.1.9 on 2023-06-29 19:29
import logging
import multiprocessing.pool
import shutil
import tempfile
import time
from pathlib import Path
import gnupg
from django.conf import settings
from django.db import migrations
from documents.parsers import run_convert
logger = logging.getLogger("paperless.migrations")
def _do_convert(work_package):
(
existing_encrypted_thumbnail,
converted_encrypted_thumbnail,
passphrase,
) = work_package
try:
gpg = gnupg.GPG(gnupghome=settings.GNUPG_HOME)
logger.info(f"Decrypting thumbnail: {existing_encrypted_thumbnail}")
# Decrypt png
decrypted_thumbnail = existing_encrypted_thumbnail.with_suffix("").resolve()
with open(existing_encrypted_thumbnail, "rb") as existing_encrypted_file:
raw_thumb = gpg.decrypt_file(
existing_encrypted_file,
passphrase=passphrase,
always_trust=True,
).data
with open(decrypted_thumbnail, "wb") as decrypted_file:
decrypted_file.write(raw_thumb)
converted_decrypted_thumbnail = Path(
str(converted_encrypted_thumbnail).replace("webp.gpg", "webp"),
).resolve()
logger.info(f"Converting decrypted thumbnail: {decrypted_thumbnail}")
# Convert to webp
run_convert(
density=300,
scale="500x5000>",
alpha="remove",
strip=True,
trim=False,
auto_orient=True,
input_file=f"{decrypted_thumbnail}[0]",
output_file=str(converted_decrypted_thumbnail),
)
logger.info(
f"Encrypting converted thumbnail: {converted_decrypted_thumbnail}",
)
# Encrypt webp
with open(converted_decrypted_thumbnail, "rb") as converted_decrypted_file:
encrypted = gpg.encrypt_file(
fileobj_or_path=converted_decrypted_file,
recipients=None,
passphrase=passphrase,
symmetric=True,
always_trust=True,
).data
with open(converted_encrypted_thumbnail, "wb") as converted_encrypted_file:
converted_encrypted_file.write(encrypted)
# Copy newly created thumbnail to thumbnail directory
shutil.copy(converted_encrypted_thumbnail, existing_encrypted_thumbnail.parent)
# Remove the existing encrypted PNG version
existing_encrypted_thumbnail.unlink()
# Remove the decrypted PNG version
decrypted_thumbnail.unlink()
# Remove the decrypted WebP version
converted_decrypted_thumbnail.unlink()
logger.info(
"Conversion to WebP completed, "
f"replaced {existing_encrypted_thumbnail.name} with {converted_encrypted_thumbnail.name}",
)
except Exception as e:
logger.error(f"Error converting thumbnail (existing file unchanged): {e}")
def _convert_encrypted_thumbnails_to_webp(apps, schema_editor):
start = time.time()
with tempfile.TemporaryDirectory() as tempdir:
work_packages = []
if len(list(Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"))) > 0:
passphrase = settings.PASSPHRASE
if not passphrase:
raise Exception(
"Passphrase not defined, encrypted thumbnails cannot be migrated"
"without this",
)
for file in Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"):
existing_thumbnail = file.resolve()
# Change the existing filename suffix from png to webp
converted_thumbnail_name = Path(
str(existing_thumbnail).replace(".png.gpg", ".webp.gpg"),
).name
# Create the expected output filename in the tempdir
converted_thumbnail = (
Path(tempdir) / Path(converted_thumbnail_name)
).resolve()
# Package up the necessary info
work_packages.append(
(existing_thumbnail, converted_thumbnail, passphrase),
)
if len(work_packages):
logger.info(
"\n\n"
" This is a one-time only migration to convert thumbnails for all of your\n"
" *encrypted* documents into WebP format. If you have a lot of encrypted documents, \n"
" this may take a while, so a coffee break may be in order."
"\n",
)
with multiprocessing.pool.Pool(
processes=min(multiprocessing.cpu_count(), 4),
maxtasksperchild=4,
) as pool:
pool.map(_do_convert, work_packages)
end = time.time()
duration = end - start
logger.info(f"Conversion completed in {duration:.3f}s")
class Migration(migrations.Migration):
dependencies = [
("documents", "1036_alter_savedviewfilterrule_rule_type"),
]
operations = [
migrations.RunPython(
code=_convert_encrypted_thumbnails_to_webp,
reverse_code=migrations.RunPython.noop,
),
]