mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Migrate encrypted png thumbnails to webp
This commit is contained in:
		| @@ -0,0 +1,162 @@ | ||||
| # Generated by Django 4.1.9 on 2023-06-29 19:29 | ||||
| import logging | ||||
| import multiprocessing.pool | ||||
| import shutil | ||||
| import tempfile | ||||
| import time | ||||
| from pathlib import Path | ||||
|  | ||||
| import gnupg | ||||
| from django.conf import settings | ||||
| from django.db import migrations | ||||
|  | ||||
| from documents.parsers import run_convert | ||||
|  | ||||
| logger = logging.getLogger("paperless.migrations") | ||||
|  | ||||
|  | ||||
| def _do_convert(work_package): | ||||
|     ( | ||||
|         existing_encrypted_thumbnail, | ||||
|         converted_encrypted_thumbnail, | ||||
|         passphrase, | ||||
|     ) = work_package | ||||
|  | ||||
|     try: | ||||
|         gpg = gnupg.GPG(gnupghome=settings.GNUPG_HOME) | ||||
|  | ||||
|         logger.info(f"Decrypting thumbnail: {existing_encrypted_thumbnail}") | ||||
|  | ||||
|         # Decrypt png | ||||
|         decrypted_thumbnail = existing_encrypted_thumbnail.with_suffix("").resolve() | ||||
|  | ||||
|         with open(existing_encrypted_thumbnail, "rb") as existing_encrypted_file: | ||||
|             raw_thumb = gpg.decrypt_file( | ||||
|                 existing_encrypted_file, | ||||
|                 passphrase=passphrase, | ||||
|                 always_trust=True, | ||||
|             ).data | ||||
|             with open(decrypted_thumbnail, "wb") as decrypted_file: | ||||
|                 decrypted_file.write(raw_thumb) | ||||
|  | ||||
|         converted_decrypted_thumbnail = Path( | ||||
|             str(converted_encrypted_thumbnail).replace("webp.gpg", "webp"), | ||||
|         ).resolve() | ||||
|  | ||||
|         logger.info(f"Converting decrypted thumbnail: {decrypted_thumbnail}") | ||||
|  | ||||
|         # Convert to webp | ||||
|         run_convert( | ||||
|             density=300, | ||||
|             scale="500x5000>", | ||||
|             alpha="remove", | ||||
|             strip=True, | ||||
|             trim=False, | ||||
|             auto_orient=True, | ||||
|             input_file=f"{decrypted_thumbnail}[0]", | ||||
|             output_file=str(converted_decrypted_thumbnail), | ||||
|         ) | ||||
|  | ||||
|         logger.info( | ||||
|             f"Encrypting converted thumbnail: {converted_decrypted_thumbnail}", | ||||
|         ) | ||||
|  | ||||
|         # Encrypt webp | ||||
|         with open(converted_decrypted_thumbnail, "rb") as converted_decrypted_file: | ||||
|             encrypted = gpg.encrypt_file( | ||||
|                 fileobj_or_path=converted_decrypted_file, | ||||
|                 recipients=None, | ||||
|                 passphrase=passphrase, | ||||
|                 symmetric=True, | ||||
|                 always_trust=True, | ||||
|             ).data | ||||
|  | ||||
|             with open(converted_encrypted_thumbnail, "wb") as converted_encrypted_file: | ||||
|                 converted_encrypted_file.write(encrypted) | ||||
|  | ||||
|         # Copy newly created thumbnail to thumbnail directory | ||||
|         shutil.copy(converted_encrypted_thumbnail, existing_encrypted_thumbnail.parent) | ||||
|  | ||||
|         # Remove the existing encrypted PNG version | ||||
|         existing_encrypted_thumbnail.unlink() | ||||
|  | ||||
|         # Remove the decrypted PNG version | ||||
|         decrypted_thumbnail.unlink() | ||||
|  | ||||
|         # Remove the decrypted WebP version | ||||
|         converted_decrypted_thumbnail.unlink() | ||||
|  | ||||
|         logger.info( | ||||
|             "Conversion to WebP completed, " | ||||
|             f"replaced {existing_encrypted_thumbnail.name} with {converted_encrypted_thumbnail.name}", | ||||
|         ) | ||||
|  | ||||
|     except Exception as e: | ||||
|         logger.error(f"Error converting thumbnail (existing file unchanged): {e}") | ||||
|  | ||||
|  | ||||
| def _convert_encrypted_thumbnails_to_webp(apps, schema_editor): | ||||
|     start = time.time() | ||||
|  | ||||
|     with tempfile.TemporaryDirectory() as tempdir: | ||||
|         work_packages = [] | ||||
|  | ||||
|         if len(list(Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"))) > 0: | ||||
|             passphrase = settings.PASSPHRASE | ||||
|  | ||||
|             if not passphrase: | ||||
|                 raise Exception( | ||||
|                     "Passphrase not defined, encrypted thumbnails cannot be migrated" | ||||
|                     "without this", | ||||
|                 ) | ||||
|  | ||||
|             for file in Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"): | ||||
|                 existing_thumbnail = file.resolve() | ||||
|  | ||||
|                 # Change the existing filename suffix from png to webp | ||||
|                 converted_thumbnail_name = Path( | ||||
|                     str(existing_thumbnail).replace(".png.gpg", ".webp.gpg"), | ||||
|                 ).name | ||||
|  | ||||
|                 # Create the expected output filename in the tempdir | ||||
|                 converted_thumbnail = ( | ||||
|                     Path(tempdir) / Path(converted_thumbnail_name) | ||||
|                 ).resolve() | ||||
|  | ||||
|                 # Package up the necessary info | ||||
|                 work_packages.append( | ||||
|                     (existing_thumbnail, converted_thumbnail, passphrase), | ||||
|                 ) | ||||
|  | ||||
|             if len(work_packages): | ||||
|                 logger.info( | ||||
|                     "\n\n" | ||||
|                     "  This is a one-time only migration to convert thumbnails for all of your\n" | ||||
|                     "  *encrypted* documents into WebP format. If you have a lot of encrypted documents, \n" | ||||
|                     "  this may take a while, so a coffee break may be in order." | ||||
|                     "\n", | ||||
|                 ) | ||||
|  | ||||
|                 with multiprocessing.pool.Pool( | ||||
|                     processes=min(multiprocessing.cpu_count(), 4), | ||||
|                     maxtasksperchild=4, | ||||
|                 ) as pool: | ||||
|                     pool.map(_do_convert, work_packages) | ||||
|  | ||||
|                     end = time.time() | ||||
|                     duration = end - start | ||||
|  | ||||
|                 logger.info(f"Conversion completed in {duration:.3f}s") | ||||
|  | ||||
|  | ||||
| class Migration(migrations.Migration): | ||||
|     dependencies = [ | ||||
|         ("documents", "1036_alter_savedviewfilterrule_rule_type"), | ||||
|     ] | ||||
|  | ||||
|     operations = [ | ||||
|         migrations.RunPython( | ||||
|             code=_convert_encrypted_thumbnails_to_webp, | ||||
|             reverse_code=migrations.RunPython.noop, | ||||
|         ), | ||||
|     ] | ||||
							
								
								
									
										276
									
								
								src/documents/tests/test_migration_encrypted_webp_conversion.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										276
									
								
								src/documents/tests/test_migration_encrypted_webp_conversion.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,276 @@ | ||||
| import shutil | ||||
| import tempfile | ||||
| from pathlib import Path | ||||
| from typing import Callable | ||||
| from typing import Iterable | ||||
| from typing import Union | ||||
| from unittest import mock | ||||
|  | ||||
| from django.test import override_settings | ||||
|  | ||||
| from documents.tests.utils import TestMigrations | ||||
|  | ||||
|  | ||||
| @override_settings(PASSPHRASE="test") | ||||
| @mock.patch( | ||||
|     "documents.migrations.1037_webp_encrypted_thumbnail_conversion.multiprocessing.pool.Pool.map", | ||||
| ) | ||||
| @mock.patch("documents.migrations.1037_webp_encrypted_thumbnail_conversion.run_convert") | ||||
| class TestMigrateToEncrytpedWebPThumbnails(TestMigrations): | ||||
|     migrate_from = "1036_alter_savedviewfilterrule_rule_type" | ||||
|     migrate_to = "1037_webp_encrypted_thumbnail_conversion" | ||||
|     auto_migrate = False | ||||
|  | ||||
|     def pretend_convert_output(self, *args, **kwargs): | ||||
|         """ | ||||
|         Pretends to do the conversion, by copying the input file | ||||
|         to the output file | ||||
|         """ | ||||
|         shutil.copy2( | ||||
|             Path(kwargs["input_file"].rstrip("[0]")), | ||||
|             Path(kwargs["output_file"]), | ||||
|         ) | ||||
|  | ||||
|     def pretend_map(self, func: Callable, iterable: Iterable): | ||||
|         """ | ||||
|         Pretends to be the map of a multiprocessing.Pool, but secretly does | ||||
|         everything in series | ||||
|         """ | ||||
|         for item in iterable: | ||||
|             func(item) | ||||
|  | ||||
|     def create_dummy_thumbnails( | ||||
|         self, | ||||
|         thumb_dir: Path, | ||||
|         ext: str, | ||||
|         count: int, | ||||
|         start_count: int = 0, | ||||
|     ): | ||||
|         """ | ||||
|         Helper to create a certain count of files of given extension in a given directory | ||||
|         """ | ||||
|         for idx in range(count): | ||||
|             (Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch() | ||||
|         # Triple check expected files exist | ||||
|         self.assert_file_count_by_extension(ext, thumb_dir, count) | ||||
|  | ||||
|     def create_webp_thumbnail_files( | ||||
|         self, | ||||
|         thumb_dir: Path, | ||||
|         count: int, | ||||
|         start_count: int = 0, | ||||
|     ): | ||||
|         """ | ||||
|         Creates a dummy WebP thumbnail file in the given directory, based on | ||||
|         the database Document | ||||
|         """ | ||||
|         self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count) | ||||
|  | ||||
|     def create_encrypted_webp_thumbnail_files( | ||||
|         self, | ||||
|         thumb_dir: Path, | ||||
|         count: int, | ||||
|         start_count: int = 0, | ||||
|     ): | ||||
|         """ | ||||
|         Creates a dummy encrypted WebP thumbnail file in the given directory, based on | ||||
|         the database Document | ||||
|         """ | ||||
|         self.create_dummy_thumbnails(thumb_dir, "webp.gpg", count, start_count) | ||||
|  | ||||
|     def create_png_thumbnail_files( | ||||
|         self, | ||||
|         thumb_dir: Path, | ||||
|         count: int, | ||||
|         start_count: int = 0, | ||||
|     ): | ||||
|         """ | ||||
|         Creates a dummy PNG thumbnail file in the given directory, based on | ||||
|         the database Document | ||||
|         """ | ||||
|  | ||||
|         self.create_dummy_thumbnails(thumb_dir, "png", count, start_count) | ||||
|  | ||||
|     def create_encrypted_png_thumbnail_files( | ||||
|         self, | ||||
|         thumb_dir: Path, | ||||
|         count: int, | ||||
|         start_count: int = 0, | ||||
|     ): | ||||
|         """ | ||||
|         Creates a dummy encrypted PNG thumbnail file in the given directory, based on | ||||
|         the database Document | ||||
|         """ | ||||
|  | ||||
|         self.create_dummy_thumbnails(thumb_dir, "png.gpg", count, start_count) | ||||
|  | ||||
|     def assert_file_count_by_extension( | ||||
|         self, | ||||
|         ext: str, | ||||
|         dir: Union[str, Path], | ||||
|         expected_count: int, | ||||
|     ): | ||||
|         """ | ||||
|         Helper to assert a certain count of given extension files in given directory | ||||
|         """ | ||||
|         if not isinstance(dir, Path): | ||||
|             dir = Path(dir) | ||||
|         matching_files = list(dir.glob(f"*.{ext}")) | ||||
|         self.assertEqual(len(matching_files), expected_count) | ||||
|  | ||||
|     def assert_encrypted_png_file_count(self, dir: Path, expected_count: int): | ||||
|         """ | ||||
|         Helper to assert a certain count of excrypted PNG extension files in given directory | ||||
|         """ | ||||
|         self.assert_file_count_by_extension("png.gpg", dir, expected_count) | ||||
|  | ||||
|     def assert_encrypted_webp_file_count(self, dir: Path, expected_count: int): | ||||
|         """ | ||||
|         Helper to assert a certain count of encrypted WebP extension files in given directory | ||||
|         """ | ||||
|         self.assert_file_count_by_extension("webp.gpg", dir, expected_count) | ||||
|  | ||||
|     def assert_webp_file_count(self, dir: Path, expected_count: int): | ||||
|         """ | ||||
|         Helper to assert a certain count of WebP extension files in given directory | ||||
|         """ | ||||
|         self.assert_file_count_by_extension("webp", dir, expected_count) | ||||
|  | ||||
|     def assert_png_file_count(self, dir: Path, expected_count: int): | ||||
|         """ | ||||
|         Helper to assert a certain count of PNG extension files in given directory | ||||
|         """ | ||||
|         self.assert_file_count_by_extension("png", dir, expected_count) | ||||
|  | ||||
|     def setUp(self): | ||||
|         self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve() | ||||
|  | ||||
|         return super().setUp() | ||||
|  | ||||
|     def tearDown(self) -> None: | ||||
|         shutil.rmtree(self.thumbnail_dir) | ||||
|  | ||||
|         return super().tearDown() | ||||
|  | ||||
|     def test_do_nothing_if_converted( | ||||
|         self, | ||||
|         run_convert_mock: mock.MagicMock, | ||||
|         map_mock: mock.MagicMock, | ||||
|     ): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - Encrytped document exists with existing encrypted WebP thumbnail path | ||||
|         WHEN: | ||||
|             - Migration is attempted | ||||
|         THEN: | ||||
|             - Nothing is converted | ||||
|         """ | ||||
|         map_mock.side_effect = self.pretend_map | ||||
|  | ||||
|         with override_settings( | ||||
|             THUMBNAIL_DIR=self.thumbnail_dir, | ||||
|         ): | ||||
|             self.create_encrypted_webp_thumbnail_files(self.thumbnail_dir, 3) | ||||
|  | ||||
|             self.performMigration() | ||||
|             run_convert_mock.assert_not_called() | ||||
|  | ||||
|             self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3) | ||||
|  | ||||
|     def test_convert_thumbnails( | ||||
|         self, | ||||
|         run_convert_mock: mock.MagicMock, | ||||
|         map_mock: mock.MagicMock, | ||||
|     ): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - Encrypted documents exist with PNG thumbnail | ||||
|         WHEN: | ||||
|             - Migration is attempted | ||||
|         THEN: | ||||
|             - Thumbnails are converted to webp & re-encrypted | ||||
|         """ | ||||
|         map_mock.side_effect = self.pretend_map | ||||
|         run_convert_mock.side_effect = self.pretend_convert_output | ||||
|  | ||||
|         with override_settings( | ||||
|             THUMBNAIL_DIR=self.thumbnail_dir, | ||||
|         ): | ||||
|             self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3) | ||||
|  | ||||
|             self.performMigration() | ||||
|  | ||||
|             run_convert_mock.assert_called() | ||||
|             self.assertEqual(run_convert_mock.call_count, 3) | ||||
|  | ||||
|             self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3) | ||||
|  | ||||
|     def test_convert_errors_out( | ||||
|         self, | ||||
|         run_convert_mock: mock.MagicMock, | ||||
|         map_mock: mock.MagicMock, | ||||
|     ): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - Encrypted document exists with PNG thumbnail | ||||
|         WHEN: | ||||
|             - Migration is attempted, but raises an exception | ||||
|         THEN: | ||||
|             - Single thumbnail is converted | ||||
|         """ | ||||
|         map_mock.side_effect = self.pretend_map | ||||
|         run_convert_mock.side_effect = OSError | ||||
|  | ||||
|         with override_settings( | ||||
|             THUMBNAIL_DIR=self.thumbnail_dir, | ||||
|         ): | ||||
|             self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3) | ||||
|  | ||||
|             self.performMigration() | ||||
|  | ||||
|             run_convert_mock.assert_called() | ||||
|             self.assertEqual(run_convert_mock.call_count, 3) | ||||
|  | ||||
|             self.assert_encrypted_png_file_count(self.thumbnail_dir, 3) | ||||
|  | ||||
|     def test_convert_mixed( | ||||
|         self, | ||||
|         run_convert_mock: mock.MagicMock, | ||||
|         map_mock: mock.MagicMock, | ||||
|     ): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - Documents exist with PNG, encrypted PNG and WebP thumbnails | ||||
|         WHEN: | ||||
|             - Migration is attempted | ||||
|         THEN: | ||||
|             - Only encrypted PNG thumbnails are converted | ||||
|         """ | ||||
|         map_mock.side_effect = self.pretend_map | ||||
|         run_convert_mock.side_effect = self.pretend_convert_output | ||||
|  | ||||
|         with override_settings( | ||||
|             THUMBNAIL_DIR=self.thumbnail_dir, | ||||
|         ): | ||||
|             self.create_png_thumbnail_files(self.thumbnail_dir, 3) | ||||
|             self.create_encrypted_png_thumbnail_files( | ||||
|                 self.thumbnail_dir, | ||||
|                 3, | ||||
|                 start_count=3, | ||||
|             ) | ||||
|             self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=6) | ||||
|             self.create_encrypted_webp_thumbnail_files( | ||||
|                 self.thumbnail_dir, | ||||
|                 3, | ||||
|                 start_count=8, | ||||
|             ) | ||||
|  | ||||
|             self.performMigration() | ||||
|  | ||||
|             run_convert_mock.assert_called() | ||||
|             self.assertEqual(run_convert_mock.call_count, 3) | ||||
|  | ||||
|             self.assert_png_file_count(self.thumbnail_dir, 3) | ||||
|             self.assert_encrypted_webp_file_count(self.thumbnail_dir, 6) | ||||
|             self.assert_webp_file_count(self.thumbnail_dir, 2) | ||||
|             self.assert_encrypted_png_file_count(self.thumbnail_dir, 0) | ||||
		Reference in New Issue
	
	Block a user
	 shamoon
					shamoon