mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
Merge pull request #4007 from paperless-ngx/fix/issue-4003
Fix: enforce permissions on bulk_edit operations
This commit is contained in:
commit
ce36c2d0ea
@ -3453,6 +3453,110 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
|
||||
self.assertCountEqual(args[0], [self.doc2.id, self.doc3.id])
|
||||
self.assertEqual(len(kwargs["set_permissions"]["view"]["users"]), 2)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_permissions")
|
||||
def test_insufficient_permissions_ownership(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Documents owned by user other than logged in user
|
||||
WHEN:
|
||||
- set_permissions bulk edit API endpoint is called
|
||||
THEN:
|
||||
- User is not able to change permissions
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
self.doc1.owner = User.objects.get(username="temp_admin")
|
||||
self.doc1.save()
|
||||
user1 = User.objects.create(username="user1")
|
||||
self.client.force_authenticate(user=user1)
|
||||
|
||||
permissions = {
|
||||
"owner": user1.id,
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id, self.doc2.id, self.doc3.id],
|
||||
"method": "set_permissions",
|
||||
"parameters": {"set_permissions": permissions},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
m.assert_not_called()
|
||||
self.assertEqual(response.content, b"Insufficient permissions")
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc2.id, self.doc3.id],
|
||||
"method": "set_permissions",
|
||||
"parameters": {"set_permissions": permissions},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
m.assert_called_once()
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
|
||||
def test_insufficient_permissions_edit(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Documents for which current user only has view permissions
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is only called if user can edit all docs
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
self.doc1.owner = User.objects.get(username="temp_admin")
|
||||
self.doc1.save()
|
||||
user1 = User.objects.create(username="user1")
|
||||
assign_perm("view_document", user1, self.doc1)
|
||||
self.client.force_authenticate(user=user1)
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id, self.doc2.id, self.doc3.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": self.sp1.id},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
m.assert_not_called()
|
||||
self.assertEqual(response.content, b"Insufficient permissions")
|
||||
|
||||
assign_perm("change_document", user1, self.doc1)
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id, self.doc2.id, self.doc3.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": self.sp1.id},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
m.assert_called_once()
|
||||
|
||||
|
||||
class TestBulkDownload(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/documents/bulk_download/"
|
||||
|
@ -54,6 +54,7 @@ from rest_framework.viewsets import ModelViewSet
|
||||
from rest_framework.viewsets import ReadOnlyModelViewSet
|
||||
from rest_framework.viewsets import ViewSet
|
||||
|
||||
from documents import bulk_edit
|
||||
from documents.filters import ObjectOwnedOrGrantedPermissionsFilter
|
||||
from documents.permissions import PaperlessAdminPermissions
|
||||
from documents.permissions import PaperlessObjectPermissions
|
||||
@ -694,7 +695,7 @@ class SavedViewViewSet(ModelViewSet, PassUserMixin):
|
||||
serializer.save(owner=self.request.user)
|
||||
|
||||
|
||||
class BulkEditView(GenericAPIView):
|
||||
class BulkEditView(GenericAPIView, PassUserMixin):
|
||||
permission_classes = (IsAuthenticated,)
|
||||
serializer_class = BulkEditSerializer
|
||||
parser_classes = (parsers.JSONParser,)
|
||||
@ -703,10 +704,25 @@ class BulkEditView(GenericAPIView):
|
||||
serializer = self.get_serializer(data=request.data)
|
||||
serializer.is_valid(raise_exception=True)
|
||||
|
||||
user = self.request.user
|
||||
method = serializer.validated_data.get("method")
|
||||
parameters = serializer.validated_data.get("parameters")
|
||||
documents = serializer.validated_data.get("documents")
|
||||
|
||||
if not user.is_superuser:
|
||||
document_objs = Document.objects.filter(pk__in=documents)
|
||||
has_perms = (
|
||||
all((doc.owner == user or doc.owner is None) for doc in document_objs)
|
||||
if method == bulk_edit.set_permissions
|
||||
else all(
|
||||
has_perms_owner_aware(user, "change_document", doc)
|
||||
for doc in document_objs
|
||||
)
|
||||
)
|
||||
|
||||
if not has_perms:
|
||||
return HttpResponseForbidden("Insufficient permissions")
|
||||
|
||||
try:
|
||||
# TODO: parameter validation
|
||||
result = method(documents, **parameters)
|
||||
|
Loading…
x
Reference in New Issue
Block a user