mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-10-12 02:26:09 -05:00

Fixes N+1 queries in tag, correspondent, storage path, custom field, and document type list views. Reduces SQL queries from 160 to 9.
2592 lines
90 KiB
Python
2592 lines
90 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import re
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import TYPE_CHECKING
|
|
from typing import Literal
|
|
|
|
import magic
|
|
from celery import states
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import Group
|
|
from django.contrib.auth.models import User
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.validators import DecimalValidator
|
|
from django.core.validators import MaxLengthValidator
|
|
from django.core.validators import RegexValidator
|
|
from django.core.validators import integer_validator
|
|
from django.utils.crypto import get_random_string
|
|
from django.utils.dateparse import parse_datetime
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import get_current_timezone
|
|
from django.utils.timezone import is_naive
|
|
from django.utils.timezone import make_aware
|
|
from django.utils.translation import gettext as _
|
|
from drf_spectacular.utils import extend_schema_field
|
|
from drf_spectacular.utils import extend_schema_serializer
|
|
from drf_writable_nested.serializers import NestedUpdateMixin
|
|
from guardian.core import ObjectPermissionChecker
|
|
from guardian.shortcuts import get_users_with_perms
|
|
from guardian.utils import get_group_obj_perms_model
|
|
from guardian.utils import get_user_obj_perms_model
|
|
from rest_framework import fields
|
|
from rest_framework import serializers
|
|
from rest_framework.fields import SerializerMethodField
|
|
|
|
if settings.AUDIT_LOG_ENABLED:
|
|
from auditlog.context import set_actor
|
|
|
|
|
|
from documents import bulk_edit
|
|
from documents.data_models import DocumentSource
|
|
from documents.models import Correspondent
|
|
from documents.models import CustomField
|
|
from documents.models import CustomFieldInstance
|
|
from documents.models import Document
|
|
from documents.models import DocumentType
|
|
from documents.models import MatchingModel
|
|
from documents.models import Note
|
|
from documents.models import PaperlessTask
|
|
from documents.models import SavedView
|
|
from documents.models import SavedViewFilterRule
|
|
from documents.models import ShareLink
|
|
from documents.models import StoragePath
|
|
from documents.models import Tag
|
|
from documents.models import UiSettings
|
|
from documents.models import Workflow
|
|
from documents.models import WorkflowAction
|
|
from documents.models import WorkflowActionEmail
|
|
from documents.models import WorkflowActionWebhook
|
|
from documents.models import WorkflowTrigger
|
|
from documents.parsers import is_mime_type_supported
|
|
from documents.permissions import get_groups_with_only_permission
|
|
from documents.permissions import set_permissions_for_object
|
|
from documents.templating.filepath import validate_filepath_template_and_render
|
|
from documents.templating.utils import convert_format_str_to_template_format
|
|
from documents.validators import uri_validator
|
|
from documents.validators import url_validator
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterable
|
|
|
|
logger = logging.getLogger("paperless.serializers")
|
|
|
|
|
|
# https://www.django-rest-framework.org/api-guide/serializers/#example
|
|
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
|
|
"""
|
|
A ModelSerializer that takes an additional `fields` argument that
|
|
controls which fields should be displayed.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
# Don't pass the 'fields' arg up to the superclass
|
|
fields = kwargs.pop("fields", None)
|
|
|
|
# Instantiate the superclass normally
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if fields is not None:
|
|
# Drop any fields that are not specified in the `fields` argument.
|
|
allowed = set(fields)
|
|
existing = set(self.fields)
|
|
for field_name in existing - allowed:
|
|
self.fields.pop(field_name)
|
|
|
|
|
|
class MatchingModelSerializer(serializers.ModelSerializer):
|
|
document_count = serializers.IntegerField(read_only=True)
|
|
|
|
def get_slug(self, obj) -> str:
|
|
return slugify(obj.name)
|
|
|
|
slug = SerializerMethodField()
|
|
|
|
def validate(self, data):
|
|
# TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
|
|
name = data.get(
|
|
"name",
|
|
self.instance.name if hasattr(self.instance, "name") else None,
|
|
)
|
|
owner = (
|
|
data["owner"]
|
|
if "owner" in data
|
|
else self.user
|
|
if hasattr(self, "user")
|
|
else None
|
|
)
|
|
pk = self.instance.pk if hasattr(self.instance, "pk") else None
|
|
if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
|
|
name=name,
|
|
owner=owner,
|
|
).exclude(pk=pk).exists():
|
|
raise serializers.ValidationError(
|
|
{"error": "Object violates owner / name unique constraint"},
|
|
)
|
|
return data
|
|
|
|
def validate_match(self, match):
|
|
if (
|
|
"matching_algorithm" in self.initial_data
|
|
and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
|
|
):
|
|
try:
|
|
re.compile(match)
|
|
except re.error as e:
|
|
raise serializers.ValidationError(
|
|
_("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
|
|
)
|
|
return match
|
|
|
|
|
|
class SetPermissionsMixin:
|
|
def _validate_user_ids(self, user_ids):
|
|
users = User.objects.none()
|
|
if user_ids is not None:
|
|
users = User.objects.filter(id__in=user_ids)
|
|
if not users.count() == len(user_ids):
|
|
raise serializers.ValidationError(
|
|
"Some users in don't exist or were specified twice.",
|
|
)
|
|
return users
|
|
|
|
def _validate_group_ids(self, group_ids):
|
|
groups = Group.objects.none()
|
|
if group_ids is not None:
|
|
groups = Group.objects.filter(id__in=group_ids)
|
|
if not groups.count() == len(group_ids):
|
|
raise serializers.ValidationError(
|
|
"Some groups in don't exist or were specified twice.",
|
|
)
|
|
return groups
|
|
|
|
def validate_set_permissions(self, set_permissions=None):
|
|
permissions_dict = {
|
|
"view": {},
|
|
"change": {},
|
|
}
|
|
if set_permissions is not None:
|
|
for action in ["view", "change"]:
|
|
if action in set_permissions:
|
|
if "users" in set_permissions[action]:
|
|
users = set_permissions[action]["users"]
|
|
permissions_dict[action]["users"] = self._validate_user_ids(
|
|
users,
|
|
)
|
|
if "groups" in set_permissions[action]:
|
|
groups = set_permissions[action]["groups"]
|
|
permissions_dict[action]["groups"] = self._validate_group_ids(
|
|
groups,
|
|
)
|
|
else:
|
|
del permissions_dict[action]
|
|
return permissions_dict
|
|
|
|
def _set_permissions(self, permissions, object):
|
|
set_permissions_for_object(permissions, object)
|
|
|
|
|
|
class SerializerWithPerms(serializers.Serializer):
|
|
def __init__(self, *args, **kwargs):
|
|
self.user = kwargs.pop("user", None)
|
|
self.full_perms = kwargs.pop("full_perms", False)
|
|
self.all_fields = kwargs.pop("all_fields", False)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
@extend_schema_field(
|
|
field={
|
|
"type": "object",
|
|
"properties": {
|
|
"view": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
"change": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
class SetPermissionsSerializer(serializers.DictField):
|
|
pass
|
|
|
|
|
|
class OwnedObjectSerializer(
|
|
SerializerWithPerms,
|
|
serializers.ModelSerializer,
|
|
SetPermissionsMixin,
|
|
):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if not self.all_fields:
|
|
try:
|
|
if self.full_perms:
|
|
self.fields.pop("user_can_change")
|
|
self.fields.pop("is_shared_by_requester")
|
|
else:
|
|
self.fields.pop("permissions")
|
|
except KeyError:
|
|
pass
|
|
|
|
def _get_perms(self, obj, codename: str, target: Literal["users", "groups"]):
|
|
"""
|
|
Get the given permissions from context or from django-guardian.
|
|
|
|
:param codename: The permission codename, e.g. 'view' or 'change'
|
|
:param target: 'users' or 'groups'
|
|
"""
|
|
key = f"{target}_{codename}_perms"
|
|
cached = self.context.get(key, {}).get(obj.pk)
|
|
if cached is not None:
|
|
return list(cached)
|
|
|
|
# Permission not found in the context, get it from guardian
|
|
if target == "users":
|
|
return list(
|
|
get_users_with_perms(
|
|
obj,
|
|
only_with_perms_in=[f"{codename}_{obj.__class__.__name__.lower()}"],
|
|
with_group_users=False,
|
|
).values_list("id", flat=True),
|
|
)
|
|
else: # groups
|
|
return list(
|
|
get_groups_with_only_permission(
|
|
obj,
|
|
codename=f"{codename}_{obj.__class__.__name__.lower()}",
|
|
).values_list("id", flat=True),
|
|
)
|
|
|
|
@extend_schema_field(
|
|
field={
|
|
"type": "object",
|
|
"properties": {
|
|
"view": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
"change": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
def get_permissions(self, obj) -> dict:
|
|
return {
|
|
"view": {
|
|
"users": self._get_perms(obj, "view", "users"),
|
|
"groups": self._get_perms(obj, "view", "groups"),
|
|
},
|
|
"change": {
|
|
"users": self._get_perms(obj, "change", "users"),
|
|
"groups": self._get_perms(obj, "change", "groups"),
|
|
},
|
|
}
|
|
|
|
def get_user_can_change(self, obj) -> bool:
|
|
checker = ObjectPermissionChecker(self.user) if self.user is not None else None
|
|
return (
|
|
obj.owner is None
|
|
or obj.owner == self.user
|
|
or (
|
|
self.user is not None
|
|
and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def get_shared_object_pks(objects: Iterable):
|
|
"""
|
|
Return the primary keys of the subset of objects that are shared.
|
|
"""
|
|
try:
|
|
first_obj = next(iter(objects))
|
|
except StopIteration:
|
|
return set()
|
|
|
|
ctype = ContentType.objects.get_for_model(first_obj)
|
|
object_pks = list(obj.pk for obj in objects)
|
|
pk_type = type(first_obj.pk)
|
|
|
|
def get_pks_for_permission_type(model):
|
|
return map(
|
|
pk_type, # coerce the pk to be the same type of the provided objects
|
|
model.objects.filter(
|
|
content_type=ctype,
|
|
object_pk__in=object_pks,
|
|
)
|
|
.values_list("object_pk", flat=True)
|
|
.distinct(),
|
|
)
|
|
|
|
UserObjectPermission = get_user_obj_perms_model()
|
|
GroupObjectPermission = get_group_obj_perms_model()
|
|
user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
|
|
group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)
|
|
|
|
return set(user_permission_pks) | set(group_permission_pks)
|
|
|
|
def get_is_shared_by_requester(self, obj: Document) -> bool:
|
|
# First check the context to see if `shared_object_pks` is set by the parent.
|
|
shared_object_pks = self.context.get("shared_object_pks")
|
|
# If not just check if the current object is shared.
|
|
if shared_object_pks is None:
|
|
shared_object_pks = self.get_shared_object_pks([obj])
|
|
return obj.owner == self.user and obj.id in shared_object_pks
|
|
|
|
permissions = SerializerMethodField(read_only=True, required=False)
|
|
user_can_change = SerializerMethodField(read_only=True, required=False)
|
|
is_shared_by_requester = SerializerMethodField(read_only=True, required=False)
|
|
|
|
set_permissions = SetPermissionsSerializer(
|
|
label="Set permissions",
|
|
allow_empty=True,
|
|
required=False,
|
|
write_only=True,
|
|
)
|
|
# other methods in mixin
|
|
|
|
def validate_unique_together(self, validated_data, instance=None):
|
|
# workaround for https://github.com/encode/django-rest-framework/issues/9358
|
|
if "owner" in validated_data and "name" in self.Meta.fields:
|
|
name = validated_data.get("name", instance.name if instance else None)
|
|
objects = (
|
|
self.Meta.model.objects.exclude(pk=instance.pk)
|
|
if instance
|
|
else self.Meta.model.objects.all()
|
|
)
|
|
not_unique = objects.filter(
|
|
owner=validated_data["owner"],
|
|
name=name,
|
|
).exists()
|
|
if not_unique:
|
|
raise serializers.ValidationError(
|
|
{"error": "Object violates owner / name unique constraint"},
|
|
)
|
|
|
|
def create(self, validated_data):
|
|
# default to current user if not set
|
|
request = self.context.get("request")
|
|
if (
|
|
"owner" not in validated_data
|
|
or (request is not None and "owner" not in request.data)
|
|
) and self.user:
|
|
validated_data["owner"] = self.user
|
|
permissions = None
|
|
if "set_permissions" in validated_data:
|
|
permissions = validated_data.pop("set_permissions")
|
|
self.validate_unique_together(validated_data)
|
|
instance = super().create(validated_data)
|
|
if permissions is not None:
|
|
self._set_permissions(permissions, instance)
|
|
return instance
|
|
|
|
def update(self, instance, validated_data):
|
|
if "set_permissions" in validated_data:
|
|
self._set_permissions(validated_data["set_permissions"], instance)
|
|
self.validate_unique_together(validated_data, instance)
|
|
return super().update(instance, validated_data)
|
|
|
|
|
|
class OwnedObjectListSerializer(serializers.ListSerializer):
|
|
def to_representation(self, documents):
|
|
self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
|
|
documents,
|
|
)
|
|
return super().to_representation(documents)
|
|
|
|
|
|
class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
last_correspondence = serializers.DateField(read_only=True, required=False)
|
|
|
|
class Meta:
|
|
model = Correspondent
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"document_count",
|
|
"last_correspondence",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
|
|
class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
class Meta:
|
|
model = DocumentType
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
|
|
class DeprecatedColors:
|
|
COLOURS = (
|
|
(1, "#a6cee3"),
|
|
(2, "#1f78b4"),
|
|
(3, "#b2df8a"),
|
|
(4, "#33a02c"),
|
|
(5, "#fb9a99"),
|
|
(6, "#e31a1c"),
|
|
(7, "#fdbf6f"),
|
|
(8, "#ff7f00"),
|
|
(9, "#cab2d6"),
|
|
(10, "#6a3d9a"),
|
|
(11, "#b15928"),
|
|
(12, "#000000"),
|
|
(13, "#cccccc"),
|
|
)
|
|
|
|
|
|
@extend_schema_field(
|
|
serializers.ChoiceField(
|
|
choices=DeprecatedColors.COLOURS,
|
|
),
|
|
)
|
|
class ColorField(serializers.Field):
|
|
def to_internal_value(self, data):
|
|
for id, color in DeprecatedColors.COLOURS:
|
|
if id == data:
|
|
return color
|
|
raise serializers.ValidationError
|
|
|
|
def to_representation(self, value):
|
|
for id, color in DeprecatedColors.COLOURS:
|
|
if color == value:
|
|
return id
|
|
return 1
|
|
|
|
|
|
class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
|
|
colour = ColorField(source="color", default="#a6cee3")
|
|
|
|
class Meta:
|
|
model = Tag
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"colour",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"is_inbox_tag",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
|
|
class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
def get_text_color(self, obj) -> str:
|
|
try:
|
|
h = obj.color.lstrip("#")
|
|
rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
|
|
luminance = math.sqrt(
|
|
0.299 * math.pow(rgb[0], 2)
|
|
+ 0.587 * math.pow(rgb[1], 2)
|
|
+ 0.114 * math.pow(rgb[2], 2),
|
|
)
|
|
return "#ffffff" if luminance < 0.53 else "#000000"
|
|
except ValueError:
|
|
return "#000000"
|
|
|
|
text_color = serializers.SerializerMethodField()
|
|
|
|
# map to treenode's tn_parent
|
|
parent = serializers.PrimaryKeyRelatedField(
|
|
queryset=Tag.objects.all(),
|
|
allow_null=True,
|
|
required=False,
|
|
source="tn_parent",
|
|
)
|
|
|
|
@extend_schema_field(
|
|
field=serializers.ListSerializer(
|
|
child=serializers.PrimaryKeyRelatedField(
|
|
queryset=Tag.objects.all(),
|
|
),
|
|
),
|
|
)
|
|
def get_children(self, obj):
|
|
serializer = TagSerializer(
|
|
obj.get_children(),
|
|
many=True,
|
|
context=self.context,
|
|
)
|
|
return serializer.data
|
|
|
|
# children as nested Tag objects
|
|
children = serializers.SerializerMethodField()
|
|
|
|
class Meta:
|
|
model = Tag
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"color",
|
|
"text_color",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"is_inbox_tag",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
"parent",
|
|
"children",
|
|
)
|
|
|
|
def validate_color(self, color):
|
|
regex = r"#[0-9a-fA-F]{6}"
|
|
if not re.match(regex, color):
|
|
raise serializers.ValidationError(_("Invalid color."))
|
|
return color
|
|
|
|
def validate(self, attrs):
|
|
# Validate when changing parent
|
|
parent = attrs.get(
|
|
"tn_parent",
|
|
self.instance.get_parent() if self.instance else None,
|
|
)
|
|
|
|
if self.instance:
|
|
# Temporarily set parent on the instance if updating and use model clean()
|
|
original_parent = self.instance.get_parent()
|
|
try:
|
|
# Temporarily set tn_parent in-memory to validate clean()
|
|
self.instance.tn_parent = parent
|
|
self.instance.clean()
|
|
except ValidationError as e:
|
|
logger.debug("Tag parent validation failed: %s", e)
|
|
raise e
|
|
finally:
|
|
self.instance.tn_parent = original_parent
|
|
else:
|
|
# For new instances, create a transient Tag and validate
|
|
temp = Tag(tn_parent=parent)
|
|
try:
|
|
temp.clean()
|
|
except ValidationError as e:
|
|
logger.debug("Tag parent validation failed: %s", e)
|
|
raise serializers.ValidationError({"parent": _("Invalid parent tag.")})
|
|
|
|
return super().validate(attrs)
|
|
|
|
|
|
class CorrespondentField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return Correspondent.objects.all()
|
|
|
|
|
|
class TagsField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return Tag.objects.all()
|
|
|
|
|
|
class DocumentTypeField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return DocumentType.objects.all()
|
|
|
|
|
|
class StoragePathField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return StoragePath.objects.all()
|
|
|
|
|
|
class CustomFieldSerializer(serializers.ModelSerializer):
|
|
def __init__(self, *args, **kwargs):
|
|
context = kwargs.get("context")
|
|
self.api_version = int(
|
|
context.get("request").version
|
|
if context and context.get("request")
|
|
else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
|
|
)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
data_type = serializers.ChoiceField(
|
|
choices=CustomField.FieldDataType,
|
|
read_only=False,
|
|
)
|
|
|
|
document_count = serializers.IntegerField(read_only=True)
|
|
|
|
class Meta:
|
|
model = CustomField
|
|
fields = [
|
|
"id",
|
|
"name",
|
|
"data_type",
|
|
"extra_data",
|
|
"document_count",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
# TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
|
|
name = attrs.get(
|
|
"name",
|
|
self.instance.name if hasattr(self.instance, "name") else None,
|
|
)
|
|
objects = (
|
|
self.Meta.model.objects.exclude(
|
|
pk=self.instance.pk,
|
|
)
|
|
if self.instance is not None
|
|
else self.Meta.model.objects.all()
|
|
)
|
|
if ("name" in attrs) and objects.filter(
|
|
name=name,
|
|
).exists():
|
|
raise serializers.ValidationError(
|
|
{"error": "Object violates name unique constraint"},
|
|
)
|
|
if (
|
|
"data_type" in attrs
|
|
and attrs["data_type"] == CustomField.FieldDataType.SELECT
|
|
) or (
|
|
self.instance
|
|
and self.instance.data_type == CustomField.FieldDataType.SELECT
|
|
):
|
|
if (
|
|
"extra_data" not in attrs
|
|
or "select_options" not in attrs["extra_data"]
|
|
or not isinstance(attrs["extra_data"]["select_options"], list)
|
|
or len(attrs["extra_data"]["select_options"]) == 0
|
|
or not all(
|
|
len(option.get("label", "")) > 0
|
|
for option in attrs["extra_data"]["select_options"]
|
|
)
|
|
):
|
|
raise serializers.ValidationError(
|
|
{"error": "extra_data.select_options must be a valid list"},
|
|
)
|
|
# labels are valid, generate ids if not present
|
|
for option in attrs["extra_data"]["select_options"]:
|
|
if option.get("id") is None:
|
|
option["id"] = get_random_string(length=16)
|
|
elif (
|
|
"data_type" in attrs
|
|
and attrs["data_type"] == CustomField.FieldDataType.MONETARY
|
|
and "extra_data" in attrs
|
|
and "default_currency" in attrs["extra_data"]
|
|
and attrs["extra_data"]["default_currency"] is not None
|
|
and (
|
|
not isinstance(attrs["extra_data"]["default_currency"], str)
|
|
or (
|
|
len(attrs["extra_data"]["default_currency"]) > 0
|
|
and len(attrs["extra_data"]["default_currency"]) != 3
|
|
)
|
|
)
|
|
):
|
|
raise serializers.ValidationError(
|
|
{"error": "extra_data.default_currency must be a 3-character string"},
|
|
)
|
|
return super().validate(attrs)
|
|
|
|
def to_internal_value(self, data):
|
|
ret = super().to_internal_value(data)
|
|
|
|
if (
|
|
self.api_version < 7
|
|
and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
|
|
and isinstance(ret.get("extra_data", {}).get("select_options"), list)
|
|
):
|
|
ret["extra_data"]["select_options"] = [
|
|
{
|
|
"label": option,
|
|
"id": get_random_string(length=16),
|
|
}
|
|
for option in ret["extra_data"]["select_options"]
|
|
]
|
|
|
|
return ret
|
|
|
|
def to_representation(self, instance):
|
|
ret = super().to_representation(instance)
|
|
|
|
if (
|
|
self.api_version < 7
|
|
and instance.data_type == CustomField.FieldDataType.SELECT
|
|
):
|
|
# Convert the select options with ids to a list of strings
|
|
ret["extra_data"]["select_options"] = [
|
|
option["label"] for option in ret["extra_data"]["select_options"]
|
|
]
|
|
|
|
return ret
|
|
|
|
|
|
class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
|
|
"""
|
|
Based on https://stackoverflow.com/a/62579804
|
|
"""
|
|
|
|
def __init__(self, method_name=None, *args, **kwargs):
|
|
self.method_name = method_name
|
|
kwargs["source"] = "*"
|
|
super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)
|
|
|
|
def to_internal_value(self, data):
|
|
return {self.field_name: data}
|
|
|
|
|
|
class CustomFieldInstanceSerializer(serializers.ModelSerializer):
|
|
field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
|
|
value = ReadWriteSerializerMethodField(allow_null=True)
|
|
|
|
def create(self, validated_data):
|
|
# An instance is attached to a document
|
|
document: Document = validated_data["document"]
|
|
# And to a CustomField
|
|
custom_field: CustomField = validated_data["field"]
|
|
# This key must exist, as it is validated
|
|
data_store_name = CustomFieldInstance.get_value_field_name(
|
|
custom_field.data_type,
|
|
)
|
|
|
|
if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
|
|
# prior to update so we can look for any docs that are going to be removed
|
|
bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])
|
|
|
|
# Actually update or create the instance, providing the value
|
|
# to fill in the correct attribute based on the type
|
|
instance, _ = CustomFieldInstance.objects.update_or_create(
|
|
document=document,
|
|
field=custom_field,
|
|
defaults={data_store_name: validated_data["value"]},
|
|
)
|
|
return instance
|
|
|
|
def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
|
|
return obj.value
|
|
|
|
def validate(self, data):
|
|
"""
|
|
Probably because we're kind of doing it odd, validation from the model
|
|
doesn't run against the field "value", so we have to re-create it here.
|
|
|
|
Don't like it, but it is better than returning an HTTP 500 when the database
|
|
hates the value
|
|
"""
|
|
data = super().validate(data)
|
|
field: CustomField = data["field"]
|
|
if "value" in data and data["value"] is not None:
|
|
if (
|
|
field.data_type == CustomField.FieldDataType.URL
|
|
and len(data["value"]) > 0
|
|
):
|
|
uri_validator(data["value"])
|
|
elif field.data_type == CustomField.FieldDataType.INT:
|
|
integer_validator(data["value"])
|
|
elif (
|
|
field.data_type == CustomField.FieldDataType.MONETARY
|
|
and data["value"] != ""
|
|
):
|
|
try:
|
|
# First try to validate as a number from legacy format
|
|
DecimalValidator(max_digits=12, decimal_places=2)(
|
|
Decimal(str(data["value"])),
|
|
)
|
|
except Exception:
|
|
# If that fails, try to validate as a monetary string
|
|
RegexValidator(
|
|
regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
|
|
message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
|
|
)(data["value"])
|
|
elif field.data_type == CustomField.FieldDataType.STRING:
|
|
MaxLengthValidator(limit_value=128)(data["value"])
|
|
elif field.data_type == CustomField.FieldDataType.SELECT:
|
|
select_options = field.extra_data["select_options"]
|
|
try:
|
|
next(
|
|
option
|
|
for option in select_options
|
|
if option["id"] == data["value"]
|
|
)
|
|
except Exception:
|
|
raise serializers.ValidationError(
|
|
f"Value must be an id of an element in {select_options}",
|
|
)
|
|
elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
|
|
if not (isinstance(data["value"], list) or data["value"] is None):
|
|
raise serializers.ValidationError(
|
|
"Value must be a list",
|
|
)
|
|
doc_ids = data["value"]
|
|
if Document.objects.filter(id__in=doc_ids).count() != len(
|
|
data["value"],
|
|
):
|
|
raise serializers.ValidationError(
|
|
"Some documents in value don't exist or were specified twice.",
|
|
)
|
|
|
|
return data
|
|
|
|
def get_api_version(self):
|
|
return int(
|
|
self.context.get("request").version
|
|
if self.context.get("request")
|
|
else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
|
|
)
|
|
|
|
def to_internal_value(self, data):
|
|
ret = super().to_internal_value(data)
|
|
|
|
if (
|
|
self.get_api_version() < 7
|
|
and ret.get("field").data_type == CustomField.FieldDataType.SELECT
|
|
and ret.get("value") is not None
|
|
):
|
|
# Convert the index of the option in the field.extra_data["select_options"]
|
|
# list to the options unique id
|
|
ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
|
|
"id"
|
|
]
|
|
|
|
return ret
|
|
|
|
def to_representation(self, instance):
|
|
ret = super().to_representation(instance)
|
|
|
|
if (
|
|
self.get_api_version() < 7
|
|
and instance.field.data_type == CustomField.FieldDataType.SELECT
|
|
):
|
|
# return the index of the option in the field.extra_data["select_options"] list
|
|
ret["value"] = next(
|
|
(
|
|
idx
|
|
for idx, option in enumerate(
|
|
instance.field.extra_data["select_options"],
|
|
)
|
|
if option["id"] == instance.value
|
|
),
|
|
None,
|
|
)
|
|
|
|
return ret
|
|
|
|
class Meta:
|
|
model = CustomFieldInstance
|
|
fields = [
|
|
"value",
|
|
"field",
|
|
]
|
|
|
|
|
|
class BasicUserSerializer(serializers.ModelSerializer):
|
|
# Different than paperless.serializers.UserSerializer
|
|
class Meta:
|
|
model = User
|
|
fields = ["id", "username", "first_name", "last_name"]
|
|
|
|
|
|
class NotesSerializer(serializers.ModelSerializer):
|
|
user = BasicUserSerializer(read_only=True)
|
|
|
|
class Meta:
|
|
model = Note
|
|
fields = ["id", "note", "created", "user"]
|
|
ordering = ["-created"]
|
|
|
|
def to_representation(self, instance):
|
|
ret = super().to_representation(instance)
|
|
|
|
request = self.context.get("request")
|
|
api_version = int(
|
|
request.version if request else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
|
|
)
|
|
|
|
if api_version < 8 and "user" in ret:
|
|
user_id = ret["user"]["id"]
|
|
ret["user"] = user_id
|
|
|
|
return ret
|
|
|
|
|
|
@extend_schema_serializer(
|
|
deprecate_fields=["created_date"],
|
|
)
|
|
class DocumentSerializer(
|
|
OwnedObjectSerializer,
|
|
NestedUpdateMixin,
|
|
DynamicFieldsModelSerializer,
|
|
):
|
|
correspondent = CorrespondentField(allow_null=True)
|
|
tags = TagsField(many=True)
|
|
document_type = DocumentTypeField(allow_null=True)
|
|
storage_path = StoragePathField(allow_null=True)
|
|
|
|
original_file_name = SerializerMethodField()
|
|
archived_file_name = SerializerMethodField()
|
|
created_date = serializers.DateField(required=False)
|
|
page_count = SerializerMethodField()
|
|
|
|
notes = NotesSerializer(many=True, required=False, read_only=True)
|
|
|
|
custom_fields = CustomFieldInstanceSerializer(
|
|
many=True,
|
|
allow_null=False,
|
|
required=False,
|
|
)
|
|
|
|
owner = serializers.PrimaryKeyRelatedField(
|
|
queryset=User.objects.all(),
|
|
required=False,
|
|
allow_null=True,
|
|
)
|
|
|
|
remove_inbox_tags = serializers.BooleanField(
|
|
default=False,
|
|
write_only=True,
|
|
allow_null=True,
|
|
required=False,
|
|
)
|
|
|
|
def get_page_count(self, obj) -> int | None:
|
|
return obj.page_count
|
|
|
|
def get_original_file_name(self, obj) -> str | None:
|
|
return obj.original_filename
|
|
|
|
def get_archived_file_name(self, obj) -> str | None:
|
|
if obj.has_archive_version:
|
|
return obj.get_public_filename(archive=True)
|
|
else:
|
|
return None
|
|
|
|
def to_representation(self, instance):
|
|
doc = super().to_representation(instance)
|
|
if self.truncate_content and "content" in self.fields:
|
|
doc["content"] = doc.get("content")[0:550]
|
|
|
|
request = self.context.get("request")
|
|
api_version = int(
|
|
request.version if request else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
|
|
)
|
|
|
|
if api_version < 9:
|
|
# provide created as a datetime for backwards compatibility
|
|
from django.utils import timezone
|
|
|
|
doc["created"] = timezone.make_aware(
|
|
datetime.combine(
|
|
instance.created,
|
|
datetime.min.time(),
|
|
),
|
|
).isoformat()
|
|
return doc
|
|
|
|
def to_internal_value(self, data):
|
|
if (
|
|
"created" in data
|
|
and isinstance(data["created"], str)
|
|
and ":" in data["created"]
|
|
):
|
|
# Handle old format of isoformat datetime string
|
|
parsed = parse_datetime(data["created"])
|
|
if parsed:
|
|
if is_naive(parsed):
|
|
parsed = make_aware(parsed, get_current_timezone())
|
|
data["created"] = parsed.astimezone().date()
|
|
return super().to_internal_value(data)
|
|
|
|
def validate(self, attrs):
|
|
if (
|
|
"archive_serial_number" in attrs
|
|
and attrs["archive_serial_number"] is not None
|
|
and len(str(attrs["archive_serial_number"])) > 0
|
|
and Document.deleted_objects.filter(
|
|
archive_serial_number=attrs["archive_serial_number"],
|
|
).exists()
|
|
):
|
|
raise serializers.ValidationError(
|
|
{
|
|
"archive_serial_number": [
|
|
"Document with this Archive Serial Number already exists in the trash.",
|
|
],
|
|
},
|
|
)
|
|
return super().validate(attrs)
|
|
|
|
def update(self, instance: Document, validated_data):
|
|
if "created_date" in validated_data and "created" not in validated_data:
|
|
instance.created = validated_data.get("created_date")
|
|
instance.save()
|
|
if "created_date" in validated_data:
|
|
logger.warning(
|
|
"created_date is deprecated, use created instead",
|
|
)
|
|
validated_data.pop("created_date")
|
|
if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
|
|
incoming_custom_fields = [
|
|
field["field"] for field in validated_data["custom_fields"]
|
|
]
|
|
for custom_field_instance in instance.custom_fields.filter(
|
|
field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
|
|
):
|
|
if (
|
|
custom_field_instance.field not in incoming_custom_fields
|
|
and custom_field_instance.value is not None
|
|
):
|
|
# Doc link field is being removed entirely
|
|
for doc_id in custom_field_instance.value:
|
|
bulk_edit.remove_doclink(
|
|
instance,
|
|
custom_field_instance.field,
|
|
doc_id,
|
|
)
|
|
if "tags" in validated_data:
|
|
# Respect tag hierarchy on updates:
|
|
# - Adding a child adds its ancestors
|
|
# - Removing a parent removes all its descendants
|
|
prev_tags = set(instance.tags.all())
|
|
requested_tags = set(validated_data["tags"])
|
|
|
|
# Tags being removed in this update and all descendants
|
|
removed_tags = prev_tags - requested_tags
|
|
blocked_tags = set(removed_tags)
|
|
for t in removed_tags:
|
|
blocked_tags.update(t.get_descendants())
|
|
|
|
# Add all parent tags
|
|
final_tags = set(requested_tags)
|
|
for t in requested_tags:
|
|
final_tags.update(t.get_ancestors())
|
|
|
|
# Drop removed parents and their descendants
|
|
final_tags.difference_update(blocked_tags)
|
|
|
|
validated_data["tags"] = list(final_tags)
|
|
if validated_data.get("remove_inbox_tags"):
|
|
tag_ids_being_added = (
|
|
[
|
|
tag.id
|
|
for tag in validated_data["tags"]
|
|
if tag not in instance.tags.all()
|
|
]
|
|
if "tags" in validated_data
|
|
else []
|
|
)
|
|
inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
|
|
id__in=tag_ids_being_added,
|
|
)
|
|
if "tags" in validated_data:
|
|
validated_data["tags"] = [
|
|
tag
|
|
for tag in validated_data["tags"]
|
|
if tag not in inbox_tags_not_being_added
|
|
]
|
|
else:
|
|
validated_data["tags"] = [
|
|
tag
|
|
for tag in instance.tags.all()
|
|
if tag not in inbox_tags_not_being_added
|
|
]
|
|
if settings.AUDIT_LOG_ENABLED:
|
|
with set_actor(self.user):
|
|
super().update(instance, validated_data)
|
|
else:
|
|
super().update(instance, validated_data)
|
|
# hard delete custom field instances that were soft deleted
|
|
CustomFieldInstance.deleted_objects.filter(document=instance).delete()
|
|
return instance
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.truncate_content = kwargs.pop("truncate_content", False)
|
|
|
|
# return full permissions if we're doing a PATCH or PUT
|
|
context = kwargs.get("context")
|
|
if context is not None and (
|
|
context.get("request").method == "PATCH"
|
|
or context.get("request").method == "PUT"
|
|
):
|
|
kwargs["full_perms"] = True
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
class Meta:
|
|
model = Document
|
|
fields = (
|
|
"id",
|
|
"correspondent",
|
|
"document_type",
|
|
"storage_path",
|
|
"title",
|
|
"content",
|
|
"tags",
|
|
"created",
|
|
"created_date",
|
|
"modified",
|
|
"added",
|
|
"deleted_at",
|
|
"archive_serial_number",
|
|
"original_file_name",
|
|
"archived_file_name",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"is_shared_by_requester",
|
|
"set_permissions",
|
|
"notes",
|
|
"custom_fields",
|
|
"remove_inbox_tags",
|
|
"page_count",
|
|
"mime_type",
|
|
)
|
|
list_serializer_class = OwnedObjectListSerializer
|
|
|
|
|
|
class SearchResultListSerializer(serializers.ListSerializer):
|
|
def to_representation(self, hits):
|
|
document_ids = [hit["id"] for hit in hits]
|
|
# Fetch all Document objects in the list in one SQL query.
|
|
documents = self.child.fetch_documents(document_ids)
|
|
self.child.context["documents"] = documents
|
|
# Also check if they are shared with other users / groups.
|
|
self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
|
|
documents.values(),
|
|
)
|
|
|
|
return super().to_representation(hits)
|
|
|
|
|
|
class SearchResultSerializer(DocumentSerializer):
|
|
@staticmethod
|
|
def fetch_documents(ids):
|
|
"""
|
|
Return a dict that maps given document IDs to Document objects.
|
|
"""
|
|
return {
|
|
document.id: document
|
|
for document in Document.objects.select_related(
|
|
"correspondent",
|
|
"storage_path",
|
|
"document_type",
|
|
"owner",
|
|
)
|
|
.prefetch_related("tags", "custom_fields", "notes")
|
|
.filter(id__in=ids)
|
|
}
|
|
|
|
def to_representation(self, hit):
|
|
# Again we first check if the parent has already fetched the documents.
|
|
documents = self.context.get("documents")
|
|
# Otherwise we fetch this document.
|
|
if documents is None: # pragma: no cover
|
|
# In practice we only serialize **lists** of whoosh.searching.Hit.
|
|
# I'm keeping this check for completeness but marking it no cover for now.
|
|
documents = self.fetch_documents([hit["id"]])
|
|
document = documents[hit["id"]]
|
|
|
|
notes = ",".join(
|
|
[str(c.note) for c in document.notes.all()],
|
|
)
|
|
r = super().to_representation(document)
|
|
r["__search_hit__"] = {
|
|
"score": hit.score,
|
|
"highlights": hit.highlights("content", text=document.content),
|
|
"note_highlights": (
|
|
hit.highlights("notes", text=notes) if document else None
|
|
),
|
|
"rank": hit.rank,
|
|
}
|
|
|
|
return r
|
|
|
|
class Meta(DocumentSerializer.Meta):
|
|
list_serializer_class = SearchResultListSerializer
|
|
|
|
|
|
class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
|
|
class Meta:
|
|
model = SavedViewFilterRule
|
|
fields = ["rule_type", "value"]
|
|
|
|
|
|
class SavedViewSerializer(OwnedObjectSerializer):
|
|
filter_rules = SavedViewFilterRuleSerializer(many=True)
|
|
|
|
class Meta:
|
|
model = SavedView
|
|
fields = [
|
|
"id",
|
|
"name",
|
|
"show_on_dashboard",
|
|
"show_in_sidebar",
|
|
"sort_field",
|
|
"sort_reverse",
|
|
"filter_rules",
|
|
"page_size",
|
|
"display_mode",
|
|
"display_fields",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
attrs = super().validate(attrs)
|
|
if "display_fields" in attrs and attrs["display_fields"] is not None:
|
|
for field in attrs["display_fields"]:
|
|
if (
|
|
SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
|
|
): # i.e. check for 'custom_field_' prefix
|
|
field_id = int(re.search(r"\d+", field)[0])
|
|
if not CustomField.objects.filter(id=field_id).exists():
|
|
raise serializers.ValidationError(
|
|
f"Invalid field: {field}",
|
|
)
|
|
elif field not in SavedView.DisplayFields.values:
|
|
raise serializers.ValidationError(
|
|
f"Invalid field: {field}",
|
|
)
|
|
return attrs
|
|
|
|
def update(self, instance, validated_data):
|
|
if "filter_rules" in validated_data:
|
|
rules_data = validated_data.pop("filter_rules")
|
|
else:
|
|
rules_data = None
|
|
if "user" in validated_data:
|
|
# backwards compatibility
|
|
validated_data["owner"] = validated_data.pop("user")
|
|
if (
|
|
"display_fields" in validated_data
|
|
and isinstance(
|
|
validated_data["display_fields"],
|
|
list,
|
|
)
|
|
and len(validated_data["display_fields"]) == 0
|
|
):
|
|
validated_data["display_fields"] = None
|
|
super().update(instance, validated_data)
|
|
if rules_data is not None:
|
|
SavedViewFilterRule.objects.filter(saved_view=instance).delete()
|
|
for rule_data in rules_data:
|
|
SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
|
|
return instance
|
|
|
|
def create(self, validated_data):
|
|
rules_data = validated_data.pop("filter_rules")
|
|
if "user" in validated_data:
|
|
# backwards compatibility
|
|
validated_data["owner"] = validated_data.pop("user")
|
|
saved_view = SavedView.objects.create(**validated_data)
|
|
for rule_data in rules_data:
|
|
SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
|
|
return saved_view
|
|
|
|
|
|
class DocumentListSerializer(serializers.Serializer):
|
|
documents = serializers.ListField(
|
|
required=True,
|
|
label="Documents",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
def _validate_document_id_list(self, documents, name="documents"):
|
|
if not isinstance(documents, list):
|
|
raise serializers.ValidationError(f"{name} must be a list")
|
|
if not all(isinstance(i, int) for i in documents):
|
|
raise serializers.ValidationError(f"{name} must be a list of integers")
|
|
count = Document.objects.filter(id__in=documents).count()
|
|
if not count == len(documents):
|
|
raise serializers.ValidationError(
|
|
f"Some documents in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def validate_documents(self, documents):
|
|
self._validate_document_id_list(documents)
|
|
return documents
|
|
|
|
|
|
class BulkEditSerializer(
|
|
SerializerWithPerms,
|
|
DocumentListSerializer,
|
|
SetPermissionsMixin,
|
|
):
|
|
method = serializers.ChoiceField(
|
|
choices=[
|
|
"set_correspondent",
|
|
"set_document_type",
|
|
"set_storage_path",
|
|
"add_tag",
|
|
"remove_tag",
|
|
"modify_tags",
|
|
"modify_custom_fields",
|
|
"delete",
|
|
"reprocess",
|
|
"set_permissions",
|
|
"rotate",
|
|
"merge",
|
|
"split",
|
|
"delete_pages",
|
|
"edit_pdf",
|
|
],
|
|
label="Method",
|
|
write_only=True,
|
|
)
|
|
|
|
parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)
|
|
|
|
def _validate_tag_id_list(self, tags, name="tags"):
|
|
if not isinstance(tags, list):
|
|
raise serializers.ValidationError(f"{name} must be a list")
|
|
if not all(isinstance(i, int) for i in tags):
|
|
raise serializers.ValidationError(f"{name} must be a list of integers")
|
|
count = Tag.objects.filter(id__in=tags).count()
|
|
if not count == len(tags):
|
|
raise serializers.ValidationError(
|
|
f"Some tags in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def _validate_custom_field_id_list_or_dict(
|
|
self,
|
|
custom_fields,
|
|
name="custom_fields",
|
|
):
|
|
ids = custom_fields
|
|
if isinstance(custom_fields, dict):
|
|
try:
|
|
ids = [int(i[0]) for i in custom_fields.items()]
|
|
except Exception as e:
|
|
logger.exception(f"Error validating custom fields: {e}")
|
|
raise serializers.ValidationError(
|
|
f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
|
|
)
|
|
elif not isinstance(custom_fields, list) or not all(
|
|
isinstance(i, int) for i in ids
|
|
):
|
|
raise serializers.ValidationError(
|
|
f"{name} must be a list of integers or a dict of id:value pairs",
|
|
)
|
|
count = CustomField.objects.filter(id__in=ids).count()
|
|
if not count == len(ids):
|
|
raise serializers.ValidationError(
|
|
f"Some custom fields in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def validate_method(self, method):
|
|
if method == "set_correspondent":
|
|
return bulk_edit.set_correspondent
|
|
elif method == "set_document_type":
|
|
return bulk_edit.set_document_type
|
|
elif method == "set_storage_path":
|
|
return bulk_edit.set_storage_path
|
|
elif method == "add_tag":
|
|
return bulk_edit.add_tag
|
|
elif method == "remove_tag":
|
|
return bulk_edit.remove_tag
|
|
elif method == "modify_tags":
|
|
return bulk_edit.modify_tags
|
|
elif method == "modify_custom_fields":
|
|
return bulk_edit.modify_custom_fields
|
|
elif method == "delete":
|
|
return bulk_edit.delete
|
|
elif method == "redo_ocr" or method == "reprocess":
|
|
return bulk_edit.reprocess
|
|
elif method == "set_permissions":
|
|
return bulk_edit.set_permissions
|
|
elif method == "rotate":
|
|
return bulk_edit.rotate
|
|
elif method == "merge":
|
|
return bulk_edit.merge
|
|
elif method == "split":
|
|
return bulk_edit.split
|
|
elif method == "delete_pages":
|
|
return bulk_edit.delete_pages
|
|
elif method == "edit_pdf":
|
|
return bulk_edit.edit_pdf
|
|
else: # pragma: no cover
|
|
# This will never happen as it is handled by the ChoiceField
|
|
raise serializers.ValidationError("Unsupported method.")
|
|
|
|
def _validate_parameters_tags(self, parameters):
|
|
if "tag" in parameters:
|
|
tag_id = parameters["tag"]
|
|
try:
|
|
Tag.objects.get(id=tag_id)
|
|
except Tag.DoesNotExist:
|
|
raise serializers.ValidationError("Tag does not exist")
|
|
else:
|
|
raise serializers.ValidationError("tag not specified")
|
|
|
|
def _validate_parameters_document_type(self, parameters):
|
|
if "document_type" in parameters:
|
|
document_type_id = parameters["document_type"]
|
|
if document_type_id is None:
|
|
# None is ok
|
|
return
|
|
try:
|
|
DocumentType.objects.get(id=document_type_id)
|
|
except DocumentType.DoesNotExist:
|
|
raise serializers.ValidationError("Document type does not exist")
|
|
else:
|
|
raise serializers.ValidationError("document_type not specified")
|
|
|
|
def _validate_parameters_correspondent(self, parameters):
|
|
if "correspondent" in parameters:
|
|
correspondent_id = parameters["correspondent"]
|
|
if correspondent_id is None:
|
|
return
|
|
try:
|
|
Correspondent.objects.get(id=correspondent_id)
|
|
except Correspondent.DoesNotExist:
|
|
raise serializers.ValidationError("Correspondent does not exist")
|
|
else:
|
|
raise serializers.ValidationError("correspondent not specified")
|
|
|
|
def _validate_storage_path(self, parameters):
|
|
if "storage_path" in parameters:
|
|
storage_path_id = parameters["storage_path"]
|
|
if storage_path_id is None:
|
|
return
|
|
try:
|
|
StoragePath.objects.get(id=storage_path_id)
|
|
except StoragePath.DoesNotExist:
|
|
raise serializers.ValidationError(
|
|
"Storage path does not exist",
|
|
)
|
|
else:
|
|
raise serializers.ValidationError("storage path not specified")
|
|
|
|
def _validate_parameters_modify_tags(self, parameters):
|
|
if "add_tags" in parameters:
|
|
self._validate_tag_id_list(parameters["add_tags"], "add_tags")
|
|
else:
|
|
raise serializers.ValidationError("add_tags not specified")
|
|
|
|
if "remove_tags" in parameters:
|
|
self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
|
|
else:
|
|
raise serializers.ValidationError("remove_tags not specified")
|
|
|
|
def _validate_parameters_modify_custom_fields(self, parameters):
|
|
if "add_custom_fields" in parameters:
|
|
self._validate_custom_field_id_list_or_dict(
|
|
parameters["add_custom_fields"],
|
|
"add_custom_fields",
|
|
)
|
|
else:
|
|
raise serializers.ValidationError("add_custom_fields not specified")
|
|
|
|
if "remove_custom_fields" in parameters:
|
|
self._validate_custom_field_id_list_or_dict(
|
|
parameters["remove_custom_fields"],
|
|
"remove_custom_fields",
|
|
)
|
|
else:
|
|
raise serializers.ValidationError("remove_custom_fields not specified")
|
|
|
|
def _validate_owner(self, owner):
|
|
ownerUser = User.objects.get(pk=owner)
|
|
if ownerUser is None:
|
|
raise serializers.ValidationError("Specified owner cannot be found")
|
|
return ownerUser
|
|
|
|
def _validate_parameters_set_permissions(self, parameters):
|
|
parameters["set_permissions"] = self.validate_set_permissions(
|
|
parameters["set_permissions"],
|
|
)
|
|
if "owner" in parameters and parameters["owner"] is not None:
|
|
self._validate_owner(parameters["owner"])
|
|
if "merge" not in parameters:
|
|
parameters["merge"] = False
|
|
|
|
def _validate_parameters_rotate(self, parameters):
|
|
try:
|
|
if (
|
|
"degrees" not in parameters
|
|
or not float(parameters["degrees"]).is_integer()
|
|
):
|
|
raise serializers.ValidationError("invalid rotation degrees")
|
|
except ValueError:
|
|
raise serializers.ValidationError("invalid rotation degrees")
|
|
|
|
def _validate_parameters_split(self, parameters):
|
|
if "pages" not in parameters:
|
|
raise serializers.ValidationError("pages not specified")
|
|
try:
|
|
pages = []
|
|
docs = parameters["pages"].split(",")
|
|
for doc in docs:
|
|
if "-" in doc:
|
|
pages.append(
|
|
[
|
|
x
|
|
for x in range(
|
|
int(doc.split("-")[0]),
|
|
int(doc.split("-")[1]) + 1,
|
|
)
|
|
],
|
|
)
|
|
else:
|
|
pages.append([int(doc)])
|
|
parameters["pages"] = pages
|
|
except ValueError:
|
|
raise serializers.ValidationError("invalid pages specified")
|
|
|
|
if "delete_originals" in parameters:
|
|
if not isinstance(parameters["delete_originals"], bool):
|
|
raise serializers.ValidationError("delete_originals must be a boolean")
|
|
else:
|
|
parameters["delete_originals"] = False
|
|
|
|
def _validate_parameters_delete_pages(self, parameters):
|
|
if "pages" not in parameters:
|
|
raise serializers.ValidationError("pages not specified")
|
|
if not isinstance(parameters["pages"], list):
|
|
raise serializers.ValidationError("pages must be a list")
|
|
if not all(isinstance(i, int) for i in parameters["pages"]):
|
|
raise serializers.ValidationError("pages must be a list of integers")
|
|
|
|
def _validate_parameters_merge(self, parameters):
|
|
if "delete_originals" in parameters:
|
|
if not isinstance(parameters["delete_originals"], bool):
|
|
raise serializers.ValidationError("delete_originals must be a boolean")
|
|
else:
|
|
parameters["delete_originals"] = False
|
|
if "archive_fallback" in parameters:
|
|
if not isinstance(parameters["archive_fallback"], bool):
|
|
raise serializers.ValidationError("archive_fallback must be a boolean")
|
|
else:
|
|
parameters["archive_fallback"] = False
|
|
|
|
def _validate_parameters_edit_pdf(self, parameters, document_id):
|
|
if "operations" not in parameters:
|
|
raise serializers.ValidationError("operations not specified")
|
|
if not isinstance(parameters["operations"], list):
|
|
raise serializers.ValidationError("operations must be a list")
|
|
for op in parameters["operations"]:
|
|
if not isinstance(op, dict):
|
|
raise serializers.ValidationError("invalid operation entry")
|
|
if "page" not in op or not isinstance(op["page"], int):
|
|
raise serializers.ValidationError("page must be an integer")
|
|
if "rotate" in op and not isinstance(op["rotate"], int):
|
|
raise serializers.ValidationError("rotate must be an integer")
|
|
if "doc" in op and not isinstance(op["doc"], int):
|
|
raise serializers.ValidationError("doc must be an integer")
|
|
if "update_document" in parameters:
|
|
if not isinstance(parameters["update_document"], bool):
|
|
raise serializers.ValidationError("update_document must be a boolean")
|
|
else:
|
|
parameters["update_document"] = False
|
|
if "include_metadata" in parameters:
|
|
if not isinstance(parameters["include_metadata"], bool):
|
|
raise serializers.ValidationError("include_metadata must be a boolean")
|
|
else:
|
|
parameters["include_metadata"] = True
|
|
|
|
if parameters["update_document"]:
|
|
max_idx = max(op.get("doc", 0) for op in parameters["operations"])
|
|
if max_idx > 0:
|
|
raise serializers.ValidationError(
|
|
"update_document only allowed with a single output document",
|
|
)
|
|
|
|
doc = Document.objects.get(id=document_id)
|
|
# doc existence is already validated
|
|
if doc.page_count:
|
|
for op in parameters["operations"]:
|
|
if op["page"] < 1 or op["page"] > doc.page_count:
|
|
raise serializers.ValidationError(
|
|
f"Page {op['page']} is out of bounds for document with {doc.page_count} pages.",
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
method = attrs["method"]
|
|
parameters = attrs["parameters"]
|
|
|
|
if method == bulk_edit.set_correspondent:
|
|
self._validate_parameters_correspondent(parameters)
|
|
elif method == bulk_edit.set_document_type:
|
|
self._validate_parameters_document_type(parameters)
|
|
elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
|
|
self._validate_parameters_tags(parameters)
|
|
elif method == bulk_edit.modify_tags:
|
|
self._validate_parameters_modify_tags(parameters)
|
|
elif method == bulk_edit.set_storage_path:
|
|
self._validate_storage_path(parameters)
|
|
elif method == bulk_edit.modify_custom_fields:
|
|
self._validate_parameters_modify_custom_fields(parameters)
|
|
elif method == bulk_edit.set_permissions:
|
|
self._validate_parameters_set_permissions(parameters)
|
|
elif method == bulk_edit.rotate:
|
|
self._validate_parameters_rotate(parameters)
|
|
elif method == bulk_edit.split:
|
|
if len(attrs["documents"]) > 1:
|
|
raise serializers.ValidationError(
|
|
"Split method only supports one document",
|
|
)
|
|
self._validate_parameters_split(parameters)
|
|
elif method == bulk_edit.delete_pages:
|
|
if len(attrs["documents"]) > 1:
|
|
raise serializers.ValidationError(
|
|
"Delete pages method only supports one document",
|
|
)
|
|
self._validate_parameters_delete_pages(parameters)
|
|
elif method == bulk_edit.merge:
|
|
self._validate_parameters_merge(parameters)
|
|
elif method == bulk_edit.edit_pdf:
|
|
if len(attrs["documents"]) > 1:
|
|
raise serializers.ValidationError(
|
|
"Edit PDF method only supports one document",
|
|
)
|
|
self._validate_parameters_edit_pdf(parameters, attrs["documents"][0])
|
|
|
|
return attrs
|
|
|
|
|
|
class PostDocumentSerializer(serializers.Serializer):
|
|
created = serializers.DateTimeField(
|
|
label="Created",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
document = serializers.FileField(
|
|
label="Document",
|
|
write_only=True,
|
|
)
|
|
|
|
title = serializers.CharField(
|
|
label="Title",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
correspondent = serializers.PrimaryKeyRelatedField(
|
|
queryset=Correspondent.objects.all(),
|
|
label="Correspondent",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
document_type = serializers.PrimaryKeyRelatedField(
|
|
queryset=DocumentType.objects.all(),
|
|
label="Document type",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
storage_path = serializers.PrimaryKeyRelatedField(
|
|
queryset=StoragePath.objects.all(),
|
|
label="Storage path",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
tags = serializers.PrimaryKeyRelatedField(
|
|
many=True,
|
|
queryset=Tag.objects.all(),
|
|
label="Tags",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
archive_serial_number = serializers.IntegerField(
|
|
label="ASN",
|
|
write_only=True,
|
|
required=False,
|
|
min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
|
|
max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
|
|
)
|
|
|
|
# Accept either a list of custom field ids or a dict mapping id -> value
|
|
custom_fields = serializers.JSONField(
|
|
label="Custom fields",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
from_webui = serializers.BooleanField(
|
|
label="Documents are from Paperless-ngx WebUI",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
def validate_document(self, document):
|
|
document_data = document.file.read()
|
|
mime_type = magic.from_buffer(document_data, mime=True)
|
|
|
|
if not is_mime_type_supported(mime_type):
|
|
if (
|
|
mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
|
|
and document.name.endswith(
|
|
".pdf",
|
|
)
|
|
):
|
|
# If the file is an invalid PDF, we can try to recover it later in the consumer
|
|
mime_type = "application/pdf"
|
|
else:
|
|
raise serializers.ValidationError(
|
|
_("File type %(type)s not supported") % {"type": mime_type},
|
|
)
|
|
|
|
return document.name, document_data
|
|
|
|
def validate_correspondent(self, correspondent):
|
|
if correspondent:
|
|
return correspondent.id
|
|
else:
|
|
return None
|
|
|
|
def validate_document_type(self, document_type):
|
|
if document_type:
|
|
return document_type.id
|
|
else:
|
|
return None
|
|
|
|
def validate_storage_path(self, storage_path):
|
|
if storage_path:
|
|
return storage_path.id
|
|
else:
|
|
return None
|
|
|
|
def validate_tags(self, tags):
|
|
if tags:
|
|
return [tag.id for tag in tags]
|
|
else:
|
|
return None
|
|
|
|
def validate_custom_fields(self, custom_fields):
|
|
if not custom_fields:
|
|
return None
|
|
|
|
# Normalize single values to a list
|
|
if isinstance(custom_fields, int):
|
|
custom_fields = [custom_fields]
|
|
if isinstance(custom_fields, dict):
|
|
custom_field_serializer = CustomFieldInstanceSerializer()
|
|
normalized = {}
|
|
for field_id, value in custom_fields.items():
|
|
try:
|
|
field_id_int = int(field_id)
|
|
except (TypeError, ValueError):
|
|
raise serializers.ValidationError(
|
|
_("Custom field id must be an integer: %(id)s")
|
|
% {"id": field_id},
|
|
)
|
|
try:
|
|
field = CustomField.objects.get(id=field_id_int)
|
|
except CustomField.DoesNotExist:
|
|
raise serializers.ValidationError(
|
|
_("Custom field with id %(id)s does not exist")
|
|
% {"id": field_id_int},
|
|
)
|
|
custom_field_serializer.validate(
|
|
{
|
|
"field": field,
|
|
"value": value,
|
|
},
|
|
)
|
|
normalized[field_id_int] = value
|
|
return normalized
|
|
elif isinstance(custom_fields, list):
|
|
try:
|
|
ids = [int(i) for i in custom_fields]
|
|
except (TypeError, ValueError):
|
|
raise serializers.ValidationError(
|
|
_(
|
|
"Custom fields must be a list of integers or an object mapping ids to values.",
|
|
),
|
|
)
|
|
if CustomField.objects.filter(id__in=ids).count() != len(set(ids)):
|
|
raise serializers.ValidationError(
|
|
_("Some custom fields don't exist or were specified twice."),
|
|
)
|
|
return ids
|
|
raise serializers.ValidationError(
|
|
_(
|
|
"Custom fields must be a list of integers or an object mapping ids to values.",
|
|
),
|
|
)
|
|
|
|
# custom_fields_w_values handled via validate_custom_fields
|
|
|
|
def validate_created(self, created):
|
|
# support datetime format for created for backwards compatibility
|
|
if isinstance(created, datetime):
|
|
return created.date()
|
|
|
|
|
|
class BulkDownloadSerializer(DocumentListSerializer):
|
|
content = serializers.ChoiceField(
|
|
choices=["archive", "originals", "both"],
|
|
default="archive",
|
|
)
|
|
|
|
compression = serializers.ChoiceField(
|
|
choices=["none", "deflated", "bzip2", "lzma"],
|
|
default="none",
|
|
)
|
|
|
|
follow_formatting = serializers.BooleanField(
|
|
default=False,
|
|
)
|
|
|
|
def validate_compression(self, compression):
|
|
import zipfile
|
|
|
|
return {
|
|
"none": zipfile.ZIP_STORED,
|
|
"deflated": zipfile.ZIP_DEFLATED,
|
|
"bzip2": zipfile.ZIP_BZIP2,
|
|
"lzma": zipfile.ZIP_LZMA,
|
|
}[compression]
|
|
|
|
|
|
class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
class Meta:
|
|
model = StoragePath
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"path",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
def validate_path(self, path: str):
|
|
converted_path = convert_format_str_to_template_format(path)
|
|
if converted_path != path:
|
|
logger.warning(
|
|
f"Storage path {path} is not using the new style format, consider updating",
|
|
)
|
|
result = validate_filepath_template_and_render(converted_path)
|
|
|
|
if result is None:
|
|
raise serializers.ValidationError(_("Invalid variable detected."))
|
|
|
|
return converted_path
|
|
|
|
def update(self, instance, validated_data):
|
|
"""
|
|
When a storage path is updated, see if documents
|
|
using it require a rename/move
|
|
"""
|
|
doc_ids = [doc.id for doc in instance.documents.all()]
|
|
if doc_ids:
|
|
bulk_edit.bulk_update_documents.delay(doc_ids)
|
|
|
|
return super().update(instance, validated_data)
|
|
|
|
|
|
class UiSettingsViewSerializer(serializers.ModelSerializer):
|
|
settings = serializers.DictField(required=False, allow_null=True)
|
|
|
|
class Meta:
|
|
model = UiSettings
|
|
depth = 1
|
|
fields = [
|
|
"id",
|
|
"settings",
|
|
]
|
|
|
|
def validate_settings(self, settings):
|
|
# we never save update checking backend setting
|
|
if "update_checking" in settings:
|
|
try:
|
|
settings["update_checking"].pop("backend_setting")
|
|
except KeyError:
|
|
pass
|
|
return settings
|
|
|
|
def create(self, validated_data):
|
|
ui_settings = UiSettings.objects.update_or_create(
|
|
user=validated_data.get("user"),
|
|
defaults={"settings": validated_data.get("settings", None)},
|
|
)
|
|
return ui_settings
|
|
|
|
|
|
class TasksViewSerializer(OwnedObjectSerializer):
|
|
class Meta:
|
|
model = PaperlessTask
|
|
fields = (
|
|
"id",
|
|
"task_id",
|
|
"task_name",
|
|
"task_file_name",
|
|
"date_created",
|
|
"date_done",
|
|
"type",
|
|
"status",
|
|
"result",
|
|
"acknowledged",
|
|
"related_document",
|
|
"owner",
|
|
)
|
|
|
|
related_document = serializers.SerializerMethodField()
|
|
created_doc_re = re.compile(r"New document id (\d+) created")
|
|
duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")
|
|
|
|
def get_related_document(self, obj) -> str | None:
|
|
result = None
|
|
re = None
|
|
if obj.result:
|
|
match obj.status:
|
|
case states.SUCCESS:
|
|
re = self.created_doc_re
|
|
case states.FAILURE:
|
|
re = (
|
|
self.duplicate_doc_re
|
|
if "existing document is in the trash" not in obj.result
|
|
else None
|
|
)
|
|
if re is not None:
|
|
try:
|
|
result = re.search(obj.result).group(1)
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
class RunTaskViewSerializer(serializers.Serializer):
|
|
task_name = serializers.ChoiceField(
|
|
choices=PaperlessTask.TaskName.choices,
|
|
label="Task Name",
|
|
write_only=True,
|
|
)
|
|
|
|
|
|
class AcknowledgeTasksViewSerializer(serializers.Serializer):
|
|
tasks = serializers.ListField(
|
|
required=True,
|
|
label="Tasks",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
def _validate_task_id_list(self, tasks, name="tasks"):
|
|
if not isinstance(tasks, list):
|
|
raise serializers.ValidationError(f"{name} must be a list")
|
|
if not all(isinstance(i, int) for i in tasks):
|
|
raise serializers.ValidationError(f"{name} must be a list of integers")
|
|
count = PaperlessTask.objects.filter(id__in=tasks).count()
|
|
if not count == len(tasks):
|
|
raise serializers.ValidationError(
|
|
f"Some tasks in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def validate_tasks(self, tasks):
|
|
self._validate_task_id_list(tasks)
|
|
return tasks
|
|
|
|
|
|
class ShareLinkSerializer(OwnedObjectSerializer):
|
|
class Meta:
|
|
model = ShareLink
|
|
fields = (
|
|
"id",
|
|
"created",
|
|
"expiration",
|
|
"slug",
|
|
"document",
|
|
"file_version",
|
|
)
|
|
|
|
def create(self, validated_data):
|
|
validated_data["slug"] = get_random_string(50)
|
|
return super().create(validated_data)
|
|
|
|
|
|
class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
|
|
objects = serializers.ListField(
|
|
required=True,
|
|
allow_empty=False,
|
|
label="Objects",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
object_type = serializers.ChoiceField(
|
|
choices=[
|
|
"tags",
|
|
"correspondents",
|
|
"document_types",
|
|
"storage_paths",
|
|
],
|
|
label="Object Type",
|
|
write_only=True,
|
|
)
|
|
|
|
operation = serializers.ChoiceField(
|
|
choices=[
|
|
"set_permissions",
|
|
"delete",
|
|
],
|
|
label="Operation",
|
|
required=True,
|
|
write_only=True,
|
|
)
|
|
|
|
owner = serializers.PrimaryKeyRelatedField(
|
|
queryset=User.objects.all(),
|
|
required=False,
|
|
allow_null=True,
|
|
)
|
|
|
|
permissions = serializers.DictField(
|
|
label="Set permissions",
|
|
allow_empty=False,
|
|
required=False,
|
|
write_only=True,
|
|
)
|
|
|
|
merge = serializers.BooleanField(
|
|
default=False,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
def get_object_class(self, object_type):
|
|
object_class = None
|
|
if object_type == "tags":
|
|
object_class = Tag
|
|
elif object_type == "correspondents":
|
|
object_class = Correspondent
|
|
elif object_type == "document_types":
|
|
object_class = DocumentType
|
|
elif object_type == "storage_paths":
|
|
object_class = StoragePath
|
|
return object_class
|
|
|
|
def _validate_objects(self, objects, object_type):
|
|
if not isinstance(objects, list):
|
|
raise serializers.ValidationError("objects must be a list")
|
|
if not all(isinstance(i, int) for i in objects):
|
|
raise serializers.ValidationError("objects must be a list of integers")
|
|
object_class = self.get_object_class(object_type)
|
|
count = object_class.objects.filter(id__in=objects).count()
|
|
if not count == len(objects):
|
|
raise serializers.ValidationError(
|
|
"Some ids in objects don't exist or were specified twice.",
|
|
)
|
|
return objects
|
|
|
|
def _validate_permissions(self, permissions):
|
|
self.validate_set_permissions(
|
|
permissions,
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
object_type = attrs["object_type"]
|
|
objects = attrs["objects"]
|
|
operation = attrs.get("operation")
|
|
|
|
self._validate_objects(objects, object_type)
|
|
|
|
if operation == "set_permissions":
|
|
permissions = attrs.get("permissions")
|
|
if permissions is not None:
|
|
self._validate_permissions(permissions)
|
|
|
|
return attrs
|
|
|
|
|
|
class WorkflowTriggerSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(required=False, allow_null=True)
|
|
sources = fields.MultipleChoiceField(
|
|
choices=WorkflowTrigger.DocumentSourceChoices.choices,
|
|
allow_empty=True,
|
|
default={
|
|
DocumentSource.ConsumeFolder,
|
|
DocumentSource.ApiUpload,
|
|
DocumentSource.MailFetch,
|
|
},
|
|
)
|
|
|
|
type = serializers.ChoiceField(
|
|
choices=WorkflowTrigger.WorkflowTriggerType.choices,
|
|
label="Trigger Type",
|
|
)
|
|
|
|
class Meta:
|
|
model = WorkflowTrigger
|
|
fields = [
|
|
"id",
|
|
"sources",
|
|
"type",
|
|
"filter_path",
|
|
"filter_filename",
|
|
"filter_mailrule",
|
|
"matching_algorithm",
|
|
"match",
|
|
"is_insensitive",
|
|
"filter_has_tags",
|
|
"filter_has_correspondent",
|
|
"filter_has_document_type",
|
|
"filter_has_storage_path",
|
|
"schedule_offset_days",
|
|
"schedule_is_recurring",
|
|
"schedule_recurring_interval_days",
|
|
"schedule_date_field",
|
|
"schedule_date_custom_field",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
# Empty strings treated as None to avoid unexpected behavior
|
|
if (
|
|
"filter_filename" in attrs
|
|
and attrs["filter_filename"] is not None
|
|
and len(attrs["filter_filename"]) == 0
|
|
):
|
|
attrs["filter_filename"] = None
|
|
if (
|
|
"filter_path" in attrs
|
|
and attrs["filter_path"] is not None
|
|
and len(attrs["filter_path"]) == 0
|
|
):
|
|
attrs["filter_path"] = None
|
|
|
|
trigger_type = attrs.get("type", getattr(self.instance, "type", None))
|
|
if (
|
|
trigger_type == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
|
|
and "filter_mailrule" not in attrs
|
|
and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
|
|
and ("filter_path" not in attrs or attrs["filter_path"] is None)
|
|
):
|
|
raise serializers.ValidationError(
|
|
"File name, path or mail rule filter are required",
|
|
)
|
|
|
|
return attrs
|
|
|
|
@staticmethod
|
|
def normalize_workflow_trigger_sources(trigger):
|
|
"""
|
|
Convert sources to strings to handle django-multiselectfield v1.0 changes
|
|
"""
|
|
if trigger and "sources" in trigger:
|
|
trigger["sources"] = [
|
|
str(s.value if hasattr(s, "value") else s) for s in trigger["sources"]
|
|
]
|
|
|
|
def create(self, validated_data):
|
|
WorkflowTriggerSerializer.normalize_workflow_trigger_sources(validated_data)
|
|
return super().create(validated_data)
|
|
|
|
def update(self, instance, validated_data):
|
|
WorkflowTriggerSerializer.normalize_workflow_trigger_sources(validated_data)
|
|
return super().update(instance, validated_data)
|
|
|
|
|
|
class WorkflowActionEmailSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(allow_null=True, required=False)
|
|
|
|
class Meta:
|
|
model = WorkflowActionEmail
|
|
fields = [
|
|
"id",
|
|
"subject",
|
|
"body",
|
|
"to",
|
|
"include_document",
|
|
]
|
|
|
|
|
|
class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(allow_null=True, required=False)
|
|
|
|
def validate_url(self, url):
|
|
url_validator(url)
|
|
return url
|
|
|
|
class Meta:
|
|
model = WorkflowActionWebhook
|
|
fields = [
|
|
"id",
|
|
"url",
|
|
"use_params",
|
|
"as_json",
|
|
"params",
|
|
"body",
|
|
"headers",
|
|
"include_document",
|
|
]
|
|
|
|
|
|
class WorkflowActionSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(required=False, allow_null=True)
|
|
assign_correspondent = CorrespondentField(allow_null=True, required=False)
|
|
assign_tags = TagsField(many=True, allow_null=True, required=False)
|
|
assign_document_type = DocumentTypeField(allow_null=True, required=False)
|
|
assign_storage_path = StoragePathField(allow_null=True, required=False)
|
|
email = WorkflowActionEmailSerializer(allow_null=True, required=False)
|
|
webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)
|
|
|
|
class Meta:
|
|
model = WorkflowAction
|
|
fields = [
|
|
"id",
|
|
"type",
|
|
"assign_title",
|
|
"assign_tags",
|
|
"assign_correspondent",
|
|
"assign_document_type",
|
|
"assign_storage_path",
|
|
"assign_owner",
|
|
"assign_view_users",
|
|
"assign_view_groups",
|
|
"assign_change_users",
|
|
"assign_change_groups",
|
|
"assign_custom_fields",
|
|
"assign_custom_fields_values",
|
|
"remove_all_tags",
|
|
"remove_tags",
|
|
"remove_all_correspondents",
|
|
"remove_correspondents",
|
|
"remove_all_document_types",
|
|
"remove_document_types",
|
|
"remove_all_storage_paths",
|
|
"remove_storage_paths",
|
|
"remove_custom_fields",
|
|
"remove_all_custom_fields",
|
|
"remove_all_owners",
|
|
"remove_owners",
|
|
"remove_all_permissions",
|
|
"remove_view_users",
|
|
"remove_view_groups",
|
|
"remove_change_users",
|
|
"remove_change_groups",
|
|
"email",
|
|
"webhook",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
if "assign_title" in attrs and attrs["assign_title"] is not None:
|
|
if len(attrs["assign_title"]) == 0:
|
|
# Empty strings treated as None to avoid unexpected behavior
|
|
attrs["assign_title"] = None
|
|
else:
|
|
try:
|
|
# test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
|
|
attrs["assign_title"].format(
|
|
correspondent="",
|
|
document_type="",
|
|
added="",
|
|
added_year="",
|
|
added_year_short="",
|
|
added_month="",
|
|
added_month_name="",
|
|
added_month_name_short="",
|
|
added_day="",
|
|
added_time="",
|
|
owner_username="",
|
|
original_filename="",
|
|
filename="",
|
|
created="",
|
|
created_year="",
|
|
created_year_short="",
|
|
created_month="",
|
|
created_month_name="",
|
|
created_month_name_short="",
|
|
created_day="",
|
|
created_time="",
|
|
)
|
|
except (ValueError, KeyError) as e:
|
|
raise serializers.ValidationError(
|
|
{"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
|
|
)
|
|
|
|
if (
|
|
"type" in attrs
|
|
and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
|
|
and "email" not in attrs
|
|
):
|
|
raise serializers.ValidationError(
|
|
"Email data is required for email actions",
|
|
)
|
|
|
|
if (
|
|
"type" in attrs
|
|
and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
|
|
and "webhook" not in attrs
|
|
):
|
|
raise serializers.ValidationError(
|
|
"Webhook data is required for webhook actions",
|
|
)
|
|
|
|
return attrs
|
|
|
|
|
|
class WorkflowSerializer(serializers.ModelSerializer):
|
|
order = serializers.IntegerField(required=False)
|
|
|
|
triggers = WorkflowTriggerSerializer(many=True)
|
|
actions = WorkflowActionSerializer(many=True)
|
|
|
|
class Meta:
|
|
model = Workflow
|
|
fields = [
|
|
"id",
|
|
"name",
|
|
"order",
|
|
"enabled",
|
|
"triggers",
|
|
"actions",
|
|
]
|
|
|
|
def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
|
|
set_triggers = []
|
|
set_actions = []
|
|
|
|
if triggers is not None and triggers is not serializers.empty:
|
|
for trigger in triggers:
|
|
filter_has_tags = trigger.pop("filter_has_tags", None)
|
|
# Convert sources to strings to handle django-multiselectfield v1.0 changes
|
|
WorkflowTriggerSerializer.normalize_workflow_trigger_sources(trigger)
|
|
trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
|
|
id=trigger.get("id"),
|
|
defaults=trigger,
|
|
)
|
|
if filter_has_tags is not None:
|
|
trigger_instance.filter_has_tags.set(filter_has_tags)
|
|
set_triggers.append(trigger_instance)
|
|
|
|
if actions is not None and actions is not serializers.empty:
|
|
for action in actions:
|
|
assign_tags = action.pop("assign_tags", None)
|
|
assign_view_users = action.pop("assign_view_users", None)
|
|
assign_view_groups = action.pop("assign_view_groups", None)
|
|
assign_change_users = action.pop("assign_change_users", None)
|
|
assign_change_groups = action.pop("assign_change_groups", None)
|
|
assign_custom_fields = action.pop("assign_custom_fields", None)
|
|
remove_tags = action.pop("remove_tags", None)
|
|
remove_correspondents = action.pop("remove_correspondents", None)
|
|
remove_document_types = action.pop("remove_document_types", None)
|
|
remove_storage_paths = action.pop("remove_storage_paths", None)
|
|
remove_custom_fields = action.pop("remove_custom_fields", None)
|
|
remove_owners = action.pop("remove_owners", None)
|
|
remove_view_users = action.pop("remove_view_users", None)
|
|
remove_view_groups = action.pop("remove_view_groups", None)
|
|
remove_change_users = action.pop("remove_change_users", None)
|
|
remove_change_groups = action.pop("remove_change_groups", None)
|
|
|
|
email_data = action.pop("email", None)
|
|
webhook_data = action.pop("webhook", None)
|
|
|
|
action_instance, _ = WorkflowAction.objects.update_or_create(
|
|
id=action.get("id"),
|
|
defaults=action,
|
|
)
|
|
|
|
if email_data is not None:
|
|
serializer = WorkflowActionEmailSerializer(data=email_data)
|
|
serializer.is_valid(raise_exception=True)
|
|
email, _ = WorkflowActionEmail.objects.update_or_create(
|
|
id=email_data.get("id"),
|
|
defaults=serializer.validated_data,
|
|
)
|
|
action_instance.email = email
|
|
action_instance.save()
|
|
|
|
if webhook_data is not None:
|
|
serializer = WorkflowActionWebhookSerializer(data=webhook_data)
|
|
serializer.is_valid(raise_exception=True)
|
|
webhook, _ = WorkflowActionWebhook.objects.update_or_create(
|
|
id=webhook_data.get("id"),
|
|
defaults=serializer.validated_data,
|
|
)
|
|
action_instance.webhook = webhook
|
|
action_instance.save()
|
|
|
|
if assign_tags is not None:
|
|
action_instance.assign_tags.set(assign_tags)
|
|
if assign_view_users is not None:
|
|
action_instance.assign_view_users.set(assign_view_users)
|
|
if assign_view_groups is not None:
|
|
action_instance.assign_view_groups.set(assign_view_groups)
|
|
if assign_change_users is not None:
|
|
action_instance.assign_change_users.set(assign_change_users)
|
|
if assign_change_groups is not None:
|
|
action_instance.assign_change_groups.set(assign_change_groups)
|
|
if assign_custom_fields is not None:
|
|
action_instance.assign_custom_fields.set(assign_custom_fields)
|
|
if remove_tags is not None:
|
|
action_instance.remove_tags.set(remove_tags)
|
|
if remove_correspondents is not None:
|
|
action_instance.remove_correspondents.set(remove_correspondents)
|
|
if remove_document_types is not None:
|
|
action_instance.remove_document_types.set(remove_document_types)
|
|
if remove_storage_paths is not None:
|
|
action_instance.remove_storage_paths.set(remove_storage_paths)
|
|
if remove_custom_fields is not None:
|
|
action_instance.remove_custom_fields.set(remove_custom_fields)
|
|
if remove_owners is not None:
|
|
action_instance.remove_owners.set(remove_owners)
|
|
if remove_view_users is not None:
|
|
action_instance.remove_view_users.set(remove_view_users)
|
|
if remove_view_groups is not None:
|
|
action_instance.remove_view_groups.set(remove_view_groups)
|
|
if remove_change_users is not None:
|
|
action_instance.remove_change_users.set(remove_change_users)
|
|
if remove_change_groups is not None:
|
|
action_instance.remove_change_groups.set(remove_change_groups)
|
|
|
|
set_actions.append(action_instance)
|
|
|
|
if triggers is not serializers.empty:
|
|
instance.triggers.set(set_triggers)
|
|
if actions is not serializers.empty:
|
|
instance.actions.set(set_actions)
|
|
instance.save()
|
|
|
|
def prune_triggers_and_actions(self):
|
|
"""
|
|
ManyToMany fields dont support e.g. on_delete so we need to discard unattached
|
|
triggers and actions manually
|
|
"""
|
|
for trigger in WorkflowTrigger.objects.all():
|
|
if trigger.workflows.all().count() == 0:
|
|
trigger.delete()
|
|
|
|
for action in WorkflowAction.objects.all():
|
|
if action.workflows.all().count() == 0:
|
|
action.delete()
|
|
|
|
WorkflowActionEmail.objects.filter(action=None).delete()
|
|
WorkflowActionWebhook.objects.filter(action=None).delete()
|
|
|
|
def create(self, validated_data) -> Workflow:
|
|
if "triggers" in validated_data:
|
|
triggers = validated_data.pop("triggers")
|
|
|
|
if "actions" in validated_data:
|
|
actions = validated_data.pop("actions")
|
|
|
|
instance = super().create(validated_data)
|
|
|
|
self.update_triggers_and_actions(instance, triggers, actions)
|
|
|
|
return instance
|
|
|
|
def update(self, instance: Workflow, validated_data) -> Workflow:
|
|
triggers = validated_data.pop("triggers", serializers.empty)
|
|
actions = validated_data.pop("actions", serializers.empty)
|
|
|
|
instance = super().update(instance, validated_data)
|
|
|
|
self.update_triggers_and_actions(instance, triggers, actions)
|
|
self.prune_triggers_and_actions()
|
|
|
|
return instance
|
|
|
|
|
|
class TrashSerializer(SerializerWithPerms):
|
|
documents = serializers.ListField(
|
|
required=False,
|
|
label="Documents",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
action = serializers.ChoiceField(
|
|
choices=["restore", "empty"],
|
|
label="Action",
|
|
write_only=True,
|
|
)
|
|
|
|
def validate_documents(self, documents):
|
|
count = Document.deleted_objects.filter(id__in=documents).count()
|
|
if not count == len(documents):
|
|
raise serializers.ValidationError(
|
|
"Some documents in the list have not yet been deleted.",
|
|
)
|
|
return documents
|
|
|
|
|
|
class StoragePathTestSerializer(SerializerWithPerms):
|
|
path = serializers.CharField(
|
|
required=True,
|
|
label="Path",
|
|
write_only=True,
|
|
)
|
|
|
|
document = serializers.PrimaryKeyRelatedField(
|
|
queryset=Document.objects.all(),
|
|
required=True,
|
|
label="Document",
|
|
write_only=True,
|
|
)
|