Files
paperless-ngx/src/documents/permissions.py
2026-02-16 09:37:33 -08:00

326 lines
11 KiB
Python

from typing import Any
from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.db.models import Count
from django.db.models import IntegerField
from django.db.models import OuterRef
from django.db.models import Q
from django.db.models import QuerySet
from django.db.models import Subquery
from django.db.models.functions import Cast
from django.db.models.functions import Coalesce
from guardian.core import ObjectPermissionChecker
from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_objects_for_user
from guardian.shortcuts import get_users_with_perms
from guardian.shortcuts import remove_perm
from rest_framework.permissions import BasePermission
from rest_framework.permissions import DjangoObjectPermissions
from documents.models import Document
class PaperlessObjectPermissions(DjangoObjectPermissions):
"""
A permissions backend that checks for object-level permissions
or for ownership.
"""
perms_map = {
"GET": ["%(app_label)s.view_%(model_name)s"],
"OPTIONS": ["%(app_label)s.view_%(model_name)s"],
"HEAD": ["%(app_label)s.view_%(model_name)s"],
"POST": ["%(app_label)s.add_%(model_name)s"],
"PUT": ["%(app_label)s.change_%(model_name)s"],
"PATCH": ["%(app_label)s.change_%(model_name)s"],
"DELETE": ["%(app_label)s.delete_%(model_name)s"],
}
def has_object_permission(self, request, view, obj):
if hasattr(obj, "owner") and obj.owner is not None:
if request.user == obj.owner:
return True
else:
return super().has_object_permission(request, view, obj)
else:
return True # no owner
class PaperlessAdminPermissions(BasePermission):
def has_permission(self, request, view):
return request.user.is_staff
def get_groups_with_only_permission(obj, codename):
ctype = ContentType.objects.get_for_model(obj)
permission = Permission.objects.get(content_type=ctype, codename=codename)
group_object_perm_group_ids = (
GroupObjectPermission.objects.filter(
object_pk=obj.pk,
content_type=ctype,
)
.filter(permission=permission)
.values_list("group_id")
)
return Group.objects.filter(id__in=group_object_perm_group_ids).distinct()
def set_permissions_for_object(
permissions: dict,
object,
*,
merge: bool = False,
) -> None:
"""
Set permissions for an object. The permissions are given as a mapping of actions
to a dict of user / group id lists, e.g.
{"view": {"users": [1], "groups": [2]}, "change": {"users": [], "groups": []}}.
If merge is True, the permissions are merged with the existing permissions and
no users or groups are removed. If False, the permissions are set to exactly
the given list of users and groups.
"""
for action, entry in permissions.items():
permission = f"{action}_{object.__class__.__name__.lower()}"
if "users" in entry:
# users
users_to_add = User.objects.filter(id__in=entry["users"])
users_to_remove = (
get_users_with_perms(
object,
only_with_perms_in=[permission],
with_group_users=False,
)
if not merge
else User.objects.none()
)
if users_to_add.exists() and users_to_remove.exists():
users_to_remove = users_to_remove.exclude(id__in=users_to_add)
if users_to_remove.exists():
for user in users_to_remove:
remove_perm(permission, user, object)
if users_to_add.exists():
for user in users_to_add:
assign_perm(permission, user, object)
if action == "change":
# change gives view too
assign_perm(
f"view_{object.__class__.__name__.lower()}",
user,
object,
)
if "groups" in entry:
# groups
groups_to_add = Group.objects.filter(id__in=entry["groups"])
groups_to_remove = (
get_groups_with_only_permission(
object,
permission,
)
if not merge
else Group.objects.none()
)
if groups_to_add.exists() and groups_to_remove.exists():
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
if groups_to_remove.exists():
for group in groups_to_remove:
remove_perm(permission, group, object)
if groups_to_add.exists():
for group in groups_to_add:
assign_perm(permission, group, object)
if action == "change":
# change gives view too
assign_perm(
f"view_{object.__class__.__name__.lower()}",
group,
object,
)
def _permitted_document_ids(user):
"""
Return a queryset of document IDs the user may view, limited to non-deleted
documents. This intentionally avoids ``get_objects_for_user`` to keep the
subquery small and index-friendly.
"""
base_docs = Document.objects.filter(deleted_at__isnull=True).only("id", "owner")
if user is None or not getattr(user, "is_authenticated", False):
# Just Anonymous user e.g. for drf-spectacular
return base_docs.filter(owner__isnull=True).values_list("id", flat=True)
if getattr(user, "is_superuser", False):
return base_docs.values_list("id", flat=True)
document_ct = ContentType.objects.get_for_model(Document)
perm_filter = {
"permission__codename": "view_document",
"permission__content_type": document_ct,
}
user_perm_docs = (
UserObjectPermission.objects.filter(user=user, **perm_filter)
.annotate(object_pk_int=Cast("object_pk", IntegerField()))
.values_list("object_pk_int", flat=True)
)
group_perm_docs = (
GroupObjectPermission.objects.filter(group__user=user, **perm_filter)
.annotate(object_pk_int=Cast("object_pk", IntegerField()))
.values_list("object_pk_int", flat=True)
)
permitted_documents = user_perm_docs.union(group_perm_docs)
return base_docs.filter(
Q(owner=user) | Q(owner__isnull=True) | Q(id__in=permitted_documents),
).values_list("id", flat=True)
def get_document_count_filter_for_user(user):
"""
Return the Q object used to filter document counts for the given user.
The filter is expressed as an ``id__in`` against a small subquery of permitted
document IDs to keep the generated SQL simple and avoid large OR clauses.
"""
if getattr(user, "is_superuser", False):
# Superuser: no permission filtering needed
return Q(documents__deleted_at__isnull=True)
permitted_ids = _permitted_document_ids(user)
return Q(documents__id__in=permitted_ids)
def annotate_document_count_for_related_queryset(
queryset: QuerySet[Any],
through_model: Any,
related_object_field: str,
target_field: str = "document_id",
user: User | None = None,
) -> QuerySet[Any]:
"""
Annotate a queryset with permissions-aware document counts using a subquery
against a relation table.
Args:
queryset: base queryset to annotate (must contain pk)
through_model: model representing the relation (e.g., Document.tags.through
or CustomFieldInstance)
source_field: field on the relation pointing back to queryset pk
target_field: field on the relation pointing to Document id
user: the user for whom to filter permitted document ids
"""
permitted_ids = _permitted_document_ids(user)
counts = (
through_model.objects.filter(
**{
related_object_field: OuterRef("pk"),
f"{target_field}__in": permitted_ids,
},
)
.values(related_object_field)
.annotate(c=Count(target_field))
.values("c")
)
return queryset.annotate(document_count=Coalesce(Subquery(counts[:1]), 0))
def get_objects_for_user_owner_aware(
user: User | None,
perms: str | list[str],
Model: Any,
*,
include_deleted: bool = False,
) -> QuerySet:
"""
Returns objects the user owns, are unowned, or has explicit perms.
When include_deleted is True, soft-deleted items are also included.
"""
manager = (
Model.global_objects
if include_deleted and hasattr(Model, "global_objects")
else Model.objects
)
objects_owned = manager.filter(owner=user)
objects_unowned = manager.filter(owner__isnull=True)
objects_with_perms = get_objects_for_user(
user=user,
perms=perms,
klass=manager.all(),
accept_global_perms=False,
)
return objects_owned | objects_unowned | objects_with_perms
def has_perms_owner_aware(user, perms, obj):
checker = ObjectPermissionChecker(user)
return obj.owner is None or obj.owner == user or checker.has_perm(perms, obj)
class ViewDocumentsPermissions(BasePermission):
"""
Permissions class that checks for model permissions for only viewing Documents.
"""
perms_map = {
"OPTIONS": ["documents.view_document"],
"GET": ["documents.view_document"],
"POST": ["documents.view_document"],
}
def has_permission(self, request, view):
if not request.user or (not request.user.is_authenticated): # pragma: no cover
return False
return request.user.has_perms(self.perms_map.get(request.method, []))
class PaperlessNotePermissions(BasePermission):
"""
Permissions class that checks for model permissions for Notes.
"""
perms_map = {
"OPTIONS": ["documents.view_note"],
"GET": ["documents.view_note"],
"POST": ["documents.add_note"],
"DELETE": ["documents.delete_note"],
}
def has_permission(self, request, view):
if not request.user or (not request.user.is_authenticated): # pragma: no cover
return False
perms = self.perms_map[request.method]
return request.user.has_perms(perms)
class AcknowledgeTasksPermissions(BasePermission):
"""
Permissions class that checks for model permissions for acknowledging tasks.
"""
perms_map = {
"POST": ["documents.change_paperlesstask"],
}
def has_permission(self, request: Any, view: Any) -> bool:
if not request.user or not request.user.is_authenticated: # pragma: no cover
return False
perms = self.perms_map.get(request.method, [])
return request.user.has_perms(perms)