mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
Merge pull request #3222 from paperless-ngx/fix/advanced-queries-perms-fixes
Fix: Respect superuser for advanced queries, test coverage for object perms
This commit is contained in:
commit
fe85aff052
@ -225,15 +225,19 @@ class DelayedQuery:
|
||||
|
||||
user_criterias = [query.Term("has_owner", False)]
|
||||
if "user" in self.query_params:
|
||||
user_criterias.append(query.Term("owner_id", self.query_params["user"]))
|
||||
user_criterias.append(
|
||||
query.Term("viewer_id", str(self.query_params["user"])),
|
||||
)
|
||||
if self.query_params["is_superuser"]: # superusers see all docs
|
||||
user_criterias = []
|
||||
else:
|
||||
user_criterias.append(query.Term("owner_id", self.query_params["user"]))
|
||||
user_criterias.append(
|
||||
query.Term("viewer_id", str(self.query_params["user"])),
|
||||
)
|
||||
if len(criterias) > 0:
|
||||
criterias.append(query.Or(user_criterias))
|
||||
if len(user_criterias) > 0:
|
||||
criterias.append(query.Or(user_criterias))
|
||||
return query.And(criterias)
|
||||
else:
|
||||
return query.Or(user_criterias)
|
||||
return query.Or(user_criterias) if len(user_criterias) > 0 else None
|
||||
|
||||
def _get_query_sortedby(self):
|
||||
if "ordering" not in self.query_params:
|
||||
|
@ -27,6 +27,7 @@ from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import override_settings
|
||||
from django.utils import timezone
|
||||
from guardian.shortcuts import assign_perm
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
from whoosh.writing import AsyncWriter
|
||||
@ -253,8 +254,6 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
response = self.client.get(f"/api/documents/{doc.pk}/thumb/")
|
||||
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
|
||||
|
||||
from guardian.shortcuts import assign_perm
|
||||
|
||||
assign_perm("view_document", user2, doc)
|
||||
|
||||
response = self.client.get(f"/api/documents/{doc.pk}/download/")
|
||||
@ -1064,6 +1063,92 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
),
|
||||
)
|
||||
|
||||
def test_search_filtering_respect_owner(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Documents with owners set & without
|
||||
WHEN:
|
||||
- API reuqest for advanced query (search) is made by non-superuser
|
||||
- API reuqest for advanced query (search) is made by superuser
|
||||
THEN:
|
||||
- Only owned docs are returned for regular users
|
||||
- All docs are returned for superuser
|
||||
"""
|
||||
superuser = User.objects.create_superuser("superuser")
|
||||
u1 = User.objects.create_user("user1")
|
||||
u2 = User.objects.create_user("user2")
|
||||
u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
|
||||
u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
|
||||
|
||||
Document.objects.create(checksum="1", content="test 1", owner=u1)
|
||||
Document.objects.create(checksum="2", content="test 2", owner=u2)
|
||||
Document.objects.create(checksum="3", content="test 3", owner=u2)
|
||||
Document.objects.create(checksum="4", content="test 4")
|
||||
|
||||
with AsyncWriter(index.open_index()) as writer:
|
||||
for doc in Document.objects.all():
|
||||
index.update_document(writer, doc)
|
||||
|
||||
self.client.force_authenticate(user=u1)
|
||||
r = self.client.get("/api/documents/?query=test")
|
||||
self.assertEqual(r.data["count"], 2)
|
||||
r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
|
||||
self.assertEqual(r.data["count"], 2)
|
||||
|
||||
self.client.force_authenticate(user=u2)
|
||||
r = self.client.get("/api/documents/?query=test")
|
||||
self.assertEqual(r.data["count"], 3)
|
||||
r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
|
||||
self.assertEqual(r.data["count"], 3)
|
||||
|
||||
self.client.force_authenticate(user=superuser)
|
||||
r = self.client.get("/api/documents/?query=test")
|
||||
self.assertEqual(r.data["count"], 4)
|
||||
r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
|
||||
self.assertEqual(r.data["count"], 4)
|
||||
|
||||
def test_search_filtering_with_object_perms(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Documents with granted view permissions to others
|
||||
WHEN:
|
||||
- API reuqest for advanced query (search) is made by user
|
||||
THEN:
|
||||
- Only docs with granted view permissions are returned
|
||||
"""
|
||||
u1 = User.objects.create_user("user1")
|
||||
u2 = User.objects.create_user("user2")
|
||||
u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
|
||||
u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
|
||||
|
||||
Document.objects.create(checksum="1", content="test 1", owner=u1)
|
||||
d2 = Document.objects.create(checksum="2", content="test 2", owner=u2)
|
||||
d3 = Document.objects.create(checksum="3", content="test 3", owner=u2)
|
||||
Document.objects.create(checksum="4", content="test 4")
|
||||
|
||||
with AsyncWriter(index.open_index()) as writer:
|
||||
for doc in Document.objects.all():
|
||||
index.update_document(writer, doc)
|
||||
|
||||
self.client.force_authenticate(user=u1)
|
||||
r = self.client.get("/api/documents/?query=test")
|
||||
self.assertEqual(r.data["count"], 2)
|
||||
r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
|
||||
self.assertEqual(r.data["count"], 2)
|
||||
|
||||
assign_perm("view_document", u1, d2)
|
||||
assign_perm("view_document", u1, d3)
|
||||
|
||||
with AsyncWriter(index.open_index()) as writer:
|
||||
for doc in [d2, d3]:
|
||||
index.update_document(writer, doc)
|
||||
|
||||
self.client.force_authenticate(user=u1)
|
||||
r = self.client.get("/api/documents/?query=test")
|
||||
self.assertEqual(r.data["count"], 4)
|
||||
r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
|
||||
self.assertEqual(r.data["count"], 4)
|
||||
|
||||
def test_search_sorting(self):
|
||||
c1 = Correspondent.objects.create(name="corres Ax")
|
||||
c2 = Correspondent.objects.create(name="corres Cx")
|
||||
|
@ -604,6 +604,9 @@ class UnifiedSearchViewSet(DocumentViewSet):
|
||||
# pass user to query for perms
|
||||
self.request.query_params._mutable = True
|
||||
self.request.query_params["user"] = self.request.user.id
|
||||
self.request.query_params[
|
||||
"is_superuser"
|
||||
] = self.request.user.is_superuser
|
||||
self.request.query_params._mutable = False
|
||||
|
||||
if "query" in self.request.query_params:
|
||||
|
Loading…
x
Reference in New Issue
Block a user