mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Compare commits
	
		
			7 Commits
		
	
	
		
			550e74e559
			...
			fix-strip-
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | d6cfd87cc0 | ||
|   | 7a287e7479 | ||
|   | 43b4f36026 | ||
|   | 76a81adcb5 | ||
|   | 6b868a5ecb | ||
|   | 3e4aa87cc5 | ||
|   | fc95d42b35 | 
| @@ -2,9 +2,11 @@ | ||||
|  | ||||
| If you feel like contributing to the project, please do! Bug fixes and improvements are always welcome. | ||||
|  | ||||
| ⚠️ Please note: Pull requests that implement a new feature or enhancement _should almost always target an existing feature request_ with evidence of community interest and discussion. This is in order to balance the work of implementing and maintaining new features / enhancements. Pull requests that are opened without meeting this requirement may not be merged. | ||||
|  | ||||
| If you want to implement something big: | ||||
|  | ||||
| - Please start a discussion about that in the issues! Maybe something similar is already in development and we can make it happen together. | ||||
| - As above, please start with a discussion! Maybe something similar is already in development and we can make it happen together. | ||||
| - When making additions to the project, consider if the majority of users will benefit from your change. If not, you're probably better of forking the project. | ||||
| - Also consider if your change will get in the way of other users. A good change is a change that enhances the experience of some users who want that change and does not affect users who do not care about the change. | ||||
| - Please see the [paperless-ngx merge process](#merging-prs) below. | ||||
|   | ||||
| @@ -1,4 +1,6 @@ | ||||
| import json | ||||
| from fractions import Fraction | ||||
| from io import BytesIO | ||||
| from pathlib import Path | ||||
|  | ||||
| from django.contrib.auth.models import User | ||||
| @@ -6,6 +8,11 @@ from django.core.files.uploadedfile import SimpleUploadedFile | ||||
| from rest_framework import status | ||||
| from rest_framework.test import APITestCase | ||||
|  | ||||
| try: | ||||
|     from PIL import Image | ||||
| except ModuleNotFoundError:  # pragma: no cover - Pillow is required in production | ||||
|     Image = None  # type: ignore[assignment] | ||||
|  | ||||
| from documents.tests.utils import DirectoriesMixin | ||||
| from paperless.models import ApplicationConfiguration | ||||
| from paperless.models import ColorConvertChoices | ||||
| @@ -190,6 +197,74 @@ class TestApiAppConfig(DirectoriesMixin, APITestCase): | ||||
|         ) | ||||
|         self.assertFalse(Path(old_logo.path).exists()) | ||||
|  | ||||
|     def test_api_strips_metadata_from_logo_upload(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - An image file containing EXIF metadata including GPS coordinates | ||||
|         WHEN: | ||||
|             - Uploaded via PATCH to app config | ||||
|         THEN: | ||||
|             - Stored logo no longer contains EXIF metadata | ||||
|         """ | ||||
|         if Image is None: | ||||
|             self.skipTest("Pillow is not installed") | ||||
|  | ||||
|         if not hasattr(Image, "Exif"): | ||||
|             self.skipTest("Current Pillow version cannot create EXIF metadata") | ||||
|  | ||||
|         assert Image is not None | ||||
|  | ||||
|         exif = Image.Exif() | ||||
|         exif[0x010E] = "Test description"  # ImageDescription | ||||
|         exif[0x8825] = { | ||||
|             1: "N",  # GPSLatitudeRef | ||||
|             2: (Fraction(51, 1), Fraction(30, 1), Fraction(0, 1)), | ||||
|             3: "E",  # GPSLongitudeRef | ||||
|             4: (Fraction(0, 1), Fraction(7, 1), Fraction(0, 1)), | ||||
|         } | ||||
|  | ||||
|         buffer = BytesIO() | ||||
|         Image.new("RGB", (8, 8), "white").save(buffer, format="JPEG", exif=exif) | ||||
|         buffer.seek(0) | ||||
|  | ||||
|         with Image.open(BytesIO(buffer.getvalue())) as uploaded_image: | ||||
|             self.assertGreater(len(uploaded_image.getexif()), 0) | ||||
|  | ||||
|         response = self.client.patch( | ||||
|             f"{self.ENDPOINT}1/", | ||||
|             { | ||||
|                 "app_logo": SimpleUploadedFile( | ||||
|                     name="with_exif.jpg", | ||||
|                     content=buffer.getvalue(), | ||||
|                     content_type="image/jpeg", | ||||
|                 ), | ||||
|             }, | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, status.HTTP_200_OK) | ||||
|  | ||||
|         config = ApplicationConfiguration.objects.first() | ||||
|         stored_logo = Path(config.app_logo.path) | ||||
|         self.assertTrue(stored_logo.exists()) | ||||
|  | ||||
|         with Image.open(stored_logo) as sanitized: | ||||
|             sanitized_exif = sanitized.getexif() | ||||
|             self.assertNotEqual(sanitized_exif.get(0x010E), "Test description") | ||||
|  | ||||
|             gps_ifd = None | ||||
|             if hasattr(sanitized_exif, "get_ifd"): | ||||
|                 try: | ||||
|                     gps_ifd = sanitized_exif.get_ifd(0x8825) | ||||
|                 except KeyError: | ||||
|                     gps_ifd = None | ||||
|             else: | ||||
|                 gps_ifd = sanitized_exif.get(0x8825) | ||||
|  | ||||
|             if gps_ifd is not None: | ||||
|                 self.assertEqual(len(gps_ifd), 0, "GPS metadata should be cleared") | ||||
|  | ||||
|             self.assertNotIn("exif", sanitized.info) | ||||
|  | ||||
|     def test_api_rejects_malicious_svg_logo(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|   | ||||
| @@ -1,4 +1,5 @@ | ||||
| import logging | ||||
| from io import BytesIO | ||||
|  | ||||
| import magic | ||||
| from allauth.mfa.adapter import get_adapter as get_mfa_adapter | ||||
| @@ -9,6 +10,10 @@ from allauth.socialaccount.models import SocialApp | ||||
| from django.contrib.auth.models import Group | ||||
| from django.contrib.auth.models import Permission | ||||
| from django.contrib.auth.models import User | ||||
| from django.core.files.uploadedfile import SimpleUploadedFile | ||||
| from PIL import Image | ||||
| from PIL import ImageOps | ||||
| from PIL import UnidentifiedImageError | ||||
| from rest_framework import serializers | ||||
| from rest_framework.authtoken.serializers import AuthTokenSerializer | ||||
|  | ||||
| @@ -19,6 +24,102 @@ from paperless_mail.serialisers import ObfuscatedPasswordField | ||||
| logger = logging.getLogger("paperless.settings") | ||||
|  | ||||
|  | ||||
| def strip_image_metadata(uploaded_file, mime_type: str | None): | ||||
|     """Return a copy of ``uploaded_file`` with EXIF/ICC metadata removed.""" | ||||
|  | ||||
|     if uploaded_file is None: | ||||
|         return uploaded_file | ||||
|  | ||||
|     original_position = uploaded_file.tell() if hasattr(uploaded_file, "tell") else None | ||||
|     image = None | ||||
|  | ||||
|     sanitized = None | ||||
|  | ||||
|     try: | ||||
|         if hasattr(uploaded_file, "seek"): | ||||
|             uploaded_file.seek(0) | ||||
|         image = Image.open(uploaded_file) | ||||
|         image.load() | ||||
|     except (UnidentifiedImageError, OSError): | ||||
|         if hasattr(uploaded_file, "seek") and original_position is not None: | ||||
|             uploaded_file.seek(original_position) | ||||
|         return uploaded_file | ||||
|  | ||||
|     try: | ||||
|         image_format = (image.format or "").upper() | ||||
|         image = ImageOps.exif_transpose(image) | ||||
|  | ||||
|         if image_format not in {"JPEG", "JPG", "PNG"}: | ||||
|             if hasattr(uploaded_file, "seek") and original_position is not None: | ||||
|                 uploaded_file.seek(original_position) | ||||
|             return uploaded_file | ||||
|  | ||||
|         if hasattr(image, "info"): | ||||
|             image.info.pop("exif", None) | ||||
|             image.info.pop("icc_profile", None) | ||||
|             image.info.pop("comment", None) | ||||
|  | ||||
|         if image_format in {"JPEG", "JPG"}: | ||||
|             sanitized = image.convert("RGB") | ||||
|             save_kwargs = { | ||||
|                 "format": "JPEG", | ||||
|                 "quality": 95, | ||||
|                 "subsampling": 0, | ||||
|                 "optimize": True, | ||||
|                 "exif": b"", | ||||
|             } | ||||
|         else:  # PNG | ||||
|             target_mode = ( | ||||
|                 "RGBA" | ||||
|                 if ("A" in image.mode or image.info.get("transparency")) | ||||
|                 else "RGB" | ||||
|             ) | ||||
|             sanitized = image.convert(target_mode) | ||||
|             save_kwargs = { | ||||
|                 "format": "PNG", | ||||
|                 "optimize": True, | ||||
|             } | ||||
|  | ||||
|         buffer = BytesIO() | ||||
|         try: | ||||
|             sanitized.save(buffer, **save_kwargs) | ||||
|         except (OSError, ValueError): | ||||
|             buffer = BytesIO() | ||||
|             if image_format in {"JPEG", "JPG"}: | ||||
|                 sanitized.save( | ||||
|                     buffer, | ||||
|                     format="JPEG", | ||||
|                     quality=90, | ||||
|                     subsampling=0, | ||||
|                     exif=b"", | ||||
|                 ) | ||||
|             else: | ||||
|                 sanitized.save( | ||||
|                     buffer, | ||||
|                     format="PNG", | ||||
|                 ) | ||||
|  | ||||
|         buffer.seek(0) | ||||
|  | ||||
|         if hasattr(uploaded_file, "close"): | ||||
|             try: | ||||
|                 uploaded_file.close() | ||||
|             except Exception: | ||||
|                 pass | ||||
|  | ||||
|         content_type = getattr(uploaded_file, "content_type", None) or mime_type | ||||
|         return SimpleUploadedFile( | ||||
|             name=getattr(uploaded_file, "name", "logo"), | ||||
|             content=buffer.getvalue(), | ||||
|             content_type=content_type, | ||||
|         ) | ||||
|     finally: | ||||
|         if sanitized is not None: | ||||
|             sanitized.close() | ||||
|         if image is not None: | ||||
|             image.close() | ||||
|  | ||||
|  | ||||
| class PaperlessAuthTokenSerializer(AuthTokenSerializer): | ||||
|     code = serializers.CharField( | ||||
|         label="MFA Code", | ||||
| @@ -209,10 +310,23 @@ class ApplicationConfigurationSerializer(serializers.ModelSerializer): | ||||
|         return super().update(instance, validated_data) | ||||
|  | ||||
|     def validate_app_logo(self, file): | ||||
|         if file and magic.from_buffer(file.read(2048), mime=True) == "image/svg+xml": | ||||
|             reject_dangerous_svg(file) | ||||
|         if not file: | ||||
|             return file | ||||
|  | ||||
|         if hasattr(file, "seek"): | ||||
|             file.seek(0) | ||||
|         mime_type = magic.from_buffer(file.read(2048), mime=True) | ||||
|         if hasattr(file, "seek"): | ||||
|             file.seek(0) | ||||
|  | ||||
|         if mime_type == "image/svg+xml": | ||||
|             reject_dangerous_svg(file) | ||||
|             if hasattr(file, "seek"): | ||||
|                 file.seek(0) | ||||
|             return file | ||||
|  | ||||
|         return strip_image_metadata(file, mime_type) | ||||
|  | ||||
|     class Meta: | ||||
|         model = ApplicationConfiguration | ||||
|         fields = "__all__" | ||||
|   | ||||
| @@ -922,7 +922,7 @@ CELERY_ACCEPT_CONTENT = ["application/json", "application/x-python-serialize"] | ||||
| CELERY_BEAT_SCHEDULE = _parse_beat_schedule() | ||||
|  | ||||
| # https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule-filename | ||||
| CELERY_BEAT_SCHEDULE_FILENAME = DATA_DIR / "celerybeat-schedule.db" | ||||
| CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db") | ||||
|  | ||||
|  | ||||
| # Cachalot: Database read cache. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user