mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Refactor permissions API endpoints, UI group permissions
This commit is contained in:
@@ -28,7 +28,7 @@ from .models import UiSettings
|
||||
from .models import PaperlessTask
|
||||
from .parsers import is_mime_type_supported
|
||||
|
||||
from guardian.models import UserObjectPermission
|
||||
from guardian.models import GroupObjectPermission
|
||||
from guardian.shortcuts import assign_perm
|
||||
from guardian.shortcuts import remove_perm
|
||||
from guardian.shortcuts import get_users_with_perms
|
||||
@@ -36,6 +36,8 @@ from guardian.shortcuts import get_users_with_perms
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import Permission
|
||||
|
||||
|
||||
# https://www.django-rest-framework.org/api-guide/serializers/#example
|
||||
@@ -83,14 +85,46 @@ class MatchingModelSerializer(serializers.ModelSerializer):
|
||||
return match
|
||||
|
||||
|
||||
def get_groups_with_only_permission(obj, codename):
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
permission = Permission.objects.get(content_type=ctype, codename=codename)
|
||||
group_object_perm_group_ids = (
|
||||
GroupObjectPermission.objects.filter(
|
||||
object_pk=obj.pk,
|
||||
content_type=ctype,
|
||||
)
|
||||
.filter(permission=permission)
|
||||
.values_list("group_id")
|
||||
)
|
||||
return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
|
||||
|
||||
|
||||
class OwnedObjectSerializer(serializers.ModelSerializer):
|
||||
def get_permissions(self, obj):
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
user_object_perms = UserObjectPermission.objects.filter(
|
||||
object_pk=obj.pk,
|
||||
content_type=content_type,
|
||||
).values_list("user", "permission__codename")
|
||||
return list(user_object_perms)
|
||||
view_codename = f"view_{obj.__class__.__name__.lower()}"
|
||||
change_codename = f"change_{obj.__class__.__name__.lower()}"
|
||||
return {
|
||||
"view": {
|
||||
"users": get_users_with_perms(
|
||||
obj,
|
||||
only_with_perms_in=[view_codename],
|
||||
).values_list("id", flat=True),
|
||||
"groups": get_groups_with_only_permission(
|
||||
obj,
|
||||
codename=view_codename,
|
||||
).values_list("id", flat=True),
|
||||
},
|
||||
"change": {
|
||||
"users": get_users_with_perms(
|
||||
obj,
|
||||
only_with_perms_in=[change_codename],
|
||||
).values_list("id", flat=True),
|
||||
"groups": get_groups_with_only_permission(
|
||||
obj,
|
||||
codename=change_codename,
|
||||
).values_list("id", flat=True),
|
||||
},
|
||||
}
|
||||
|
||||
permissions = SerializerMethodField(read_only=True)
|
||||
|
||||
@@ -111,19 +145,34 @@ class OwnedObjectSerializer(serializers.ModelSerializer):
|
||||
)
|
||||
return users
|
||||
|
||||
def _validate_group_ids(self, group_ids):
|
||||
groups = Group.objects.none()
|
||||
if group_ids is not None:
|
||||
groups = Group.objects.filter(id__in=group_ids)
|
||||
if not groups.count() == len(group_ids):
|
||||
raise serializers.ValidationError(
|
||||
"Some groups in don't exist or were specified twice.",
|
||||
)
|
||||
return groups
|
||||
|
||||
def validate_set_permissions(self, set_permissions):
|
||||
user_dict = {
|
||||
"view": User.objects.none(),
|
||||
"change": User.objects.none(),
|
||||
permissions_dict = {
|
||||
"view": {
|
||||
"users": User.objects.none(),
|
||||
"groups": Group.objects.none(),
|
||||
},
|
||||
"change": {
|
||||
"users": User.objects.none(),
|
||||
"groups": Group.objects.none(),
|
||||
},
|
||||
}
|
||||
if set_permissions is not None:
|
||||
if "view" in set_permissions:
|
||||
view_list = set_permissions["view"]
|
||||
user_dict["view"] = self._validate_user_ids(view_list)
|
||||
if "change" in set_permissions:
|
||||
change_list = set_permissions["change"]
|
||||
user_dict["change"] = self._validate_user_ids(change_list)
|
||||
return user_dict
|
||||
for action in permissions_dict:
|
||||
users = set_permissions[action]["users"]
|
||||
permissions_dict[action]["users"] = self._validate_user_ids(users)
|
||||
groups = set_permissions[action]["groups"]
|
||||
permissions_dict[action]["groups"] = self._validate_group_ids(groups)
|
||||
return permissions_dict
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.user = kwargs.pop("user", None)
|
||||
@@ -132,7 +181,8 @@ class OwnedObjectSerializer(serializers.ModelSerializer):
|
||||
def _set_permissions(self, permissions, object):
|
||||
for action in permissions:
|
||||
permission = f"{action}_{object.__class__.__name__.lower()}"
|
||||
users_to_add = permissions[action]
|
||||
# users
|
||||
users_to_add = permissions[action]["users"]
|
||||
users_to_remove = get_users_with_perms(
|
||||
object,
|
||||
only_with_perms_in=[permission],
|
||||
@@ -148,6 +198,23 @@ class OwnedObjectSerializer(serializers.ModelSerializer):
|
||||
user,
|
||||
object,
|
||||
)
|
||||
# groups
|
||||
groups_to_add = permissions[action]["groups"]
|
||||
groups_to_remove = get_groups_with_only_permission(
|
||||
object,
|
||||
permission,
|
||||
).difference(groups_to_add)
|
||||
for group in groups_to_remove:
|
||||
remove_perm(permission, group, object)
|
||||
for group in groups_to_add:
|
||||
assign_perm(permission, group, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
group,
|
||||
object,
|
||||
)
|
||||
|
||||
def create(self, validated_data):
|
||||
if self.user and (
|
||||
|
@@ -158,7 +158,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
|
||||
response = self.client.get("/api/documents/?fields=", format="json")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
results = response.data["results"]
|
||||
self.assertEqual(results_full, results)
|
||||
self.assertEqual(len(results_full[0]), len(results[0]))
|
||||
|
||||
response = self.client.get("/api/documents/?fields=dgfhs", format="json")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
Reference in New Issue
Block a user