mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
2304 lines
78 KiB
Python
2304 lines
78 KiB
Python
from __future__ import annotations
|
|
|
|
import datetime
|
|
import logging
|
|
import math
|
|
import re
|
|
import zoneinfo
|
|
from decimal import Decimal
|
|
from typing import TYPE_CHECKING
|
|
|
|
import magic
|
|
from celery import states
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import Group
|
|
from django.contrib.auth.models import User
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.validators import DecimalValidator
|
|
from django.core.validators import MaxLengthValidator
|
|
from django.core.validators import RegexValidator
|
|
from django.core.validators import integer_validator
|
|
from django.utils.crypto import get_random_string
|
|
from django.utils.text import slugify
|
|
from django.utils.translation import gettext as _
|
|
from drf_spectacular.utils import extend_schema_field
|
|
from drf_writable_nested.serializers import NestedUpdateMixin
|
|
from guardian.core import ObjectPermissionChecker
|
|
from guardian.shortcuts import get_users_with_perms
|
|
from guardian.utils import get_group_obj_perms_model
|
|
from guardian.utils import get_user_obj_perms_model
|
|
from rest_framework import fields
|
|
from rest_framework import serializers
|
|
from rest_framework.fields import SerializerMethodField
|
|
|
|
if settings.AUDIT_LOG_ENABLED:
|
|
from auditlog.context import set_actor
|
|
|
|
|
|
from documents import bulk_edit
|
|
from documents.data_models import DocumentSource
|
|
from documents.models import Correspondent
|
|
from documents.models import CustomField
|
|
from documents.models import CustomFieldInstance
|
|
from documents.models import Document
|
|
from documents.models import DocumentType
|
|
from documents.models import MatchingModel
|
|
from documents.models import PaperlessTask
|
|
from documents.models import SavedView
|
|
from documents.models import SavedViewFilterRule
|
|
from documents.models import ShareLink
|
|
from documents.models import StoragePath
|
|
from documents.models import Tag
|
|
from documents.models import UiSettings
|
|
from documents.models import Workflow
|
|
from documents.models import WorkflowAction
|
|
from documents.models import WorkflowActionEmail
|
|
from documents.models import WorkflowActionWebhook
|
|
from documents.models import WorkflowTrigger
|
|
from documents.parsers import is_mime_type_supported
|
|
from documents.permissions import get_groups_with_only_permission
|
|
from documents.permissions import set_permissions_for_object
|
|
from documents.templating.filepath import validate_filepath_template_and_render
|
|
from documents.templating.utils import convert_format_str_to_template_format
|
|
from documents.validators import uri_validator
|
|
from documents.validators import url_validator
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Iterable
|
|
|
|
logger = logging.getLogger("paperless.serializers")
|
|
|
|
|
|
# https://www.django-rest-framework.org/api-guide/serializers/#example
|
|
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
|
|
"""
|
|
A ModelSerializer that takes an additional `fields` argument that
|
|
controls which fields should be displayed.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
# Don't pass the 'fields' arg up to the superclass
|
|
fields = kwargs.pop("fields", None)
|
|
|
|
# Instantiate the superclass normally
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if fields is not None:
|
|
# Drop any fields that are not specified in the `fields` argument.
|
|
allowed = set(fields)
|
|
existing = set(self.fields)
|
|
for field_name in existing - allowed:
|
|
self.fields.pop(field_name)
|
|
|
|
|
|
class MatchingModelSerializer(serializers.ModelSerializer):
|
|
document_count = serializers.IntegerField(read_only=True)
|
|
|
|
def get_slug(self, obj) -> str:
|
|
return slugify(obj.name)
|
|
|
|
slug = SerializerMethodField()
|
|
|
|
def validate(self, data):
|
|
# TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
|
|
name = data.get(
|
|
"name",
|
|
self.instance.name if hasattr(self.instance, "name") else None,
|
|
)
|
|
owner = (
|
|
data["owner"]
|
|
if "owner" in data
|
|
else self.user
|
|
if hasattr(self, "user")
|
|
else None
|
|
)
|
|
pk = self.instance.pk if hasattr(self.instance, "pk") else None
|
|
if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
|
|
name=name,
|
|
owner=owner,
|
|
).exclude(pk=pk).exists():
|
|
raise serializers.ValidationError(
|
|
{"error": "Object violates owner / name unique constraint"},
|
|
)
|
|
return data
|
|
|
|
def validate_match(self, match):
|
|
if (
|
|
"matching_algorithm" in self.initial_data
|
|
and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
|
|
):
|
|
try:
|
|
re.compile(match)
|
|
except re.error as e:
|
|
raise serializers.ValidationError(
|
|
_("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
|
|
)
|
|
return match
|
|
|
|
|
|
class SetPermissionsMixin:
|
|
def _validate_user_ids(self, user_ids):
|
|
users = User.objects.none()
|
|
if user_ids is not None:
|
|
users = User.objects.filter(id__in=user_ids)
|
|
if not users.count() == len(user_ids):
|
|
raise serializers.ValidationError(
|
|
"Some users in don't exist or were specified twice.",
|
|
)
|
|
return users
|
|
|
|
def _validate_group_ids(self, group_ids):
|
|
groups = Group.objects.none()
|
|
if group_ids is not None:
|
|
groups = Group.objects.filter(id__in=group_ids)
|
|
if not groups.count() == len(group_ids):
|
|
raise serializers.ValidationError(
|
|
"Some groups in don't exist or were specified twice.",
|
|
)
|
|
return groups
|
|
|
|
def validate_set_permissions(self, set_permissions=None):
|
|
permissions_dict = {
|
|
"view": {
|
|
"users": User.objects.none(),
|
|
"groups": Group.objects.none(),
|
|
},
|
|
"change": {
|
|
"users": User.objects.none(),
|
|
"groups": Group.objects.none(),
|
|
},
|
|
}
|
|
if set_permissions is not None:
|
|
for action, _ in permissions_dict.items():
|
|
if action in set_permissions:
|
|
users = set_permissions[action]["users"]
|
|
permissions_dict[action]["users"] = self._validate_user_ids(users)
|
|
groups = set_permissions[action]["groups"]
|
|
permissions_dict[action]["groups"] = self._validate_group_ids(
|
|
groups,
|
|
)
|
|
return permissions_dict
|
|
|
|
def _set_permissions(self, permissions, object):
|
|
set_permissions_for_object(permissions, object)
|
|
|
|
|
|
class SerializerWithPerms(serializers.Serializer):
|
|
def __init__(self, *args, **kwargs):
|
|
self.user = kwargs.pop("user", None)
|
|
self.full_perms = kwargs.pop("full_perms", False)
|
|
self.all_fields = kwargs.pop("all_fields", False)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
@extend_schema_field(
|
|
field={
|
|
"type": "object",
|
|
"properties": {
|
|
"view": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
"change": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
class SetPermissionsSerializer(serializers.DictField):
|
|
pass
|
|
|
|
|
|
class OwnedObjectSerializer(
|
|
SerializerWithPerms,
|
|
serializers.ModelSerializer,
|
|
SetPermissionsMixin,
|
|
):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
if not self.all_fields:
|
|
try:
|
|
if self.full_perms:
|
|
self.fields.pop("user_can_change")
|
|
self.fields.pop("is_shared_by_requester")
|
|
else:
|
|
self.fields.pop("permissions")
|
|
except KeyError:
|
|
pass
|
|
|
|
@extend_schema_field(
|
|
field={
|
|
"type": "object",
|
|
"properties": {
|
|
"view": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
"change": {
|
|
"type": "object",
|
|
"properties": {
|
|
"users": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
"groups": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
def get_permissions(self, obj) -> dict:
|
|
view_codename = f"view_{obj.__class__.__name__.lower()}"
|
|
change_codename = f"change_{obj.__class__.__name__.lower()}"
|
|
|
|
return {
|
|
"view": {
|
|
"users": get_users_with_perms(
|
|
obj,
|
|
only_with_perms_in=[view_codename],
|
|
with_group_users=False,
|
|
).values_list("id", flat=True),
|
|
"groups": get_groups_with_only_permission(
|
|
obj,
|
|
codename=view_codename,
|
|
).values_list("id", flat=True),
|
|
},
|
|
"change": {
|
|
"users": get_users_with_perms(
|
|
obj,
|
|
only_with_perms_in=[change_codename],
|
|
with_group_users=False,
|
|
).values_list("id", flat=True),
|
|
"groups": get_groups_with_only_permission(
|
|
obj,
|
|
codename=change_codename,
|
|
).values_list("id", flat=True),
|
|
},
|
|
}
|
|
|
|
def get_user_can_change(self, obj) -> bool:
|
|
checker = ObjectPermissionChecker(self.user) if self.user is not None else None
|
|
return (
|
|
obj.owner is None
|
|
or obj.owner == self.user
|
|
or (
|
|
self.user is not None
|
|
and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
|
|
)
|
|
)
|
|
|
|
@staticmethod
|
|
def get_shared_object_pks(objects: Iterable):
|
|
"""
|
|
Return the primary keys of the subset of objects that are shared.
|
|
"""
|
|
try:
|
|
first_obj = next(iter(objects))
|
|
except StopIteration:
|
|
return set()
|
|
|
|
ctype = ContentType.objects.get_for_model(first_obj)
|
|
object_pks = list(obj.pk for obj in objects)
|
|
pk_type = type(first_obj.pk)
|
|
|
|
def get_pks_for_permission_type(model):
|
|
return map(
|
|
pk_type, # coerce the pk to be the same type of the provided objects
|
|
model.objects.filter(
|
|
content_type=ctype,
|
|
object_pk__in=object_pks,
|
|
)
|
|
.values_list("object_pk", flat=True)
|
|
.distinct(),
|
|
)
|
|
|
|
UserObjectPermission = get_user_obj_perms_model()
|
|
GroupObjectPermission = get_group_obj_perms_model()
|
|
user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
|
|
group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)
|
|
|
|
return set(user_permission_pks) | set(group_permission_pks)
|
|
|
|
def get_is_shared_by_requester(self, obj: Document) -> bool:
|
|
# First check the context to see if `shared_object_pks` is set by the parent.
|
|
shared_object_pks = self.context.get("shared_object_pks")
|
|
# If not just check if the current object is shared.
|
|
if shared_object_pks is None:
|
|
shared_object_pks = self.get_shared_object_pks([obj])
|
|
return obj.owner == self.user and obj.id in shared_object_pks
|
|
|
|
permissions = SerializerMethodField(read_only=True)
|
|
user_can_change = SerializerMethodField(read_only=True)
|
|
is_shared_by_requester = SerializerMethodField(read_only=True)
|
|
|
|
set_permissions = SetPermissionsSerializer(
|
|
label="Set permissions",
|
|
allow_empty=True,
|
|
required=False,
|
|
write_only=True,
|
|
)
|
|
# other methods in mixin
|
|
|
|
def validate_unique_together(self, validated_data, instance=None):
|
|
# workaround for https://github.com/encode/django-rest-framework/issues/9358
|
|
if "owner" in validated_data and "name" in self.Meta.fields:
|
|
name = validated_data.get("name", instance.name if instance else None)
|
|
objects = (
|
|
self.Meta.model.objects.exclude(pk=instance.pk)
|
|
if instance
|
|
else self.Meta.model.objects.all()
|
|
)
|
|
not_unique = objects.filter(
|
|
owner=validated_data["owner"],
|
|
name=name,
|
|
).exists()
|
|
if not_unique:
|
|
raise serializers.ValidationError(
|
|
{"error": "Object violates owner / name unique constraint"},
|
|
)
|
|
|
|
def create(self, validated_data):
|
|
# default to current user if not set
|
|
request = self.context.get("request")
|
|
if (
|
|
"owner" not in validated_data
|
|
or (request is not None and "owner" not in request.data)
|
|
) and self.user:
|
|
validated_data["owner"] = self.user
|
|
permissions = None
|
|
if "set_permissions" in validated_data:
|
|
permissions = validated_data.pop("set_permissions")
|
|
self.validate_unique_together(validated_data)
|
|
instance = super().create(validated_data)
|
|
if permissions is not None:
|
|
self._set_permissions(permissions, instance)
|
|
return instance
|
|
|
|
def update(self, instance, validated_data):
|
|
if "set_permissions" in validated_data:
|
|
self._set_permissions(validated_data["set_permissions"], instance)
|
|
self.validate_unique_together(validated_data, instance)
|
|
return super().update(instance, validated_data)
|
|
|
|
|
|
class OwnedObjectListSerializer(serializers.ListSerializer):
|
|
def to_representation(self, documents):
|
|
self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
|
|
documents,
|
|
)
|
|
return super().to_representation(documents)
|
|
|
|
|
|
class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
last_correspondence = serializers.DateTimeField(read_only=True, required=False)
|
|
|
|
class Meta:
|
|
model = Correspondent
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"document_count",
|
|
"last_correspondence",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
|
|
class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
class Meta:
|
|
model = DocumentType
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
|
|
class DeprecatedColors:
|
|
COLOURS = (
|
|
(1, "#a6cee3"),
|
|
(2, "#1f78b4"),
|
|
(3, "#b2df8a"),
|
|
(4, "#33a02c"),
|
|
(5, "#fb9a99"),
|
|
(6, "#e31a1c"),
|
|
(7, "#fdbf6f"),
|
|
(8, "#ff7f00"),
|
|
(9, "#cab2d6"),
|
|
(10, "#6a3d9a"),
|
|
(11, "#b15928"),
|
|
(12, "#000000"),
|
|
(13, "#cccccc"),
|
|
)
|
|
|
|
|
|
@extend_schema_field(
|
|
serializers.ChoiceField(
|
|
choices=DeprecatedColors.COLOURS,
|
|
),
|
|
)
|
|
class ColorField(serializers.Field):
|
|
def to_internal_value(self, data):
|
|
for id, color in DeprecatedColors.COLOURS:
|
|
if id == data:
|
|
return color
|
|
raise serializers.ValidationError
|
|
|
|
def to_representation(self, value):
|
|
for id, color in DeprecatedColors.COLOURS:
|
|
if color == value:
|
|
return id
|
|
return 1
|
|
|
|
|
|
class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
|
|
colour = ColorField(source="color", default="#a6cee3")
|
|
|
|
class Meta:
|
|
model = Tag
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"colour",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"is_inbox_tag",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
|
|
class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
def get_text_color(self, obj) -> str:
|
|
try:
|
|
h = obj.color.lstrip("#")
|
|
rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
|
|
luminance = math.sqrt(
|
|
0.299 * math.pow(rgb[0], 2)
|
|
+ 0.587 * math.pow(rgb[1], 2)
|
|
+ 0.114 * math.pow(rgb[2], 2),
|
|
)
|
|
return "#ffffff" if luminance < 0.53 else "#000000"
|
|
except ValueError:
|
|
return "#000000"
|
|
|
|
text_color = serializers.SerializerMethodField()
|
|
|
|
class Meta:
|
|
model = Tag
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"color",
|
|
"text_color",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"is_inbox_tag",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
def validate_color(self, color):
|
|
regex = r"#[0-9a-fA-F]{6}"
|
|
if not re.match(regex, color):
|
|
raise serializers.ValidationError(_("Invalid color."))
|
|
return color
|
|
|
|
|
|
class CorrespondentField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return Correspondent.objects.all()
|
|
|
|
|
|
class TagsField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return Tag.objects.all()
|
|
|
|
|
|
class DocumentTypeField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return DocumentType.objects.all()
|
|
|
|
|
|
class StoragePathField(serializers.PrimaryKeyRelatedField):
|
|
def get_queryset(self):
|
|
return StoragePath.objects.all()
|
|
|
|
|
|
class CustomFieldSerializer(serializers.ModelSerializer):
|
|
def __init__(self, *args, **kwargs):
|
|
context = kwargs.get("context")
|
|
self.api_version = int(
|
|
context.get("request").version
|
|
if context and context.get("request")
|
|
else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
|
|
)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
data_type = serializers.ChoiceField(
|
|
choices=CustomField.FieldDataType,
|
|
read_only=False,
|
|
)
|
|
|
|
document_count = serializers.IntegerField(read_only=True)
|
|
|
|
class Meta:
|
|
model = CustomField
|
|
fields = [
|
|
"id",
|
|
"name",
|
|
"data_type",
|
|
"extra_data",
|
|
"document_count",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
# TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
|
|
name = attrs.get(
|
|
"name",
|
|
self.instance.name if hasattr(self.instance, "name") else None,
|
|
)
|
|
objects = (
|
|
self.Meta.model.objects.exclude(
|
|
pk=self.instance.pk,
|
|
)
|
|
if self.instance is not None
|
|
else self.Meta.model.objects.all()
|
|
)
|
|
if ("name" in attrs) and objects.filter(
|
|
name=name,
|
|
).exists():
|
|
raise serializers.ValidationError(
|
|
{"error": "Object violates name unique constraint"},
|
|
)
|
|
if (
|
|
"data_type" in attrs
|
|
and attrs["data_type"] == CustomField.FieldDataType.SELECT
|
|
) or (
|
|
self.instance
|
|
and self.instance.data_type == CustomField.FieldDataType.SELECT
|
|
):
|
|
if (
|
|
"extra_data" not in attrs
|
|
or "select_options" not in attrs["extra_data"]
|
|
or not isinstance(attrs["extra_data"]["select_options"], list)
|
|
or len(attrs["extra_data"]["select_options"]) == 0
|
|
or not all(
|
|
len(option.get("label", "")) > 0
|
|
for option in attrs["extra_data"]["select_options"]
|
|
)
|
|
):
|
|
raise serializers.ValidationError(
|
|
{"error": "extra_data.select_options must be a valid list"},
|
|
)
|
|
# labels are valid, generate ids if not present
|
|
for option in attrs["extra_data"]["select_options"]:
|
|
if option.get("id") is None:
|
|
option["id"] = get_random_string(length=16)
|
|
elif (
|
|
"data_type" in attrs
|
|
and attrs["data_type"] == CustomField.FieldDataType.MONETARY
|
|
and "extra_data" in attrs
|
|
and "default_currency" in attrs["extra_data"]
|
|
and attrs["extra_data"]["default_currency"] is not None
|
|
and (
|
|
not isinstance(attrs["extra_data"]["default_currency"], str)
|
|
or (
|
|
len(attrs["extra_data"]["default_currency"]) > 0
|
|
and len(attrs["extra_data"]["default_currency"]) != 3
|
|
)
|
|
)
|
|
):
|
|
raise serializers.ValidationError(
|
|
{"error": "extra_data.default_currency must be a 3-character string"},
|
|
)
|
|
return super().validate(attrs)
|
|
|
|
def to_internal_value(self, data):
|
|
ret = super().to_internal_value(data)
|
|
|
|
if (
|
|
self.api_version < 7
|
|
and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
|
|
and isinstance(ret.get("extra_data", {}).get("select_options"), list)
|
|
):
|
|
ret["extra_data"]["select_options"] = [
|
|
{
|
|
"label": option,
|
|
"id": get_random_string(length=16),
|
|
}
|
|
for option in ret["extra_data"]["select_options"]
|
|
]
|
|
|
|
return ret
|
|
|
|
def to_representation(self, instance):
|
|
ret = super().to_representation(instance)
|
|
|
|
if (
|
|
self.api_version < 7
|
|
and instance.data_type == CustomField.FieldDataType.SELECT
|
|
):
|
|
# Convert the select options with ids to a list of strings
|
|
ret["extra_data"]["select_options"] = [
|
|
option["label"] for option in ret["extra_data"]["select_options"]
|
|
]
|
|
|
|
return ret
|
|
|
|
|
|
class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
|
|
"""
|
|
Based on https://stackoverflow.com/a/62579804
|
|
"""
|
|
|
|
def __init__(self, method_name=None, *args, **kwargs):
|
|
self.method_name = method_name
|
|
kwargs["source"] = "*"
|
|
super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)
|
|
|
|
def to_internal_value(self, data):
|
|
return {self.field_name: data}
|
|
|
|
|
|
class CustomFieldInstanceSerializer(serializers.ModelSerializer):
|
|
field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
|
|
value = ReadWriteSerializerMethodField(allow_null=True)
|
|
|
|
def create(self, validated_data):
|
|
# An instance is attached to a document
|
|
document: Document = validated_data["document"]
|
|
# And to a CustomField
|
|
custom_field: CustomField = validated_data["field"]
|
|
# This key must exist, as it is validated
|
|
data_store_name = CustomFieldInstance.get_value_field_name(
|
|
custom_field.data_type,
|
|
)
|
|
|
|
if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
|
|
# prior to update so we can look for any docs that are going to be removed
|
|
bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])
|
|
|
|
# Actually update or create the instance, providing the value
|
|
# to fill in the correct attribute based on the type
|
|
instance, _ = CustomFieldInstance.objects.update_or_create(
|
|
document=document,
|
|
field=custom_field,
|
|
defaults={data_store_name: validated_data["value"]},
|
|
)
|
|
return instance
|
|
|
|
def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
|
|
return obj.value
|
|
|
|
def validate(self, data):
|
|
"""
|
|
Probably because we're kind of doing it odd, validation from the model
|
|
doesn't run against the field "value", so we have to re-create it here.
|
|
|
|
Don't like it, but it is better than returning an HTTP 500 when the database
|
|
hates the value
|
|
"""
|
|
data = super().validate(data)
|
|
field: CustomField = data["field"]
|
|
if "value" in data and data["value"] is not None:
|
|
if (
|
|
field.data_type == CustomField.FieldDataType.URL
|
|
and len(data["value"]) > 0
|
|
):
|
|
uri_validator(data["value"])
|
|
elif field.data_type == CustomField.FieldDataType.INT:
|
|
integer_validator(data["value"])
|
|
elif (
|
|
field.data_type == CustomField.FieldDataType.MONETARY
|
|
and data["value"] != ""
|
|
):
|
|
try:
|
|
# First try to validate as a number from legacy format
|
|
DecimalValidator(max_digits=12, decimal_places=2)(
|
|
Decimal(str(data["value"])),
|
|
)
|
|
except Exception:
|
|
# If that fails, try to validate as a monetary string
|
|
RegexValidator(
|
|
regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
|
|
message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
|
|
)(data["value"])
|
|
elif field.data_type == CustomField.FieldDataType.STRING:
|
|
MaxLengthValidator(limit_value=128)(data["value"])
|
|
elif field.data_type == CustomField.FieldDataType.SELECT:
|
|
select_options = field.extra_data["select_options"]
|
|
try:
|
|
next(
|
|
option
|
|
for option in select_options
|
|
if option["id"] == data["value"]
|
|
)
|
|
except Exception:
|
|
raise serializers.ValidationError(
|
|
f"Value must be an id of an element in {select_options}",
|
|
)
|
|
elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
|
|
if not (isinstance(data["value"], list) or data["value"] is None):
|
|
raise serializers.ValidationError(
|
|
"Value must be a list",
|
|
)
|
|
doc_ids = data["value"]
|
|
if Document.objects.filter(id__in=doc_ids).count() != len(
|
|
data["value"],
|
|
):
|
|
raise serializers.ValidationError(
|
|
"Some documents in value don't exist or were specified twice.",
|
|
)
|
|
|
|
return data
|
|
|
|
def get_api_version(self):
|
|
return int(
|
|
self.context.get("request").version
|
|
if self.context.get("request")
|
|
else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
|
|
)
|
|
|
|
def to_internal_value(self, data):
|
|
ret = super().to_internal_value(data)
|
|
|
|
if (
|
|
self.get_api_version() < 7
|
|
and ret.get("field").data_type == CustomField.FieldDataType.SELECT
|
|
and ret.get("value") is not None
|
|
):
|
|
# Convert the index of the option in the field.extra_data["select_options"]
|
|
# list to the options unique id
|
|
ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
|
|
"id"
|
|
]
|
|
|
|
return ret
|
|
|
|
def to_representation(self, instance):
|
|
ret = super().to_representation(instance)
|
|
|
|
if (
|
|
self.get_api_version() < 7
|
|
and instance.field.data_type == CustomField.FieldDataType.SELECT
|
|
):
|
|
# return the index of the option in the field.extra_data["select_options"] list
|
|
ret["value"] = next(
|
|
(
|
|
idx
|
|
for idx, option in enumerate(
|
|
instance.field.extra_data["select_options"],
|
|
)
|
|
if option["id"] == instance.value
|
|
),
|
|
None,
|
|
)
|
|
|
|
return ret
|
|
|
|
class Meta:
|
|
model = CustomFieldInstance
|
|
fields = [
|
|
"value",
|
|
"field",
|
|
]
|
|
|
|
|
|
class DocumentSerializer(
|
|
OwnedObjectSerializer,
|
|
NestedUpdateMixin,
|
|
DynamicFieldsModelSerializer,
|
|
):
|
|
correspondent = CorrespondentField(allow_null=True)
|
|
tags = TagsField(many=True)
|
|
document_type = DocumentTypeField(allow_null=True)
|
|
storage_path = StoragePathField(allow_null=True)
|
|
|
|
original_file_name = SerializerMethodField()
|
|
archived_file_name = SerializerMethodField()
|
|
created_date = serializers.DateField(required=False)
|
|
page_count = SerializerMethodField()
|
|
|
|
custom_fields = CustomFieldInstanceSerializer(
|
|
many=True,
|
|
allow_null=False,
|
|
required=False,
|
|
)
|
|
|
|
owner = serializers.PrimaryKeyRelatedField(
|
|
queryset=User.objects.all(),
|
|
required=False,
|
|
allow_null=True,
|
|
)
|
|
|
|
remove_inbox_tags = serializers.BooleanField(
|
|
default=False,
|
|
write_only=True,
|
|
allow_null=True,
|
|
required=False,
|
|
)
|
|
|
|
def get_page_count(self, obj) -> int | None:
|
|
return obj.page_count
|
|
|
|
def get_original_file_name(self, obj) -> str | None:
|
|
return obj.original_filename
|
|
|
|
def get_archived_file_name(self, obj) -> str | None:
|
|
if obj.has_archive_version:
|
|
return obj.get_public_filename(archive=True)
|
|
else:
|
|
return None
|
|
|
|
def to_representation(self, instance):
|
|
doc = super().to_representation(instance)
|
|
if self.truncate_content and "content" in self.fields:
|
|
doc["content"] = doc.get("content")[0:550]
|
|
return doc
|
|
|
|
def validate(self, attrs):
|
|
if (
|
|
"archive_serial_number" in attrs
|
|
and attrs["archive_serial_number"] is not None
|
|
and len(str(attrs["archive_serial_number"])) > 0
|
|
and Document.deleted_objects.filter(
|
|
archive_serial_number=attrs["archive_serial_number"],
|
|
).exists()
|
|
):
|
|
raise serializers.ValidationError(
|
|
{
|
|
"archive_serial_number": [
|
|
"Document with this Archive Serial Number already exists in the trash.",
|
|
],
|
|
},
|
|
)
|
|
return super().validate(attrs)
|
|
|
|
def update(self, instance: Document, validated_data):
|
|
if "created_date" in validated_data and "created" not in validated_data:
|
|
new_datetime = datetime.datetime.combine(
|
|
validated_data.get("created_date"),
|
|
datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)),
|
|
)
|
|
instance.created = new_datetime
|
|
instance.save()
|
|
if "created_date" in validated_data:
|
|
validated_data.pop("created_date")
|
|
if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
|
|
incoming_custom_fields = [
|
|
field["field"] for field in validated_data["custom_fields"]
|
|
]
|
|
for custom_field_instance in instance.custom_fields.filter(
|
|
field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
|
|
):
|
|
if (
|
|
custom_field_instance.field not in incoming_custom_fields
|
|
and custom_field_instance.value is not None
|
|
):
|
|
# Doc link field is being removed entirely
|
|
for doc_id in custom_field_instance.value:
|
|
bulk_edit.remove_doclink(
|
|
instance,
|
|
custom_field_instance.field,
|
|
doc_id,
|
|
)
|
|
if validated_data.get("remove_inbox_tags"):
|
|
tag_ids_being_added = (
|
|
[
|
|
tag.id
|
|
for tag in validated_data["tags"]
|
|
if tag not in instance.tags.all()
|
|
]
|
|
if "tags" in validated_data
|
|
else []
|
|
)
|
|
inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
|
|
id__in=tag_ids_being_added,
|
|
)
|
|
if "tags" in validated_data:
|
|
validated_data["tags"] = [
|
|
tag
|
|
for tag in validated_data["tags"]
|
|
if tag not in inbox_tags_not_being_added
|
|
]
|
|
else:
|
|
validated_data["tags"] = [
|
|
tag
|
|
for tag in instance.tags.all()
|
|
if tag not in inbox_tags_not_being_added
|
|
]
|
|
if settings.AUDIT_LOG_ENABLED:
|
|
with set_actor(self.user):
|
|
super().update(instance, validated_data)
|
|
else:
|
|
super().update(instance, validated_data)
|
|
# hard delete custom field instances that were soft deleted
|
|
CustomFieldInstance.deleted_objects.filter(document=instance).delete()
|
|
return instance
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.truncate_content = kwargs.pop("truncate_content", False)
|
|
|
|
# return full permissions if we're doing a PATCH or PUT
|
|
context = kwargs.get("context")
|
|
if context is not None and (
|
|
context.get("request").method == "PATCH"
|
|
or context.get("request").method == "PUT"
|
|
):
|
|
kwargs["full_perms"] = True
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
class Meta:
|
|
model = Document
|
|
fields = (
|
|
"id",
|
|
"correspondent",
|
|
"document_type",
|
|
"storage_path",
|
|
"title",
|
|
"content",
|
|
"tags",
|
|
"created",
|
|
"created_date",
|
|
"modified",
|
|
"added",
|
|
"deleted_at",
|
|
"archive_serial_number",
|
|
"original_file_name",
|
|
"archived_file_name",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"is_shared_by_requester",
|
|
"set_permissions",
|
|
"notes",
|
|
"custom_fields",
|
|
"remove_inbox_tags",
|
|
"page_count",
|
|
"mime_type",
|
|
)
|
|
list_serializer_class = OwnedObjectListSerializer
|
|
|
|
|
|
class SearchResultListSerializer(serializers.ListSerializer):
|
|
def to_representation(self, hits):
|
|
document_ids = [hit["id"] for hit in hits]
|
|
# Fetch all Document objects in the list in one SQL query.
|
|
documents = self.child.fetch_documents(document_ids)
|
|
self.child.context["documents"] = documents
|
|
# Also check if they are shared with other users / groups.
|
|
self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
|
|
documents.values(),
|
|
)
|
|
|
|
return super().to_representation(hits)
|
|
|
|
|
|
class SearchResultSerializer(DocumentSerializer):
|
|
@staticmethod
|
|
def fetch_documents(ids):
|
|
"""
|
|
Return a dict that maps given document IDs to Document objects.
|
|
"""
|
|
return {
|
|
document.id: document
|
|
for document in Document.objects.select_related(
|
|
"correspondent",
|
|
"storage_path",
|
|
"document_type",
|
|
"owner",
|
|
)
|
|
.prefetch_related("tags", "custom_fields", "notes")
|
|
.filter(id__in=ids)
|
|
}
|
|
|
|
def to_representation(self, hit):
|
|
# Again we first check if the parent has already fetched the documents.
|
|
documents = self.context.get("documents")
|
|
# Otherwise we fetch this document.
|
|
if documents is None: # pragma: no cover
|
|
# In practice we only serialize **lists** of whoosh.searching.Hit.
|
|
# I'm keeping this check for completeness but marking it no cover for now.
|
|
documents = self.fetch_documents([hit["id"]])
|
|
document = documents[hit["id"]]
|
|
|
|
notes = ",".join(
|
|
[str(c.note) for c in document.notes.all()],
|
|
)
|
|
r = super().to_representation(document)
|
|
r["__search_hit__"] = {
|
|
"score": hit.score,
|
|
"highlights": hit.highlights("content", text=document.content),
|
|
"note_highlights": (
|
|
hit.highlights("notes", text=notes) if document else None
|
|
),
|
|
"rank": hit.rank,
|
|
}
|
|
|
|
return r
|
|
|
|
class Meta(DocumentSerializer.Meta):
|
|
list_serializer_class = SearchResultListSerializer
|
|
|
|
|
|
class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
|
|
class Meta:
|
|
model = SavedViewFilterRule
|
|
fields = ["rule_type", "value"]
|
|
|
|
|
|
class SavedViewSerializer(OwnedObjectSerializer):
|
|
filter_rules = SavedViewFilterRuleSerializer(many=True)
|
|
|
|
class Meta:
|
|
model = SavedView
|
|
fields = [
|
|
"id",
|
|
"name",
|
|
"show_on_dashboard",
|
|
"show_in_sidebar",
|
|
"sort_field",
|
|
"sort_reverse",
|
|
"filter_rules",
|
|
"page_size",
|
|
"display_mode",
|
|
"display_fields",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
attrs = super().validate(attrs)
|
|
if "display_fields" in attrs and attrs["display_fields"] is not None:
|
|
for field in attrs["display_fields"]:
|
|
if (
|
|
SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
|
|
): # i.e. check for 'custom_field_' prefix
|
|
field_id = int(re.search(r"\d+", field)[0])
|
|
if not CustomField.objects.filter(id=field_id).exists():
|
|
raise serializers.ValidationError(
|
|
f"Invalid field: {field}",
|
|
)
|
|
elif field not in SavedView.DisplayFields.values:
|
|
raise serializers.ValidationError(
|
|
f"Invalid field: {field}",
|
|
)
|
|
return attrs
|
|
|
|
def update(self, instance, validated_data):
|
|
if "filter_rules" in validated_data:
|
|
rules_data = validated_data.pop("filter_rules")
|
|
else:
|
|
rules_data = None
|
|
if "user" in validated_data:
|
|
# backwards compatibility
|
|
validated_data["owner"] = validated_data.pop("user")
|
|
if (
|
|
"display_fields" in validated_data
|
|
and isinstance(
|
|
validated_data["display_fields"],
|
|
list,
|
|
)
|
|
and len(validated_data["display_fields"]) == 0
|
|
):
|
|
validated_data["display_fields"] = None
|
|
super().update(instance, validated_data)
|
|
if rules_data is not None:
|
|
SavedViewFilterRule.objects.filter(saved_view=instance).delete()
|
|
for rule_data in rules_data:
|
|
SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
|
|
return instance
|
|
|
|
def create(self, validated_data):
|
|
rules_data = validated_data.pop("filter_rules")
|
|
if "user" in validated_data:
|
|
# backwards compatibility
|
|
validated_data["owner"] = validated_data.pop("user")
|
|
saved_view = SavedView.objects.create(**validated_data)
|
|
for rule_data in rules_data:
|
|
SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
|
|
return saved_view
|
|
|
|
|
|
class DocumentListSerializer(serializers.Serializer):
|
|
documents = serializers.ListField(
|
|
required=True,
|
|
label="Documents",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
def _validate_document_id_list(self, documents, name="documents"):
|
|
if not isinstance(documents, list):
|
|
raise serializers.ValidationError(f"{name} must be a list")
|
|
if not all(isinstance(i, int) for i in documents):
|
|
raise serializers.ValidationError(f"{name} must be a list of integers")
|
|
count = Document.objects.filter(id__in=documents).count()
|
|
if not count == len(documents):
|
|
raise serializers.ValidationError(
|
|
f"Some documents in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def validate_documents(self, documents):
|
|
self._validate_document_id_list(documents)
|
|
return documents
|
|
|
|
|
|
class BulkEditSerializer(
|
|
SerializerWithPerms,
|
|
DocumentListSerializer,
|
|
SetPermissionsMixin,
|
|
):
|
|
method = serializers.ChoiceField(
|
|
choices=[
|
|
"set_correspondent",
|
|
"set_document_type",
|
|
"set_storage_path",
|
|
"add_tag",
|
|
"remove_tag",
|
|
"modify_tags",
|
|
"modify_custom_fields",
|
|
"delete",
|
|
"reprocess",
|
|
"set_permissions",
|
|
"rotate",
|
|
"merge",
|
|
"split",
|
|
"delete_pages",
|
|
],
|
|
label="Method",
|
|
write_only=True,
|
|
)
|
|
|
|
parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)
|
|
|
|
def _validate_tag_id_list(self, tags, name="tags"):
|
|
if not isinstance(tags, list):
|
|
raise serializers.ValidationError(f"{name} must be a list")
|
|
if not all(isinstance(i, int) for i in tags):
|
|
raise serializers.ValidationError(f"{name} must be a list of integers")
|
|
count = Tag.objects.filter(id__in=tags).count()
|
|
if not count == len(tags):
|
|
raise serializers.ValidationError(
|
|
f"Some tags in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def _validate_custom_field_id_list_or_dict(
|
|
self,
|
|
custom_fields,
|
|
name="custom_fields",
|
|
):
|
|
ids = custom_fields
|
|
if isinstance(custom_fields, dict):
|
|
try:
|
|
ids = [int(i[0]) for i in custom_fields.items()]
|
|
except Exception as e:
|
|
logger.exception(f"Error validating custom fields: {e}")
|
|
raise serializers.ValidationError(
|
|
f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
|
|
)
|
|
elif not isinstance(custom_fields, list) or not all(
|
|
isinstance(i, int) for i in ids
|
|
):
|
|
raise serializers.ValidationError(
|
|
f"{name} must be a list of integers or a dict of id:value pairs",
|
|
)
|
|
count = CustomField.objects.filter(id__in=ids).count()
|
|
if not count == len(ids):
|
|
raise serializers.ValidationError(
|
|
f"Some custom fields in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def validate_method(self, method):
|
|
if method == "set_correspondent":
|
|
return bulk_edit.set_correspondent
|
|
elif method == "set_document_type":
|
|
return bulk_edit.set_document_type
|
|
elif method == "set_storage_path":
|
|
return bulk_edit.set_storage_path
|
|
elif method == "add_tag":
|
|
return bulk_edit.add_tag
|
|
elif method == "remove_tag":
|
|
return bulk_edit.remove_tag
|
|
elif method == "modify_tags":
|
|
return bulk_edit.modify_tags
|
|
elif method == "modify_custom_fields":
|
|
return bulk_edit.modify_custom_fields
|
|
elif method == "delete":
|
|
return bulk_edit.delete
|
|
elif method == "redo_ocr" or method == "reprocess":
|
|
return bulk_edit.reprocess
|
|
elif method == "set_permissions":
|
|
return bulk_edit.set_permissions
|
|
elif method == "rotate":
|
|
return bulk_edit.rotate
|
|
elif method == "merge":
|
|
return bulk_edit.merge
|
|
elif method == "split":
|
|
return bulk_edit.split
|
|
elif method == "delete_pages":
|
|
return bulk_edit.delete_pages
|
|
else:
|
|
raise serializers.ValidationError("Unsupported method.")
|
|
|
|
def _validate_parameters_tags(self, parameters):
|
|
if "tag" in parameters:
|
|
tag_id = parameters["tag"]
|
|
try:
|
|
Tag.objects.get(id=tag_id)
|
|
except Tag.DoesNotExist:
|
|
raise serializers.ValidationError("Tag does not exist")
|
|
else:
|
|
raise serializers.ValidationError("tag not specified")
|
|
|
|
def _validate_parameters_document_type(self, parameters):
|
|
if "document_type" in parameters:
|
|
document_type_id = parameters["document_type"]
|
|
if document_type_id is None:
|
|
# None is ok
|
|
return
|
|
try:
|
|
DocumentType.objects.get(id=document_type_id)
|
|
except DocumentType.DoesNotExist:
|
|
raise serializers.ValidationError("Document type does not exist")
|
|
else:
|
|
raise serializers.ValidationError("document_type not specified")
|
|
|
|
def _validate_parameters_correspondent(self, parameters):
|
|
if "correspondent" in parameters:
|
|
correspondent_id = parameters["correspondent"]
|
|
if correspondent_id is None:
|
|
return
|
|
try:
|
|
Correspondent.objects.get(id=correspondent_id)
|
|
except Correspondent.DoesNotExist:
|
|
raise serializers.ValidationError("Correspondent does not exist")
|
|
else:
|
|
raise serializers.ValidationError("correspondent not specified")
|
|
|
|
def _validate_storage_path(self, parameters):
|
|
if "storage_path" in parameters:
|
|
storage_path_id = parameters["storage_path"]
|
|
if storage_path_id is None:
|
|
return
|
|
try:
|
|
StoragePath.objects.get(id=storage_path_id)
|
|
except StoragePath.DoesNotExist:
|
|
raise serializers.ValidationError(
|
|
"Storage path does not exist",
|
|
)
|
|
else:
|
|
raise serializers.ValidationError("storage path not specified")
|
|
|
|
def _validate_parameters_modify_tags(self, parameters):
|
|
if "add_tags" in parameters:
|
|
self._validate_tag_id_list(parameters["add_tags"], "add_tags")
|
|
else:
|
|
raise serializers.ValidationError("add_tags not specified")
|
|
|
|
if "remove_tags" in parameters:
|
|
self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
|
|
else:
|
|
raise serializers.ValidationError("remove_tags not specified")
|
|
|
|
def _validate_parameters_modify_custom_fields(self, parameters):
|
|
if "add_custom_fields" in parameters:
|
|
self._validate_custom_field_id_list_or_dict(
|
|
parameters["add_custom_fields"],
|
|
"add_custom_fields",
|
|
)
|
|
else:
|
|
raise serializers.ValidationError("add_custom_fields not specified")
|
|
|
|
if "remove_custom_fields" in parameters:
|
|
self._validate_custom_field_id_list_or_dict(
|
|
parameters["remove_custom_fields"],
|
|
"remove_custom_fields",
|
|
)
|
|
else:
|
|
raise serializers.ValidationError("remove_custom_fields not specified")
|
|
|
|
def _validate_owner(self, owner):
|
|
ownerUser = User.objects.get(pk=owner)
|
|
if ownerUser is None:
|
|
raise serializers.ValidationError("Specified owner cannot be found")
|
|
return ownerUser
|
|
|
|
def _validate_parameters_set_permissions(self, parameters):
|
|
parameters["set_permissions"] = self.validate_set_permissions(
|
|
parameters["set_permissions"],
|
|
)
|
|
if "owner" in parameters and parameters["owner"] is not None:
|
|
self._validate_owner(parameters["owner"])
|
|
if "merge" not in parameters:
|
|
parameters["merge"] = False
|
|
|
|
def _validate_parameters_rotate(self, parameters):
|
|
try:
|
|
if (
|
|
"degrees" not in parameters
|
|
or not float(parameters["degrees"]).is_integer()
|
|
):
|
|
raise serializers.ValidationError("invalid rotation degrees")
|
|
except ValueError:
|
|
raise serializers.ValidationError("invalid rotation degrees")
|
|
|
|
def _validate_parameters_split(self, parameters):
|
|
if "pages" not in parameters:
|
|
raise serializers.ValidationError("pages not specified")
|
|
try:
|
|
pages = []
|
|
docs = parameters["pages"].split(",")
|
|
for doc in docs:
|
|
if "-" in doc:
|
|
pages.append(
|
|
[
|
|
x
|
|
for x in range(
|
|
int(doc.split("-")[0]),
|
|
int(doc.split("-")[1]) + 1,
|
|
)
|
|
],
|
|
)
|
|
else:
|
|
pages.append([int(doc)])
|
|
parameters["pages"] = pages
|
|
except ValueError:
|
|
raise serializers.ValidationError("invalid pages specified")
|
|
|
|
if "delete_originals" in parameters:
|
|
if not isinstance(parameters["delete_originals"], bool):
|
|
raise serializers.ValidationError("delete_originals must be a boolean")
|
|
else:
|
|
parameters["delete_originals"] = False
|
|
|
|
def _validate_parameters_delete_pages(self, parameters):
|
|
if "pages" not in parameters:
|
|
raise serializers.ValidationError("pages not specified")
|
|
if not isinstance(parameters["pages"], list):
|
|
raise serializers.ValidationError("pages must be a list")
|
|
if not all(isinstance(i, int) for i in parameters["pages"]):
|
|
raise serializers.ValidationError("pages must be a list of integers")
|
|
|
|
def _validate_parameters_merge(self, parameters):
|
|
if "delete_originals" in parameters:
|
|
if not isinstance(parameters["delete_originals"], bool):
|
|
raise serializers.ValidationError("delete_originals must be a boolean")
|
|
else:
|
|
parameters["delete_originals"] = False
|
|
if "archive_fallback" in parameters:
|
|
if not isinstance(parameters["archive_fallback"], bool):
|
|
raise serializers.ValidationError("archive_fallback must be a boolean")
|
|
else:
|
|
parameters["archive_fallback"] = False
|
|
|
|
def validate(self, attrs):
|
|
method = attrs["method"]
|
|
parameters = attrs["parameters"]
|
|
|
|
if method == bulk_edit.set_correspondent:
|
|
self._validate_parameters_correspondent(parameters)
|
|
elif method == bulk_edit.set_document_type:
|
|
self._validate_parameters_document_type(parameters)
|
|
elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
|
|
self._validate_parameters_tags(parameters)
|
|
elif method == bulk_edit.modify_tags:
|
|
self._validate_parameters_modify_tags(parameters)
|
|
elif method == bulk_edit.set_storage_path:
|
|
self._validate_storage_path(parameters)
|
|
elif method == bulk_edit.modify_custom_fields:
|
|
self._validate_parameters_modify_custom_fields(parameters)
|
|
elif method == bulk_edit.set_permissions:
|
|
self._validate_parameters_set_permissions(parameters)
|
|
elif method == bulk_edit.rotate:
|
|
self._validate_parameters_rotate(parameters)
|
|
elif method == bulk_edit.split:
|
|
if len(attrs["documents"]) > 1:
|
|
raise serializers.ValidationError(
|
|
"Split method only supports one document",
|
|
)
|
|
self._validate_parameters_split(parameters)
|
|
elif method == bulk_edit.delete_pages:
|
|
if len(attrs["documents"]) > 1:
|
|
raise serializers.ValidationError(
|
|
"Delete pages method only supports one document",
|
|
)
|
|
self._validate_parameters_delete_pages(parameters)
|
|
elif method == bulk_edit.merge:
|
|
self._validate_parameters_merge(parameters)
|
|
|
|
return attrs
|
|
|
|
|
|
class PostDocumentSerializer(serializers.Serializer):
|
|
created = serializers.DateTimeField(
|
|
label="Created",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
document = serializers.FileField(
|
|
label="Document",
|
|
write_only=True,
|
|
)
|
|
|
|
title = serializers.CharField(
|
|
label="Title",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
correspondent = serializers.PrimaryKeyRelatedField(
|
|
queryset=Correspondent.objects.all(),
|
|
label="Correspondent",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
document_type = serializers.PrimaryKeyRelatedField(
|
|
queryset=DocumentType.objects.all(),
|
|
label="Document type",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
storage_path = serializers.PrimaryKeyRelatedField(
|
|
queryset=StoragePath.objects.all(),
|
|
label="Storage path",
|
|
allow_null=True,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
tags = serializers.PrimaryKeyRelatedField(
|
|
many=True,
|
|
queryset=Tag.objects.all(),
|
|
label="Tags",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
archive_serial_number = serializers.IntegerField(
|
|
label="ASN",
|
|
write_only=True,
|
|
required=False,
|
|
min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
|
|
max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
|
|
)
|
|
|
|
custom_fields = serializers.PrimaryKeyRelatedField(
|
|
many=True,
|
|
queryset=CustomField.objects.all(),
|
|
label="Custom fields",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
from_webui = serializers.BooleanField(
|
|
label="Documents are from Paperless-ngx WebUI",
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
def validate_document(self, document):
|
|
document_data = document.file.read()
|
|
mime_type = magic.from_buffer(document_data, mime=True)
|
|
|
|
if not is_mime_type_supported(mime_type):
|
|
if (
|
|
mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
|
|
and document.name.endswith(
|
|
".pdf",
|
|
)
|
|
):
|
|
# If the file is an invalid PDF, we can try to recover it later in the consumer
|
|
mime_type = "application/pdf"
|
|
else:
|
|
raise serializers.ValidationError(
|
|
_("File type %(type)s not supported") % {"type": mime_type},
|
|
)
|
|
|
|
return document.name, document_data
|
|
|
|
def validate_correspondent(self, correspondent):
|
|
if correspondent:
|
|
return correspondent.id
|
|
else:
|
|
return None
|
|
|
|
def validate_document_type(self, document_type):
|
|
if document_type:
|
|
return document_type.id
|
|
else:
|
|
return None
|
|
|
|
def validate_storage_path(self, storage_path):
|
|
if storage_path:
|
|
return storage_path.id
|
|
else:
|
|
return None
|
|
|
|
def validate_tags(self, tags):
|
|
if tags:
|
|
return [tag.id for tag in tags]
|
|
else:
|
|
return None
|
|
|
|
def validate_custom_fields(self, custom_fields):
|
|
if custom_fields:
|
|
return [custom_field.id for custom_field in custom_fields]
|
|
else:
|
|
return None
|
|
|
|
|
|
class BulkDownloadSerializer(DocumentListSerializer):
|
|
content = serializers.ChoiceField(
|
|
choices=["archive", "originals", "both"],
|
|
default="archive",
|
|
)
|
|
|
|
compression = serializers.ChoiceField(
|
|
choices=["none", "deflated", "bzip2", "lzma"],
|
|
default="none",
|
|
)
|
|
|
|
follow_formatting = serializers.BooleanField(
|
|
default=False,
|
|
)
|
|
|
|
def validate_compression(self, compression):
|
|
import zipfile
|
|
|
|
return {
|
|
"none": zipfile.ZIP_STORED,
|
|
"deflated": zipfile.ZIP_DEFLATED,
|
|
"bzip2": zipfile.ZIP_BZIP2,
|
|
"lzma": zipfile.ZIP_LZMA,
|
|
}[compression]
|
|
|
|
|
|
class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
|
class Meta:
|
|
model = StoragePath
|
|
fields = (
|
|
"id",
|
|
"slug",
|
|
"name",
|
|
"path",
|
|
"match",
|
|
"matching_algorithm",
|
|
"is_insensitive",
|
|
"document_count",
|
|
"owner",
|
|
"permissions",
|
|
"user_can_change",
|
|
"set_permissions",
|
|
)
|
|
|
|
def validate_path(self, path: str):
|
|
converted_path = convert_format_str_to_template_format(path)
|
|
if converted_path != path:
|
|
logger.warning(
|
|
f"Storage path {path} is not using the new style format, consider updating",
|
|
)
|
|
result = validate_filepath_template_and_render(converted_path)
|
|
|
|
if result is None:
|
|
raise serializers.ValidationError(_("Invalid variable detected."))
|
|
|
|
return converted_path
|
|
|
|
def update(self, instance, validated_data):
|
|
"""
|
|
When a storage path is updated, see if documents
|
|
using it require a rename/move
|
|
"""
|
|
doc_ids = [doc.id for doc in instance.documents.all()]
|
|
if len(doc_ids):
|
|
bulk_edit.bulk_update_documents.delay(doc_ids)
|
|
|
|
return super().update(instance, validated_data)
|
|
|
|
|
|
class UiSettingsViewSerializer(serializers.ModelSerializer):
|
|
class Meta:
|
|
model = UiSettings
|
|
depth = 1
|
|
fields = [
|
|
"id",
|
|
"settings",
|
|
]
|
|
|
|
def validate_settings(self, settings):
|
|
# we never save update checking backend setting
|
|
if "update_checking" in settings:
|
|
try:
|
|
settings["update_checking"].pop("backend_setting")
|
|
except KeyError:
|
|
pass
|
|
return settings
|
|
|
|
def create(self, validated_data):
|
|
ui_settings = UiSettings.objects.update_or_create(
|
|
user=validated_data.get("user"),
|
|
defaults={"settings": validated_data.get("settings", None)},
|
|
)
|
|
return ui_settings
|
|
|
|
|
|
class TasksViewSerializer(OwnedObjectSerializer):
|
|
class Meta:
|
|
model = PaperlessTask
|
|
fields = (
|
|
"id",
|
|
"task_id",
|
|
"task_name",
|
|
"task_file_name",
|
|
"date_created",
|
|
"date_done",
|
|
"type",
|
|
"status",
|
|
"result",
|
|
"acknowledged",
|
|
"related_document",
|
|
"owner",
|
|
)
|
|
|
|
related_document = serializers.SerializerMethodField()
|
|
created_doc_re = re.compile(r"New document id (\d+) created")
|
|
duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")
|
|
|
|
def get_related_document(self, obj) -> str | None:
|
|
result = None
|
|
re = None
|
|
if obj.result:
|
|
match obj.status:
|
|
case states.SUCCESS:
|
|
re = self.created_doc_re
|
|
case states.FAILURE:
|
|
re = (
|
|
self.duplicate_doc_re
|
|
if "existing document is in the trash" not in obj.result
|
|
else None
|
|
)
|
|
if re is not None:
|
|
try:
|
|
result = re.search(obj.result).group(1)
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
class RunTaskViewSerializer(serializers.Serializer):
|
|
task_name = serializers.ChoiceField(
|
|
choices=PaperlessTask.TaskName.choices,
|
|
label="Task Name",
|
|
write_only=True,
|
|
)
|
|
|
|
|
|
class AcknowledgeTasksViewSerializer(serializers.Serializer):
|
|
tasks = serializers.ListField(
|
|
required=True,
|
|
label="Tasks",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
def _validate_task_id_list(self, tasks, name="tasks"):
|
|
if not isinstance(tasks, list):
|
|
raise serializers.ValidationError(f"{name} must be a list")
|
|
if not all(isinstance(i, int) for i in tasks):
|
|
raise serializers.ValidationError(f"{name} must be a list of integers")
|
|
count = PaperlessTask.objects.filter(id__in=tasks).count()
|
|
if not count == len(tasks):
|
|
raise serializers.ValidationError(
|
|
f"Some tasks in {name} don't exist or were specified twice.",
|
|
)
|
|
|
|
def validate_tasks(self, tasks):
|
|
self._validate_task_id_list(tasks)
|
|
return tasks
|
|
|
|
|
|
class ShareLinkSerializer(OwnedObjectSerializer):
|
|
class Meta:
|
|
model = ShareLink
|
|
fields = (
|
|
"id",
|
|
"created",
|
|
"expiration",
|
|
"slug",
|
|
"document",
|
|
"file_version",
|
|
)
|
|
|
|
def create(self, validated_data):
|
|
validated_data["slug"] = get_random_string(50)
|
|
return super().create(validated_data)
|
|
|
|
|
|
class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
|
|
objects = serializers.ListField(
|
|
required=True,
|
|
allow_empty=False,
|
|
label="Objects",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
object_type = serializers.ChoiceField(
|
|
choices=[
|
|
"tags",
|
|
"correspondents",
|
|
"document_types",
|
|
"storage_paths",
|
|
],
|
|
label="Object Type",
|
|
write_only=True,
|
|
)
|
|
|
|
operation = serializers.ChoiceField(
|
|
choices=[
|
|
"set_permissions",
|
|
"delete",
|
|
],
|
|
label="Operation",
|
|
required=True,
|
|
write_only=True,
|
|
)
|
|
|
|
owner = serializers.PrimaryKeyRelatedField(
|
|
queryset=User.objects.all(),
|
|
required=False,
|
|
allow_null=True,
|
|
)
|
|
|
|
permissions = serializers.DictField(
|
|
label="Set permissions",
|
|
allow_empty=False,
|
|
required=False,
|
|
write_only=True,
|
|
)
|
|
|
|
merge = serializers.BooleanField(
|
|
default=False,
|
|
write_only=True,
|
|
required=False,
|
|
)
|
|
|
|
def get_object_class(self, object_type):
|
|
object_class = None
|
|
if object_type == "tags":
|
|
object_class = Tag
|
|
elif object_type == "correspondents":
|
|
object_class = Correspondent
|
|
elif object_type == "document_types":
|
|
object_class = DocumentType
|
|
elif object_type == "storage_paths":
|
|
object_class = StoragePath
|
|
return object_class
|
|
|
|
def _validate_objects(self, objects, object_type):
|
|
if not isinstance(objects, list):
|
|
raise serializers.ValidationError("objects must be a list")
|
|
if not all(isinstance(i, int) for i in objects):
|
|
raise serializers.ValidationError("objects must be a list of integers")
|
|
object_class = self.get_object_class(object_type)
|
|
count = object_class.objects.filter(id__in=objects).count()
|
|
if not count == len(objects):
|
|
raise serializers.ValidationError(
|
|
"Some ids in objects don't exist or were specified twice.",
|
|
)
|
|
return objects
|
|
|
|
def _validate_permissions(self, permissions):
|
|
self.validate_set_permissions(
|
|
permissions,
|
|
)
|
|
|
|
def validate(self, attrs):
|
|
object_type = attrs["object_type"]
|
|
objects = attrs["objects"]
|
|
operation = attrs.get("operation")
|
|
|
|
self._validate_objects(objects, object_type)
|
|
|
|
if operation == "set_permissions":
|
|
permissions = attrs.get("permissions")
|
|
if permissions is not None:
|
|
self._validate_permissions(permissions)
|
|
|
|
return attrs
|
|
|
|
|
|
class WorkflowTriggerSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(required=False, allow_null=True)
|
|
sources = fields.MultipleChoiceField(
|
|
choices=WorkflowTrigger.DocumentSourceChoices.choices,
|
|
allow_empty=True,
|
|
default={
|
|
DocumentSource.ConsumeFolder,
|
|
DocumentSource.ApiUpload,
|
|
DocumentSource.MailFetch,
|
|
},
|
|
)
|
|
|
|
type = serializers.ChoiceField(
|
|
choices=WorkflowTrigger.WorkflowTriggerType.choices,
|
|
label="Trigger Type",
|
|
)
|
|
|
|
class Meta:
|
|
model = WorkflowTrigger
|
|
fields = [
|
|
"id",
|
|
"sources",
|
|
"type",
|
|
"filter_path",
|
|
"filter_filename",
|
|
"filter_mailrule",
|
|
"matching_algorithm",
|
|
"match",
|
|
"is_insensitive",
|
|
"filter_has_tags",
|
|
"filter_has_correspondent",
|
|
"filter_has_document_type",
|
|
"schedule_offset_days",
|
|
"schedule_is_recurring",
|
|
"schedule_recurring_interval_days",
|
|
"schedule_date_field",
|
|
"schedule_date_custom_field",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
# Empty strings treated as None to avoid unexpected behavior
|
|
if (
|
|
"filter_filename" in attrs
|
|
and attrs["filter_filename"] is not None
|
|
and len(attrs["filter_filename"]) == 0
|
|
):
|
|
attrs["filter_filename"] = None
|
|
if (
|
|
"filter_path" in attrs
|
|
and attrs["filter_path"] is not None
|
|
and len(attrs["filter_path"]) == 0
|
|
):
|
|
attrs["filter_path"] = None
|
|
|
|
if (
|
|
attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
|
|
and "filter_mailrule" not in attrs
|
|
and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
|
|
and ("filter_path" not in attrs or attrs["filter_path"] is None)
|
|
):
|
|
raise serializers.ValidationError(
|
|
"File name, path or mail rule filter are required",
|
|
)
|
|
|
|
return attrs
|
|
|
|
|
|
class WorkflowActionEmailSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(allow_null=True, required=False)
|
|
|
|
class Meta:
|
|
model = WorkflowActionEmail
|
|
fields = [
|
|
"id",
|
|
"subject",
|
|
"body",
|
|
"to",
|
|
"include_document",
|
|
]
|
|
|
|
|
|
class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(allow_null=True, required=False)
|
|
|
|
def validate_url(self, url):
|
|
url_validator(url)
|
|
return url
|
|
|
|
class Meta:
|
|
model = WorkflowActionWebhook
|
|
fields = [
|
|
"id",
|
|
"url",
|
|
"use_params",
|
|
"as_json",
|
|
"params",
|
|
"body",
|
|
"headers",
|
|
"include_document",
|
|
]
|
|
|
|
|
|
class WorkflowActionSerializer(serializers.ModelSerializer):
|
|
id = serializers.IntegerField(required=False, allow_null=True)
|
|
assign_correspondent = CorrespondentField(allow_null=True, required=False)
|
|
assign_tags = TagsField(many=True, allow_null=True, required=False)
|
|
assign_document_type = DocumentTypeField(allow_null=True, required=False)
|
|
assign_storage_path = StoragePathField(allow_null=True, required=False)
|
|
email = WorkflowActionEmailSerializer(allow_null=True, required=False)
|
|
webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)
|
|
|
|
class Meta:
|
|
model = WorkflowAction
|
|
fields = [
|
|
"id",
|
|
"type",
|
|
"assign_title",
|
|
"assign_tags",
|
|
"assign_correspondent",
|
|
"assign_document_type",
|
|
"assign_storage_path",
|
|
"assign_owner",
|
|
"assign_view_users",
|
|
"assign_view_groups",
|
|
"assign_change_users",
|
|
"assign_change_groups",
|
|
"assign_custom_fields",
|
|
"assign_custom_fields_values",
|
|
"remove_all_tags",
|
|
"remove_tags",
|
|
"remove_all_correspondents",
|
|
"remove_correspondents",
|
|
"remove_all_document_types",
|
|
"remove_document_types",
|
|
"remove_all_storage_paths",
|
|
"remove_storage_paths",
|
|
"remove_custom_fields",
|
|
"remove_all_custom_fields",
|
|
"remove_all_owners",
|
|
"remove_owners",
|
|
"remove_all_permissions",
|
|
"remove_view_users",
|
|
"remove_view_groups",
|
|
"remove_change_users",
|
|
"remove_change_groups",
|
|
"email",
|
|
"webhook",
|
|
]
|
|
|
|
def validate(self, attrs):
|
|
if "assign_title" in attrs and attrs["assign_title"] is not None:
|
|
if len(attrs["assign_title"]) == 0:
|
|
# Empty strings treated as None to avoid unexpected behavior
|
|
attrs["assign_title"] = None
|
|
else:
|
|
try:
|
|
# test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
|
|
attrs["assign_title"].format(
|
|
correspondent="",
|
|
document_type="",
|
|
added="",
|
|
added_year="",
|
|
added_year_short="",
|
|
added_month="",
|
|
added_month_name="",
|
|
added_month_name_short="",
|
|
added_day="",
|
|
added_time="",
|
|
owner_username="",
|
|
original_filename="",
|
|
filename="",
|
|
created="",
|
|
created_year="",
|
|
created_year_short="",
|
|
created_month="",
|
|
created_month_name="",
|
|
created_month_name_short="",
|
|
created_day="",
|
|
created_time="",
|
|
)
|
|
except (ValueError, KeyError) as e:
|
|
raise serializers.ValidationError(
|
|
{"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
|
|
)
|
|
|
|
if (
|
|
"type" in attrs
|
|
and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
|
|
and "email" not in attrs
|
|
):
|
|
raise serializers.ValidationError(
|
|
"Email data is required for email actions",
|
|
)
|
|
|
|
if (
|
|
"type" in attrs
|
|
and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
|
|
and "webhook" not in attrs
|
|
):
|
|
raise serializers.ValidationError(
|
|
"Webhook data is required for webhook actions",
|
|
)
|
|
|
|
return attrs
|
|
|
|
|
|
class WorkflowSerializer(serializers.ModelSerializer):
|
|
order = serializers.IntegerField(required=False)
|
|
|
|
triggers = WorkflowTriggerSerializer(many=True)
|
|
actions = WorkflowActionSerializer(many=True)
|
|
|
|
class Meta:
|
|
model = Workflow
|
|
fields = [
|
|
"id",
|
|
"name",
|
|
"order",
|
|
"enabled",
|
|
"triggers",
|
|
"actions",
|
|
]
|
|
|
|
def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
|
|
set_triggers = []
|
|
set_actions = []
|
|
|
|
if triggers is not None:
|
|
for trigger in triggers:
|
|
filter_has_tags = trigger.pop("filter_has_tags", None)
|
|
trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
|
|
id=trigger.get("id"),
|
|
defaults=trigger,
|
|
)
|
|
if filter_has_tags is not None:
|
|
trigger_instance.filter_has_tags.set(filter_has_tags)
|
|
set_triggers.append(trigger_instance)
|
|
|
|
if actions is not None:
|
|
for action in actions:
|
|
assign_tags = action.pop("assign_tags", None)
|
|
assign_view_users = action.pop("assign_view_users", None)
|
|
assign_view_groups = action.pop("assign_view_groups", None)
|
|
assign_change_users = action.pop("assign_change_users", None)
|
|
assign_change_groups = action.pop("assign_change_groups", None)
|
|
assign_custom_fields = action.pop("assign_custom_fields", None)
|
|
remove_tags = action.pop("remove_tags", None)
|
|
remove_correspondents = action.pop("remove_correspondents", None)
|
|
remove_document_types = action.pop("remove_document_types", None)
|
|
remove_storage_paths = action.pop("remove_storage_paths", None)
|
|
remove_custom_fields = action.pop("remove_custom_fields", None)
|
|
remove_owners = action.pop("remove_owners", None)
|
|
remove_view_users = action.pop("remove_view_users", None)
|
|
remove_view_groups = action.pop("remove_view_groups", None)
|
|
remove_change_users = action.pop("remove_change_users", None)
|
|
remove_change_groups = action.pop("remove_change_groups", None)
|
|
|
|
email_data = action.pop("email", None)
|
|
webhook_data = action.pop("webhook", None)
|
|
|
|
action_instance, _ = WorkflowAction.objects.update_or_create(
|
|
id=action.get("id"),
|
|
defaults=action,
|
|
)
|
|
|
|
if email_data is not None:
|
|
serializer = WorkflowActionEmailSerializer(data=email_data)
|
|
serializer.is_valid(raise_exception=True)
|
|
email, _ = WorkflowActionEmail.objects.update_or_create(
|
|
id=email_data.get("id"),
|
|
defaults=serializer.validated_data,
|
|
)
|
|
action_instance.email = email
|
|
action_instance.save()
|
|
|
|
if webhook_data is not None:
|
|
serializer = WorkflowActionWebhookSerializer(data=webhook_data)
|
|
serializer.is_valid(raise_exception=True)
|
|
webhook, _ = WorkflowActionWebhook.objects.update_or_create(
|
|
id=webhook_data.get("id"),
|
|
defaults=serializer.validated_data,
|
|
)
|
|
action_instance.webhook = webhook
|
|
action_instance.save()
|
|
|
|
if assign_tags is not None:
|
|
action_instance.assign_tags.set(assign_tags)
|
|
if assign_view_users is not None:
|
|
action_instance.assign_view_users.set(assign_view_users)
|
|
if assign_view_groups is not None:
|
|
action_instance.assign_view_groups.set(assign_view_groups)
|
|
if assign_change_users is not None:
|
|
action_instance.assign_change_users.set(assign_change_users)
|
|
if assign_change_groups is not None:
|
|
action_instance.assign_change_groups.set(assign_change_groups)
|
|
if assign_custom_fields is not None:
|
|
action_instance.assign_custom_fields.set(assign_custom_fields)
|
|
if remove_tags is not None:
|
|
action_instance.remove_tags.set(remove_tags)
|
|
if remove_correspondents is not None:
|
|
action_instance.remove_correspondents.set(remove_correspondents)
|
|
if remove_document_types is not None:
|
|
action_instance.remove_document_types.set(remove_document_types)
|
|
if remove_storage_paths is not None:
|
|
action_instance.remove_storage_paths.set(remove_storage_paths)
|
|
if remove_custom_fields is not None:
|
|
action_instance.remove_custom_fields.set(remove_custom_fields)
|
|
if remove_owners is not None:
|
|
action_instance.remove_owners.set(remove_owners)
|
|
if remove_view_users is not None:
|
|
action_instance.remove_view_users.set(remove_view_users)
|
|
if remove_view_groups is not None:
|
|
action_instance.remove_view_groups.set(remove_view_groups)
|
|
if remove_change_users is not None:
|
|
action_instance.remove_change_users.set(remove_change_users)
|
|
if remove_change_groups is not None:
|
|
action_instance.remove_change_groups.set(remove_change_groups)
|
|
|
|
set_actions.append(action_instance)
|
|
|
|
instance.triggers.set(set_triggers)
|
|
instance.actions.set(set_actions)
|
|
instance.save()
|
|
|
|
def prune_triggers_and_actions(self):
|
|
"""
|
|
ManyToMany fields dont support e.g. on_delete so we need to discard unattached
|
|
triggers and actionas manually
|
|
"""
|
|
for trigger in WorkflowTrigger.objects.all():
|
|
if trigger.workflows.all().count() == 0:
|
|
trigger.delete()
|
|
|
|
for action in WorkflowAction.objects.all():
|
|
if action.workflows.all().count() == 0:
|
|
action.delete()
|
|
|
|
WorkflowActionEmail.objects.filter(action=None).delete()
|
|
WorkflowActionWebhook.objects.filter(action=None).delete()
|
|
|
|
def create(self, validated_data) -> Workflow:
|
|
if "triggers" in validated_data:
|
|
triggers = validated_data.pop("triggers")
|
|
|
|
if "actions" in validated_data:
|
|
actions = validated_data.pop("actions")
|
|
|
|
instance = super().create(validated_data)
|
|
|
|
self.update_triggers_and_actions(instance, triggers, actions)
|
|
|
|
return instance
|
|
|
|
def update(self, instance: Workflow, validated_data) -> Workflow:
|
|
if "triggers" in validated_data:
|
|
triggers = validated_data.pop("triggers")
|
|
|
|
if "actions" in validated_data:
|
|
actions = validated_data.pop("actions")
|
|
|
|
instance = super().update(instance, validated_data)
|
|
|
|
self.update_triggers_and_actions(instance, triggers, actions)
|
|
|
|
self.prune_triggers_and_actions()
|
|
|
|
return instance
|
|
|
|
|
|
class TrashSerializer(SerializerWithPerms):
|
|
documents = serializers.ListField(
|
|
required=False,
|
|
label="Documents",
|
|
write_only=True,
|
|
child=serializers.IntegerField(),
|
|
)
|
|
|
|
action = serializers.ChoiceField(
|
|
choices=["restore", "empty"],
|
|
label="Action",
|
|
write_only=True,
|
|
)
|
|
|
|
def validate_documents(self, documents):
|
|
count = Document.deleted_objects.filter(id__in=documents).count()
|
|
if not count == len(documents):
|
|
raise serializers.ValidationError(
|
|
"Some documents in the list have not yet been deleted.",
|
|
)
|
|
return documents
|
|
|
|
|
|
class StoragePathTestSerializer(SerializerWithPerms):
|
|
path = serializers.CharField(
|
|
required=True,
|
|
label="Path",
|
|
write_only=True,
|
|
)
|
|
|
|
document = serializers.PrimaryKeyRelatedField(
|
|
queryset=Document.objects.all(),
|
|
required=True,
|
|
label="Document",
|
|
write_only=True,
|
|
)
|