mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Fix: enforce permissions on bulk_edit operations
This commit is contained in:
		| @@ -3453,6 +3453,110 @@ class TestBulkEdit(DirectoriesMixin, APITestCase): | ||||
|         self.assertCountEqual(args[0], [self.doc2.id, self.doc3.id]) | ||||
|         self.assertEqual(len(kwargs["set_permissions"]["view"]["users"]), 2) | ||||
|  | ||||
|     @mock.patch("documents.serialisers.bulk_edit.set_permissions") | ||||
|     def test_insufficient_permissions_ownership(self, m): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - Documents owned by user other than logged in user | ||||
|         WHEN: | ||||
|             - set_permissions bulk edit API endpoint is called | ||||
|         THEN: | ||||
|             - User is not able to change permissions | ||||
|         """ | ||||
|         m.return_value = "OK" | ||||
|         self.doc1.owner = User.objects.get(username="temp_admin") | ||||
|         self.doc1.save() | ||||
|         user1 = User.objects.create(username="user1") | ||||
|         self.client.force_authenticate(user=user1) | ||||
|  | ||||
|         permissions = { | ||||
|             "owner": user1.id, | ||||
|         } | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id, self.doc2.id, self.doc3.id], | ||||
|                     "method": "set_permissions", | ||||
|                     "parameters": {"set_permissions": permissions}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) | ||||
|  | ||||
|         m.assert_not_called() | ||||
|         self.assertEqual(response.content, b"Insufficient permissions") | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc2.id, self.doc3.id], | ||||
|                     "method": "set_permissions", | ||||
|                     "parameters": {"set_permissions": permissions}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, status.HTTP_200_OK) | ||||
|         m.assert_called_once() | ||||
|  | ||||
|     @mock.patch("documents.serialisers.bulk_edit.set_storage_path") | ||||
|     def test_insufficient_permissions_edit(self, m): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - Documents for which current user only has view permissions | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - set_storage_path is only called if user can edit all docs | ||||
|         """ | ||||
|         m.return_value = "OK" | ||||
|         self.doc1.owner = User.objects.get(username="temp_admin") | ||||
|         self.doc1.save() | ||||
|         user1 = User.objects.create(username="user1") | ||||
|         assign_perm("view_document", user1, self.doc1) | ||||
|         self.client.force_authenticate(user=user1) | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id, self.doc2.id, self.doc3.id], | ||||
|                     "method": "set_storage_path", | ||||
|                     "parameters": {"storage_path": self.sp1.id}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) | ||||
|  | ||||
|         m.assert_not_called() | ||||
|         self.assertEqual(response.content, b"Insufficient permissions") | ||||
|  | ||||
|         assign_perm("change_document", user1, self.doc1) | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id, self.doc2.id, self.doc3.id], | ||||
|                     "method": "set_storage_path", | ||||
|                     "parameters": {"storage_path": self.sp1.id}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, status.HTTP_200_OK) | ||||
|  | ||||
|         m.assert_called_once() | ||||
|  | ||||
|  | ||||
| class TestBulkDownload(DirectoriesMixin, APITestCase): | ||||
|     ENDPOINT = "/api/documents/bulk_download/" | ||||
|   | ||||
| @@ -54,6 +54,7 @@ from rest_framework.viewsets import ModelViewSet | ||||
| from rest_framework.viewsets import ReadOnlyModelViewSet | ||||
| from rest_framework.viewsets import ViewSet | ||||
|  | ||||
| from documents import bulk_edit | ||||
| from documents.filters import ObjectOwnedOrGrantedPermissionsFilter | ||||
| from documents.permissions import PaperlessAdminPermissions | ||||
| from documents.permissions import PaperlessObjectPermissions | ||||
| @@ -694,7 +695,7 @@ class SavedViewViewSet(ModelViewSet, PassUserMixin): | ||||
|         serializer.save(owner=self.request.user) | ||||
|  | ||||
|  | ||||
| class BulkEditView(GenericAPIView): | ||||
| class BulkEditView(GenericAPIView, PassUserMixin): | ||||
|     permission_classes = (IsAuthenticated,) | ||||
|     serializer_class = BulkEditSerializer | ||||
|     parser_classes = (parsers.JSONParser,) | ||||
| @@ -703,10 +704,25 @@ class BulkEditView(GenericAPIView): | ||||
|         serializer = self.get_serializer(data=request.data) | ||||
|         serializer.is_valid(raise_exception=True) | ||||
|  | ||||
|         user = self.request.user | ||||
|         method = serializer.validated_data.get("method") | ||||
|         parameters = serializer.validated_data.get("parameters") | ||||
|         documents = serializer.validated_data.get("documents") | ||||
|  | ||||
|         if not user.is_superuser: | ||||
|             document_objs = Document.objects.filter(pk__in=documents) | ||||
|             has_perms = ( | ||||
|                 all((doc.owner == user or doc.owner is None) for doc in document_objs) | ||||
|                 if method == bulk_edit.set_permissions | ||||
|                 else all( | ||||
|                     has_perms_owner_aware(user, "change_document", doc) | ||||
|                     for doc in document_objs | ||||
|                 ) | ||||
|             ) | ||||
|  | ||||
|             if not has_perms: | ||||
|                 return HttpResponseForbidden("Insufficient permissions") | ||||
|  | ||||
|         try: | ||||
|             # TODO: parameter validation | ||||
|             result = method(documents, **parameters) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 shamoon
					shamoon