diff --git a/src/documents/permissions.py b/src/documents/permissions.py index d4114e488..0bfff031c 100644 --- a/src/documents/permissions.py +++ b/src/documents/permissions.py @@ -9,6 +9,7 @@ from guardian.shortcuts import get_users_with_perms from guardian.shortcuts import remove_perm from rest_framework.permissions import BasePermission from rest_framework.permissions import DjangoObjectPermissions +from guardian.core import ObjectPermissionChecker class PaperlessObjectPermissions(DjangoObjectPermissions): @@ -114,3 +115,8 @@ def get_objects_for_user_owner_aware(user, perms, Model): accept_global_perms=False, ) return objects_owned | objects_unowned | objects_with_perms + + +def has_perms_owner_aware(user, perms, obj): + checker = ObjectPermissionChecker(user) + return obj.owner is None or obj.owner == user or checker.has_perm(perms, obj) diff --git a/src/documents/tests/test_api.py b/src/documents/tests/test_api.py index 6cd6b610a..ba5c58f3f 100644 --- a/src/documents/tests/test_api.py +++ b/src/documents/tests/test_api.py @@ -206,6 +206,67 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase): self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.content, content_thumbnail) + def test_document_actions_with_perms(self): + """ + GIVEN: + - Document with owner and without granted permissions + - User is then granted permissions + WHEN: + - User tries to load preview, thumbnail + THEN: + - Initially, HTTP 403 Forbidden + - With permissions, HTTP 200 OK + """ + _, filename = tempfile.mkstemp(dir=self.dirs.originals_dir) + + content = b"This is a test" + content_thumbnail = b"thumbnail content" + + with open(filename, "wb") as f: + f.write(content) + + user1 = User.objects.create_user(username="test1") + user2 = User.objects.create_user(username="test2") + user1.user_permissions.add(*Permission.objects.filter(codename="view_document")) + user2.user_permissions.add(*Permission.objects.filter(codename="view_document")) + + self.client.force_authenticate(user2) + + doc = Document.objects.create( + title="none", + filename=os.path.basename(filename), + mime_type="application/pdf", + owner=user1, + ) + + with open( + os.path.join(self.dirs.thumbnail_dir, f"{doc.pk:07d}.webp"), + "wb", + ) as f: + f.write(content_thumbnail) + + response = self.client.get(f"/api/documents/{doc.pk}/download/") + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + response = self.client.get(f"/api/documents/{doc.pk}/preview/") + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + response = self.client.get(f"/api/documents/{doc.pk}/thumb/") + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + from guardian.shortcuts import assign_perm + + assign_perm("view_document", user2, doc) + + response = self.client.get(f"/api/documents/{doc.pk}/download/") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = self.client.get(f"/api/documents/{doc.pk}/preview/") + self.assertEqual(response.status_code, status.HTTP_200_OK) + + response = self.client.get(f"/api/documents/{doc.pk}/thumb/") + self.assertEqual(response.status_code, status.HTTP_200_OK) + @override_settings(FILENAME_FORMAT="") def test_download_with_archive(self): diff --git a/src/documents/views.py b/src/documents/views.py index d44016834..8fe3f2a55 100644 --- a/src/documents/views.py +++ b/src/documents/views.py @@ -23,7 +23,7 @@ from django.db.models import Sum from django.db.models import When from django.db.models.functions import Length from django.db.models.functions import Lower -from django.http import Http404 +from django.http import Http404, HttpResponseForbidden from django.http import HttpResponse from django.http import HttpResponseBadRequest from django.shortcuts import get_object_or_404 @@ -33,7 +33,7 @@ from django.views.decorators.cache import cache_control from django.views.generic import TemplateView from django_filters.rest_framework import DjangoFilterBackend from documents.filters import ObjectOwnedOrGrantedPermissionsFilter -from documents.permissions import PaperlessAdminPermissions +from documents.permissions import PaperlessAdminPermissions, has_perms_owner_aware from documents.permissions import PaperlessObjectPermissions from documents.tasks import consume_file from langdetect import detect @@ -59,7 +59,6 @@ from rest_framework.viewsets import GenericViewSet from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ReadOnlyModelViewSet from rest_framework.viewsets import ViewSet - from .bulk_download import ArchiveOnlyStrategy from .bulk_download import OriginalAndArchiveStrategy from .bulk_download import OriginalsOnlyStrategy @@ -295,6 +294,12 @@ class DocumentViewSet( def file_response(self, pk, request, disposition): doc = Document.objects.get(id=pk) + if request.user is not None and not has_perms_owner_aware( + request.user, + "view_document", + doc, + ): + return HttpResponseForbidden("Insufficient permissions") if not self.original_requested(request) and doc.has_archive_version: file_handle = doc.archive_file filename = doc.get_public_filename(archive=True) @@ -354,6 +359,12 @@ class DocumentViewSet( def metadata(self, request, pk=None): try: doc = Document.objects.get(pk=pk) + if request.user is not None and not has_perms_owner_aware( + request.user, + "view_document", + doc, + ): + return HttpResponseForbidden("Insufficient permissions") except Document.DoesNotExist: raise Http404 @@ -391,6 +402,12 @@ class DocumentViewSet( @action(methods=["get"], detail=True) def suggestions(self, request, pk=None): doc = get_object_or_404(Document, pk=pk) + if request.user is not None and not has_perms_owner_aware( + request.user, + "view_document", + doc, + ): + return HttpResponseForbidden("Insufficient permissions") classifier = load_classifier() @@ -430,6 +447,12 @@ class DocumentViewSet( def thumb(self, request, pk=None): try: doc = Document.objects.get(id=pk) + if request.user is not None and not has_perms_owner_aware( + request.user, + "view_document", + doc, + ): + return HttpResponseForbidden("Insufficient permissions") if doc.storage_type == Document.STORAGE_TYPE_GPG: handle = GnuPG.decrypted(doc.thumbnail_file) else: @@ -468,6 +491,12 @@ class DocumentViewSet( def notes(self, request, pk=None): try: doc = Document.objects.get(pk=pk) + if request.user is not None and not has_perms_owner_aware( + request.user, + "view_document", + doc, + ): + return HttpResponseForbidden("Insufficient permissions") except Document.DoesNotExist: raise Http404