From 5881f05dbc7d478a5cd3b1c7203eda3c33888e9d Mon Sep 17 00:00:00 2001 From: shamoon <4887959+shamoon@users.noreply.github.com> Date: Mon, 22 Jan 2024 16:34:16 -0800 Subject: [PATCH] Change workflow permissions assignment to merge (#5496) --- src/documents/permissions.py | 33 +++++++++--- src/documents/signals/handlers.py | 26 ++++++++-- src/documents/tests/test_workflows.py | 73 ++++++++++++++++++++++++++- 3 files changed, 118 insertions(+), 14 deletions(-) diff --git a/src/documents/permissions.py b/src/documents/permissions.py index d5712f670..bdd6fd555 100644 --- a/src/documents/permissions.py +++ b/src/documents/permissions.py @@ -57,15 +57,28 @@ def get_groups_with_only_permission(obj, codename): return Group.objects.filter(id__in=group_object_perm_group_ids).distinct() -def set_permissions_for_object(permissions, object): +def set_permissions_for_object(permissions: list[str], object, merge: bool = False): + """ + Set permissions for an object. The permissions are given as a list of strings + in the format "action_modelname", e.g. "view_document". + + If merge is True, the permissions are merged with the existing permissions and + no users or groups are removed. If False, the permissions are set to exactly + the given list of users and groups. + """ + for action in permissions: permission = f"{action}_{object.__class__.__name__.lower()}" # users users_to_add = User.objects.filter(id__in=permissions[action]["users"]) - users_to_remove = get_users_with_perms( - object, - only_with_perms_in=[permission], - with_group_users=False, + users_to_remove = ( + get_users_with_perms( + object, + only_with_perms_in=[permission], + with_group_users=False, + ) + if not merge + else User.objects.none() ) if len(users_to_add) > 0 and len(users_to_remove) > 0: users_to_remove = users_to_remove.exclude(id__in=users_to_add) @@ -84,9 +97,13 @@ def set_permissions_for_object(permissions, object): ) # groups groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"]) - groups_to_remove = get_groups_with_only_permission( - object, - permission, + groups_to_remove = ( + get_groups_with_only_permission( + object, + permission, + ) + if not merge + else Group.objects.none() ) if len(groups_to_add) > 0 and len(groups_to_remove) > 0: groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add) diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index 0315fa9b2..2b717e042 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -593,10 +593,22 @@ def run_workflow( ) if ( - action.assign_view_users is not None - or action.assign_view_groups is not None - or action.assign_change_users is not None - or action.assign_change_groups is not None + ( + action.assign_view_users is not None + and action.assign_view_users.count() > 0 + ) + or ( + action.assign_view_groups is not None + and action.assign_view_groups.count() > 0 + ) + or ( + action.assign_change_users is not None + and action.assign_change_users.count() > 0 + ) + or ( + action.assign_change_groups is not None + and action.assign_change_groups.count() > 0 + ) ): permissions = { "view": { @@ -614,7 +626,11 @@ def run_workflow( or [], }, } - set_permissions_for_object(permissions=permissions, object=document) + set_permissions_for_object( + permissions=permissions, + object=document, + merge=True, + ) if action.assign_custom_fields is not None: for field in action.assign_custom_fields.all(): diff --git a/src/documents/tests/test_workflows.py b/src/documents/tests/test_workflows.py index e92a00682..ba5c53a78 100644 --- a/src/documents/tests/test_workflows.py +++ b/src/documents/tests/test_workflows.py @@ -4,7 +4,11 @@ from unittest import mock from django.contrib.auth.models import Group from django.contrib.auth.models import User +from django.db.models import QuerySet from django.utils import timezone +from guardian.shortcuts import assign_perm +from guardian.shortcuts import get_groups_with_perms +from guardian.shortcuts import get_users_with_perms from rest_framework.test import APITestCase from documents import tasks @@ -50,6 +54,7 @@ class TestWorkflows(DirectoriesMixin, FileSystemAssertsMixin, APITestCase): self.user2 = User.objects.create(username="user2") self.user3 = User.objects.create(username="user3") self.group1 = Group.objects.create(name="group1") + self.group2 = Group.objects.create(name="group2") account1 = MailAccount.objects.create( name="Email1", @@ -1082,7 +1087,73 @@ class TestWorkflows(DirectoriesMixin, FileSystemAssertsMixin, APITestCase): format="json", ) - self.assertEqual(doc.custom_fields.all().count(), 1) + def test_document_updated_workflow_merge_permissions(self): + """ + GIVEN: + - Existing workflow with UPDATED trigger and action that sets permissions + WHEN: + - Document is updated that already has permissions + THEN: + - Permissions are merged + """ + trigger = WorkflowTrigger.objects.create( + type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, + filter_has_document_type=self.dt, + ) + action = WorkflowAction.objects.create() + action.assign_view_users.add(self.user3) + action.assign_change_users.add(self.user3) + action.assign_view_groups.add(self.group2) + action.save() + + w = Workflow.objects.create( + name="Workflow 1", + order=0, + ) + w.triggers.add(trigger) + w.actions.add(action) + w.save() + + doc = Document.objects.create( + title="sample test", + correspondent=self.c, + original_filename="sample.pdf", + ) + + assign_perm("documents.view_document", self.user2, doc) + assign_perm("documents.change_document", self.user2, doc) + assign_perm("documents.view_document", self.group1, doc) + assign_perm("documents.change_document", self.group1, doc) + + superuser = User.objects.create_superuser("superuser") + self.client.force_authenticate(user=superuser) + + self.client.patch( + f"/api/documents/{doc.id}/", + {"document_type": self.dt.id}, + format="json", + ) + + view_users_perms: QuerySet = get_users_with_perms( + doc, + only_with_perms_in=["view_document"], + ) + change_users_perms: QuerySet = get_users_with_perms( + doc, + only_with_perms_in=["change_document"], + ) + # user2 should still have permissions + self.assertIn(self.user2, view_users_perms) + self.assertIn(self.user2, change_users_perms) + # user3 should have been added + self.assertIn(self.user3, view_users_perms) + self.assertIn(self.user3, change_users_perms) + + group_perms: QuerySet = get_groups_with_perms(doc) + # group1 should still have permissions + self.assertIn(self.group1, group_perms) + # group2 should have been added + self.assertIn(self.group2, group_perms) def test_workflow_enabled_disabled(self): trigger = WorkflowTrigger.objects.create(