mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	Merge pull request #3719 from paperless-ngx/fix/issue-3712
Enhancement / Fix: Migrate encrypted png thumbnails to webp
This commit is contained in:
		@@ -0,0 +1,162 @@
 | 
			
		||||
# Generated by Django 4.1.9 on 2023-06-29 19:29
 | 
			
		||||
import logging
 | 
			
		||||
import multiprocessing.pool
 | 
			
		||||
import shutil
 | 
			
		||||
import tempfile
 | 
			
		||||
import time
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
import gnupg
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.db import migrations
 | 
			
		||||
 | 
			
		||||
from documents.parsers import run_convert
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger("paperless.migrations")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _do_convert(work_package):
 | 
			
		||||
    (
 | 
			
		||||
        existing_encrypted_thumbnail,
 | 
			
		||||
        converted_encrypted_thumbnail,
 | 
			
		||||
        passphrase,
 | 
			
		||||
    ) = work_package
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        gpg = gnupg.GPG(gnupghome=settings.GNUPG_HOME)
 | 
			
		||||
 | 
			
		||||
        logger.info(f"Decrypting thumbnail: {existing_encrypted_thumbnail}")
 | 
			
		||||
 | 
			
		||||
        # Decrypt png
 | 
			
		||||
        decrypted_thumbnail = existing_encrypted_thumbnail.with_suffix("").resolve()
 | 
			
		||||
 | 
			
		||||
        with open(existing_encrypted_thumbnail, "rb") as existing_encrypted_file:
 | 
			
		||||
            raw_thumb = gpg.decrypt_file(
 | 
			
		||||
                existing_encrypted_file,
 | 
			
		||||
                passphrase=passphrase,
 | 
			
		||||
                always_trust=True,
 | 
			
		||||
            ).data
 | 
			
		||||
            with open(decrypted_thumbnail, "wb") as decrypted_file:
 | 
			
		||||
                decrypted_file.write(raw_thumb)
 | 
			
		||||
 | 
			
		||||
        converted_decrypted_thumbnail = Path(
 | 
			
		||||
            str(converted_encrypted_thumbnail).replace("webp.gpg", "webp"),
 | 
			
		||||
        ).resolve()
 | 
			
		||||
 | 
			
		||||
        logger.info(f"Converting decrypted thumbnail: {decrypted_thumbnail}")
 | 
			
		||||
 | 
			
		||||
        # Convert to webp
 | 
			
		||||
        run_convert(
 | 
			
		||||
            density=300,
 | 
			
		||||
            scale="500x5000>",
 | 
			
		||||
            alpha="remove",
 | 
			
		||||
            strip=True,
 | 
			
		||||
            trim=False,
 | 
			
		||||
            auto_orient=True,
 | 
			
		||||
            input_file=f"{decrypted_thumbnail}[0]",
 | 
			
		||||
            output_file=str(converted_decrypted_thumbnail),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        logger.info(
 | 
			
		||||
            f"Encrypting converted thumbnail: {converted_decrypted_thumbnail}",
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Encrypt webp
 | 
			
		||||
        with open(converted_decrypted_thumbnail, "rb") as converted_decrypted_file:
 | 
			
		||||
            encrypted = gpg.encrypt_file(
 | 
			
		||||
                fileobj_or_path=converted_decrypted_file,
 | 
			
		||||
                recipients=None,
 | 
			
		||||
                passphrase=passphrase,
 | 
			
		||||
                symmetric=True,
 | 
			
		||||
                always_trust=True,
 | 
			
		||||
            ).data
 | 
			
		||||
 | 
			
		||||
            with open(converted_encrypted_thumbnail, "wb") as converted_encrypted_file:
 | 
			
		||||
                converted_encrypted_file.write(encrypted)
 | 
			
		||||
 | 
			
		||||
        # Copy newly created thumbnail to thumbnail directory
 | 
			
		||||
        shutil.copy(converted_encrypted_thumbnail, existing_encrypted_thumbnail.parent)
 | 
			
		||||
 | 
			
		||||
        # Remove the existing encrypted PNG version
 | 
			
		||||
        existing_encrypted_thumbnail.unlink()
 | 
			
		||||
 | 
			
		||||
        # Remove the decrypted PNG version
 | 
			
		||||
        decrypted_thumbnail.unlink()
 | 
			
		||||
 | 
			
		||||
        # Remove the decrypted WebP version
 | 
			
		||||
        converted_decrypted_thumbnail.unlink()
 | 
			
		||||
 | 
			
		||||
        logger.info(
 | 
			
		||||
            "Conversion to WebP completed, "
 | 
			
		||||
            f"replaced {existing_encrypted_thumbnail.name} with {converted_encrypted_thumbnail.name}",
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f"Error converting thumbnail (existing file unchanged): {e}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _convert_encrypted_thumbnails_to_webp(apps, schema_editor):
 | 
			
		||||
    start = time.time()
 | 
			
		||||
 | 
			
		||||
    with tempfile.TemporaryDirectory() as tempdir:
 | 
			
		||||
        work_packages = []
 | 
			
		||||
 | 
			
		||||
        if len(list(Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"))) > 0:
 | 
			
		||||
            passphrase = settings.PASSPHRASE
 | 
			
		||||
 | 
			
		||||
            if not passphrase:
 | 
			
		||||
                raise Exception(
 | 
			
		||||
                    "Passphrase not defined, encrypted thumbnails cannot be migrated"
 | 
			
		||||
                    "without this",
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            for file in Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"):
 | 
			
		||||
                existing_thumbnail = file.resolve()
 | 
			
		||||
 | 
			
		||||
                # Change the existing filename suffix from png to webp
 | 
			
		||||
                converted_thumbnail_name = Path(
 | 
			
		||||
                    str(existing_thumbnail).replace(".png.gpg", ".webp.gpg"),
 | 
			
		||||
                ).name
 | 
			
		||||
 | 
			
		||||
                # Create the expected output filename in the tempdir
 | 
			
		||||
                converted_thumbnail = (
 | 
			
		||||
                    Path(tempdir) / Path(converted_thumbnail_name)
 | 
			
		||||
                ).resolve()
 | 
			
		||||
 | 
			
		||||
                # Package up the necessary info
 | 
			
		||||
                work_packages.append(
 | 
			
		||||
                    (existing_thumbnail, converted_thumbnail, passphrase),
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            if len(work_packages):
 | 
			
		||||
                logger.info(
 | 
			
		||||
                    "\n\n"
 | 
			
		||||
                    "  This is a one-time only migration to convert thumbnails for all of your\n"
 | 
			
		||||
                    "  *encrypted* documents into WebP format. If you have a lot of encrypted documents, \n"
 | 
			
		||||
                    "  this may take a while, so a coffee break may be in order."
 | 
			
		||||
                    "\n",
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                with multiprocessing.pool.Pool(
 | 
			
		||||
                    processes=min(multiprocessing.cpu_count(), 4),
 | 
			
		||||
                    maxtasksperchild=4,
 | 
			
		||||
                ) as pool:
 | 
			
		||||
                    pool.map(_do_convert, work_packages)
 | 
			
		||||
 | 
			
		||||
                    end = time.time()
 | 
			
		||||
                    duration = end - start
 | 
			
		||||
 | 
			
		||||
                logger.info(f"Conversion completed in {duration:.3f}s")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Migration(migrations.Migration):
 | 
			
		||||
    dependencies = [
 | 
			
		||||
        ("documents", "1036_alter_savedviewfilterrule_rule_type"),
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    operations = [
 | 
			
		||||
        migrations.RunPython(
 | 
			
		||||
            code=_convert_encrypted_thumbnails_to_webp,
 | 
			
		||||
            reverse_code=migrations.RunPython.noop,
 | 
			
		||||
        ),
 | 
			
		||||
    ]
 | 
			
		||||
							
								
								
									
										276
									
								
								src/documents/tests/test_migration_encrypted_webp_conversion.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										276
									
								
								src/documents/tests/test_migration_encrypted_webp_conversion.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,276 @@
 | 
			
		||||
import shutil
 | 
			
		||||
import tempfile
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from typing import Callable
 | 
			
		||||
from typing import Iterable
 | 
			
		||||
from typing import Union
 | 
			
		||||
from unittest import mock
 | 
			
		||||
 | 
			
		||||
from django.test import override_settings
 | 
			
		||||
 | 
			
		||||
from documents.tests.utils import TestMigrations
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@override_settings(PASSPHRASE="test")
 | 
			
		||||
@mock.patch(
 | 
			
		||||
    "documents.migrations.1037_webp_encrypted_thumbnail_conversion.multiprocessing.pool.Pool.map",
 | 
			
		||||
)
 | 
			
		||||
@mock.patch("documents.migrations.1037_webp_encrypted_thumbnail_conversion.run_convert")
 | 
			
		||||
class TestMigrateToEncrytpedWebPThumbnails(TestMigrations):
 | 
			
		||||
    migrate_from = "1036_alter_savedviewfilterrule_rule_type"
 | 
			
		||||
    migrate_to = "1037_webp_encrypted_thumbnail_conversion"
 | 
			
		||||
    auto_migrate = False
 | 
			
		||||
 | 
			
		||||
    def pretend_convert_output(self, *args, **kwargs):
 | 
			
		||||
        """
 | 
			
		||||
        Pretends to do the conversion, by copying the input file
 | 
			
		||||
        to the output file
 | 
			
		||||
        """
 | 
			
		||||
        shutil.copy2(
 | 
			
		||||
            Path(kwargs["input_file"].rstrip("[0]")),
 | 
			
		||||
            Path(kwargs["output_file"]),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def pretend_map(self, func: Callable, iterable: Iterable):
 | 
			
		||||
        """
 | 
			
		||||
        Pretends to be the map of a multiprocessing.Pool, but secretly does
 | 
			
		||||
        everything in series
 | 
			
		||||
        """
 | 
			
		||||
        for item in iterable:
 | 
			
		||||
            func(item)
 | 
			
		||||
 | 
			
		||||
    def create_dummy_thumbnails(
 | 
			
		||||
        self,
 | 
			
		||||
        thumb_dir: Path,
 | 
			
		||||
        ext: str,
 | 
			
		||||
        count: int,
 | 
			
		||||
        start_count: int = 0,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Helper to create a certain count of files of given extension in a given directory
 | 
			
		||||
        """
 | 
			
		||||
        for idx in range(count):
 | 
			
		||||
            (Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
 | 
			
		||||
        # Triple check expected files exist
 | 
			
		||||
        self.assert_file_count_by_extension(ext, thumb_dir, count)
 | 
			
		||||
 | 
			
		||||
    def create_webp_thumbnail_files(
 | 
			
		||||
        self,
 | 
			
		||||
        thumb_dir: Path,
 | 
			
		||||
        count: int,
 | 
			
		||||
        start_count: int = 0,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Creates a dummy WebP thumbnail file in the given directory, based on
 | 
			
		||||
        the database Document
 | 
			
		||||
        """
 | 
			
		||||
        self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
 | 
			
		||||
 | 
			
		||||
    def create_encrypted_webp_thumbnail_files(
 | 
			
		||||
        self,
 | 
			
		||||
        thumb_dir: Path,
 | 
			
		||||
        count: int,
 | 
			
		||||
        start_count: int = 0,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Creates a dummy encrypted WebP thumbnail file in the given directory, based on
 | 
			
		||||
        the database Document
 | 
			
		||||
        """
 | 
			
		||||
        self.create_dummy_thumbnails(thumb_dir, "webp.gpg", count, start_count)
 | 
			
		||||
 | 
			
		||||
    def create_png_thumbnail_files(
 | 
			
		||||
        self,
 | 
			
		||||
        thumb_dir: Path,
 | 
			
		||||
        count: int,
 | 
			
		||||
        start_count: int = 0,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Creates a dummy PNG thumbnail file in the given directory, based on
 | 
			
		||||
        the database Document
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
 | 
			
		||||
 | 
			
		||||
    def create_encrypted_png_thumbnail_files(
 | 
			
		||||
        self,
 | 
			
		||||
        thumb_dir: Path,
 | 
			
		||||
        count: int,
 | 
			
		||||
        start_count: int = 0,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Creates a dummy encrypted PNG thumbnail file in the given directory, based on
 | 
			
		||||
        the database Document
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        self.create_dummy_thumbnails(thumb_dir, "png.gpg", count, start_count)
 | 
			
		||||
 | 
			
		||||
    def assert_file_count_by_extension(
 | 
			
		||||
        self,
 | 
			
		||||
        ext: str,
 | 
			
		||||
        dir: Union[str, Path],
 | 
			
		||||
        expected_count: int,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Helper to assert a certain count of given extension files in given directory
 | 
			
		||||
        """
 | 
			
		||||
        if not isinstance(dir, Path):
 | 
			
		||||
            dir = Path(dir)
 | 
			
		||||
        matching_files = list(dir.glob(f"*.{ext}"))
 | 
			
		||||
        self.assertEqual(len(matching_files), expected_count)
 | 
			
		||||
 | 
			
		||||
    def assert_encrypted_png_file_count(self, dir: Path, expected_count: int):
 | 
			
		||||
        """
 | 
			
		||||
        Helper to assert a certain count of excrypted PNG extension files in given directory
 | 
			
		||||
        """
 | 
			
		||||
        self.assert_file_count_by_extension("png.gpg", dir, expected_count)
 | 
			
		||||
 | 
			
		||||
    def assert_encrypted_webp_file_count(self, dir: Path, expected_count: int):
 | 
			
		||||
        """
 | 
			
		||||
        Helper to assert a certain count of encrypted WebP extension files in given directory
 | 
			
		||||
        """
 | 
			
		||||
        self.assert_file_count_by_extension("webp.gpg", dir, expected_count)
 | 
			
		||||
 | 
			
		||||
    def assert_webp_file_count(self, dir: Path, expected_count: int):
 | 
			
		||||
        """
 | 
			
		||||
        Helper to assert a certain count of WebP extension files in given directory
 | 
			
		||||
        """
 | 
			
		||||
        self.assert_file_count_by_extension("webp", dir, expected_count)
 | 
			
		||||
 | 
			
		||||
    def assert_png_file_count(self, dir: Path, expected_count: int):
 | 
			
		||||
        """
 | 
			
		||||
        Helper to assert a certain count of PNG extension files in given directory
 | 
			
		||||
        """
 | 
			
		||||
        self.assert_file_count_by_extension("png", dir, expected_count)
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
 | 
			
		||||
 | 
			
		||||
        return super().setUp()
 | 
			
		||||
 | 
			
		||||
    def tearDown(self) -> None:
 | 
			
		||||
        shutil.rmtree(self.thumbnail_dir)
 | 
			
		||||
 | 
			
		||||
        return super().tearDown()
 | 
			
		||||
 | 
			
		||||
    def test_do_nothing_if_converted(
 | 
			
		||||
        self,
 | 
			
		||||
        run_convert_mock: mock.MagicMock,
 | 
			
		||||
        map_mock: mock.MagicMock,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Encrytped document exists with existing encrypted WebP thumbnail path
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - Migration is attempted
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Nothing is converted
 | 
			
		||||
        """
 | 
			
		||||
        map_mock.side_effect = self.pretend_map
 | 
			
		||||
 | 
			
		||||
        with override_settings(
 | 
			
		||||
            THUMBNAIL_DIR=self.thumbnail_dir,
 | 
			
		||||
        ):
 | 
			
		||||
            self.create_encrypted_webp_thumbnail_files(self.thumbnail_dir, 3)
 | 
			
		||||
 | 
			
		||||
            self.performMigration()
 | 
			
		||||
            run_convert_mock.assert_not_called()
 | 
			
		||||
 | 
			
		||||
            self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
 | 
			
		||||
 | 
			
		||||
    def test_convert_thumbnails(
 | 
			
		||||
        self,
 | 
			
		||||
        run_convert_mock: mock.MagicMock,
 | 
			
		||||
        map_mock: mock.MagicMock,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Encrypted documents exist with PNG thumbnail
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - Migration is attempted
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Thumbnails are converted to webp & re-encrypted
 | 
			
		||||
        """
 | 
			
		||||
        map_mock.side_effect = self.pretend_map
 | 
			
		||||
        run_convert_mock.side_effect = self.pretend_convert_output
 | 
			
		||||
 | 
			
		||||
        with override_settings(
 | 
			
		||||
            THUMBNAIL_DIR=self.thumbnail_dir,
 | 
			
		||||
        ):
 | 
			
		||||
            self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
 | 
			
		||||
 | 
			
		||||
            self.performMigration()
 | 
			
		||||
 | 
			
		||||
            run_convert_mock.assert_called()
 | 
			
		||||
            self.assertEqual(run_convert_mock.call_count, 3)
 | 
			
		||||
 | 
			
		||||
            self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
 | 
			
		||||
 | 
			
		||||
    def test_convert_errors_out(
 | 
			
		||||
        self,
 | 
			
		||||
        run_convert_mock: mock.MagicMock,
 | 
			
		||||
        map_mock: mock.MagicMock,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Encrypted document exists with PNG thumbnail
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - Migration is attempted, but raises an exception
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Single thumbnail is converted
 | 
			
		||||
        """
 | 
			
		||||
        map_mock.side_effect = self.pretend_map
 | 
			
		||||
        run_convert_mock.side_effect = OSError
 | 
			
		||||
 | 
			
		||||
        with override_settings(
 | 
			
		||||
            THUMBNAIL_DIR=self.thumbnail_dir,
 | 
			
		||||
        ):
 | 
			
		||||
            self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
 | 
			
		||||
 | 
			
		||||
            self.performMigration()
 | 
			
		||||
 | 
			
		||||
            run_convert_mock.assert_called()
 | 
			
		||||
            self.assertEqual(run_convert_mock.call_count, 3)
 | 
			
		||||
 | 
			
		||||
            self.assert_encrypted_png_file_count(self.thumbnail_dir, 3)
 | 
			
		||||
 | 
			
		||||
    def test_convert_mixed(
 | 
			
		||||
        self,
 | 
			
		||||
        run_convert_mock: mock.MagicMock,
 | 
			
		||||
        map_mock: mock.MagicMock,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Documents exist with PNG, encrypted PNG and WebP thumbnails
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - Migration is attempted
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Only encrypted PNG thumbnails are converted
 | 
			
		||||
        """
 | 
			
		||||
        map_mock.side_effect = self.pretend_map
 | 
			
		||||
        run_convert_mock.side_effect = self.pretend_convert_output
 | 
			
		||||
 | 
			
		||||
        with override_settings(
 | 
			
		||||
            THUMBNAIL_DIR=self.thumbnail_dir,
 | 
			
		||||
        ):
 | 
			
		||||
            self.create_png_thumbnail_files(self.thumbnail_dir, 3)
 | 
			
		||||
            self.create_encrypted_png_thumbnail_files(
 | 
			
		||||
                self.thumbnail_dir,
 | 
			
		||||
                3,
 | 
			
		||||
                start_count=3,
 | 
			
		||||
            )
 | 
			
		||||
            self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=6)
 | 
			
		||||
            self.create_encrypted_webp_thumbnail_files(
 | 
			
		||||
                self.thumbnail_dir,
 | 
			
		||||
                3,
 | 
			
		||||
                start_count=8,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            self.performMigration()
 | 
			
		||||
 | 
			
		||||
            run_convert_mock.assert_called()
 | 
			
		||||
            self.assertEqual(run_convert_mock.call_count, 3)
 | 
			
		||||
 | 
			
		||||
            self.assert_png_file_count(self.thumbnail_dir, 3)
 | 
			
		||||
            self.assert_encrypted_webp_file_count(self.thumbnail_dir, 6)
 | 
			
		||||
            self.assert_webp_file_count(self.thumbnail_dir, 2)
 | 
			
		||||
            self.assert_encrypted_png_file_count(self.thumbnail_dir, 0)
 | 
			
		||||
		Reference in New Issue
	
	Block a user