from __future__ import annotations

import datetime
import logging
import math
import re
import zoneinfo
from decimal import Decimal
from typing import TYPE_CHECKING

import magic
from celery import states
from django.conf import settings
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.core.validators import DecimalValidator
from django.core.validators import MaxLengthValidator
from django.core.validators import RegexValidator
from django.core.validators import integer_validator
from django.utils.crypto import get_random_string
from django.utils.text import slugify
from django.utils.translation import gettext as _
from drf_spectacular.utils import extend_schema_field
from drf_writable_nested.serializers import NestedUpdateMixin
from guardian.core import ObjectPermissionChecker
from guardian.shortcuts import get_users_with_perms
from guardian.utils import get_group_obj_perms_model
from guardian.utils import get_user_obj_perms_model
from rest_framework import fields
from rest_framework import serializers
from rest_framework.fields import SerializerMethodField

if settings.AUDIT_LOG_ENABLED:
    from auditlog.context import set_actor


from documents import bulk_edit
from documents.data_models import DocumentSource
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import Note
from documents.models import PaperlessTask
from documents.models import SavedView
from documents.models import SavedViewFilterRule
from documents.models import ShareLink
from documents.models import StoragePath
from documents.models import Tag
from documents.models import UiSettings
from documents.models import Workflow
from documents.models import WorkflowAction
from documents.models import WorkflowActionEmail
from documents.models import WorkflowActionWebhook
from documents.models import WorkflowTrigger
from documents.parsers import is_mime_type_supported
from documents.permissions import get_groups_with_only_permission
from documents.permissions import set_permissions_for_object
from documents.templating.filepath import validate_filepath_template_and_render
from documents.templating.utils import convert_format_str_to_template_format
from documents.validators import uri_validator
from documents.validators import url_validator

if TYPE_CHECKING:
    from collections.abc import Iterable

logger = logging.getLogger("paperless.serializers")


# https://www.django-rest-framework.org/api-guide/serializers/#example
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
    """
    A ModelSerializer that takes an additional `fields` argument that
    controls which fields should be displayed.
    """

    def __init__(self, *args, **kwargs):
        # Don't pass the 'fields' arg up to the superclass
        fields = kwargs.pop("fields", None)

        # Instantiate the superclass normally
        super().__init__(*args, **kwargs)

        if fields is not None:
            # Drop any fields that are not specified in the `fields` argument.
            allowed = set(fields)
            existing = set(self.fields)
            for field_name in existing - allowed:
                self.fields.pop(field_name)


class MatchingModelSerializer(serializers.ModelSerializer):
    document_count = serializers.IntegerField(read_only=True)

    def get_slug(self, obj) -> str:
        return slugify(obj.name)

    slug = SerializerMethodField()

    def validate(self, data):
        # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
        name = data.get(
            "name",
            self.instance.name if hasattr(self.instance, "name") else None,
        )
        owner = (
            data["owner"]
            if "owner" in data
            else self.user
            if hasattr(self, "user")
            else None
        )
        pk = self.instance.pk if hasattr(self.instance, "pk") else None
        if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
            name=name,
            owner=owner,
        ).exclude(pk=pk).exists():
            raise serializers.ValidationError(
                {"error": "Object violates owner / name unique constraint"},
            )
        return data

    def validate_match(self, match):
        if (
            "matching_algorithm" in self.initial_data
            and self.initial_data["matching_algorithm"] == MatchingModel.MATCH_REGEX
        ):
            try:
                re.compile(match)
            except re.error as e:
                raise serializers.ValidationError(
                    _("Invalid regular expression: %(error)s") % {"error": str(e.msg)},
                )
        return match


class SetPermissionsMixin:
    def _validate_user_ids(self, user_ids):
        users = User.objects.none()
        if user_ids is not None:
            users = User.objects.filter(id__in=user_ids)
            if not users.count() == len(user_ids):
                raise serializers.ValidationError(
                    "Some users in don't exist or were specified twice.",
                )
        return users

    def _validate_group_ids(self, group_ids):
        groups = Group.objects.none()
        if group_ids is not None:
            groups = Group.objects.filter(id__in=group_ids)
            if not groups.count() == len(group_ids):
                raise serializers.ValidationError(
                    "Some groups in don't exist or were specified twice.",
                )
        return groups

    def validate_set_permissions(self, set_permissions=None):
        permissions_dict = {
            "view": {},
            "change": {},
        }
        if set_permissions is not None:
            for action in ["view", "change"]:
                if action in set_permissions:
                    if "users" in set_permissions[action]:
                        users = set_permissions[action]["users"]
                        permissions_dict[action]["users"] = self._validate_user_ids(
                            users,
                        )
                    if "groups" in set_permissions[action]:
                        groups = set_permissions[action]["groups"]
                        permissions_dict[action]["groups"] = self._validate_group_ids(
                            groups,
                        )
                else:
                    del permissions_dict[action]
        return permissions_dict

    def _set_permissions(self, permissions, object):
        set_permissions_for_object(permissions, object)


class SerializerWithPerms(serializers.Serializer):
    def __init__(self, *args, **kwargs):
        self.user = kwargs.pop("user", None)
        self.full_perms = kwargs.pop("full_perms", False)
        self.all_fields = kwargs.pop("all_fields", False)
        super().__init__(*args, **kwargs)


@extend_schema_field(
    field={
        "type": "object",
        "properties": {
            "view": {
                "type": "object",
                "properties": {
                    "users": {
                        "type": "array",
                        "items": {"type": "integer"},
                    },
                    "groups": {
                        "type": "array",
                        "items": {"type": "integer"},
                    },
                },
            },
            "change": {
                "type": "object",
                "properties": {
                    "users": {
                        "type": "array",
                        "items": {"type": "integer"},
                    },
                    "groups": {
                        "type": "array",
                        "items": {"type": "integer"},
                    },
                },
            },
        },
    },
)
class SetPermissionsSerializer(serializers.DictField):
    pass


class OwnedObjectSerializer(
    SerializerWithPerms,
    serializers.ModelSerializer,
    SetPermissionsMixin,
):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        if not self.all_fields:
            try:
                if self.full_perms:
                    self.fields.pop("user_can_change")
                    self.fields.pop("is_shared_by_requester")
                else:
                    self.fields.pop("permissions")
            except KeyError:
                pass

    @extend_schema_field(
        field={
            "type": "object",
            "properties": {
                "view": {
                    "type": "object",
                    "properties": {
                        "users": {
                            "type": "array",
                            "items": {"type": "integer"},
                        },
                        "groups": {
                            "type": "array",
                            "items": {"type": "integer"},
                        },
                    },
                },
                "change": {
                    "type": "object",
                    "properties": {
                        "users": {
                            "type": "array",
                            "items": {"type": "integer"},
                        },
                        "groups": {
                            "type": "array",
                            "items": {"type": "integer"},
                        },
                    },
                },
            },
        },
    )
    def get_permissions(self, obj) -> dict:
        view_codename = f"view_{obj.__class__.__name__.lower()}"
        change_codename = f"change_{obj.__class__.__name__.lower()}"

        return {
            "view": {
                "users": get_users_with_perms(
                    obj,
                    only_with_perms_in=[view_codename],
                    with_group_users=False,
                ).values_list("id", flat=True),
                "groups": get_groups_with_only_permission(
                    obj,
                    codename=view_codename,
                ).values_list("id", flat=True),
            },
            "change": {
                "users": get_users_with_perms(
                    obj,
                    only_with_perms_in=[change_codename],
                    with_group_users=False,
                ).values_list("id", flat=True),
                "groups": get_groups_with_only_permission(
                    obj,
                    codename=change_codename,
                ).values_list("id", flat=True),
            },
        }

    def get_user_can_change(self, obj) -> bool:
        checker = ObjectPermissionChecker(self.user) if self.user is not None else None
        return (
            obj.owner is None
            or obj.owner == self.user
            or (
                self.user is not None
                and checker.has_perm(f"change_{obj.__class__.__name__.lower()}", obj)
            )
        )

    @staticmethod
    def get_shared_object_pks(objects: Iterable):
        """
        Return the primary keys of the subset of objects that are shared.
        """
        try:
            first_obj = next(iter(objects))
        except StopIteration:
            return set()

        ctype = ContentType.objects.get_for_model(first_obj)
        object_pks = list(obj.pk for obj in objects)
        pk_type = type(first_obj.pk)

        def get_pks_for_permission_type(model):
            return map(
                pk_type,  # coerce the pk to be the same type of the provided objects
                model.objects.filter(
                    content_type=ctype,
                    object_pk__in=object_pks,
                )
                .values_list("object_pk", flat=True)
                .distinct(),
            )

        UserObjectPermission = get_user_obj_perms_model()
        GroupObjectPermission = get_group_obj_perms_model()
        user_permission_pks = get_pks_for_permission_type(UserObjectPermission)
        group_permission_pks = get_pks_for_permission_type(GroupObjectPermission)

        return set(user_permission_pks) | set(group_permission_pks)

    def get_is_shared_by_requester(self, obj: Document) -> bool:
        # First check the context to see if `shared_object_pks` is set by the parent.
        shared_object_pks = self.context.get("shared_object_pks")
        # If not just check if the current object is shared.
        if shared_object_pks is None:
            shared_object_pks = self.get_shared_object_pks([obj])
        return obj.owner == self.user and obj.id in shared_object_pks

    permissions = SerializerMethodField(read_only=True)
    user_can_change = SerializerMethodField(read_only=True)
    is_shared_by_requester = SerializerMethodField(read_only=True)

    set_permissions = SetPermissionsSerializer(
        label="Set permissions",
        allow_empty=True,
        required=False,
        write_only=True,
    )
    # other methods in mixin

    def validate_unique_together(self, validated_data, instance=None):
        # workaround for https://github.com/encode/django-rest-framework/issues/9358
        if "owner" in validated_data and "name" in self.Meta.fields:
            name = validated_data.get("name", instance.name if instance else None)
            objects = (
                self.Meta.model.objects.exclude(pk=instance.pk)
                if instance
                else self.Meta.model.objects.all()
            )
            not_unique = objects.filter(
                owner=validated_data["owner"],
                name=name,
            ).exists()
            if not_unique:
                raise serializers.ValidationError(
                    {"error": "Object violates owner / name unique constraint"},
                )

    def create(self, validated_data):
        # default to current user if not set
        request = self.context.get("request")
        if (
            "owner" not in validated_data
            or (request is not None and "owner" not in request.data)
        ) and self.user:
            validated_data["owner"] = self.user
        permissions = None
        if "set_permissions" in validated_data:
            permissions = validated_data.pop("set_permissions")
        self.validate_unique_together(validated_data)
        instance = super().create(validated_data)
        if permissions is not None:
            self._set_permissions(permissions, instance)
        return instance

    def update(self, instance, validated_data):
        if "set_permissions" in validated_data:
            self._set_permissions(validated_data["set_permissions"], instance)
        self.validate_unique_together(validated_data, instance)
        return super().update(instance, validated_data)


class OwnedObjectListSerializer(serializers.ListSerializer):
    def to_representation(self, documents):
        self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
            documents,
        )
        return super().to_representation(documents)


class CorrespondentSerializer(MatchingModelSerializer, OwnedObjectSerializer):
    last_correspondence = serializers.DateTimeField(read_only=True, required=False)

    class Meta:
        model = Correspondent
        fields = (
            "id",
            "slug",
            "name",
            "match",
            "matching_algorithm",
            "is_insensitive",
            "document_count",
            "last_correspondence",
            "owner",
            "permissions",
            "user_can_change",
            "set_permissions",
        )


class DocumentTypeSerializer(MatchingModelSerializer, OwnedObjectSerializer):
    class Meta:
        model = DocumentType
        fields = (
            "id",
            "slug",
            "name",
            "match",
            "matching_algorithm",
            "is_insensitive",
            "document_count",
            "owner",
            "permissions",
            "user_can_change",
            "set_permissions",
        )


class DeprecatedColors:
    COLOURS = (
        (1, "#a6cee3"),
        (2, "#1f78b4"),
        (3, "#b2df8a"),
        (4, "#33a02c"),
        (5, "#fb9a99"),
        (6, "#e31a1c"),
        (7, "#fdbf6f"),
        (8, "#ff7f00"),
        (9, "#cab2d6"),
        (10, "#6a3d9a"),
        (11, "#b15928"),
        (12, "#000000"),
        (13, "#cccccc"),
    )


@extend_schema_field(
    serializers.ChoiceField(
        choices=DeprecatedColors.COLOURS,
    ),
)
class ColorField(serializers.Field):
    def to_internal_value(self, data):
        for id, color in DeprecatedColors.COLOURS:
            if id == data:
                return color
        raise serializers.ValidationError

    def to_representation(self, value):
        for id, color in DeprecatedColors.COLOURS:
            if color == value:
                return id
        return 1


class TagSerializerVersion1(MatchingModelSerializer, OwnedObjectSerializer):
    colour = ColorField(source="color", default="#a6cee3")

    class Meta:
        model = Tag
        fields = (
            "id",
            "slug",
            "name",
            "colour",
            "match",
            "matching_algorithm",
            "is_insensitive",
            "is_inbox_tag",
            "document_count",
            "owner",
            "permissions",
            "user_can_change",
            "set_permissions",
        )


class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
    def get_text_color(self, obj) -> str:
        try:
            h = obj.color.lstrip("#")
            rgb = tuple(int(h[i : i + 2], 16) / 256 for i in (0, 2, 4))
            luminance = math.sqrt(
                0.299 * math.pow(rgb[0], 2)
                + 0.587 * math.pow(rgb[1], 2)
                + 0.114 * math.pow(rgb[2], 2),
            )
            return "#ffffff" if luminance < 0.53 else "#000000"
        except ValueError:
            return "#000000"

    text_color = serializers.SerializerMethodField()

    class Meta:
        model = Tag
        fields = (
            "id",
            "slug",
            "name",
            "color",
            "text_color",
            "match",
            "matching_algorithm",
            "is_insensitive",
            "is_inbox_tag",
            "document_count",
            "owner",
            "permissions",
            "user_can_change",
            "set_permissions",
        )

    def validate_color(self, color):
        regex = r"#[0-9a-fA-F]{6}"
        if not re.match(regex, color):
            raise serializers.ValidationError(_("Invalid color."))
        return color


class CorrespondentField(serializers.PrimaryKeyRelatedField):
    def get_queryset(self):
        return Correspondent.objects.all()


class TagsField(serializers.PrimaryKeyRelatedField):
    def get_queryset(self):
        return Tag.objects.all()


class DocumentTypeField(serializers.PrimaryKeyRelatedField):
    def get_queryset(self):
        return DocumentType.objects.all()


class StoragePathField(serializers.PrimaryKeyRelatedField):
    def get_queryset(self):
        return StoragePath.objects.all()


class CustomFieldSerializer(MatchingModelSerializer, serializers.ModelSerializer):
    def __init__(self, *args, **kwargs):
        context = kwargs.get("context")
        self.api_version = int(
            context.get("request").version
            if context and context.get("request")
            else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
        )
        super().__init__(*args, **kwargs)

    data_type = serializers.ChoiceField(
        choices=CustomField.FieldDataType,
        read_only=False,
    )

    class Meta:
        model = CustomField
        fields = [
            "id",
            "name",
            "data_type",
            "extra_data",
            "document_count",
            "match",
            "matching_algorithm",
            "is_insensitive",
        ]

    def validate(self, attrs):
        # TODO: remove pending https://github.com/encode/django-rest-framework/issues/7173
        name = attrs.get(
            "name",
            self.instance.name if hasattr(self.instance, "name") else None,
        )
        objects = (
            self.Meta.model.objects.exclude(
                pk=self.instance.pk,
            )
            if self.instance is not None
            else self.Meta.model.objects.all()
        )
        if ("name" in attrs) and objects.filter(
            name=name,
        ).exists():
            raise serializers.ValidationError(
                {"error": "Object violates name unique constraint"},
            )
        if (
            "data_type" in attrs
            and attrs["data_type"] == CustomField.FieldDataType.SELECT
        ) or (
            self.instance
            and self.instance.data_type == CustomField.FieldDataType.SELECT
        ):
            if (
                "extra_data" not in attrs
                or "select_options" not in attrs["extra_data"]
                or not isinstance(attrs["extra_data"]["select_options"], list)
                or len(attrs["extra_data"]["select_options"]) == 0
                or not all(
                    len(option.get("label", "")) > 0
                    for option in attrs["extra_data"]["select_options"]
                )
            ):
                raise serializers.ValidationError(
                    {"error": "extra_data.select_options must be a valid list"},
                )
            # labels are valid, generate ids if not present
            for option in attrs["extra_data"]["select_options"]:
                if option.get("id") is None:
                    option["id"] = get_random_string(length=16)
        elif (
            "data_type" in attrs
            and attrs["data_type"] == CustomField.FieldDataType.MONETARY
            and "extra_data" in attrs
            and "default_currency" in attrs["extra_data"]
            and attrs["extra_data"]["default_currency"] is not None
            and (
                not isinstance(attrs["extra_data"]["default_currency"], str)
                or (
                    len(attrs["extra_data"]["default_currency"]) > 0
                    and len(attrs["extra_data"]["default_currency"]) != 3
                )
            )
        ):
            raise serializers.ValidationError(
                {"error": "extra_data.default_currency must be a 3-character string"},
            )
        return super().validate(attrs)

    def to_internal_value(self, data):
        ret = super().to_internal_value(data)

        if (
            self.api_version < 7
            and ret.get("data_type", "") == CustomField.FieldDataType.SELECT
            and isinstance(ret.get("extra_data", {}).get("select_options"), list)
        ):
            ret["extra_data"]["select_options"] = [
                {
                    "label": option,
                    "id": get_random_string(length=16),
                }
                for option in ret["extra_data"]["select_options"]
            ]

        return ret

    def to_representation(self, instance):
        ret = super().to_representation(instance)

        if (
            self.api_version < 7
            and instance.data_type == CustomField.FieldDataType.SELECT
        ):
            # Convert the select options with ids to a list of strings
            ret["extra_data"]["select_options"] = [
                option["label"] for option in ret["extra_data"]["select_options"]
            ]

        return ret


class ReadWriteSerializerMethodField(serializers.SerializerMethodField):
    """
    Based on https://stackoverflow.com/a/62579804
    """

    def __init__(self, method_name=None, *args, **kwargs):
        self.method_name = method_name
        kwargs["source"] = "*"
        super(serializers.SerializerMethodField, self).__init__(*args, **kwargs)

    def to_internal_value(self, data):
        return {self.field_name: data}


class CustomFieldInstanceSerializer(serializers.ModelSerializer):
    field = serializers.PrimaryKeyRelatedField(queryset=CustomField.objects.all())
    value = ReadWriteSerializerMethodField(allow_null=True)

    def create(self, validated_data):
        # An instance is attached to a document
        document: Document = validated_data["document"]
        # And to a CustomField
        custom_field: CustomField = validated_data["field"]
        # This key must exist, as it is validated
        data_store_name = CustomFieldInstance.get_value_field_name(
            custom_field.data_type,
        )

        if custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
            # prior to update so we can look for any docs that are going to be removed
            bulk_edit.reflect_doclinks(document, custom_field, validated_data["value"])

        # Actually update or create the instance, providing the value
        # to fill in the correct attribute based on the type
        instance, _ = CustomFieldInstance.objects.update_or_create(
            document=document,
            field=custom_field,
            defaults={data_store_name: validated_data["value"]},
        )
        return instance

    def get_value(self, obj: CustomFieldInstance) -> str | int | float | dict | None:
        return obj.value

    def validate(self, data):
        """
        Probably because we're kind of doing it odd, validation from the model
        doesn't run against the field "value", so we have to re-create it here.

        Don't like it, but it is better than returning an HTTP 500 when the database
        hates the value
        """
        data = super().validate(data)
        field: CustomField = data["field"]
        if "value" in data and data["value"] is not None:
            if (
                field.data_type == CustomField.FieldDataType.URL
                and len(data["value"]) > 0
            ):
                uri_validator(data["value"])
            elif field.data_type == CustomField.FieldDataType.INT:
                integer_validator(data["value"])
            elif (
                field.data_type == CustomField.FieldDataType.MONETARY
                and data["value"] != ""
            ):
                try:
                    # First try to validate as a number from legacy format
                    DecimalValidator(max_digits=12, decimal_places=2)(
                        Decimal(str(data["value"])),
                    )
                except Exception:
                    # If that fails, try to validate as a monetary string
                    RegexValidator(
                        regex=r"^[A-Z]{3}-?\d+(\.\d{1,2})$",
                        message="Must be a two-decimal number with optional currency code e.g. GBP123.45",
                    )(data["value"])
            elif field.data_type == CustomField.FieldDataType.STRING:
                MaxLengthValidator(limit_value=128)(data["value"])
            elif field.data_type == CustomField.FieldDataType.SELECT:
                select_options = field.extra_data["select_options"]
                try:
                    next(
                        option
                        for option in select_options
                        if option["id"] == data["value"]
                    )
                except Exception:
                    raise serializers.ValidationError(
                        f"Value must be an id of an element in {select_options}",
                    )
            elif field.data_type == CustomField.FieldDataType.DOCUMENTLINK:
                if not (isinstance(data["value"], list) or data["value"] is None):
                    raise serializers.ValidationError(
                        "Value must be a list",
                    )
                doc_ids = data["value"]
                if Document.objects.filter(id__in=doc_ids).count() != len(
                    data["value"],
                ):
                    raise serializers.ValidationError(
                        "Some documents in value don't exist or were specified twice.",
                    )

        return data

    def get_api_version(self):
        return int(
            self.context.get("request").version
            if self.context.get("request")
            else settings.REST_FRAMEWORK["DEFAULT_VERSION"],
        )

    def to_internal_value(self, data):
        ret = super().to_internal_value(data)

        if (
            self.get_api_version() < 7
            and ret.get("field").data_type == CustomField.FieldDataType.SELECT
            and ret.get("value") is not None
        ):
            # Convert the index of the option in the field.extra_data["select_options"]
            # list to the options unique id
            ret["value"] = ret.get("field").extra_data["select_options"][ret["value"]][
                "id"
            ]

        return ret

    def to_representation(self, instance):
        ret = super().to_representation(instance)

        if (
            self.get_api_version() < 7
            and instance.field.data_type == CustomField.FieldDataType.SELECT
        ):
            # return the index of the option in the field.extra_data["select_options"] list
            ret["value"] = next(
                (
                    idx
                    for idx, option in enumerate(
                        instance.field.extra_data["select_options"],
                    )
                    if option["id"] == instance.value
                ),
                None,
            )

        return ret

    class Meta:
        model = CustomFieldInstance
        fields = [
            "value",
            "field",
        ]


class BasicUserSerializer(serializers.ModelSerializer):
    # Different than paperless.serializers.UserSerializer
    class Meta:
        model = User
        fields = ["id", "username", "first_name", "last_name"]


class NotesSerializer(serializers.ModelSerializer):
    user = BasicUserSerializer()

    class Meta:
        model = Note
        fields = ["id", "note", "created", "user"]
        ordering = ["-created"]


class DocumentSerializer(
    OwnedObjectSerializer,
    NestedUpdateMixin,
    DynamicFieldsModelSerializer,
):
    correspondent = CorrespondentField(allow_null=True)
    tags = TagsField(many=True)
    document_type = DocumentTypeField(allow_null=True)
    storage_path = StoragePathField(allow_null=True)

    original_file_name = SerializerMethodField()
    archived_file_name = SerializerMethodField()
    created_date = serializers.DateField(required=False)
    page_count = SerializerMethodField()

    notes = NotesSerializer(many=True, required=False)

    custom_fields = CustomFieldInstanceSerializer(
        many=True,
        allow_null=False,
        required=False,
    )

    owner = serializers.PrimaryKeyRelatedField(
        queryset=User.objects.all(),
        required=False,
        allow_null=True,
    )

    remove_inbox_tags = serializers.BooleanField(
        default=False,
        write_only=True,
        allow_null=True,
        required=False,
    )

    def get_page_count(self, obj) -> int | None:
        return obj.page_count

    def get_original_file_name(self, obj) -> str | None:
        return obj.original_filename

    def get_archived_file_name(self, obj) -> str | None:
        if obj.has_archive_version:
            return obj.get_public_filename(archive=True)
        else:
            return None

    def to_representation(self, instance):
        doc = super().to_representation(instance)
        if self.truncate_content and "content" in self.fields:
            doc["content"] = doc.get("content")[0:550]
        return doc

    def validate(self, attrs):
        if (
            "archive_serial_number" in attrs
            and attrs["archive_serial_number"] is not None
            and len(str(attrs["archive_serial_number"])) > 0
            and Document.deleted_objects.filter(
                archive_serial_number=attrs["archive_serial_number"],
            ).exists()
        ):
            raise serializers.ValidationError(
                {
                    "archive_serial_number": [
                        "Document with this Archive Serial Number already exists in the trash.",
                    ],
                },
            )
        return super().validate(attrs)

    def update(self, instance: Document, validated_data):
        if "created_date" in validated_data and "created" not in validated_data:
            new_datetime = datetime.datetime.combine(
                validated_data.get("created_date"),
                datetime.time(0, 0, 0, 0, zoneinfo.ZoneInfo(settings.TIME_ZONE)),
            )
            instance.created = new_datetime
            instance.save()
        if "created_date" in validated_data:
            validated_data.pop("created_date")
        if instance.custom_fields.count() > 0 and "custom_fields" in validated_data:
            incoming_custom_fields = [
                field["field"] for field in validated_data["custom_fields"]
            ]
            for custom_field_instance in instance.custom_fields.filter(
                field__data_type=CustomField.FieldDataType.DOCUMENTLINK,
            ):
                if (
                    custom_field_instance.field not in incoming_custom_fields
                    and custom_field_instance.value is not None
                ):
                    # Doc link field is being removed entirely
                    for doc_id in custom_field_instance.value:
                        bulk_edit.remove_doclink(
                            instance,
                            custom_field_instance.field,
                            doc_id,
                        )
        if validated_data.get("remove_inbox_tags"):
            tag_ids_being_added = (
                [
                    tag.id
                    for tag in validated_data["tags"]
                    if tag not in instance.tags.all()
                ]
                if "tags" in validated_data
                else []
            )
            inbox_tags_not_being_added = Tag.objects.filter(is_inbox_tag=True).exclude(
                id__in=tag_ids_being_added,
            )
            if "tags" in validated_data:
                validated_data["tags"] = [
                    tag
                    for tag in validated_data["tags"]
                    if tag not in inbox_tags_not_being_added
                ]
            else:
                validated_data["tags"] = [
                    tag
                    for tag in instance.tags.all()
                    if tag not in inbox_tags_not_being_added
                ]
        if settings.AUDIT_LOG_ENABLED:
            with set_actor(self.user):
                super().update(instance, validated_data)
        else:
            super().update(instance, validated_data)
        # hard delete custom field instances that were soft deleted
        CustomFieldInstance.deleted_objects.filter(document=instance).delete()
        return instance

    def __init__(self, *args, **kwargs):
        self.truncate_content = kwargs.pop("truncate_content", False)

        # return full permissions if we're doing a PATCH or PUT
        context = kwargs.get("context")
        if context is not None and (
            context.get("request").method == "PATCH"
            or context.get("request").method == "PUT"
        ):
            kwargs["full_perms"] = True

        super().__init__(*args, **kwargs)

    class Meta:
        model = Document
        fields = (
            "id",
            "correspondent",
            "document_type",
            "storage_path",
            "title",
            "content",
            "tags",
            "created",
            "created_date",
            "modified",
            "added",
            "deleted_at",
            "archive_serial_number",
            "original_file_name",
            "archived_file_name",
            "owner",
            "permissions",
            "user_can_change",
            "is_shared_by_requester",
            "set_permissions",
            "notes",
            "custom_fields",
            "remove_inbox_tags",
            "page_count",
            "mime_type",
        )
        list_serializer_class = OwnedObjectListSerializer


class SearchResultListSerializer(serializers.ListSerializer):
    def to_representation(self, hits):
        document_ids = [hit["id"] for hit in hits]
        # Fetch all Document objects in the list in one SQL query.
        documents = self.child.fetch_documents(document_ids)
        self.child.context["documents"] = documents
        # Also check if they are shared with other users / groups.
        self.child.context["shared_object_pks"] = self.child.get_shared_object_pks(
            documents.values(),
        )

        return super().to_representation(hits)


class SearchResultSerializer(DocumentSerializer):
    @staticmethod
    def fetch_documents(ids):
        """
        Return a dict that maps given document IDs to Document objects.
        """
        return {
            document.id: document
            for document in Document.objects.select_related(
                "correspondent",
                "storage_path",
                "document_type",
                "owner",
            )
            .prefetch_related("tags", "custom_fields", "notes")
            .filter(id__in=ids)
        }

    def to_representation(self, hit):
        # Again we first check if the parent has already fetched the documents.
        documents = self.context.get("documents")
        # Otherwise we fetch this document.
        if documents is None:  # pragma: no cover
            # In practice we only serialize **lists** of whoosh.searching.Hit.
            # I'm keeping this check for completeness but marking it no cover for now.
            documents = self.fetch_documents([hit["id"]])
        document = documents[hit["id"]]

        notes = ",".join(
            [str(c.note) for c in document.notes.all()],
        )
        r = super().to_representation(document)
        r["__search_hit__"] = {
            "score": hit.score,
            "highlights": hit.highlights("content", text=document.content),
            "note_highlights": (
                hit.highlights("notes", text=notes) if document else None
            ),
            "rank": hit.rank,
        }

        return r

    class Meta(DocumentSerializer.Meta):
        list_serializer_class = SearchResultListSerializer


class SavedViewFilterRuleSerializer(serializers.ModelSerializer):
    class Meta:
        model = SavedViewFilterRule
        fields = ["rule_type", "value"]


class SavedViewSerializer(OwnedObjectSerializer):
    filter_rules = SavedViewFilterRuleSerializer(many=True)

    class Meta:
        model = SavedView
        fields = [
            "id",
            "name",
            "show_on_dashboard",
            "show_in_sidebar",
            "sort_field",
            "sort_reverse",
            "filter_rules",
            "page_size",
            "display_mode",
            "display_fields",
            "owner",
            "permissions",
            "user_can_change",
            "set_permissions",
        ]

    def validate(self, attrs):
        attrs = super().validate(attrs)
        if "display_fields" in attrs and attrs["display_fields"] is not None:
            for field in attrs["display_fields"]:
                if (
                    SavedView.DisplayFields.CUSTOM_FIELD[:-2] in field
                ):  # i.e. check for 'custom_field_' prefix
                    field_id = int(re.search(r"\d+", field)[0])
                    if not CustomField.objects.filter(id=field_id).exists():
                        raise serializers.ValidationError(
                            f"Invalid field: {field}",
                        )
                elif field not in SavedView.DisplayFields.values:
                    raise serializers.ValidationError(
                        f"Invalid field: {field}",
                    )
        return attrs

    def update(self, instance, validated_data):
        if "filter_rules" in validated_data:
            rules_data = validated_data.pop("filter_rules")
        else:
            rules_data = None
        if "user" in validated_data:
            # backwards compatibility
            validated_data["owner"] = validated_data.pop("user")
        if (
            "display_fields" in validated_data
            and isinstance(
                validated_data["display_fields"],
                list,
            )
            and len(validated_data["display_fields"]) == 0
        ):
            validated_data["display_fields"] = None
        super().update(instance, validated_data)
        if rules_data is not None:
            SavedViewFilterRule.objects.filter(saved_view=instance).delete()
            for rule_data in rules_data:
                SavedViewFilterRule.objects.create(saved_view=instance, **rule_data)
        return instance

    def create(self, validated_data):
        rules_data = validated_data.pop("filter_rules")
        if "user" in validated_data:
            # backwards compatibility
            validated_data["owner"] = validated_data.pop("user")
        saved_view = SavedView.objects.create(**validated_data)
        for rule_data in rules_data:
            SavedViewFilterRule.objects.create(saved_view=saved_view, **rule_data)
        return saved_view


class DocumentListSerializer(serializers.Serializer):
    documents = serializers.ListField(
        required=True,
        label="Documents",
        write_only=True,
        child=serializers.IntegerField(),
    )

    def _validate_document_id_list(self, documents, name="documents"):
        if not isinstance(documents, list):
            raise serializers.ValidationError(f"{name} must be a list")
        if not all(isinstance(i, int) for i in documents):
            raise serializers.ValidationError(f"{name} must be a list of integers")
        count = Document.objects.filter(id__in=documents).count()
        if not count == len(documents):
            raise serializers.ValidationError(
                f"Some documents in {name} don't exist or were specified twice.",
            )

    def validate_documents(self, documents):
        self._validate_document_id_list(documents)
        return documents


class BulkEditSerializer(
    SerializerWithPerms,
    DocumentListSerializer,
    SetPermissionsMixin,
):
    method = serializers.ChoiceField(
        choices=[
            "set_correspondent",
            "set_document_type",
            "set_storage_path",
            "add_tag",
            "remove_tag",
            "modify_tags",
            "modify_custom_fields",
            "delete",
            "reprocess",
            "set_permissions",
            "rotate",
            "merge",
            "split",
            "delete_pages",
        ],
        label="Method",
        write_only=True,
    )

    parameters = serializers.DictField(allow_empty=True, default={}, write_only=True)

    def _validate_tag_id_list(self, tags, name="tags"):
        if not isinstance(tags, list):
            raise serializers.ValidationError(f"{name} must be a list")
        if not all(isinstance(i, int) for i in tags):
            raise serializers.ValidationError(f"{name} must be a list of integers")
        count = Tag.objects.filter(id__in=tags).count()
        if not count == len(tags):
            raise serializers.ValidationError(
                f"Some tags in {name} don't exist or were specified twice.",
            )

    def _validate_custom_field_id_list_or_dict(
        self,
        custom_fields,
        name="custom_fields",
    ):
        ids = custom_fields
        if isinstance(custom_fields, dict):
            try:
                ids = [int(i[0]) for i in custom_fields.items()]
            except Exception as e:
                logger.exception(f"Error validating custom fields: {e}")
                raise serializers.ValidationError(
                    f"{name} must be a list of integers or a dict of id:value pairs, see the log for details",
                )
        elif not isinstance(custom_fields, list) or not all(
            isinstance(i, int) for i in ids
        ):
            raise serializers.ValidationError(
                f"{name} must be a list of integers or a dict of id:value pairs",
            )
        count = CustomField.objects.filter(id__in=ids).count()
        if not count == len(ids):
            raise serializers.ValidationError(
                f"Some custom fields in {name} don't exist or were specified twice.",
            )

    def validate_method(self, method):
        if method == "set_correspondent":
            return bulk_edit.set_correspondent
        elif method == "set_document_type":
            return bulk_edit.set_document_type
        elif method == "set_storage_path":
            return bulk_edit.set_storage_path
        elif method == "add_tag":
            return bulk_edit.add_tag
        elif method == "remove_tag":
            return bulk_edit.remove_tag
        elif method == "modify_tags":
            return bulk_edit.modify_tags
        elif method == "modify_custom_fields":
            return bulk_edit.modify_custom_fields
        elif method == "delete":
            return bulk_edit.delete
        elif method == "redo_ocr" or method == "reprocess":
            return bulk_edit.reprocess
        elif method == "set_permissions":
            return bulk_edit.set_permissions
        elif method == "rotate":
            return bulk_edit.rotate
        elif method == "merge":
            return bulk_edit.merge
        elif method == "split":
            return bulk_edit.split
        elif method == "delete_pages":
            return bulk_edit.delete_pages
        else:
            raise serializers.ValidationError("Unsupported method.")

    def _validate_parameters_tags(self, parameters):
        if "tag" in parameters:
            tag_id = parameters["tag"]
            try:
                Tag.objects.get(id=tag_id)
            except Tag.DoesNotExist:
                raise serializers.ValidationError("Tag does not exist")
        else:
            raise serializers.ValidationError("tag not specified")

    def _validate_parameters_document_type(self, parameters):
        if "document_type" in parameters:
            document_type_id = parameters["document_type"]
            if document_type_id is None:
                # None is ok
                return
            try:
                DocumentType.objects.get(id=document_type_id)
            except DocumentType.DoesNotExist:
                raise serializers.ValidationError("Document type does not exist")
        else:
            raise serializers.ValidationError("document_type not specified")

    def _validate_parameters_correspondent(self, parameters):
        if "correspondent" in parameters:
            correspondent_id = parameters["correspondent"]
            if correspondent_id is None:
                return
            try:
                Correspondent.objects.get(id=correspondent_id)
            except Correspondent.DoesNotExist:
                raise serializers.ValidationError("Correspondent does not exist")
        else:
            raise serializers.ValidationError("correspondent not specified")

    def _validate_storage_path(self, parameters):
        if "storage_path" in parameters:
            storage_path_id = parameters["storage_path"]
            if storage_path_id is None:
                return
            try:
                StoragePath.objects.get(id=storage_path_id)
            except StoragePath.DoesNotExist:
                raise serializers.ValidationError(
                    "Storage path does not exist",
                )
        else:
            raise serializers.ValidationError("storage path not specified")

    def _validate_parameters_modify_tags(self, parameters):
        if "add_tags" in parameters:
            self._validate_tag_id_list(parameters["add_tags"], "add_tags")
        else:
            raise serializers.ValidationError("add_tags not specified")

        if "remove_tags" in parameters:
            self._validate_tag_id_list(parameters["remove_tags"], "remove_tags")
        else:
            raise serializers.ValidationError("remove_tags not specified")

    def _validate_parameters_modify_custom_fields(self, parameters):
        if "add_custom_fields" in parameters:
            self._validate_custom_field_id_list_or_dict(
                parameters["add_custom_fields"],
                "add_custom_fields",
            )
        else:
            raise serializers.ValidationError("add_custom_fields not specified")

        if "remove_custom_fields" in parameters:
            self._validate_custom_field_id_list_or_dict(
                parameters["remove_custom_fields"],
                "remove_custom_fields",
            )
        else:
            raise serializers.ValidationError("remove_custom_fields not specified")

    def _validate_owner(self, owner):
        ownerUser = User.objects.get(pk=owner)
        if ownerUser is None:
            raise serializers.ValidationError("Specified owner cannot be found")
        return ownerUser

    def _validate_parameters_set_permissions(self, parameters):
        parameters["set_permissions"] = self.validate_set_permissions(
            parameters["set_permissions"],
        )
        if "owner" in parameters and parameters["owner"] is not None:
            self._validate_owner(parameters["owner"])
        if "merge" not in parameters:
            parameters["merge"] = False

    def _validate_parameters_rotate(self, parameters):
        try:
            if (
                "degrees" not in parameters
                or not float(parameters["degrees"]).is_integer()
            ):
                raise serializers.ValidationError("invalid rotation degrees")
        except ValueError:
            raise serializers.ValidationError("invalid rotation degrees")

    def _validate_parameters_split(self, parameters):
        if "pages" not in parameters:
            raise serializers.ValidationError("pages not specified")
        try:
            pages = []
            docs = parameters["pages"].split(",")
            for doc in docs:
                if "-" in doc:
                    pages.append(
                        [
                            x
                            for x in range(
                                int(doc.split("-")[0]),
                                int(doc.split("-")[1]) + 1,
                            )
                        ],
                    )
                else:
                    pages.append([int(doc)])
            parameters["pages"] = pages
        except ValueError:
            raise serializers.ValidationError("invalid pages specified")

        if "delete_originals" in parameters:
            if not isinstance(parameters["delete_originals"], bool):
                raise serializers.ValidationError("delete_originals must be a boolean")
        else:
            parameters["delete_originals"] = False

    def _validate_parameters_delete_pages(self, parameters):
        if "pages" not in parameters:
            raise serializers.ValidationError("pages not specified")
        if not isinstance(parameters["pages"], list):
            raise serializers.ValidationError("pages must be a list")
        if not all(isinstance(i, int) for i in parameters["pages"]):
            raise serializers.ValidationError("pages must be a list of integers")

    def _validate_parameters_merge(self, parameters):
        if "delete_originals" in parameters:
            if not isinstance(parameters["delete_originals"], bool):
                raise serializers.ValidationError("delete_originals must be a boolean")
        else:
            parameters["delete_originals"] = False
        if "archive_fallback" in parameters:
            if not isinstance(parameters["archive_fallback"], bool):
                raise serializers.ValidationError("archive_fallback must be a boolean")
        else:
            parameters["archive_fallback"] = False

    def validate(self, attrs):
        method = attrs["method"]
        parameters = attrs["parameters"]

        if method == bulk_edit.set_correspondent:
            self._validate_parameters_correspondent(parameters)
        elif method == bulk_edit.set_document_type:
            self._validate_parameters_document_type(parameters)
        elif method == bulk_edit.add_tag or method == bulk_edit.remove_tag:
            self._validate_parameters_tags(parameters)
        elif method == bulk_edit.modify_tags:
            self._validate_parameters_modify_tags(parameters)
        elif method == bulk_edit.set_storage_path:
            self._validate_storage_path(parameters)
        elif method == bulk_edit.modify_custom_fields:
            self._validate_parameters_modify_custom_fields(parameters)
        elif method == bulk_edit.set_permissions:
            self._validate_parameters_set_permissions(parameters)
        elif method == bulk_edit.rotate:
            self._validate_parameters_rotate(parameters)
        elif method == bulk_edit.split:
            if len(attrs["documents"]) > 1:
                raise serializers.ValidationError(
                    "Split method only supports one document",
                )
            self._validate_parameters_split(parameters)
        elif method == bulk_edit.delete_pages:
            if len(attrs["documents"]) > 1:
                raise serializers.ValidationError(
                    "Delete pages method only supports one document",
                )
            self._validate_parameters_delete_pages(parameters)
        elif method == bulk_edit.merge:
            self._validate_parameters_merge(parameters)

        return attrs


class PostDocumentSerializer(serializers.Serializer):
    created = serializers.DateTimeField(
        label="Created",
        allow_null=True,
        write_only=True,
        required=False,
    )

    document = serializers.FileField(
        label="Document",
        write_only=True,
    )

    title = serializers.CharField(
        label="Title",
        write_only=True,
        required=False,
    )

    correspondent = serializers.PrimaryKeyRelatedField(
        queryset=Correspondent.objects.all(),
        label="Correspondent",
        allow_null=True,
        write_only=True,
        required=False,
    )

    document_type = serializers.PrimaryKeyRelatedField(
        queryset=DocumentType.objects.all(),
        label="Document type",
        allow_null=True,
        write_only=True,
        required=False,
    )

    storage_path = serializers.PrimaryKeyRelatedField(
        queryset=StoragePath.objects.all(),
        label="Storage path",
        allow_null=True,
        write_only=True,
        required=False,
    )

    tags = serializers.PrimaryKeyRelatedField(
        many=True,
        queryset=Tag.objects.all(),
        label="Tags",
        write_only=True,
        required=False,
    )

    archive_serial_number = serializers.IntegerField(
        label="ASN",
        write_only=True,
        required=False,
        min_value=Document.ARCHIVE_SERIAL_NUMBER_MIN,
        max_value=Document.ARCHIVE_SERIAL_NUMBER_MAX,
    )

    custom_fields = serializers.PrimaryKeyRelatedField(
        many=True,
        queryset=CustomField.objects.all(),
        label="Custom fields",
        write_only=True,
        required=False,
    )

    from_webui = serializers.BooleanField(
        label="Documents are from Paperless-ngx WebUI",
        write_only=True,
        required=False,
    )

    def validate_document(self, document):
        document_data = document.file.read()
        mime_type = magic.from_buffer(document_data, mime=True)

        if not is_mime_type_supported(mime_type):
            if (
                mime_type in settings.CONSUMER_PDF_RECOVERABLE_MIME_TYPES
                and document.name.endswith(
                    ".pdf",
                )
            ):
                # If the file is an invalid PDF, we can try to recover it later in the consumer
                mime_type = "application/pdf"
            else:
                raise serializers.ValidationError(
                    _("File type %(type)s not supported") % {"type": mime_type},
                )

        return document.name, document_data

    def validate_correspondent(self, correspondent):
        if correspondent:
            return correspondent.id
        else:
            return None

    def validate_document_type(self, document_type):
        if document_type:
            return document_type.id
        else:
            return None

    def validate_storage_path(self, storage_path):
        if storage_path:
            return storage_path.id
        else:
            return None

    def validate_tags(self, tags):
        if tags:
            return [tag.id for tag in tags]
        else:
            return None

    def validate_custom_fields(self, custom_fields):
        if custom_fields:
            return [custom_field.id for custom_field in custom_fields]
        else:
            return None


class BulkDownloadSerializer(DocumentListSerializer):
    content = serializers.ChoiceField(
        choices=["archive", "originals", "both"],
        default="archive",
    )

    compression = serializers.ChoiceField(
        choices=["none", "deflated", "bzip2", "lzma"],
        default="none",
    )

    follow_formatting = serializers.BooleanField(
        default=False,
    )

    def validate_compression(self, compression):
        import zipfile

        return {
            "none": zipfile.ZIP_STORED,
            "deflated": zipfile.ZIP_DEFLATED,
            "bzip2": zipfile.ZIP_BZIP2,
            "lzma": zipfile.ZIP_LZMA,
        }[compression]


class StoragePathSerializer(MatchingModelSerializer, OwnedObjectSerializer):
    class Meta:
        model = StoragePath
        fields = (
            "id",
            "slug",
            "name",
            "path",
            "match",
            "matching_algorithm",
            "is_insensitive",
            "document_count",
            "owner",
            "permissions",
            "user_can_change",
            "set_permissions",
        )

    def validate_path(self, path: str):
        converted_path = convert_format_str_to_template_format(path)
        if converted_path != path:
            logger.warning(
                f"Storage path {path} is not using the new style format, consider updating",
            )
        result = validate_filepath_template_and_render(converted_path)

        if result is None:
            raise serializers.ValidationError(_("Invalid variable detected."))

        return converted_path

    def update(self, instance, validated_data):
        """
        When a storage path is updated, see if documents
        using it require a rename/move
        """
        doc_ids = [doc.id for doc in instance.documents.all()]
        if len(doc_ids):
            bulk_edit.bulk_update_documents.delay(doc_ids)

        return super().update(instance, validated_data)


class UiSettingsViewSerializer(serializers.ModelSerializer):
    class Meta:
        model = UiSettings
        depth = 1
        fields = [
            "id",
            "settings",
        ]

    def validate_settings(self, settings):
        # we never save update checking backend setting
        if "update_checking" in settings:
            try:
                settings["update_checking"].pop("backend_setting")
            except KeyError:
                pass
        return settings

    def create(self, validated_data):
        ui_settings = UiSettings.objects.update_or_create(
            user=validated_data.get("user"),
            defaults={"settings": validated_data.get("settings", None)},
        )
        return ui_settings


class TasksViewSerializer(OwnedObjectSerializer):
    class Meta:
        model = PaperlessTask
        fields = (
            "id",
            "task_id",
            "task_name",
            "task_file_name",
            "date_created",
            "date_done",
            "type",
            "status",
            "result",
            "acknowledged",
            "related_document",
            "owner",
        )

    related_document = serializers.SerializerMethodField()
    created_doc_re = re.compile(r"New document id (\d+) created")
    duplicate_doc_re = re.compile(r"It is a duplicate of .* \(#(\d+)\)")

    def get_related_document(self, obj) -> str | None:
        result = None
        re = None
        if obj.result:
            match obj.status:
                case states.SUCCESS:
                    re = self.created_doc_re
                case states.FAILURE:
                    re = (
                        self.duplicate_doc_re
                        if "existing document is in the trash" not in obj.result
                        else None
                    )
            if re is not None:
                try:
                    result = re.search(obj.result).group(1)
                except Exception:
                    pass

        return result


class RunTaskViewSerializer(serializers.Serializer):
    task_name = serializers.ChoiceField(
        choices=PaperlessTask.TaskName.choices,
        label="Task Name",
        write_only=True,
    )


class AcknowledgeTasksViewSerializer(serializers.Serializer):
    tasks = serializers.ListField(
        required=True,
        label="Tasks",
        write_only=True,
        child=serializers.IntegerField(),
    )

    def _validate_task_id_list(self, tasks, name="tasks"):
        if not isinstance(tasks, list):
            raise serializers.ValidationError(f"{name} must be a list")
        if not all(isinstance(i, int) for i in tasks):
            raise serializers.ValidationError(f"{name} must be a list of integers")
        count = PaperlessTask.objects.filter(id__in=tasks).count()
        if not count == len(tasks):
            raise serializers.ValidationError(
                f"Some tasks in {name} don't exist or were specified twice.",
            )

    def validate_tasks(self, tasks):
        self._validate_task_id_list(tasks)
        return tasks


class ShareLinkSerializer(OwnedObjectSerializer):
    class Meta:
        model = ShareLink
        fields = (
            "id",
            "created",
            "expiration",
            "slug",
            "document",
            "file_version",
        )

    def create(self, validated_data):
        validated_data["slug"] = get_random_string(50)
        return super().create(validated_data)


class BulkEditObjectsSerializer(SerializerWithPerms, SetPermissionsMixin):
    objects = serializers.ListField(
        required=True,
        allow_empty=False,
        label="Objects",
        write_only=True,
        child=serializers.IntegerField(),
    )

    object_type = serializers.ChoiceField(
        choices=[
            "tags",
            "correspondents",
            "document_types",
            "storage_paths",
        ],
        label="Object Type",
        write_only=True,
    )

    operation = serializers.ChoiceField(
        choices=[
            "set_permissions",
            "delete",
        ],
        label="Operation",
        required=True,
        write_only=True,
    )

    owner = serializers.PrimaryKeyRelatedField(
        queryset=User.objects.all(),
        required=False,
        allow_null=True,
    )

    permissions = serializers.DictField(
        label="Set permissions",
        allow_empty=False,
        required=False,
        write_only=True,
    )

    merge = serializers.BooleanField(
        default=False,
        write_only=True,
        required=False,
    )

    def get_object_class(self, object_type):
        object_class = None
        if object_type == "tags":
            object_class = Tag
        elif object_type == "correspondents":
            object_class = Correspondent
        elif object_type == "document_types":
            object_class = DocumentType
        elif object_type == "storage_paths":
            object_class = StoragePath
        return object_class

    def _validate_objects(self, objects, object_type):
        if not isinstance(objects, list):
            raise serializers.ValidationError("objects must be a list")
        if not all(isinstance(i, int) for i in objects):
            raise serializers.ValidationError("objects must be a list of integers")
        object_class = self.get_object_class(object_type)
        count = object_class.objects.filter(id__in=objects).count()
        if not count == len(objects):
            raise serializers.ValidationError(
                "Some ids in objects don't exist or were specified twice.",
            )
        return objects

    def _validate_permissions(self, permissions):
        self.validate_set_permissions(
            permissions,
        )

    def validate(self, attrs):
        object_type = attrs["object_type"]
        objects = attrs["objects"]
        operation = attrs.get("operation")

        self._validate_objects(objects, object_type)

        if operation == "set_permissions":
            permissions = attrs.get("permissions")
            if permissions is not None:
                self._validate_permissions(permissions)

        return attrs


class WorkflowTriggerSerializer(serializers.ModelSerializer):
    id = serializers.IntegerField(required=False, allow_null=True)
    sources = fields.MultipleChoiceField(
        choices=WorkflowTrigger.DocumentSourceChoices.choices,
        allow_empty=True,
        default={
            DocumentSource.ConsumeFolder,
            DocumentSource.ApiUpload,
            DocumentSource.MailFetch,
        },
    )

    type = serializers.ChoiceField(
        choices=WorkflowTrigger.WorkflowTriggerType.choices,
        label="Trigger Type",
    )

    class Meta:
        model = WorkflowTrigger
        fields = [
            "id",
            "sources",
            "type",
            "filter_path",
            "filter_filename",
            "filter_mailrule",
            "matching_algorithm",
            "match",
            "is_insensitive",
            "filter_has_tags",
            "filter_has_correspondent",
            "filter_has_document_type",
            "schedule_offset_days",
            "schedule_is_recurring",
            "schedule_recurring_interval_days",
            "schedule_date_field",
            "schedule_date_custom_field",
        ]

    def validate(self, attrs):
        # Empty strings treated as None to avoid unexpected behavior
        if (
            "filter_filename" in attrs
            and attrs["filter_filename"] is not None
            and len(attrs["filter_filename"]) == 0
        ):
            attrs["filter_filename"] = None
        if (
            "filter_path" in attrs
            and attrs["filter_path"] is not None
            and len(attrs["filter_path"]) == 0
        ):
            attrs["filter_path"] = None

        if (
            attrs["type"] == WorkflowTrigger.WorkflowTriggerType.CONSUMPTION
            and "filter_mailrule" not in attrs
            and ("filter_filename" not in attrs or attrs["filter_filename"] is None)
            and ("filter_path" not in attrs or attrs["filter_path"] is None)
        ):
            raise serializers.ValidationError(
                "File name, path or mail rule filter are required",
            )

        return attrs


class WorkflowActionEmailSerializer(serializers.ModelSerializer):
    id = serializers.IntegerField(allow_null=True, required=False)

    class Meta:
        model = WorkflowActionEmail
        fields = [
            "id",
            "subject",
            "body",
            "to",
            "include_document",
        ]


class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
    id = serializers.IntegerField(allow_null=True, required=False)

    def validate_url(self, url):
        url_validator(url)
        return url

    class Meta:
        model = WorkflowActionWebhook
        fields = [
            "id",
            "url",
            "use_params",
            "as_json",
            "params",
            "body",
            "headers",
            "include_document",
        ]


class WorkflowActionSerializer(serializers.ModelSerializer):
    id = serializers.IntegerField(required=False, allow_null=True)
    assign_correspondent = CorrespondentField(allow_null=True, required=False)
    assign_tags = TagsField(many=True, allow_null=True, required=False)
    assign_document_type = DocumentTypeField(allow_null=True, required=False)
    assign_storage_path = StoragePathField(allow_null=True, required=False)
    email = WorkflowActionEmailSerializer(allow_null=True, required=False)
    webhook = WorkflowActionWebhookSerializer(allow_null=True, required=False)

    class Meta:
        model = WorkflowAction
        fields = [
            "id",
            "type",
            "assign_title",
            "assign_tags",
            "assign_correspondent",
            "assign_document_type",
            "assign_storage_path",
            "assign_owner",
            "assign_view_users",
            "assign_view_groups",
            "assign_change_users",
            "assign_change_groups",
            "assign_custom_fields",
            "assign_custom_fields_values",
            "remove_all_tags",
            "remove_tags",
            "remove_all_correspondents",
            "remove_correspondents",
            "remove_all_document_types",
            "remove_document_types",
            "remove_all_storage_paths",
            "remove_storage_paths",
            "remove_custom_fields",
            "remove_all_custom_fields",
            "remove_all_owners",
            "remove_owners",
            "remove_all_permissions",
            "remove_view_users",
            "remove_view_groups",
            "remove_change_users",
            "remove_change_groups",
            "email",
            "webhook",
        ]

    def validate(self, attrs):
        if "assign_title" in attrs and attrs["assign_title"] is not None:
            if len(attrs["assign_title"]) == 0:
                # Empty strings treated as None to avoid unexpected behavior
                attrs["assign_title"] = None
            else:
                try:
                    # test against all placeholders, see consumer.py `parse_doc_title_w_placeholders`
                    attrs["assign_title"].format(
                        correspondent="",
                        document_type="",
                        added="",
                        added_year="",
                        added_year_short="",
                        added_month="",
                        added_month_name="",
                        added_month_name_short="",
                        added_day="",
                        added_time="",
                        owner_username="",
                        original_filename="",
                        filename="",
                        created="",
                        created_year="",
                        created_year_short="",
                        created_month="",
                        created_month_name="",
                        created_month_name_short="",
                        created_day="",
                        created_time="",
                    )
                except (ValueError, KeyError) as e:
                    raise serializers.ValidationError(
                        {"assign_title": f'Invalid f-string detected: "{e.args[0]}"'},
                    )

        if (
            "type" in attrs
            and attrs["type"] == WorkflowAction.WorkflowActionType.EMAIL
            and "email" not in attrs
        ):
            raise serializers.ValidationError(
                "Email data is required for email actions",
            )

        if (
            "type" in attrs
            and attrs["type"] == WorkflowAction.WorkflowActionType.WEBHOOK
            and "webhook" not in attrs
        ):
            raise serializers.ValidationError(
                "Webhook data is required for webhook actions",
            )

        return attrs


class WorkflowSerializer(serializers.ModelSerializer):
    order = serializers.IntegerField(required=False)

    triggers = WorkflowTriggerSerializer(many=True)
    actions = WorkflowActionSerializer(many=True)

    class Meta:
        model = Workflow
        fields = [
            "id",
            "name",
            "order",
            "enabled",
            "triggers",
            "actions",
        ]

    def update_triggers_and_actions(self, instance: Workflow, triggers, actions):
        set_triggers = []
        set_actions = []

        if triggers is not None:
            for trigger in triggers:
                filter_has_tags = trigger.pop("filter_has_tags", None)
                trigger_instance, _ = WorkflowTrigger.objects.update_or_create(
                    id=trigger.get("id"),
                    defaults=trigger,
                )
                if filter_has_tags is not None:
                    trigger_instance.filter_has_tags.set(filter_has_tags)
                set_triggers.append(trigger_instance)

        if actions is not None:
            for action in actions:
                assign_tags = action.pop("assign_tags", None)
                assign_view_users = action.pop("assign_view_users", None)
                assign_view_groups = action.pop("assign_view_groups", None)
                assign_change_users = action.pop("assign_change_users", None)
                assign_change_groups = action.pop("assign_change_groups", None)
                assign_custom_fields = action.pop("assign_custom_fields", None)
                remove_tags = action.pop("remove_tags", None)
                remove_correspondents = action.pop("remove_correspondents", None)
                remove_document_types = action.pop("remove_document_types", None)
                remove_storage_paths = action.pop("remove_storage_paths", None)
                remove_custom_fields = action.pop("remove_custom_fields", None)
                remove_owners = action.pop("remove_owners", None)
                remove_view_users = action.pop("remove_view_users", None)
                remove_view_groups = action.pop("remove_view_groups", None)
                remove_change_users = action.pop("remove_change_users", None)
                remove_change_groups = action.pop("remove_change_groups", None)

                email_data = action.pop("email", None)
                webhook_data = action.pop("webhook", None)

                action_instance, _ = WorkflowAction.objects.update_or_create(
                    id=action.get("id"),
                    defaults=action,
                )

                if email_data is not None:
                    serializer = WorkflowActionEmailSerializer(data=email_data)
                    serializer.is_valid(raise_exception=True)
                    email, _ = WorkflowActionEmail.objects.update_or_create(
                        id=email_data.get("id"),
                        defaults=serializer.validated_data,
                    )
                    action_instance.email = email
                    action_instance.save()

                if webhook_data is not None:
                    serializer = WorkflowActionWebhookSerializer(data=webhook_data)
                    serializer.is_valid(raise_exception=True)
                    webhook, _ = WorkflowActionWebhook.objects.update_or_create(
                        id=webhook_data.get("id"),
                        defaults=serializer.validated_data,
                    )
                    action_instance.webhook = webhook
                    action_instance.save()

                if assign_tags is not None:
                    action_instance.assign_tags.set(assign_tags)
                if assign_view_users is not None:
                    action_instance.assign_view_users.set(assign_view_users)
                if assign_view_groups is not None:
                    action_instance.assign_view_groups.set(assign_view_groups)
                if assign_change_users is not None:
                    action_instance.assign_change_users.set(assign_change_users)
                if assign_change_groups is not None:
                    action_instance.assign_change_groups.set(assign_change_groups)
                if assign_custom_fields is not None:
                    action_instance.assign_custom_fields.set(assign_custom_fields)
                if remove_tags is not None:
                    action_instance.remove_tags.set(remove_tags)
                if remove_correspondents is not None:
                    action_instance.remove_correspondents.set(remove_correspondents)
                if remove_document_types is not None:
                    action_instance.remove_document_types.set(remove_document_types)
                if remove_storage_paths is not None:
                    action_instance.remove_storage_paths.set(remove_storage_paths)
                if remove_custom_fields is not None:
                    action_instance.remove_custom_fields.set(remove_custom_fields)
                if remove_owners is not None:
                    action_instance.remove_owners.set(remove_owners)
                if remove_view_users is not None:
                    action_instance.remove_view_users.set(remove_view_users)
                if remove_view_groups is not None:
                    action_instance.remove_view_groups.set(remove_view_groups)
                if remove_change_users is not None:
                    action_instance.remove_change_users.set(remove_change_users)
                if remove_change_groups is not None:
                    action_instance.remove_change_groups.set(remove_change_groups)

                set_actions.append(action_instance)

        instance.triggers.set(set_triggers)
        instance.actions.set(set_actions)
        instance.save()

    def prune_triggers_and_actions(self):
        """
        ManyToMany fields dont support e.g. on_delete so we need to discard unattached
        triggers and actionas manually
        """
        for trigger in WorkflowTrigger.objects.all():
            if trigger.workflows.all().count() == 0:
                trigger.delete()

        for action in WorkflowAction.objects.all():
            if action.workflows.all().count() == 0:
                action.delete()

        WorkflowActionEmail.objects.filter(action=None).delete()
        WorkflowActionWebhook.objects.filter(action=None).delete()

    def create(self, validated_data) -> Workflow:
        if "triggers" in validated_data:
            triggers = validated_data.pop("triggers")

        if "actions" in validated_data:
            actions = validated_data.pop("actions")

        instance = super().create(validated_data)

        self.update_triggers_and_actions(instance, triggers, actions)

        return instance

    def update(self, instance: Workflow, validated_data) -> Workflow:
        if "triggers" in validated_data:
            triggers = validated_data.pop("triggers")

        if "actions" in validated_data:
            actions = validated_data.pop("actions")

        instance = super().update(instance, validated_data)

        self.update_triggers_and_actions(instance, triggers, actions)

        self.prune_triggers_and_actions()

        return instance


class TrashSerializer(SerializerWithPerms):
    documents = serializers.ListField(
        required=False,
        label="Documents",
        write_only=True,
        child=serializers.IntegerField(),
    )

    action = serializers.ChoiceField(
        choices=["restore", "empty"],
        label="Action",
        write_only=True,
    )

    def validate_documents(self, documents):
        count = Document.deleted_objects.filter(id__in=documents).count()
        if not count == len(documents):
            raise serializers.ValidationError(
                "Some documents in the list have not yet been deleted.",
            )
        return documents


class StoragePathTestSerializer(SerializerWithPerms):
    path = serializers.CharField(
        required=True,
        label="Path",
        write_only=True,
    )

    document = serializers.PrimaryKeyRelatedField(
        queryset=Document.objects.all(),
        required=True,
        label="Document",
        write_only=True,
    )