mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	Compare commits
	
		
			7 Commits
		
	
	
		
			550e74e559
			...
			fix-strip-
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					d6cfd87cc0 | ||
| 
						 | 
					7a287e7479 | ||
| 
						 | 
					43b4f36026 | ||
| 
						 | 
					76a81adcb5 | ||
| 
						 | 
					6b868a5ecb | ||
| 
						 | 
					3e4aa87cc5 | ||
| 
						 | 
					fc95d42b35 | 
@@ -2,9 +2,11 @@
 | 
			
		||||
 | 
			
		||||
If you feel like contributing to the project, please do! Bug fixes and improvements are always welcome.
 | 
			
		||||
 | 
			
		||||
⚠️ Please note: Pull requests that implement a new feature or enhancement _should almost always target an existing feature request_ with evidence of community interest and discussion. This is in order to balance the work of implementing and maintaining new features / enhancements. Pull requests that are opened without meeting this requirement may not be merged.
 | 
			
		||||
 | 
			
		||||
If you want to implement something big:
 | 
			
		||||
 | 
			
		||||
- Please start a discussion about that in the issues! Maybe something similar is already in development and we can make it happen together.
 | 
			
		||||
- As above, please start with a discussion! Maybe something similar is already in development and we can make it happen together.
 | 
			
		||||
- When making additions to the project, consider if the majority of users will benefit from your change. If not, you're probably better of forking the project.
 | 
			
		||||
- Also consider if your change will get in the way of other users. A good change is a change that enhances the experience of some users who want that change and does not affect users who do not care about the change.
 | 
			
		||||
- Please see the [paperless-ngx merge process](#merging-prs) below.
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,6 @@
 | 
			
		||||
import json
 | 
			
		||||
from fractions import Fraction
 | 
			
		||||
from io import BytesIO
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
from django.contrib.auth.models import User
 | 
			
		||||
@@ -6,6 +8,11 @@ from django.core.files.uploadedfile import SimpleUploadedFile
 | 
			
		||||
from rest_framework import status
 | 
			
		||||
from rest_framework.test import APITestCase
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    from PIL import Image
 | 
			
		||||
except ModuleNotFoundError:  # pragma: no cover - Pillow is required in production
 | 
			
		||||
    Image = None  # type: ignore[assignment]
 | 
			
		||||
 | 
			
		||||
from documents.tests.utils import DirectoriesMixin
 | 
			
		||||
from paperless.models import ApplicationConfiguration
 | 
			
		||||
from paperless.models import ColorConvertChoices
 | 
			
		||||
@@ -190,6 +197,74 @@ class TestApiAppConfig(DirectoriesMixin, APITestCase):
 | 
			
		||||
        )
 | 
			
		||||
        self.assertFalse(Path(old_logo.path).exists())
 | 
			
		||||
 | 
			
		||||
    def test_api_strips_metadata_from_logo_upload(self):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - An image file containing EXIF metadata including GPS coordinates
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - Uploaded via PATCH to app config
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Stored logo no longer contains EXIF metadata
 | 
			
		||||
        """
 | 
			
		||||
        if Image is None:
 | 
			
		||||
            self.skipTest("Pillow is not installed")
 | 
			
		||||
 | 
			
		||||
        if not hasattr(Image, "Exif"):
 | 
			
		||||
            self.skipTest("Current Pillow version cannot create EXIF metadata")
 | 
			
		||||
 | 
			
		||||
        assert Image is not None
 | 
			
		||||
 | 
			
		||||
        exif = Image.Exif()
 | 
			
		||||
        exif[0x010E] = "Test description"  # ImageDescription
 | 
			
		||||
        exif[0x8825] = {
 | 
			
		||||
            1: "N",  # GPSLatitudeRef
 | 
			
		||||
            2: (Fraction(51, 1), Fraction(30, 1), Fraction(0, 1)),
 | 
			
		||||
            3: "E",  # GPSLongitudeRef
 | 
			
		||||
            4: (Fraction(0, 1), Fraction(7, 1), Fraction(0, 1)),
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        buffer = BytesIO()
 | 
			
		||||
        Image.new("RGB", (8, 8), "white").save(buffer, format="JPEG", exif=exif)
 | 
			
		||||
        buffer.seek(0)
 | 
			
		||||
 | 
			
		||||
        with Image.open(BytesIO(buffer.getvalue())) as uploaded_image:
 | 
			
		||||
            self.assertGreater(len(uploaded_image.getexif()), 0)
 | 
			
		||||
 | 
			
		||||
        response = self.client.patch(
 | 
			
		||||
            f"{self.ENDPOINT}1/",
 | 
			
		||||
            {
 | 
			
		||||
                "app_logo": SimpleUploadedFile(
 | 
			
		||||
                    name="with_exif.jpg",
 | 
			
		||||
                    content=buffer.getvalue(),
 | 
			
		||||
                    content_type="image/jpeg",
 | 
			
		||||
                ),
 | 
			
		||||
            },
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_200_OK)
 | 
			
		||||
 | 
			
		||||
        config = ApplicationConfiguration.objects.first()
 | 
			
		||||
        stored_logo = Path(config.app_logo.path)
 | 
			
		||||
        self.assertTrue(stored_logo.exists())
 | 
			
		||||
 | 
			
		||||
        with Image.open(stored_logo) as sanitized:
 | 
			
		||||
            sanitized_exif = sanitized.getexif()
 | 
			
		||||
            self.assertNotEqual(sanitized_exif.get(0x010E), "Test description")
 | 
			
		||||
 | 
			
		||||
            gps_ifd = None
 | 
			
		||||
            if hasattr(sanitized_exif, "get_ifd"):
 | 
			
		||||
                try:
 | 
			
		||||
                    gps_ifd = sanitized_exif.get_ifd(0x8825)
 | 
			
		||||
                except KeyError:
 | 
			
		||||
                    gps_ifd = None
 | 
			
		||||
            else:
 | 
			
		||||
                gps_ifd = sanitized_exif.get(0x8825)
 | 
			
		||||
 | 
			
		||||
            if gps_ifd is not None:
 | 
			
		||||
                self.assertEqual(len(gps_ifd), 0, "GPS metadata should be cleared")
 | 
			
		||||
 | 
			
		||||
            self.assertNotIn("exif", sanitized.info)
 | 
			
		||||
 | 
			
		||||
    def test_api_rejects_malicious_svg_logo(self):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,5 @@
 | 
			
		||||
import logging
 | 
			
		||||
from io import BytesIO
 | 
			
		||||
 | 
			
		||||
import magic
 | 
			
		||||
from allauth.mfa.adapter import get_adapter as get_mfa_adapter
 | 
			
		||||
@@ -9,6 +10,10 @@ from allauth.socialaccount.models import SocialApp
 | 
			
		||||
from django.contrib.auth.models import Group
 | 
			
		||||
from django.contrib.auth.models import Permission
 | 
			
		||||
from django.contrib.auth.models import User
 | 
			
		||||
from django.core.files.uploadedfile import SimpleUploadedFile
 | 
			
		||||
from PIL import Image
 | 
			
		||||
from PIL import ImageOps
 | 
			
		||||
from PIL import UnidentifiedImageError
 | 
			
		||||
from rest_framework import serializers
 | 
			
		||||
from rest_framework.authtoken.serializers import AuthTokenSerializer
 | 
			
		||||
 | 
			
		||||
@@ -19,6 +24,102 @@ from paperless_mail.serialisers import ObfuscatedPasswordField
 | 
			
		||||
logger = logging.getLogger("paperless.settings")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def strip_image_metadata(uploaded_file, mime_type: str | None):
 | 
			
		||||
    """Return a copy of ``uploaded_file`` with EXIF/ICC metadata removed."""
 | 
			
		||||
 | 
			
		||||
    if uploaded_file is None:
 | 
			
		||||
        return uploaded_file
 | 
			
		||||
 | 
			
		||||
    original_position = uploaded_file.tell() if hasattr(uploaded_file, "tell") else None
 | 
			
		||||
    image = None
 | 
			
		||||
 | 
			
		||||
    sanitized = None
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        if hasattr(uploaded_file, "seek"):
 | 
			
		||||
            uploaded_file.seek(0)
 | 
			
		||||
        image = Image.open(uploaded_file)
 | 
			
		||||
        image.load()
 | 
			
		||||
    except (UnidentifiedImageError, OSError):
 | 
			
		||||
        if hasattr(uploaded_file, "seek") and original_position is not None:
 | 
			
		||||
            uploaded_file.seek(original_position)
 | 
			
		||||
        return uploaded_file
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        image_format = (image.format or "").upper()
 | 
			
		||||
        image = ImageOps.exif_transpose(image)
 | 
			
		||||
 | 
			
		||||
        if image_format not in {"JPEG", "JPG", "PNG"}:
 | 
			
		||||
            if hasattr(uploaded_file, "seek") and original_position is not None:
 | 
			
		||||
                uploaded_file.seek(original_position)
 | 
			
		||||
            return uploaded_file
 | 
			
		||||
 | 
			
		||||
        if hasattr(image, "info"):
 | 
			
		||||
            image.info.pop("exif", None)
 | 
			
		||||
            image.info.pop("icc_profile", None)
 | 
			
		||||
            image.info.pop("comment", None)
 | 
			
		||||
 | 
			
		||||
        if image_format in {"JPEG", "JPG"}:
 | 
			
		||||
            sanitized = image.convert("RGB")
 | 
			
		||||
            save_kwargs = {
 | 
			
		||||
                "format": "JPEG",
 | 
			
		||||
                "quality": 95,
 | 
			
		||||
                "subsampling": 0,
 | 
			
		||||
                "optimize": True,
 | 
			
		||||
                "exif": b"",
 | 
			
		||||
            }
 | 
			
		||||
        else:  # PNG
 | 
			
		||||
            target_mode = (
 | 
			
		||||
                "RGBA"
 | 
			
		||||
                if ("A" in image.mode or image.info.get("transparency"))
 | 
			
		||||
                else "RGB"
 | 
			
		||||
            )
 | 
			
		||||
            sanitized = image.convert(target_mode)
 | 
			
		||||
            save_kwargs = {
 | 
			
		||||
                "format": "PNG",
 | 
			
		||||
                "optimize": True,
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
        buffer = BytesIO()
 | 
			
		||||
        try:
 | 
			
		||||
            sanitized.save(buffer, **save_kwargs)
 | 
			
		||||
        except (OSError, ValueError):
 | 
			
		||||
            buffer = BytesIO()
 | 
			
		||||
            if image_format in {"JPEG", "JPG"}:
 | 
			
		||||
                sanitized.save(
 | 
			
		||||
                    buffer,
 | 
			
		||||
                    format="JPEG",
 | 
			
		||||
                    quality=90,
 | 
			
		||||
                    subsampling=0,
 | 
			
		||||
                    exif=b"",
 | 
			
		||||
                )
 | 
			
		||||
            else:
 | 
			
		||||
                sanitized.save(
 | 
			
		||||
                    buffer,
 | 
			
		||||
                    format="PNG",
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        buffer.seek(0)
 | 
			
		||||
 | 
			
		||||
        if hasattr(uploaded_file, "close"):
 | 
			
		||||
            try:
 | 
			
		||||
                uploaded_file.close()
 | 
			
		||||
            except Exception:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
        content_type = getattr(uploaded_file, "content_type", None) or mime_type
 | 
			
		||||
        return SimpleUploadedFile(
 | 
			
		||||
            name=getattr(uploaded_file, "name", "logo"),
 | 
			
		||||
            content=buffer.getvalue(),
 | 
			
		||||
            content_type=content_type,
 | 
			
		||||
        )
 | 
			
		||||
    finally:
 | 
			
		||||
        if sanitized is not None:
 | 
			
		||||
            sanitized.close()
 | 
			
		||||
        if image is not None:
 | 
			
		||||
            image.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PaperlessAuthTokenSerializer(AuthTokenSerializer):
 | 
			
		||||
    code = serializers.CharField(
 | 
			
		||||
        label="MFA Code",
 | 
			
		||||
@@ -209,9 +310,22 @@ class ApplicationConfigurationSerializer(serializers.ModelSerializer):
 | 
			
		||||
        return super().update(instance, validated_data)
 | 
			
		||||
 | 
			
		||||
    def validate_app_logo(self, file):
 | 
			
		||||
        if file and magic.from_buffer(file.read(2048), mime=True) == "image/svg+xml":
 | 
			
		||||
        if not file:
 | 
			
		||||
            return file
 | 
			
		||||
 | 
			
		||||
        if hasattr(file, "seek"):
 | 
			
		||||
            file.seek(0)
 | 
			
		||||
        mime_type = magic.from_buffer(file.read(2048), mime=True)
 | 
			
		||||
        if hasattr(file, "seek"):
 | 
			
		||||
            file.seek(0)
 | 
			
		||||
 | 
			
		||||
        if mime_type == "image/svg+xml":
 | 
			
		||||
            reject_dangerous_svg(file)
 | 
			
		||||
        return file
 | 
			
		||||
            if hasattr(file, "seek"):
 | 
			
		||||
                file.seek(0)
 | 
			
		||||
            return file
 | 
			
		||||
 | 
			
		||||
        return strip_image_metadata(file, mime_type)
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
        model = ApplicationConfiguration
 | 
			
		||||
 
 | 
			
		||||
@@ -922,7 +922,7 @@ CELERY_ACCEPT_CONTENT = ["application/json", "application/x-python-serialize"]
 | 
			
		||||
CELERY_BEAT_SCHEDULE = _parse_beat_schedule()
 | 
			
		||||
 | 
			
		||||
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#beat-schedule-filename
 | 
			
		||||
CELERY_BEAT_SCHEDULE_FILENAME = DATA_DIR / "celerybeat-schedule.db"
 | 
			
		||||
CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Cachalot: Database read cache.
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user