API object permissions retrieval, grant and revoke

This commit is contained in:
Michael Shamoon 2022-12-06 01:45:33 -08:00
parent 2554ced198
commit 18e0012a59
3 changed files with 182 additions and 11 deletions

View File

@ -1,4 +1,5 @@
from django.contrib import admin
from guardian.admin import GuardedModelAdmin
from .models import Correspondent
from .models import Document
@ -9,28 +10,28 @@ from .models import StoragePath
from .models import Tag
class CorrespondentAdmin(admin.ModelAdmin):
class CorrespondentAdmin(GuardedModelAdmin):
list_display = ("name", "match", "matching_algorithm")
list_filter = ("matching_algorithm",)
list_editable = ("match", "matching_algorithm")
class TagAdmin(admin.ModelAdmin):
class TagAdmin(GuardedModelAdmin):
list_display = ("name", "color", "match", "matching_algorithm")
list_filter = ("color", "matching_algorithm")
list_editable = ("color", "match", "matching_algorithm")
class DocumentTypeAdmin(admin.ModelAdmin):
class DocumentTypeAdmin(GuardedModelAdmin):
list_display = ("name", "match", "matching_algorithm")
list_filter = ("matching_algorithm",)
list_editable = ("match", "matching_algorithm")
class DocumentAdmin(admin.ModelAdmin):
class DocumentAdmin(GuardedModelAdmin):
search_fields = ("correspondent__name", "title", "content", "tags__name")
readonly_fields = (
@ -95,7 +96,7 @@ class RuleInline(admin.TabularInline):
model = SavedViewFilterRule
class SavedViewAdmin(admin.ModelAdmin):
class SavedViewAdmin(GuardedModelAdmin):
list_display = ("name", "owner")
@ -106,7 +107,7 @@ class StoragePathInline(admin.TabularInline):
model = StoragePath
class StoragePathAdmin(admin.ModelAdmin):
class StoragePathAdmin(GuardedModelAdmin):
list_display = ("name", "path", "match", "matching_algorithm")
list_filter = ("path", "matching_algorithm")
list_editable = ("path", "match", "matching_algorithm")

View File

@ -28,6 +28,14 @@ from .models import UiSettings
from .models import PaperlessTask
from .parsers import is_mime_type_supported
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
from guardian.shortcuts import remove_perm
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.models import User
# https://www.django-rest-framework.org/api-guide/serializers/#example
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
@ -75,14 +83,140 @@ class MatchingModelSerializer(serializers.ModelSerializer):
class OwnedObjectSerializer(serializers.ModelSerializer):
def get_permissions(self, obj):
content_type = ContentType.objects.get_for_model(obj)
user_object_perms = UserObjectPermission.objects.filter(
object_pk=obj.pk,
content_type=content_type,
).values("user", "permission__codename")
return user_object_perms
permissions = SerializerMethodField()
grant_permissions = serializers.DictField(
label="Grant permissions",
allow_empty=True,
required=False,
write_only=True,
)
def _validate_user_ids(self, user_ids):
users = User.objects.filter(id__in=user_ids)
if not users.count() == len(users):
raise serializers.ValidationError(
"Some users in don't exist or were specified twice.",
)
return users
def validate_grant_permissions(self, grant_permissions):
user_dict = {
"view": User.objects.none(),
"change": User.objects.none(),
}
if grant_permissions is not None:
if "view" in grant_permissions:
view_list = grant_permissions["view"]
user_dict["view"] = self._validate_user_ids(view_list)
if "change" in grant_permissions:
change_list = grant_permissions["change"]
user_dict["change"] = self._validate_user_ids(change_list)
return user_dict
revoke_permissions = serializers.DictField(
label="Revoke permissions",
allow_empty=True,
required=False,
write_only=True,
)
def validate_revoke_permissions(self, revoke_permissions):
user_dict = {
"view": User.objects.none(),
"change": User.objects.none(),
}
if revoke_permissions is not None:
if "view" in revoke_permissions:
view_list = revoke_permissions["view"]
user_dict["view"] = self._validate_user_ids(view_list)
if "change" in revoke_permissions:
change_list = revoke_permissions["change"]
user_dict["change"] = self._validate_user_ids(change_list)
return user_dict
def __init__(self, *args, **kwargs):
self.user = kwargs.pop("user", None)
return super().__init__(*args, **kwargs)
def _adjust_permissions(self, users, object, type="view", grant=True):
if grant:
for user in users:
assign_perm(
f"{type}_{object.__class__.__name__.lower()}",
user,
object,
)
else:
for user in users:
remove_perm(
f"{type}_{object.__class__.__name__.lower()}",
user,
object,
)
def create(self, validated_data):
if self.user and validated_data["owner"] is None:
validated_data["owner"] = self.user
return super().create(validated_data)
instance = super().create(validated_data)
if "grant_permissions" in validated_data:
self._adjust_permissions(
validated_data["grant_permissions"]["view"],
instance,
)
self._adjust_permissions(
validated_data["grant_permissions"]["change"],
instance,
"change",
)
if "revoke_permissions" in validated_data:
self._adjust_permissions(
validated_data["revoke_permissions"]["view"],
instance,
"view",
False,
)
self._adjust_permissions(
validated_data["revoke_permissions"]["change"],
instance,
"change",
False,
)
return instance
def update(self, instance, validated_data):
if "grant_permissions" in validated_data:
self._adjust_permissions(
validated_data["grant_permissions"]["view"],
instance,
)
self._adjust_permissions(
validated_data["grant_permissions"]["change"],
instance,
"change",
)
if "revoke_permissions" in validated_data:
self._adjust_permissions(
validated_data["revoke_permissions"]["view"],
instance,
"view",
False,
)
self._adjust_permissions(
validated_data["revoke_permissions"]["change"],
instance,
"change",
False,
)
return super().update(instance, validated_data)
class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
@ -101,6 +235,9 @@ class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
"document_count",
"last_correspondence",
"owner",
"permissions",
"grant_permissions",
"revoke_permissions",
)
@ -116,6 +253,9 @@ class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
"is_insensitive",
"document_count",
"owner",
"permissions",
"grant_permissions",
"revoke_permissions",
)
@ -167,6 +307,9 @@ class TagSerializerVersion1(MatchingModelSerializer):
"is_inbox_tag",
"document_count",
"owner",
"permissions",
"grant_permissions",
"revoke_permissions",
)
@ -280,6 +423,9 @@ class DocumentSerializer(DynamicFieldsModelSerializer, OwnedObjectSerializer):
"original_file_name",
"archived_file_name",
"owner",
"permissions",
"grant_permissions",
"revoke_permissions",
)
@ -305,6 +451,9 @@ class SavedViewSerializer(OwnedObjectSerializer):
"sort_reverse",
"filter_rules",
"owner",
"permissions",
"grant_permissions",
"revoke_permissions",
]
def update(self, instance, validated_data):
@ -591,6 +740,9 @@ class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
"is_insensitive",
"document_count",
"owner",
"permissions",
"grant_permissions",
"revoke_permissions",
)
def validate_path(self, path):

View File

@ -2549,8 +2549,6 @@ class TestApiAuth(DirectoriesMixin, APITestCase):
self.assertEqual(self.client.get("/api/documents/").status_code, 403)
self.assertEqual(self.client.get(f"/api/documents/{d.id}/").status_code, 403)
self.assertEqual(self.client.get("/api/tags/").status_code, 403)
self.assertEqual(self.client.get("/api/correspondents/").status_code, 403)
self.assertEqual(self.client.get("/api/document_types/").status_code, 403)
@ -2567,8 +2565,6 @@ class TestApiAuth(DirectoriesMixin, APITestCase):
self.assertEqual(self.client.get("/api/documents/").status_code, 200)
self.assertEqual(self.client.get(f"/api/documents/{d.id}/").status_code, 200)
self.assertEqual(self.client.get("/api/tags/").status_code, 200)
self.assertEqual(self.client.get("/api/correspondents/").status_code, 200)
self.assertEqual(self.client.get("/api/document_types/").status_code, 200)
@ -2576,6 +2572,28 @@ class TestApiAuth(DirectoriesMixin, APITestCase):
self.assertEqual(self.client.get("/api/logs/").status_code, 200)
self.assertEqual(self.client.get("/api/saved_views/").status_code, 200)
def test_object_permissions(self):
user1 = User.objects.create_user(username="test1")
user2 = User.objects.create_user(username="test2")
user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
self.client.force_authenticate(user1)
self.assertEqual(self.client.get("/api/documents/").status_code, 200)
d = Document.objects.create(title="Test", content="the content 1", checksum="1")
# no owner
self.assertEqual(self.client.get(f"/api/documents/{d.id}/").status_code, 200)
d2 = Document.objects.create(
title="Test 2",
content="the content 2",
checksum="2",
owner=user2,
)
self.assertEqual(self.client.get(f"/api/documents/{d2.id}/").status_code, 404)
class TestApiRemoteVersion(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/remote_version/"