Change workflow permissions assignment to merge (#5496)

This commit is contained in:
shamoon 2024-01-22 16:34:16 -08:00 committed by GitHub
parent 00eba3b223
commit 5881f05dbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 118 additions and 14 deletions

View File

@ -57,16 +57,29 @@ def get_groups_with_only_permission(obj, codename):
return Group.objects.filter(id__in=group_object_perm_group_ids).distinct() return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
def set_permissions_for_object(permissions, object): def set_permissions_for_object(permissions: list[str], object, merge: bool = False):
"""
Set permissions for an object. The permissions are given as a list of strings
in the format "action_modelname", e.g. "view_document".
If merge is True, the permissions are merged with the existing permissions and
no users or groups are removed. If False, the permissions are set to exactly
the given list of users and groups.
"""
for action in permissions: for action in permissions:
permission = f"{action}_{object.__class__.__name__.lower()}" permission = f"{action}_{object.__class__.__name__.lower()}"
# users # users
users_to_add = User.objects.filter(id__in=permissions[action]["users"]) users_to_add = User.objects.filter(id__in=permissions[action]["users"])
users_to_remove = get_users_with_perms( users_to_remove = (
get_users_with_perms(
object, object,
only_with_perms_in=[permission], only_with_perms_in=[permission],
with_group_users=False, with_group_users=False,
) )
if not merge
else User.objects.none()
)
if len(users_to_add) > 0 and len(users_to_remove) > 0: if len(users_to_add) > 0 and len(users_to_remove) > 0:
users_to_remove = users_to_remove.exclude(id__in=users_to_add) users_to_remove = users_to_remove.exclude(id__in=users_to_add)
if len(users_to_remove) > 0: if len(users_to_remove) > 0:
@ -84,10 +97,14 @@ def set_permissions_for_object(permissions, object):
) )
# groups # groups
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"]) groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
groups_to_remove = get_groups_with_only_permission( groups_to_remove = (
get_groups_with_only_permission(
object, object,
permission, permission,
) )
if not merge
else Group.objects.none()
)
if len(groups_to_add) > 0 and len(groups_to_remove) > 0: if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add) groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
if len(groups_to_remove) > 0: if len(groups_to_remove) > 0:

View File

@ -593,10 +593,22 @@ def run_workflow(
) )
if ( if (
(
action.assign_view_users is not None action.assign_view_users is not None
or action.assign_view_groups is not None and action.assign_view_users.count() > 0
or action.assign_change_users is not None )
or action.assign_change_groups is not None or (
action.assign_view_groups is not None
and action.assign_view_groups.count() > 0
)
or (
action.assign_change_users is not None
and action.assign_change_users.count() > 0
)
or (
action.assign_change_groups is not None
and action.assign_change_groups.count() > 0
)
): ):
permissions = { permissions = {
"view": { "view": {
@ -614,7 +626,11 @@ def run_workflow(
or [], or [],
}, },
} }
set_permissions_for_object(permissions=permissions, object=document) set_permissions_for_object(
permissions=permissions,
object=document,
merge=True,
)
if action.assign_custom_fields is not None: if action.assign_custom_fields is not None:
for field in action.assign_custom_fields.all(): for field in action.assign_custom_fields.all():

View File

@ -4,7 +4,11 @@ from unittest import mock
from django.contrib.auth.models import Group from django.contrib.auth.models import Group
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.db.models import QuerySet
from django.utils import timezone from django.utils import timezone
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_groups_with_perms
from guardian.shortcuts import get_users_with_perms
from rest_framework.test import APITestCase from rest_framework.test import APITestCase
from documents import tasks from documents import tasks
@ -50,6 +54,7 @@ class TestWorkflows(DirectoriesMixin, FileSystemAssertsMixin, APITestCase):
self.user2 = User.objects.create(username="user2") self.user2 = User.objects.create(username="user2")
self.user3 = User.objects.create(username="user3") self.user3 = User.objects.create(username="user3")
self.group1 = Group.objects.create(name="group1") self.group1 = Group.objects.create(name="group1")
self.group2 = Group.objects.create(name="group2")
account1 = MailAccount.objects.create( account1 = MailAccount.objects.create(
name="Email1", name="Email1",
@ -1082,7 +1087,73 @@ class TestWorkflows(DirectoriesMixin, FileSystemAssertsMixin, APITestCase):
format="json", format="json",
) )
self.assertEqual(doc.custom_fields.all().count(), 1) def test_document_updated_workflow_merge_permissions(self):
"""
GIVEN:
- Existing workflow with UPDATED trigger and action that sets permissions
WHEN:
- Document is updated that already has permissions
THEN:
- Permissions are merged
"""
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
filter_has_document_type=self.dt,
)
action = WorkflowAction.objects.create()
action.assign_view_users.add(self.user3)
action.assign_change_users.add(self.user3)
action.assign_view_groups.add(self.group2)
action.save()
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
correspondent=self.c,
original_filename="sample.pdf",
)
assign_perm("documents.view_document", self.user2, doc)
assign_perm("documents.change_document", self.user2, doc)
assign_perm("documents.view_document", self.group1, doc)
assign_perm("documents.change_document", self.group1, doc)
superuser = User.objects.create_superuser("superuser")
self.client.force_authenticate(user=superuser)
self.client.patch(
f"/api/documents/{doc.id}/",
{"document_type": self.dt.id},
format="json",
)
view_users_perms: QuerySet = get_users_with_perms(
doc,
only_with_perms_in=["view_document"],
)
change_users_perms: QuerySet = get_users_with_perms(
doc,
only_with_perms_in=["change_document"],
)
# user2 should still have permissions
self.assertIn(self.user2, view_users_perms)
self.assertIn(self.user2, change_users_perms)
# user3 should have been added
self.assertIn(self.user3, view_users_perms)
self.assertIn(self.user3, change_users_perms)
group_perms: QuerySet = get_groups_with_perms(doc)
# group1 should still have permissions
self.assertIn(self.group1, group_perms)
# group2 should have been added
self.assertIn(self.group2, group_perms)
def test_workflow_enabled_disabled(self): def test_workflow_enabled_disabled(self):
trigger = WorkflowTrigger.objects.create( trigger = WorkflowTrigger.objects.create(