from __future__ import annotations import datetime import logging import math import re import zoneinfo from decimal import Decimal from typing import TYPE_CHECKING import magic from celery import states from django.conf import settings from django.contrib.auth.models import Group from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.core.validators import DecimalValidator from django.core.validators import MaxLengthValidator from django.core.validators import RegexValidator from django.core.validators import integer_validator from django.utils.crypto import get_random_string from django.utils.text import slugify from django.utils.translation import gettext as _ from drf_spectacular.utils import extend_schema_field from drf_writable_nested.serializers import NestedUpdateMixin from guardian.core import ObjectPermissionChecker from guardian.shortcuts import get_users_with_perms from guardian.utils import get_group_obj_perms_model from guardian.utils import get_user_obj_perms_model from rest_framework import fields from rest_framework import serializers from rest_framework.fields import SerializerMethodField if settings.AUDIT_LOG_ENABLED: from auditlog.context import set_actor from documents import bulk_edit from documents.data_models import DocumentSource from documents.models import Correspondent from documents.models import CustomField from documents.models import CustomFieldInstance from documents.models import Document from documents.models import DocumentType from documents.models import MatchingModel from documents.models import PaperlessTask from documents.models import SavedView from documents.models import SavedViewFilterRule from documents.models import ShareLink from documents.models import StoragePath from documents.models import Tag from documents.models import UiSettings from documents.models import Workflow from documents.models import WorkflowAction from documents.models import WorkflowActionEmail from documents.models import WorkflowActionWebhook from documents.models import WorkflowTrigger from documents.parsers import is_mime_type_supported from documents.permissions import get_groups_with_only_permission from documents.permissions import set_permissions_for_object from documents.templating.filepath import validate_filepath_template_and_render from documents.templating.utils import convert_format_str_to_template_format from documents.validators import uri_validator from documents.validators import url_validator if TYPE_CHECKING: from collections.abc import Iterable logger = logging.getLogger("paperless.serializers") # https://www.django-rest-framework.org/api-guide/serializers/#example class DynamicFieldsModelSerializer(serializers.ModelSerializer): """ A ModelSerializer that takes an additional `fields` argument that controls which fields should be displayed. """ def __init__(self, *args, **kwargs): # Don't pass the 'fields' arg up to the superclass fields = kwargs.pop("fields", None) # Instantiate the superclass normally super().__init__(*args, **kwargs) if fields is not None: # Drop any fields that are not specified in the `fields` argument. allowed = set(fields) existing = set(self.fields) for field_name in existing - allowed: self.fields.pop(field_name) class MatchingModelSerializer(serializers.ModelSerializer): document_count = serializers.IntegerField(read_only=True) def get_slug(self, obj) -> str: return slugify(obj.name) slug = SerializerMethodField() def validate(self, data): # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173 name = data.get( "name", self.instance.name if hasattr(self.instance, "name") else None, ) owner = ( data["owner"] if "owner" in data else self.user if hasattr(self, "user") else None ) pk = self.instance.pk if hasattr(self.instance, "pk") else None if ("name" in data or "owner" in data) and self.Meta.model.objects.filter( name=name, owner=owner, ).exclude(pk=pk).exists(): raise serializers.ValidationError( {"error": "Object violates owner / name unique constraint"}, ) return data def validate_match(self, match): if ( "matching_algorithm" in self.initial_data and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX ): try: re.compile(match) except re.error as e: raise serializers.ValidationError( _("Invalid regular expression: %(error)s") % {"error": str(e.msg)}, ) return match class SetPermissionsMixin: def _validate_user_ids(self, user_ids): users = User.objects.none() if user_ids is not None: users = User.objects.filter(id__in=user_ids) if not users.count() == len(user_ids): raise serializers.ValidationError( "Some users in don't exist or were specified twice.", ) return users def _validate_group_ids(self, group_ids): groups = Group.objects.none() if group_ids is not None: groups = Group.objects.filter(id__in=group_ids) if not groups.count() == len(group_ids): raise serializers.ValidationError( "Some groups in don't exist or were specified twice.", ) return groups def validate_set_permissions(self, set_permissions=None): permissions_dict = { "view": { "users": User.objects.none(), "groups": Group.objects.none(), }, "change": { "users": User.objects.none(), "groups": Group.objects.none(), }, } if set_permissions is not None: for action, _ in permissions_dict.items(): if action in set_permissions: users = set_permissions[action]["users"] permissions_dict[action]["users"] = self._validate_user_ids(users) groups = set_permissions[action]["groups"] permissions_dict[action]["groups"] = self._validate_group_ids( groups, ) return permissions_dict def _set_permissions(self, permissions, object): set_permissions_for_object(permissions, object) class SerializerWithPerms(serializers.Serializer): def __init__(self, *args, **kwargs): self.user = kwargs.pop("user", None) self.full_perms = kwargs.pop("full_perms", False) self.all_fields = kwargs.pop("all_fields", False) super().__init__(*args, **kwargs) @extend_schema_field( field={ "type": "object", "properties": { "view": { "type": "object", "properties": { "users": { "type": "array", "items": {"type": "integer"}, }, "groups": { "type": "array", "items": {"type": "integer"}, }, }, }, "change": { "type": "object", "properties": { "users": { "type": "array", "items": {"type": "integer"}, }, "groups": { "type": "array", "items": {"type": "integer"}, }, }, }, }, }, ) class SetPermissionsSerializer(serializers.DictField): pass class OwnedObjectSerializer( SerializerWithPerms, serializers.ModelSerializer, SetPermissionsMixin, ): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if not self.all_fields: try: if self.full_perms: self.fields.pop("user_can_change") self.fields.pop("is_shared_by_requester") else: self.fields.pop("permissions") except KeyError: pass @extend_schema_field( field={ "type": "object", "properties": { "view": { "type": "object", "properties": { "users": { "type": "array", "items": {"type": "integer"}, }, "groups": { "type": "array", "items": {"type": "integer"}, }, }, }, "change": { "type": "object", "properties": { "users": { "type": "array", "items": {"type": "integer"}, }, "groups": { "type": "array", "items": {"type": "integer"}, }, }, }, }, }, ) def get_permissions(self, obj) -> dict: view_codename = f"view_{obj.__class__.__name__.lower()}" change_codename = f"change_{obj.__class__.__name__.lower()}" return { "view": { "users": get_users_with_perms( obj, only_with_perms_in=[view_codename], with_group_users=False, ).values_list("id", flat=True), "groups": get_groups_with_only_permission( obj, codename=view_codename, ).values_list("id", flat=True), }, "change": { "users": get_users_with_perms( obj, only_with_perms_in=[change_codename], with_group_users=False, ).values_list("id", flat=True), "groups": get_groups_with_only_permission( obj, codename=change_codename, ).values_list("id", flat=True), }, } def get_user_can_change(self, obj) -> bool: checker = ObjectPermissionChecker(self.user) if self.user is not None else None return ( obj.owner is None or obj.owner == self.user or ( self.user is not None and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj) ) ) @staticmethod def get_shared_object_pks(objects: Iterable): """ Return the primary keys of the subset of objects that are shared. """ try: first_obj = next(iter(objects)) except StopIteration: return set() ctype = ContentType.objects.get_for_model(first_obj) object_pks = list(obj.pk for obj in objects) pk_type = type(first_obj.pk) def get_pks_for_permission_type(model): return map( pk_type, # coerce the pk to be the same type of the provided objects model.objects.filter( content_type=ctype, object_pk__in=object_pks, ) .values_list("object_pk", flat=True) .distinct(), ) UserObjectPermission = get_user_obj_perms_model() GroupObjectPermission = get_group_obj_perms_model() user_permission_pks = get_pks_for_permission_type(UserObjectPermission) group_permission_pks = get_pks_for_permission_type(GroupObjectPermission) return set(user_permission_pks) | set(group_permission_pks) def get_is_shared_by_requester(self, obj: Document) -> bool: # First check the context to see if `shared_object_pks` is set by the parent. shared_object_pks = self.context.get("shared_object_pks") # If not just check if the current object is shared. if shared_object_pks is None: shared_object_pks = self.get_shared_object_pks([obj]) return obj.owner == self.user and obj.id in shared_object_pks permissions = SerializerMethodField(read_only=True) user_can_change = SerializerMethodField(read_only=True) is_shared_by_requester = SerializerMethodField(read_only=True) set_permissions = SetPermissionsSerializer( label="Set permissions", allow_empty=True, required=False, write_only=True, ) # other methods in mixin def validate_unique_together(self, validated_data, instance=None): # workaround for https://github.com/encode/django-rest-framework/issues/9358 if "owner" in validated_data and "name" in self.Meta.fields: name = validated_data.get("name", instance.name if instance else None) objects = ( self.Meta.model.objects.exclude(pk=instance.pk) if instance else self.Meta.model.objects.all() ) not_unique = objects.filter( owner=validated_data["owner"], name=name, ).exists() if not_unique: raise serializers.ValidationError( {"error": "Object violates owner / name unique constraint"}, ) def create(self, validated_data): # default to current user if not set request = self.context.get("request") if ( "owner" not in validated_data or (request is not None and "owner" not in request.data) ) and self.user: validated_data["owner"] = self.user permissions = None if "set_permissions" in validated_data: permissions = validated_data.pop("set_permissions") self.validate_unique_together(validated_data) instance = super().create(validated_data) if permissions is not None: self._set_permissions(permissions, instance) return instance def update(self, instance, validated_data): if "set_permissions" in validated_data: self._set_permissions(validated_data["set_permissions"], instance) self.validate_unique_together(validated_data, instance) return super().update(instance, validated_data) class OwnedObjectListSerializer(serializers.ListSerializer): def to_representation(self, documents): self.child.context["shared_object_pks"] = self.child.get_shared_object_pks( documents, ) return super().to_representation(documents) class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer): last_correspondence = serializers.DateTimeField(read_only=True, required=False) class Meta: model = Correspondent fields = ( "id", "slug", "name", "match", "matching_algorithm", "is_insensitive", "document_count", "last_correspondence", "owner", "permissions", "user_can_change", "set_permissions", ) class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer): class Meta: model = DocumentType fields = ( "id", "slug", "name", "match", "matching_algorithm", "is_insensitive", "document_count", "owner", "permissions", "user_can_change", "set_permissions", ) class DeprecatedColors: COLOURS = ( (1, "#a6cee3"), (2, "#1f78b4"), (3, "#b2df8a"), (4, "#33a02c"), (5, "#fb9a99"), (6, "#e31a1c"), (7, "#fdbf6f"), (8, "#ff7f00"), (9, "#cab2d6"), (10, "#6a3d9a"), (11, "#b15928"), (12, "#000000"), (13, "#cccccc"), ) @extend_schema_field( serializers.ChoiceField( choices=DeprecatedColors.COLOURS, ), ) class ColorField(serializers.Field): def to_internal_value(self, data): for id, color in DeprecatedColors.COLOURS: if id == data: return color raise serializers.ValidationError def to_representation(self, value): for id, color in DeprecatedColors.COLOURS: if color == value: return id return 1 class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer): colour = ColorField(source="color", default="#a6cee3") class Meta: model = Tag fields = ( "id", "slug", "name", "colour", "match", "matching_algorithm", "is_insensitive", "is_inbox_tag", "document_count", "owner", "permissions", "user_can_change", "set_permissions", ) class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer): def get_text_color(self, obj) -> str: try: h = obj.color.lstrip("#") rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4)) luminance = math.sqrt( 0.299 * math.pow(rgb[0], 2) + 0.587 * math.pow(rgb[1], 2) + 0.114 * math.pow(rgb[2], 2), ) return "#ffffff" if luminance < 0.53 else "#000000" except ValueError: return "#000000" text_color = serializers.SerializerMethodField() class Meta: model = Tag fields = ( "id", "slug", "name", "color", "text_color", "match", "matching_algorithm", "is_insensitive", "is_inbox_tag", "document_count", "owner", "permissions", "user_can_change", "set_permissions", ) def validate_color(self, color): regex = r"#[0-9a-fA-F]{6}" if not re.match(regex, color): raise serializers.ValidationError(_("Invalid color.")) return color class CorrespondentField(serializers.PrimaryKeyRelatedField): def get_queryset(self): return Correspondent.objects.all() class TagsField(serializers.PrimaryKeyRelatedField): def get_queryset(self): return Tag.objects.all() class DocumentTypeField(serializers.PrimaryKeyRelatedField): def get_queryset(self): return DocumentType.objects.all() class StoragePathField(serializers.PrimaryKeyRelatedField): def get_queryset(self): return StoragePath.objects.all() class CustomFieldSerializer(serializers.ModelSerializer): def __init__(self, *args, **kwargs): context = kwargs.get("context") self.api_version = int( context.get("request").version if context and context.get("request") else settings.REST_FRAMEWORK["DEFAULT_VERSION"], ) super().__init__(*args, **kwargs) data_type = serializers.ChoiceField( choices=CustomField.FieldDataType, read_only=False, ) document_count = serializers.IntegerField(read_only=True) class Meta: model = CustomField fields = [ "id", "name", "data_type", "extra_data", "document_count", ] def validate(self, attrs): # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173 name = attrs.get( "name", self.instance.name if hasattr(self.instance, "name") else None, ) objects = ( self.Meta.model.objects.exclude( pk=self.instance.pk, ) if self.instance is not None else self.Meta.model.objects.all() ) if ("name" in attrs) and objects.filter( name=name, ).exists(): raise serializers.ValidationError( {"error": "Object violates name unique constraint"}, ) if ( "data_type" in attrs and attrs["data_type"] == CustomField.FieldDataType.SELECT ) or ( self.instance and self.instance.data_type == CustomField.FieldDataType.SELECT ): if ( "extra_data" not in attrs or "select_options" not in attrs["extra_data"] or not isinstance(attrs["extra_data"]["select_options"], list) or len(attrs["extra_data"]["select_options"]) == 0 or not all( len(option.get("label", "")) > 0 for option in attrs["extra_data"]["select_options"] ) ): raise serializers.ValidationError( {"error": "extra_data.select_options must be a valid list"}, ) # labels are valid, generate ids if not present for option in attrs["extra_data"]["select_options"]: if option.get("id") is None: option["id"] = get_random_string(length=16) elif ( "data_type" in attrs and attrs["data_type"] == CustomField.FieldDataType.MONETARY and "extra_data" in attrs and "default_currency" in attrs["extra_data"] and attrs["extra_data"]["default_currency"] is not None and ( not isinstance(attrs["extra_data"]["default_currency"], str) or ( len(attrs["extra_data"]["default_currency"]) > 0 and len(attrs["extra_data"]["default_currency"]) != 3 ) ) ): raise serializers.ValidationError( {"error": "extra_data.default_currency must be a 3-character string"}, ) return super().validate(attrs) def to_internal_value(self, data): ret = super().to_internal_value(data) if ( self.api_version < 7 and ret.get("data_type", "") == CustomField.FieldDataType.SELECT and isinstance(ret.get("extra_data", {}).get("select_options"), list) ): ret["extra_data"]["select_options"] = [ { "label": option, "id": get_random_string(length=16), } for option in ret["extra_data"]["select_options"] ] return ret def to_representation(self, instance): ret = super().to_representation(instance) if ( self.api_version < 7 and instance.data_type == CustomField.FieldDataType.SELECT ): # Convert the select options with ids to a list of strings ret["extra_data"]["select_options"] = [ option["label"] for option in ret["extra_data"]["select_options"] ] return ret class ReadWriteSerializerMethodField(serializers.SerializerMethodField): """ Based on https://stackoverflow.com/a/62579804 """ def __init__(self, method_name=None, *args, **kwargs): self.method_name = method_name kwargs["source"] = "*" super(serializers.SerializerMethodField, self).__init__(*args, **kwargs) def to_internal_value(self, data): return {self.field_name: data} class CustomFieldInstanceSerializer(serializers.ModelSerializer): field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all()) value = ReadWriteSerializerMethodField(allow_null=True) def create(self, validated_data): # An instance is attached to a document document: Document = validated_data["document"] # And to a CustomField custom_field: CustomField = validated_data["field"] # This key must exist, as it is validated data_store_name = CustomFieldInstance.get_value_field_name( custom_field.data_type, ) if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK: # prior to update so we can look for any docs that are going to be removed bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"]) # Actually update or create the instance, providing the value # to fill in the correct attribute based on the type instance, _ = CustomFieldInstance.objects.update_or_create( document=document, field=custom_field, defaults={data_store_name: validated_data["value"]}, ) return instance def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None: return obj.value def validate(self, data): """ Probably because we're kind of doing it odd, validation from the model doesn't run against the field "value", so we have to re-create it here. Don't like it, but it is better than returning an HTTP 500 when the database hates the value """ data = super().validate(data) field: CustomField = data["field"] if "value" in data and data["value"] is not None: if ( field.data_type == CustomField.FieldDataType.URL and len(data["value"]) > 0 ): uri_validator(data["value"]) elif field.data_type == CustomField.FieldDataType.INT: integer_validator(data["value"]) elif ( field.data_type == CustomField.FieldDataType.MONETARY and data["value"] != "" ): try: # First try to validate as a number from legacy format DecimalValidator(max_digits=12, decimal_places=2)( Decimal(str(data["value"])), ) except Exception: # If that fails, try to validate as a monetary string RegexValidator( regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$", message="Must be a two-decimal number with optional currency code e.g. GBP123.45", )(data["value"]) elif field.data_type == CustomField.FieldDataType.STRING: MaxLengthValidator(limit_value=128)(data["value"]) elif field.data_type == CustomField.FieldDataType.SELECT: select_options = field.extra_data["select_options"] try: next( option for option in select_options if option["id"] == data["value"] ) except Exception: raise serializers.ValidationError( f"Value must be an id of an element in {select_options}", ) elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK: if not (isinstance(data["value"], list) or data["value"] is None): raise serializers.ValidationError( "Value must be a list", ) doc_ids = data["value"] if Document.objects.filter(id__in=doc_ids).count() != len( data["value"], ): raise serializers.ValidationError( "Some documents in value don't exist or were specified twice.", ) return data def get_api_version(self): return int( self.context.get("request").version if self.context.get("request") else settings.REST_FRAMEWORK["DEFAULT_VERSION"], ) def to_internal_value(self, data): ret = super().to_internal_value(data) if ( self.get_api_version() < 7 and ret.get("field").data_type == CustomField.FieldDataType.SELECT and ret.get("value") is not None ): # Convert the index of the option in the field.extra_data["select_options"] # list to the options unique id ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][ "id" ] return ret def to_representation(self, instance): ret = super().to_representation(instance) if ( self.get_api_version() < 7 and instance.field.data_type == CustomField.FieldDataType.SELECT ): # return the index of the option in the field.extra_data["select_options"] list ret["value"] = next( ( idx for idx, option in enumerate( instance.field.extra_data["select_options"], ) if option["id"] == instance.value ), None, ) return ret class Meta: model = CustomFieldInstance fields = [ "value", "field", ] class DocumentSerializer( OwnedObjectSerializer, NestedUpdateMixin, DynamicFieldsModelSerializer, ): correspondent = CorrespondentField(allow_null=True) tags = TagsField(many=True) document_type = DocumentTypeField(allow_null=True) storage_path = StoragePathField(allow_null=True) original_file_name = SerializerMethodField() archived_file_name = SerializerMethodField() created_date = serializers.DateField(required=False) page_count = SerializerMethodField() custom_fields = CustomFieldInstanceSerializer( many=True, allow_null=False, required=False, ) owner = serializers.PrimaryKeyRelatedField( queryset=User.objects.all(), required=False, allow_null=True, ) remove_inbox_tags = serializers.BooleanField( default=False, write_only=True, allow_null=True, required=False, ) def get_page_count(self, obj) -> int | None: return obj.page_count def get_original_file_name(self, obj) -> str | None: return obj.original_filename def get_archived_file_name(self, obj) -> str | None: if obj.has_archive_version: return obj.get_public_filename(archive=True) else: return None def to_representation(self, instance): doc = super().to_representation(instance) if self.truncate_content and "content" in self.fields: doc["content"] = doc.get("content")[0:550] return doc def validate(self, attrs): if ( "archive_serial_number" in attrs and attrs["archive_serial_number"] is not None and len(str(attrs["archive_serial_number"])) > 0 and Document.deleted_objects.filter( archive_serial_number=attrs["archive_serial_number"], ).exists() ): raise serializers.ValidationError( { "archive_serial_number": [ "Document with this Archive Serial Number already exists in the trash.", ], }, ) return super().validate(attrs) def update(self, instance: Document, validated_data): if "created_date" in validated_data and "created" not in validated_data: new_datetime = datetime.datetime.combine( validated_data.get("created_date"), datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)), ) instance.created = new_datetime instance.save() if "created_date" in validated_data: validated_data.pop("created_date") if instance.custom_fields.count() > 0 and "custom_fields" in validated_data: incoming_custom_fields = [ field["field"] for field in validated_data["custom_fields"] ] for custom_field_instance in instance.custom_fields.filter( field__data_type=CustomField.FieldDataType.DOCUMENTLINK, ): if ( custom_field_instance.field not in incoming_custom_fields and custom_field_instance.value is not None ): # Doc link field is being removed entirely for doc_id in custom_field_instance.value: bulk_edit.remove_doclink( instance, custom_field_instance.field, doc_id, ) if validated_data.get("remove_inbox_tags"): tag_ids_being_added = ( [ tag.id for tag in validated_data["tags"] if tag not in instance.tags.all() ] if "tags" in validated_data else [] ) inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude( id__in=tag_ids_being_added, ) if "tags" in validated_data: validated_data["tags"] = [ tag for tag in validated_data["tags"] if tag not in inbox_tags_not_being_added ] else: validated_data["tags"] = [ tag for tag in instance.tags.all() if tag not in inbox_tags_not_being_added ] if settings.AUDIT_LOG_ENABLED: with set_actor(self.user): super().update(instance, validated_data) else: super().update(instance, validated_data) # hard delete custom field instances that were soft deleted CustomFieldInstance.deleted_objects.filter(document=instance).delete() return instance def __init__(self, *args, **kwargs): self.truncate_content = kwargs.pop("truncate_content", False) # return full permissions if we're doing a PATCH or PUT context = kwargs.get("context") if context is not None and ( context.get("request").method == "PATCH" or context.get("request").method == "PUT" ): kwargs["full_perms"] = True super().__init__(*args, **kwargs) class Meta: model = Document fields = ( "id", "correspondent", "document_type", "storage_path", "title", "content", "tags", "created", "created_date", "modified", "added", "deleted_at", "archive_serial_number", "original_file_name", "archived_file_name", "owner", "permissions", "user_can_change", "is_shared_by_requester", "set_permissions", "notes", "custom_fields", "remove_inbox_tags", "page_count", "mime_type", ) list_serializer_class = OwnedObjectListSerializer class SearchResultListSerializer(serializers.ListSerializer): def to_representation(self, hits): document_ids = [hit["id"] for hit in hits] # Fetch all Document objects in the list in one SQL query. documents = self.child.fetch_documents(document_ids) self.child.context["documents"] = documents # Also check if they are shared with other users / groups. self.child.context["shared_object_pks"] = self.child.get_shared_object_pks( documents.values(), ) return super().to_representation(hits) class SearchResultSerializer(DocumentSerializer): @staticmethod def fetch_documents(ids): """ Return a dict that maps given document IDs to Document objects. """ return { document.id: document for document in Document.objects.select_related( "correspondent", "storage_path", "document_type", "owner", ) .prefetch_related("tags", "custom_fields", "notes") .filter(id__in=ids) } def to_representation(self, hit): # Again we first check if the parent has already fetched the documents. documents = self.context.get("documents") # Otherwise we fetch this document. if documents is None: # pragma: no cover # In practice we only serialize **lists** of whoosh.searching.Hit. # I'm keeping this check for completeness but marking it no cover for now. documents = self.fetch_documents([hit["id"]]) document = documents[hit["id"]] notes = ",".join( [str(c.note) for c in document.notes.all()], ) r = super().to_representation(document) r["__search_hit__"] = { "score": hit.score, "highlights": hit.highlights("content", text=document.content), "note_highlights": ( hit.highlights("notes", text=notes) if document else None ), "rank": hit.rank, } return r class Meta(DocumentSerializer.Meta): list_serializer_class = SearchResultListSerializer class SavedViewFilterRuleSerializer(serializers.ModelSerializer): class Meta: model = SavedViewFilterRule fields = ["rule_type", "value"] class SavedViewSerializer(OwnedObjectSerializer): filter_rules = SavedViewFilterRuleSerializer(many=True) class Meta: model = SavedView fields = [ "id", "name", "show_on_dashboard", "show_in_sidebar", "sort_field", "sort_reverse", "filter_rules", "page_size", "display_mode", "display_fields", "owner", "permissions", "user_can_change", "set_permissions", ] def validate(self, attrs): attrs = super().validate(attrs) if "display_fields" in attrs and attrs["display_fields"] is not None: for field in attrs["display_fields"]: if ( SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field ): # i.e. check for 'custom_field_' prefix field_id = int(re.search(r"\d+", field)[0]) if not CustomField.objects.filter(id=field_id).exists(): # In case the field was deleted, just remove from the list attrs["display_fields"].remove(field) elif field not in SavedView.DisplayFields.values: raise serializers.ValidationError( f"Invalid field: {field}", ) return attrs def update(self, instance, validated_data): if "filter_rules" in validated_data: rules_data = validated_data.pop("filter_rules") else: rules_data = None if "user" in validated_data: # backwards compatibility validated_data["owner"] = validated_data.pop("user") if ( "display_fields" in validated_data and isinstance( validated_data["display_fields"], list, ) and len(validated_data["display_fields"]) == 0 ): validated_data["display_fields"] = None super().update(instance, validated_data) if rules_data is not None: SavedViewFilterRule.objects.filter(saved_view=instance).delete() for rule_data in rules_data: SavedViewFilterRule.objects.create(saved_view=instance, **rule_data) return instance def create(self, validated_data): rules_data = validated_data.pop("filter_rules") if "user" in validated_data: # backwards compatibility validated_data["owner"] = validated_data.pop("user") saved_view = SavedView.objects.create(**validated_data) for rule_data in rules_data: SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data) return saved_view class DocumentListSerializer(serializers.Serializer): documents = serializers.ListField( required=True, label="Documents", write_only=True, child=serializers.IntegerField(), ) def _validate_document_id_list(self, documents, name="documents"): if not isinstance(documents, list): raise serializers.ValidationError(f"{name} must be a list") if not all(isinstance(i, int) for i in documents): raise serializers.ValidationError(f"{name} must be a list of integers") count = Document.objects.filter(id__in=documents).count() if not count == len(documents): raise serializers.ValidationError( f"Some documents in {name} don't exist or were specified twice.", ) def validate_documents(self, documents): self._validate_document_id_list(documents) return documents class BulkEditSerializer( SerializerWithPerms, DocumentListSerializer, SetPermissionsMixin, ): method = serializers.ChoiceField( choices=[ "set_correspondent", "set_document_type", "set_storage_path", "add_tag", "remove_tag", "modify_tags", "modify_custom_fields", "delete", "reprocess", "set_permissions", "rotate", "merge", "split", "delete_pages", ], label="Method", write_only=True, ) parameters = serializers.DictField(allow_empty=True, default={}, write_only=True) def _validate_tag_id_list(self, tags, name="tags"): if not isinstance(tags, list): raise serializers.ValidationError(f"{name} must be a list") if not all(isinstance(i, int) for i in tags): raise serializers.ValidationError(f"{name} must be a list of integers") count = Tag.objects.filter(id__in=tags).count() if not count == len(tags): raise serializers.ValidationError( f"Some tags in {name} don't exist or were specified twice.", ) def _validate_custom_field_id_list_or_dict( self, custom_fields, name="custom_fields", ): ids = custom_fields if isinstance(custom_fields, dict): try: ids = [int(i[0]) for i in custom_fields.items()] except Exception as e: logger.exception(f"Error validating custom fields: {e}") raise serializers.ValidationError( f"{name} must be a list of integers or a dict of id:value pairs, see the log for details", ) elif not isinstance(custom_fields, list) or not all( isinstance(i, int) for i in ids ): raise serializers.ValidationError( f"{name} must be a list of integers or a dict of id:value pairs", ) count = CustomField.objects.filter(id__in=ids).count() if not count == len(ids): raise serializers.ValidationError( f"Some custom fields in {name} don't exist or were specified twice.", ) def validate_method(self, method): if method == "set_correspondent": return bulk_edit.set_correspondent elif method == "set_document_type": return bulk_edit.set_document_type elif method == "set_storage_path": return bulk_edit.set_storage_path elif method == "add_tag": return bulk_edit.add_tag elif method == "remove_tag": return bulk_edit.remove_tag elif method == "modify_tags": return bulk_edit.modify_tags elif method == "modify_custom_fields": return bulk_edit.modify_custom_fields elif method == "delete": return bulk_edit.delete elif method == "redo_ocr" or method == "reprocess": return bulk_edit.reprocess elif method == "set_permissions": return bulk_edit.set_permissions elif method == "rotate": return bulk_edit.rotate elif method == "merge": return bulk_edit.merge elif method == "split": return bulk_edit.split elif method == "delete_pages": return bulk_edit.delete_pages else: raise serializers.ValidationError("Unsupported method.") def _validate_parameters_tags(self, parameters): if "tag" in parameters: tag_id = parameters["tag"] try: Tag.objects.get(id=tag_id) except Tag.DoesNotExist: raise serializers.ValidationError("Tag does not exist") else: raise serializers.ValidationError("tag not specified") def _validate_parameters_document_type(self, parameters): if "document_type" in parameters: document_type_id = parameters["document_type"] if document_type_id is None: # None is ok return try: DocumentType.objects.get(id=document_type_id) except DocumentType.DoesNotExist: raise serializers.ValidationError("Document type does not exist") else: raise serializers.ValidationError("document_type not specified") def _validate_parameters_correspondent(self, parameters): if "correspondent" in parameters: correspondent_id = parameters["correspondent"] if correspondent_id is None: return try: Correspondent.objects.get(id=correspondent_id) except Correspondent.DoesNotExist: raise serializers.ValidationError("Correspondent does not exist") else: raise serializers.ValidationError("correspondent not specified") def _validate_storage_path(self, parameters): if "storage_path" in parameters: storage_path_id = parameters["storage_path"] if storage_path_id is None: return try: StoragePath.objects.get(id=storage_path_id) except StoragePath.DoesNotExist: raise serializers.ValidationError( "Storage path does not exist", ) else: raise serializers.ValidationError("storage path not specified") def _validate_parameters_modify_tags(self, parameters): if "add_tags" in parameters: self._validate_tag_id_list(parameters["add_tags"], "add_tags") else: raise serializers.ValidationError("add_tags not specified") if "remove_tags" in parameters: self._validate_tag_id_list(parameters["remove_tags"], "remove_tags") else: raise serializers.ValidationError("remove_tags not specified") def _validate_parameters_modify_custom_fields(self, parameters): if "add_custom_fields" in parameters: self._validate_custom_field_id_list_or_dict( parameters["add_custom_fields"], "add_custom_fields", ) else: raise serializers.ValidationError("add_custom_fields not specified") if "remove_custom_fields" in parameters: self._validate_custom_field_id_list_or_dict( parameters["remove_custom_fields"], "remove_custom_fields", ) else: raise serializers.ValidationError("remove_custom_fields not specified") def _validate_owner(self, owner): ownerUser = User.objects.get(pk=owner) if ownerUser is None: raise serializers.ValidationError("Specified owner cannot be found") return ownerUser def _validate_parameters_set_permissions(self, parameters): parameters["set_permissions"] = self.validate_set_permissions( parameters["set_permissions"], ) if "owner" in parameters and parameters["owner"] is not None: self._validate_owner(parameters["owner"]) if "merge" not in parameters: parameters["merge"] = False def _validate_parameters_rotate(self, parameters): try: if ( "degrees" not in parameters or not float(parameters["degrees"]).is_integer() ): raise serializers.ValidationError("invalid rotation degrees") except ValueError: raise serializers.ValidationError("invalid rotation degrees") def _validate_parameters_split(self, parameters): if "pages" not in parameters: raise serializers.ValidationError("pages not specified") try: pages = [] docs = parameters["pages"].split(",") for doc in docs: if "-" in doc: pages.append( [ x for x in range( int(doc.split("-")[0]), int(doc.split("-")[1]) + 1, ) ], ) else: pages.append([int(doc)]) parameters["pages"] = pages except ValueError: raise serializers.ValidationError("invalid pages specified") if "delete_originals" in parameters: if not isinstance(parameters["delete_originals"], bool): raise serializers.ValidationError("delete_originals must be a boolean") else: parameters["delete_originals"] = False def _validate_parameters_delete_pages(self, parameters): if "pages" not in parameters: raise serializers.ValidationError("pages not specified") if not isinstance(parameters["pages"], list): raise serializers.ValidationError("pages must be a list") if not all(isinstance(i, int) for i in parameters["pages"]): raise serializers.ValidationError("pages must be a list of integers") def _validate_parameters_merge(self, parameters): if "delete_originals" in parameters: if not isinstance(parameters["delete_originals"], bool): raise serializers.ValidationError("delete_originals must be a boolean") else: parameters["delete_originals"] = False def validate(self, attrs): method = attrs["method"] parameters = attrs["parameters"] if method == bulk_edit.set_correspondent: self._validate_parameters_correspondent(parameters) elif method == bulk_edit.set_document_type: self._validate_parameters_document_type(parameters) elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag: self._validate_parameters_tags(parameters) elif method == bulk_edit.modify_tags: self._validate_parameters_modify_tags(parameters) elif method == bulk_edit.set_storage_path: self._validate_storage_path(parameters) elif method == bulk_edit.modify_custom_fields: self._validate_parameters_modify_custom_fields(parameters) elif method == bulk_edit.set_permissions: self._validate_parameters_set_permissions(parameters) elif method == bulk_edit.rotate: self._validate_parameters_rotate(parameters) elif method == bulk_edit.split: if len(attrs["documents"]) > 1: raise serializers.ValidationError( "Split method only supports one document", ) self._validate_parameters_split(parameters) elif method == bulk_edit.delete_pages: if len(attrs["documents"]) > 1: raise serializers.ValidationError( "Delete pages method only supports one document", ) self._validate_parameters_delete_pages(parameters) elif method == bulk_edit.merge: self._validate_parameters_merge(parameters) return attrs class PostDocumentSerializer(serializers.Serializer): created = serializers.DateTimeField( label="Created", allow_null=True, write_only=True, required=False, ) document = serializers.FileField( label="Document", write_only=True, ) title = serializers.CharField( label="Title", write_only=True, required=False, ) correspondent = serializers.PrimaryKeyRelatedField( queryset=Correspondent.objects.all(), label="Correspondent", allow_null=True, write_only=True, required=False, ) document_type = serializers.PrimaryKeyRelatedField( queryset=DocumentType.objects.all(), label="Document type", allow_null=True, write_only=True, required=False, ) storage_path = serializers.PrimaryKeyRelatedField( queryset=StoragePath.objects.all(), label="Storage path", allow_null=True, write_only=True, required=False, ) tags = serializers.PrimaryKeyRelatedField( many=True, queryset=Tag.objects.all(), label="Tags", write_only=True, required=False, ) archive_serial_number = serializers.IntegerField( label="ASN", write_only=True, required=False, min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN, max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX, ) custom_fields = serializers.PrimaryKeyRelatedField( many=True, queryset=CustomField.objects.all(), label="Custom fields", write_only=True, required=False, ) from_webui = serializers.BooleanField( label="Documents are from Paperless-ngx WebUI", write_only=True, required=False, ) def validate_document(self, document): document_data = document.file.read() mime_type = magic.from_buffer(document_data, mime=True) if not is_mime_type_supported(mime_type): if ( mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES and document.name.endswith( ".pdf", ) ): # If the file is an invalid PDF, we can try to recover it later in the consumer mime_type = "application/pdf" else: raise serializers.ValidationError( _("File type %(type)s not supported") % {"type": mime_type}, ) return document.name, document_data def validate_correspondent(self, correspondent): if correspondent: return correspondent.id else: return None def validate_document_type(self, document_type): if document_type: return document_type.id else: return None def validate_storage_path(self, storage_path): if storage_path: return storage_path.id else: return None def validate_tags(self, tags): if tags: return [tag.id for tag in tags] else: return None def validate_custom_fields(self, custom_fields): if custom_fields: return [custom_field.id for custom_field in custom_fields] else: return None class BulkDownloadSerializer(DocumentListSerializer): content = serializers.ChoiceField( choices=["archive", "originals", "both"], default="archive", ) compression = serializers.ChoiceField( choices=["none", "deflated", "bzip2", "lzma"], default="none", ) follow_formatting = serializers.BooleanField( default=False, ) def validate_compression(self, compression): import zipfile return { "none": zipfile.ZIP_STORED, "deflated": zipfile.ZIP_DEFLATED, "bzip2": zipfile.ZIP_BZIP2, "lzma": zipfile.ZIP_LZMA, }[compression] class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer): class Meta: model = StoragePath fields = ( "id", "slug", "name", "path", "match", "matching_algorithm", "is_insensitive", "document_count", "owner", "permissions", "user_can_change", "set_permissions", ) def validate_path(self, path: str): converted_path = convert_format_str_to_template_format(path) if converted_path != path: logger.warning( f"Storage path {path} is not using the new style format, consider updating", ) result = validate_filepath_template_and_render(converted_path) if result is None: raise serializers.ValidationError(_("Invalid variable detected.")) return converted_path def update(self, instance, validated_data): """ When a storage path is updated, see if documents using it require a rename/move """ doc_ids = [doc.id for doc in instance.documents.all()] if len(doc_ids): bulk_edit.bulk_update_documents.delay(doc_ids) return super().update(instance, validated_data) class UiSettingsViewSerializer(serializers.ModelSerializer): class Meta: model = UiSettings depth = 1 fields = [ "id", "settings", ] def validate_settings(self, settings): # we never save update checking backend setting if "update_checking" in settings: try: settings["update_checking"].pop("backend_setting") except KeyError: pass return settings def create(self, validated_data): ui_settings = UiSettings.objects.update_or_create( user=validated_data.get("user"), defaults={"settings": validated_data.get("settings", None)}, ) return ui_settings class TasksViewSerializer(OwnedObjectSerializer): class Meta: model = PaperlessTask fields = ( "id", "task_id", "task_file_name", "date_created", "date_done", "type", "status", "result", "acknowledged", "related_document", "owner", ) type = serializers.SerializerMethodField() def get_type(self, obj) -> str: # just file tasks, for now return "file" related_document = serializers.SerializerMethodField() created_doc_re = re.compile(r"New document id (\d+) created") duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)") def get_related_document(self, obj) -> str | None: result = None re = None match obj.status: case states.SUCCESS: re = self.created_doc_re case states.FAILURE: re = ( self.duplicate_doc_re if "existing document is in the trash" not in obj.result else None ) if re is not None: try: result = re.search(obj.result).group(1) except Exception: pass return result class AcknowledgeTasksViewSerializer(serializers.Serializer): tasks = serializers.ListField( required=True, label="Tasks", write_only=True, child=serializers.IntegerField(), ) def _validate_task_id_list(self, tasks, name="tasks"): if not isinstance(tasks, list): raise serializers.ValidationError(f"{name} must be a list") if not all(isinstance(i, int) for i in tasks): raise serializers.ValidationError(f"{name} must be a list of integers") count = PaperlessTask.objects.filter(id__in=tasks).count() if not count == len(tasks): raise serializers.ValidationError( f"Some tasks in {name} don't exist or were specified twice.", ) def validate_tasks(self, tasks): self._validate_task_id_list(tasks) return tasks class ShareLinkSerializer(OwnedObjectSerializer): class Meta: model = ShareLink fields = ( "id", "created", "expiration", "slug", "document", "file_version", ) def create(self, validated_data): validated_data["slug"] = get_random_string(50) return super().create(validated_data) class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin): objects = serializers.ListField( required=True, allow_empty=False, label="Objects", write_only=True, child=serializers.IntegerField(), ) object_type = serializers.ChoiceField( choices=[ "tags", "correspondents", "document_types", "storage_paths", ], label="Object Type", write_only=True, ) operation = serializers.ChoiceField( choices=[ "set_permissions", "delete", ], label="Operation", required=True, write_only=True, ) owner = serializers.PrimaryKeyRelatedField( queryset=User.objects.all(), required=False, allow_null=True, ) permissions = serializers.DictField( label="Set permissions", allow_empty=False, required=False, write_only=True, ) merge = serializers.BooleanField( default=False, write_only=True, required=False, ) def get_object_class(self, object_type): object_class = None if object_type == "tags": object_class = Tag elif object_type == "correspondents": object_class = Correspondent elif object_type == "document_types": object_class = DocumentType elif object_type == "storage_paths": object_class = StoragePath return object_class def _validate_objects(self, objects, object_type): if not isinstance(objects, list): raise serializers.ValidationError("objects must be a list") if not all(isinstance(i, int) for i in objects): raise serializers.ValidationError("objects must be a list of integers") object_class = self.get_object_class(object_type) count = object_class.objects.filter(id__in=objects).count() if not count == len(objects): raise serializers.ValidationError( "Some ids in objects don't exist or were specified twice.", ) return objects def _validate_permissions(self, permissions): self.validate_set_permissions( permissions, ) def validate(self, attrs): object_type = attrs["object_type"] objects = attrs["objects"] operation = attrs.get("operation") self._validate_objects(objects, object_type) if operation == "set_permissions": permissions = attrs.get("permissions") if permissions is not None: self._validate_permissions(permissions) return attrs class WorkflowTriggerSerializer(serializers.ModelSerializer): id = serializers.IntegerField(required=False, allow_null=True) sources = fields.MultipleChoiceField( choices=WorkflowTrigger.DocumentSourceChoices.choices, allow_empty=True, default={ DocumentSource.ConsumeFolder, DocumentSource.ApiUpload, DocumentSource.MailFetch, }, ) type = serializers.ChoiceField( choices=WorkflowTrigger.WorkflowTriggerType.choices, label="Trigger Type", ) class Meta: model = WorkflowTrigger fields = [ "id", "sources", "type", "filter_path", "filter_filename", "filter_mailrule", "matching_algorithm", "match", "is_insensitive", "filter_has_tags", "filter_has_correspondent", "filter_has_document_type", "schedule_offset_days", "schedule_is_recurring", "schedule_recurring_interval_days", "schedule_date_field", "schedule_date_custom_field", ] def validate(self, attrs): # Empty strings treated as None to avoid unexpected behavior if ( "filter_filename" in attrs and attrs["filter_filename"] is not None and len(attrs["filter_filename"]) == 0 ): attrs["filter_filename"] = None if ( "filter_path" in attrs and attrs["filter_path"] is not None and len(attrs["filter_path"]) == 0 ): attrs["filter_path"] = None if ( attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION and "filter_mailrule" not in attrs and ("filter_filename" not in attrs or attrs["filter_filename"] is None) and ("filter_path" not in attrs or attrs["filter_path"] is None) ): raise serializers.ValidationError( "File name, path or mail rule filter are required", ) return attrs class WorkflowActionEmailSerializer(serializers.ModelSerializer): id = serializers.IntegerField(allow_null=True, required=False) class Meta: model = WorkflowActionEmail fields = [ "id", "subject", "body", "to", "include_document", ] class WorkflowActionWebhookSerializer(serializers.ModelSerializer): id = serializers.IntegerField(allow_null=True, required=False) def validate_url(self, url): url_validator(url) return url class Meta: model = WorkflowActionWebhook fields = [ "id", "url", "use_params", "as_json", "params", "body", "headers", "include_document", ] class WorkflowActionSerializer(serializers.ModelSerializer): id = serializers.IntegerField(required=False, allow_null=True) assign_correspondent = CorrespondentField(allow_null=True, required=False) assign_tags = TagsField(many=True, allow_null=True, required=False) assign_document_type = DocumentTypeField(allow_null=True, required=False) assign_storage_path = StoragePathField(allow_null=True, required=False) email = WorkflowActionEmailSerializer(allow_null=True, required=False) webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False) class Meta: model = WorkflowAction fields = [ "id", "type", "assign_title", "assign_tags", "assign_correspondent", "assign_document_type", "assign_storage_path", "assign_owner", "assign_view_users", "assign_view_groups", "assign_change_users", "assign_change_groups", "assign_custom_fields", "remove_all_tags", "remove_tags", "remove_all_correspondents", "remove_correspondents", "remove_all_document_types", "remove_document_types", "remove_all_storage_paths", "remove_storage_paths", "remove_custom_fields", "remove_all_custom_fields", "remove_all_owners", "remove_owners", "remove_all_permissions", "remove_view_users", "remove_view_groups", "remove_change_users", "remove_change_groups", "email", "webhook", ] def validate(self, attrs): if "assign_title" in attrs and attrs["assign_title"] is not None: if len(attrs["assign_title"]) == 0: # Empty strings treated as None to avoid unexpected behavior attrs["assign_title"] = None else: try: # test against all placeholders, see consumer.py `parse_doc_title_w_placeholders` attrs["assign_title"].format( correspondent="", document_type="", added="", added_year="", added_year_short="", added_month="", added_month_name="", added_month_name_short="", added_day="", added_time="", owner_username="", original_filename="", filename="", created="", created_year="", created_year_short="", created_month="", created_month_name="", created_month_name_short="", created_day="", created_time="", ) except (ValueError, KeyError) as e: raise serializers.ValidationError( {"assign_title": f'Invalid f-string detected: "{e.args[0]}"'}, ) if ( "type" in attrs and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL and "email" not in attrs ): raise serializers.ValidationError( "Email data is required for email actions", ) if ( "type" in attrs and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK and "webhook" not in attrs ): raise serializers.ValidationError( "Webhook data is required for webhook actions", ) return attrs class WorkflowSerializer(serializers.ModelSerializer): order = serializers.IntegerField(required=False) triggers = WorkflowTriggerSerializer(many=True) actions = WorkflowActionSerializer(many=True) class Meta: model = Workflow fields = [ "id", "name", "order", "enabled", "triggers", "actions", ] def update_triggers_and_actions(self, instance: Workflow, triggers, actions): set_triggers = [] set_actions = [] if triggers is not None: for trigger in triggers: filter_has_tags = trigger.pop("filter_has_tags", None) trigger_instance, _ = WorkflowTrigger.objects.update_or_create( id=trigger.get("id"), defaults=trigger, ) if filter_has_tags is not None: trigger_instance.filter_has_tags.set(filter_has_tags) set_triggers.append(trigger_instance) if actions is not None: for action in actions: assign_tags = action.pop("assign_tags", None) assign_view_users = action.pop("assign_view_users", None) assign_view_groups = action.pop("assign_view_groups", None) assign_change_users = action.pop("assign_change_users", None) assign_change_groups = action.pop("assign_change_groups", None) assign_custom_fields = action.pop("assign_custom_fields", None) remove_tags = action.pop("remove_tags", None) remove_correspondents = action.pop("remove_correspondents", None) remove_document_types = action.pop("remove_document_types", None) remove_storage_paths = action.pop("remove_storage_paths", None) remove_custom_fields = action.pop("remove_custom_fields", None) remove_owners = action.pop("remove_owners", None) remove_view_users = action.pop("remove_view_users", None) remove_view_groups = action.pop("remove_view_groups", None) remove_change_users = action.pop("remove_change_users", None) remove_change_groups = action.pop("remove_change_groups", None) email_data = action.pop("email", None) webhook_data = action.pop("webhook", None) action_instance, _ = WorkflowAction.objects.update_or_create( id=action.get("id"), defaults=action, ) if email_data is not None: serializer = WorkflowActionEmailSerializer(data=email_data) serializer.is_valid(raise_exception=True) email, _ = WorkflowActionEmail.objects.update_or_create( id=email_data.get("id"), defaults=serializer.validated_data, ) action_instance.email = email action_instance.save() if webhook_data is not None: serializer = WorkflowActionWebhookSerializer(data=webhook_data) serializer.is_valid(raise_exception=True) webhook, _ = WorkflowActionWebhook.objects.update_or_create( id=webhook_data.get("id"), defaults=serializer.validated_data, ) action_instance.webhook = webhook action_instance.save() if assign_tags is not None: action_instance.assign_tags.set(assign_tags) if assign_view_users is not None: action_instance.assign_view_users.set(assign_view_users) if assign_view_groups is not None: action_instance.assign_view_groups.set(assign_view_groups) if assign_change_users is not None: action_instance.assign_change_users.set(assign_change_users) if assign_change_groups is not None: action_instance.assign_change_groups.set(assign_change_groups) if assign_custom_fields is not None: action_instance.assign_custom_fields.set(assign_custom_fields) if remove_tags is not None: action_instance.remove_tags.set(remove_tags) if remove_correspondents is not None: action_instance.remove_correspondents.set(remove_correspondents) if remove_document_types is not None: action_instance.remove_document_types.set(remove_document_types) if remove_storage_paths is not None: action_instance.remove_storage_paths.set(remove_storage_paths) if remove_custom_fields is not None: action_instance.remove_custom_fields.set(remove_custom_fields) if remove_owners is not None: action_instance.remove_owners.set(remove_owners) if remove_view_users is not None: action_instance.remove_view_users.set(remove_view_users) if remove_view_groups is not None: action_instance.remove_view_groups.set(remove_view_groups) if remove_change_users is not None: action_instance.remove_change_users.set(remove_change_users) if remove_change_groups is not None: action_instance.remove_change_groups.set(remove_change_groups) set_actions.append(action_instance) instance.triggers.set(set_triggers) instance.actions.set(set_actions) instance.save() def prune_triggers_and_actions(self): """ ManyToMany fields dont support e.g. on_delete so we need to discard unattached triggers and actionas manually """ for trigger in WorkflowTrigger.objects.all(): if trigger.workflows.all().count() == 0: trigger.delete() for action in WorkflowAction.objects.all(): if action.workflows.all().count() == 0: action.delete() WorkflowActionEmail.objects.filter(action=None).delete() WorkflowActionWebhook.objects.filter(action=None).delete() def create(self, validated_data) -> Workflow: if "triggers" in validated_data: triggers = validated_data.pop("triggers") if "actions" in validated_data: actions = validated_data.pop("actions") instance = super().create(validated_data) self.update_triggers_and_actions(instance, triggers, actions) return instance def update(self, instance: Workflow, validated_data) -> Workflow: if "triggers" in validated_data: triggers = validated_data.pop("triggers") if "actions" in validated_data: actions = validated_data.pop("actions") instance = super().update(instance, validated_data) self.update_triggers_and_actions(instance, triggers, actions) self.prune_triggers_and_actions() return instance class TrashSerializer(SerializerWithPerms): documents = serializers.ListField( required=False, label="Documents", write_only=True, child=serializers.IntegerField(), ) action = serializers.ChoiceField( choices=["restore", "empty"], label="Action", write_only=True, ) def validate_documents(self, documents): count = Document.deleted_objects.filter(id__in=documents).count() if not count == len(documents): raise serializers.ValidationError( "Some documents in the list have not yet been deleted.", ) return documents class StoragePathTestSerializer(SerializerWithPerms): path = serializers.CharField( required=True, label="Path", write_only=True, ) document = serializers.PrimaryKeyRelatedField( queryset=Document.objects.all(), required=True, label="Document", write_only=True, )