mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Bulk edit permissions
This commit is contained in:
@@ -5,6 +5,7 @@ from documents.models import Correspondent
|
||||
from documents.models import Document
|
||||
from documents.models import DocumentType
|
||||
from documents.models import StoragePath
|
||||
from documents.permissions import set_permissions_for_object
|
||||
from documents.tasks import bulk_update_documents
|
||||
from documents.tasks import update_document_archive_file
|
||||
|
||||
@@ -128,3 +129,15 @@ def redo_ocr(doc_ids):
|
||||
)
|
||||
|
||||
return "OK"
|
||||
|
||||
|
||||
def set_permissions(doc_ids, permissions):
|
||||
|
||||
qs = Document.objects.filter(id__in=doc_ids)
|
||||
for doc in qs:
|
||||
set_permissions_for_object(permissions, doc)
|
||||
affected_docs = [doc.id for doc in qs]
|
||||
|
||||
bulk_update_documents.delay(document_ids=affected_docs)
|
||||
|
||||
return "OK"
|
||||
|
@@ -1,3 +1,11 @@
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from guardian.models import GroupObjectPermission
|
||||
from guardian.shortcuts import assign_perm
|
||||
from guardian.shortcuts import get_users_with_perms
|
||||
from guardian.shortcuts import remove_perm
|
||||
from rest_framework.permissions import BasePermission
|
||||
from rest_framework.permissions import DjangoObjectPermissions
|
||||
|
||||
@@ -31,3 +39,65 @@ class PaperlessObjectPermissions(DjangoObjectPermissions):
|
||||
class PaperlessAdminPermissions(BasePermission):
|
||||
def has_permission(self, request, view):
|
||||
return request.user.has_perm("admin.view_logentry")
|
||||
|
||||
|
||||
def get_groups_with_only_permission(obj, codename):
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
permission = Permission.objects.get(content_type=ctype, codename=codename)
|
||||
group_object_perm_group_ids = (
|
||||
GroupObjectPermission.objects.filter(
|
||||
object_pk=obj.pk,
|
||||
content_type=ctype,
|
||||
)
|
||||
.filter(permission=permission)
|
||||
.values_list("group_id")
|
||||
)
|
||||
return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
|
||||
|
||||
|
||||
def set_permissions_for_object(permissions, object):
|
||||
print(permissions, object)
|
||||
for action in permissions:
|
||||
permission = f"{action}_{object.__class__.__name__.lower()}"
|
||||
# users
|
||||
users_to_add = User.objects.filter(id__in=permissions[action]["users"])
|
||||
users_to_remove = get_users_with_perms(
|
||||
object,
|
||||
only_with_perms_in=[permission],
|
||||
)
|
||||
if len(users_to_add) > 0 and len(users_to_remove) > 0:
|
||||
users_to_remove = users_to_remove.difference(users_to_add)
|
||||
if len(users_to_remove) > 0:
|
||||
for user in users_to_remove:
|
||||
remove_perm(permission, user, object)
|
||||
if len(users_to_add) > 0:
|
||||
for user in users_to_add:
|
||||
assign_perm(permission, user, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
user,
|
||||
object,
|
||||
)
|
||||
# groups
|
||||
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
|
||||
groups_to_remove = get_groups_with_only_permission(
|
||||
object,
|
||||
permission,
|
||||
)
|
||||
if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
|
||||
groups_to_remove = groups_to_remove.difference(groups_to_add)
|
||||
if len(groups_to_remove) > 0:
|
||||
for group in groups_to_remove:
|
||||
remove_perm(permission, group, object)
|
||||
if len(groups_to_add) > 0:
|
||||
for group in groups_to_add:
|
||||
assign_perm(permission, group, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
group,
|
||||
object,
|
||||
)
|
||||
|
@@ -28,16 +28,13 @@ from .models import UiSettings
|
||||
from .models import PaperlessTask
|
||||
from .parsers import is_mime_type_supported
|
||||
|
||||
from guardian.models import GroupObjectPermission
|
||||
from guardian.shortcuts import assign_perm
|
||||
from guardian.shortcuts import remove_perm
|
||||
from guardian.shortcuts import get_users_with_perms
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import Permission
|
||||
|
||||
from documents.permissions import get_groups_with_only_permission
|
||||
from documents.permissions import set_permissions_for_object
|
||||
|
||||
|
||||
# https://www.django-rest-framework.org/api-guide/serializers/#example
|
||||
@@ -85,56 +82,7 @@ class MatchingModelSerializer(serializers.ModelSerializer):
|
||||
return match
|
||||
|
||||
|
||||
def get_groups_with_only_permission(obj, codename):
|
||||
ctype = ContentType.objects.get_for_model(obj)
|
||||
permission = Permission.objects.get(content_type=ctype, codename=codename)
|
||||
group_object_perm_group_ids = (
|
||||
GroupObjectPermission.objects.filter(
|
||||
object_pk=obj.pk,
|
||||
content_type=ctype,
|
||||
)
|
||||
.filter(permission=permission)
|
||||
.values_list("group_id")
|
||||
)
|
||||
return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
|
||||
|
||||
|
||||
class OwnedObjectSerializer(serializers.ModelSerializer):
|
||||
def get_permissions(self, obj):
|
||||
view_codename = f"view_{obj.__class__.__name__.lower()}"
|
||||
change_codename = f"change_{obj.__class__.__name__.lower()}"
|
||||
return {
|
||||
"view": {
|
||||
"users": get_users_with_perms(
|
||||
obj,
|
||||
only_with_perms_in=[view_codename],
|
||||
).values_list("id", flat=True),
|
||||
"groups": get_groups_with_only_permission(
|
||||
obj,
|
||||
codename=view_codename,
|
||||
).values_list("id", flat=True),
|
||||
},
|
||||
"change": {
|
||||
"users": get_users_with_perms(
|
||||
obj,
|
||||
only_with_perms_in=[change_codename],
|
||||
).values_list("id", flat=True),
|
||||
"groups": get_groups_with_only_permission(
|
||||
obj,
|
||||
codename=change_codename,
|
||||
).values_list("id", flat=True),
|
||||
},
|
||||
}
|
||||
|
||||
permissions = SerializerMethodField(read_only=True)
|
||||
|
||||
set_permissions = serializers.DictField(
|
||||
label="Set permissions",
|
||||
allow_empty=True,
|
||||
required=False,
|
||||
write_only=True,
|
||||
)
|
||||
|
||||
class SetPermissionsMixin:
|
||||
def _validate_user_ids(self, user_ids):
|
||||
users = User.objects.none()
|
||||
if user_ids is not None:
|
||||
@@ -169,52 +117,55 @@ class OwnedObjectSerializer(serializers.ModelSerializer):
|
||||
if set_permissions is not None:
|
||||
for action in permissions_dict:
|
||||
users = set_permissions[action]["users"]
|
||||
permissions_dict[action]["users"] = self._validate_user_ids(users)
|
||||
self._validate_user_ids(users)
|
||||
groups = set_permissions[action]["groups"]
|
||||
permissions_dict[action]["groups"] = self._validate_group_ids(groups)
|
||||
self._validate_group_ids(groups)
|
||||
return permissions_dict
|
||||
|
||||
def _set_permissions(self, permissions, object):
|
||||
set_permissions_for_object(permissions, object)
|
||||
|
||||
|
||||
class OwnedObjectSerializer(serializers.ModelSerializer, SetPermissionsMixin):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.user = kwargs.pop("user", None)
|
||||
return super().__init__(*args, **kwargs)
|
||||
|
||||
def _set_permissions(self, permissions, object):
|
||||
for action in permissions:
|
||||
permission = f"{action}_{object.__class__.__name__.lower()}"
|
||||
# users
|
||||
users_to_add = permissions[action]["users"]
|
||||
users_to_remove = get_users_with_perms(
|
||||
object,
|
||||
only_with_perms_in=[permission],
|
||||
).difference(users_to_add)
|
||||
for user in users_to_remove:
|
||||
remove_perm(permission, user, object)
|
||||
for user in users_to_add:
|
||||
assign_perm(permission, user, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
user,
|
||||
object,
|
||||
)
|
||||
# groups
|
||||
groups_to_add = permissions[action]["groups"]
|
||||
groups_to_remove = get_groups_with_only_permission(
|
||||
object,
|
||||
permission,
|
||||
).difference(groups_to_add)
|
||||
for group in groups_to_remove:
|
||||
remove_perm(permission, group, object)
|
||||
for group in groups_to_add:
|
||||
assign_perm(permission, group, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
group,
|
||||
object,
|
||||
)
|
||||
def get_permissions(self, obj):
|
||||
view_codename = f"view_{obj.__class__.__name__.lower()}"
|
||||
change_codename = f"change_{obj.__class__.__name__.lower()}"
|
||||
return {
|
||||
"view": {
|
||||
"users": get_users_with_perms(
|
||||
obj,
|
||||
only_with_perms_in=[view_codename],
|
||||
).values_list("id", flat=True),
|
||||
"groups": get_groups_with_only_permission(
|
||||
obj,
|
||||
codename=view_codename,
|
||||
).values_list("id", flat=True),
|
||||
},
|
||||
"change": {
|
||||
"users": get_users_with_perms(
|
||||
obj,
|
||||
only_with_perms_in=[change_codename],
|
||||
).values_list("id", flat=True),
|
||||
"groups": get_groups_with_only_permission(
|
||||
obj,
|
||||
codename=change_codename,
|
||||
).values_list("id", flat=True),
|
||||
},
|
||||
}
|
||||
|
||||
permissions = SerializerMethodField(read_only=True)
|
||||
|
||||
set_permissions = serializers.DictField(
|
||||
label="Set permissions",
|
||||
allow_empty=True,
|
||||
required=False,
|
||||
write_only=True,
|
||||
)
|
||||
# other methods in mixin
|
||||
|
||||
def create(self, validated_data):
|
||||
if self.user and (
|
||||
@@ -515,7 +466,7 @@ class DocumentListSerializer(serializers.Serializer):
|
||||
return documents
|
||||
|
||||
|
||||
class BulkEditSerializer(DocumentListSerializer):
|
||||
class BulkEditSerializer(DocumentListSerializer, SetPermissionsMixin):
|
||||
|
||||
method = serializers.ChoiceField(
|
||||
choices=[
|
||||
@@ -527,6 +478,7 @@ class BulkEditSerializer(DocumentListSerializer):
|
||||
"modify_tags",
|
||||
"delete",
|
||||
"redo_ocr",
|
||||
"set_permissions",
|
||||
],
|
||||
label="Method",
|
||||
write_only=True,
|
||||
@@ -562,6 +514,8 @@ class BulkEditSerializer(DocumentListSerializer):
|
||||
return bulk_edit.delete
|
||||
elif method == "redo_ocr":
|
||||
return bulk_edit.redo_ocr
|
||||
elif method == "set_permissions":
|
||||
return bulk_edit.set_permissions
|
||||
else:
|
||||
raise serializers.ValidationError("Unsupported method.")
|
||||
|
||||
@@ -625,6 +579,12 @@ class BulkEditSerializer(DocumentListSerializer):
|
||||
else:
|
||||
raise serializers.ValidationError("remove_tags not specified")
|
||||
|
||||
def _validate_parameters_set_permissions(self, parameters):
|
||||
if "permissions" in parameters:
|
||||
self.validate_set_permissions(parameters["permissions"])
|
||||
else:
|
||||
raise serializers.ValidationError("permissions not specified")
|
||||
|
||||
def validate(self, attrs):
|
||||
|
||||
method = attrs["method"]
|
||||
@@ -640,6 +600,8 @@ class BulkEditSerializer(DocumentListSerializer):
|
||||
self._validate_parameters_modify_tags(parameters)
|
||||
elif method == bulk_edit.set_storage_path:
|
||||
self._validate_storage_path(parameters)
|
||||
elif method == bulk_edit.set_permissions:
|
||||
self._validate_parameters_set_permissions(parameters)
|
||||
|
||||
return attrs
|
||||
|
||||
|
@@ -41,6 +41,8 @@ from paperless import version
|
||||
from rest_framework.test import APITestCase
|
||||
from whoosh.writing import AsyncWriter
|
||||
|
||||
from guardian.shortcuts import get_users_with_perms
|
||||
|
||||
|
||||
class TestDocumentApi(DirectoriesMixin, APITestCase):
|
||||
def setUp(self):
|
||||
@@ -2329,6 +2331,31 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
|
||||
],
|
||||
)
|
||||
|
||||
def test_set_permissions(self):
|
||||
user1 = User.objects.create(username="user1")
|
||||
user2 = User.objects.create(username="user2")
|
||||
permissions = {
|
||||
"view": {
|
||||
"users": User.objects.filter(id__in=[user1.id, user2.id]),
|
||||
"groups": Group.objects.none(),
|
||||
},
|
||||
"change": {
|
||||
"users": User.objects.filter(id__in=[user1.id]),
|
||||
"groups": Group.objects.none(),
|
||||
},
|
||||
}
|
||||
|
||||
bulk_edit.set_permissions(
|
||||
[self.doc2.id, self.doc3.id],
|
||||
permissions=permissions,
|
||||
)
|
||||
|
||||
self.assertEqual(get_users_with_perms(self.doc2).count(), 2)
|
||||
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
|
||||
|
||||
|
||||
class TestBulkDownload(DirectoriesMixin, APITestCase):
|
||||
def setUp(self):
|
||||
|
Reference in New Issue
Block a user