import itertools import logging import os import platform import re import tempfile import zipfile from collections import OrderedDict from datetime import datetime from pathlib import Path from time import mktime from unicodedata import normalize from urllib.parse import quote from urllib.parse import urlparse import httpx import pathvalidate from allauth.mfa import signals from allauth.mfa.adapter import get_adapter as get_mfa_adapter from allauth.mfa.base.internal.flows import delete_and_cleanup from allauth.mfa.models import Authenticator from allauth.mfa.recovery_codes.internal.flows import auto_generate_recovery_codes from allauth.mfa.totp.internal import auth as totp_auth from allauth.socialaccount.adapter import get_adapter from allauth.socialaccount.models import SocialAccount from celery import states from django.conf import settings from django.contrib.auth.models import Group from django.contrib.auth.models import User from django.db import connections from django.db.migrations.loader import MigrationLoader from django.db.migrations.recorder import MigrationRecorder from django.db.models import Case from django.db.models import Count from django.db.models import IntegerField from django.db.models import Max from django.db.models import Model from django.db.models import Q from django.db.models import Sum from django.db.models import When from django.db.models.functions import Length from django.db.models.functions import Lower from django.db.models.manager import Manager from django.http import Http404 from django.http import HttpResponse from django.http import HttpResponseBadRequest from django.http import HttpResponseForbidden from django.http import HttpResponseNotFound from django.http import HttpResponseRedirect from django.http import HttpResponseServerError from django.shortcuts import get_object_or_404 from django.utils import timezone from django.utils.decorators import method_decorator from django.utils.timezone import make_aware from django.utils.translation import get_language from django.views.decorators.cache import cache_control from django.views.decorators.http import condition from django.views.decorators.http import last_modified from django.views.generic import TemplateView from django.views.generic import View from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import OpenApiParameter from drf_spectacular.utils import extend_schema from drf_spectacular.utils import extend_schema_view from drf_spectacular.utils import inline_serializer from langdetect import detect from packaging import version as packaging_version from redis import Redis from rest_framework import parsers from rest_framework import serializers from rest_framework.authtoken.models import Token from rest_framework.authtoken.views import ObtainAuthToken from rest_framework.decorators import action from rest_framework.exceptions import NotFound from rest_framework.filters import OrderingFilter from rest_framework.filters import SearchFilter from rest_framework.generics import GenericAPIView from rest_framework.mixins import DestroyModelMixin from rest_framework.mixins import ListModelMixin from rest_framework.mixins import RetrieveModelMixin from rest_framework.mixins import UpdateModelMixin from rest_framework.pagination import PageNumberPagination from rest_framework.permissions import DjangoModelPermissions from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet from rest_framework.viewsets import ModelViewSet from rest_framework.viewsets import ReadOnlyModelViewSet from rest_framework.viewsets import ViewSet from paperless import bulk_edit from paperless import index from paperless import version from paperless.bulk_download import ArchiveOnlyStrategy from paperless.bulk_download import OriginalAndArchiveStrategy from paperless.bulk_download import OriginalsOnlyStrategy from paperless.caching import get_metadata_cache from paperless.caching import get_suggestion_cache from paperless.caching import refresh_metadata_cache from paperless.caching import refresh_suggestions_cache from paperless.caching import set_metadata_cache from paperless.caching import set_suggestions_cache from paperless.celery import app as celery_app from paperless.classifier import load_classifier from paperless.conditionals import metadata_etag from paperless.conditionals import metadata_last_modified from paperless.conditionals import preview_etag from paperless.conditionals import preview_last_modified from paperless.conditionals import suggestions_etag from paperless.conditionals import suggestions_last_modified from paperless.conditionals import thumbnail_last_modified from paperless.config import GeneralConfig from paperless.data_models import ConsumableDocument from paperless.data_models import DocumentMetadataOverrides from paperless.data_models import DocumentSource from paperless.db import GnuPG from paperless.filters import CorrespondentFilterSet from paperless.filters import CustomFieldFilterSet from paperless.filters import DocumentFilterSet from paperless.filters import DocumentsOrderingFilter from paperless.filters import DocumentTypeFilterSet from paperless.filters import GroupFilterSet from paperless.filters import ObjectOwnedOrGrantedPermissionsFilter from paperless.filters import ObjectOwnedPermissionsFilter from paperless.filters import PaperlessTaskFilterSet from paperless.filters import ShareLinkFilterSet from paperless.filters import StoragePathFilterSet from paperless.filters import TagFilterSet from paperless.filters import UserFilterSet from paperless.index import DelayedQuery from paperless.mail import send_email from paperless.matching import match_correspondents from paperless.matching import match_document_types from paperless.matching import match_storage_paths from paperless.matching import match_tags from paperless.models import ApplicationConfiguration from paperless.models import Correspondent from paperless.models import CustomField from paperless.models import Document from paperless.models import DocumentType from paperless.models import Note from paperless.models import PaperlessTask from paperless.models import SavedView from paperless.models import ShareLink from paperless.models import StoragePath from paperless.models import Tag from paperless.models import UiSettings from paperless.models import Workflow from paperless.models import WorkflowAction from paperless.models import WorkflowTrigger from paperless.parsers import get_parser_class_for_mime_type from paperless.parsers import parse_date_generator from paperless.permissions import PaperlessAdminPermissions from paperless.permissions import PaperlessNotePermissions from paperless.permissions import PaperlessObjectPermissions from paperless.permissions import get_objects_for_user_owner_aware from paperless.permissions import has_perms_owner_aware from paperless.permissions import set_permissions_for_object from paperless.schema import generate_object_with_permissions_schema from paperless.serialisers import AcknowledgeTasksViewSerializer from paperless.serialisers import ApplicationConfigurationSerializer from paperless.serialisers import BulkDownloadSerializer from paperless.serialisers import BulkEditObjectsSerializer from paperless.serialisers import BulkEditSerializer from paperless.serialisers import CorrespondentSerializer from paperless.serialisers import CustomFieldSerializer from paperless.serialisers import DocumentListSerializer from paperless.serialisers import DocumentSerializer from paperless.serialisers import DocumentTypeSerializer from paperless.serialisers import GroupSerializer from paperless.serialisers import PaperlessAuthTokenSerializer from paperless.serialisers import PostDocumentSerializer from paperless.serialisers import ProfileSerializer from paperless.serialisers import RunTaskViewSerializer from paperless.serialisers import SavedViewSerializer from paperless.serialisers import SearchResultSerializer from paperless.serialisers import ShareLinkSerializer from paperless.serialisers import StoragePathSerializer from paperless.serialisers import StoragePathTestSerializer from paperless.serialisers import TagSerializer from paperless.serialisers import TagSerializerVersion1 from paperless.serialisers import TasksViewSerializer from paperless.serialisers import TrashSerializer from paperless.serialisers import UiSettingsViewSerializer from paperless.serialisers import UserSerializer from paperless.serialisers import WorkflowActionSerializer from paperless.serialisers import WorkflowSerializer from paperless.serialisers import WorkflowTriggerSerializer from paperless.signals import document_updated from paperless.tasks import consume_file from paperless.tasks import empty_trash from paperless.tasks import index_optimize from paperless.tasks import sanity_check from paperless.tasks import train_classifier from paperless.templating.filepath import validate_filepath_template_and_render from paperless_mail.models import MailAccount from paperless_mail.models import MailRule from paperless_mail.oauth import PaperlessMailOAuth2Manager from paperless_mail.serialisers import MailAccountSerializer from paperless_mail.serialisers import MailRuleSerializer if settings.AUDIT_LOG_ENABLED: from auditlog.models import LogEntry logger = logging.getLogger("paperless.api") class PaperlessObtainAuthTokenView(ObtainAuthToken): serializer_class = PaperlessAuthTokenSerializer class StandardPagination(PageNumberPagination): page_size = 25 page_size_query_param = "page_size" max_page_size = 100000 def get_paginated_response(self, data): return Response( OrderedDict( [ ("count", self.page.paginator.count), ("next", self.get_next_link()), ("previous", self.get_previous_link()), ("all", self.get_all_result_ids()), ("results", data), ], ), ) def get_all_result_ids(self): query = self.page.paginator.object_list if isinstance(query, DelayedQuery): try: ids = [ query.searcher.ixreader.stored_fields( doc_num, )["id"] for doc_num in query.saved_results.get(0).results.docs() ] except Exception: pass else: ids = self.page.paginator.object_list.values_list("pk", flat=True) return ids def get_paginated_response_schema(self, schema): response_schema = super().get_paginated_response_schema(schema) response_schema["properties"]["all"] = { "type": "array", "example": "[1, 2, 3]", } return response_schema class FaviconView(View): def get(self, request, *args, **kwargs): # pragma: no cover favicon = os.path.join( os.path.dirname(__file__), "static", "paperless", "img", "favicon.ico", ) with open(favicon, "rb") as f: return HttpResponse(f, content_type="image/x-icon") class UserViewSet(ModelViewSet): model = User queryset = User.objects.exclude( username__in=["consumer", "AnonymousUser"], ).order_by(Lower("username")) serializer_class = UserSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = (DjangoFilterBackend, OrderingFilter) filterset_class = UserFilterSet ordering_fields = ("username",) def create(self, request, *args, **kwargs): if not request.user.is_superuser and request.data.get("is_superuser") is True: return HttpResponseForbidden( "Superuser status can only be granted by a superuser", ) return super().create(request, *args, **kwargs) def update(self, request, *args, **kwargs): user_to_update: User = self.get_object() if ( not request.user.is_superuser and request.data.get("is_superuser") is not None and request.data.get("is_superuser") != user_to_update.is_superuser ): return HttpResponseForbidden( "Superuser status can only be changed by a superuser", ) return super().update(request, *args, **kwargs) @action(detail=True, methods=["post"]) def deactivate_totp(self, request, pk=None): request_user = request.user user = User.objects.get(pk=pk) if not request_user.is_superuser and request_user != user: return HttpResponseForbidden( "You do not have permission to deactivate TOTP for this user", ) authenticator = Authenticator.objects.filter( user=user, type=Authenticator.Type.TOTP, ).first() if authenticator is not None: delete_and_cleanup(request, authenticator) return Response(data=True) else: return HttpResponseNotFound("TOTP not found") class GroupViewSet(ModelViewSet): model = Group queryset = Group.objects.order_by(Lower("name")) serializer_class = GroupSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = (DjangoFilterBackend, OrderingFilter) filterset_class = GroupFilterSet ordering_fields = ("name",) class ProfileView(GenericAPIView): """ User profile view, only available when logged in """ permission_classes = [IsAuthenticated] serializer_class = ProfileSerializer def get(self, request, *args, **kwargs): user = self.request.user serializer = self.get_serializer(data=request.data) return Response(serializer.to_representation(user)) def patch(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = self.request.user if hasattr(self.request, "user") else None if len(serializer.validated_data.get("password").replace("*", "")) > 0: user.set_password(serializer.validated_data.get("password")) user.save() serializer.validated_data.pop("password") for key, value in serializer.validated_data.items(): setattr(user, key, value) user.save() return Response(serializer.to_representation(user)) @extend_schema_view( get=extend_schema( responses={ (200, "application/json"): OpenApiTypes.OBJECT, }, ), post=extend_schema( request={ "application/json": { "type": "object", "properties": { "secret": {"type": "string"}, "code": {"type": "string"}, }, "required": ["secret", "code"], }, }, responses={ (200, "application/json"): OpenApiTypes.OBJECT, }, ), delete=extend_schema( responses={ (200, "application/json"): OpenApiTypes.BOOL, 404: OpenApiTypes.STR, }, ), ) class TOTPView(GenericAPIView): """ TOTP views """ permission_classes = [IsAuthenticated] def get(self, request, *args, **kwargs): """ Generates a new TOTP secret and returns the URL and SVG """ user = self.request.user mfa_adapter = get_mfa_adapter() secret = totp_auth.get_totp_secret(regenerate=True) url = mfa_adapter.build_totp_url(user, secret) svg = mfa_adapter.build_totp_svg(url) return Response( { "url": url, "qr_svg": svg, "secret": secret, }, ) def post(self, request, *args, **kwargs): """ Validates a TOTP code and activates the TOTP authenticator """ valid = totp_auth.validate_totp_code( request.data["secret"], request.data["code"], ) recovery_codes = None if valid: auth = totp_auth.TOTP.activate( request.user, request.data["secret"], ).instance signals.authenticator_added.send( sender=Authenticator, request=request, user=request.user, authenticator=auth, ) rc_auth: Authenticator = auto_generate_recovery_codes(request) if rc_auth: recovery_codes = rc_auth.wrap().get_unused_codes() return Response( { "success": valid, "recovery_codes": recovery_codes, }, ) def delete(self, request, *args, **kwargs): """ Deactivates the TOTP authenticator """ user = self.request.user authenticator = Authenticator.objects.filter( user=user, type=Authenticator.Type.TOTP, ).first() if authenticator is not None: delete_and_cleanup(request, authenticator) return Response(data=True) else: return HttpResponseNotFound("TOTP not found") @extend_schema_view( post=extend_schema( request={ "application/json": None, }, responses={ (200, "application/json"): OpenApiTypes.STR, }, ), ) class GenerateAuthTokenView(GenericAPIView): """ Generates (or re-generates) an auth token, requires a logged in user unlike the default DRF endpoint """ permission_classes = [IsAuthenticated] def post(self, request, *args, **kwargs): user = self.request.user existing_token = Token.objects.filter(user=user).first() if existing_token is not None: existing_token.delete() token = Token.objects.create(user=user) return Response( token.key, ) @extend_schema_view( list=extend_schema( description="Get the application configuration", external_docs={ "description": "Application Configuration", "url": "https://docs.paperless-ngx.com/configuration/", }, ), ) class ApplicationConfigurationViewSet(ModelViewSet): model = ApplicationConfiguration queryset = ApplicationConfiguration.objects serializer_class = ApplicationConfigurationSerializer permission_classes = (IsAuthenticated, DjangoModelPermissions) @extend_schema_view( post=extend_schema( request={ "application/json": { "type": "object", "properties": { "id": {"type": "integer"}, }, "required": ["id"], }, }, responses={ (200, "application/json"): OpenApiTypes.INT, 400: OpenApiTypes.STR, }, ), ) class DisconnectSocialAccountView(GenericAPIView): """ Disconnects a social account provider from the user account """ permission_classes = [IsAuthenticated] def post(self, request, *args, **kwargs): user = self.request.user try: account = user.socialaccount_set.get(pk=request.data["id"]) account_id = account.id account.delete() return Response(account_id) except SocialAccount.DoesNotExist: return HttpResponseBadRequest("Social account not found") @extend_schema_view( get=extend_schema( responses={ (200, "application/json"): OpenApiTypes.OBJECT, }, ), ) class SocialAccountProvidersView(GenericAPIView): """ List of social account providers """ permission_classes = [IsAuthenticated] def get(self, request, *args, **kwargs): adapter = get_adapter() providers = adapter.list_providers(request) resp = [ {"name": p.name, "login_url": p.get_login_url(request, process="connect")} for p in providers if p.id != "openid" ] for openid_provider in filter(lambda p: p.id == "openid", providers): resp += [ { "name": b["name"], "login_url": openid_provider.get_login_url( request, process="connect", openid=b["openid_url"], ), } for b in openid_provider.get_brands() ] return Response(sorted(resp, key=lambda p: p["name"])) class IndexView(TemplateView): template_name = "index.html" def get_frontend_language(self): if hasattr( self.request.user, "ui_settings", ) and self.request.user.ui_settings.settings.get("language"): lang = self.request.user.ui_settings.settings.get("language") else: lang = get_language() # This is here for the following reason: # Django identifies languages in the form "en-us" # However, angular generates locales as "en-US". # this translates between these two forms. if "-" in lang: first = lang[: lang.index("-")] second = lang[lang.index("-") + 1 :] return f"{first}-{second.upper()}" else: return lang def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["cookie_prefix"] = settings.COOKIE_PREFIX context["username"] = self.request.user.username context["full_name"] = self.request.user.get_full_name() context["styles_css"] = f"frontend/{self.get_frontend_language()}/styles.css" context["runtime_js"] = f"frontend/{self.get_frontend_language()}/runtime.js" context["polyfills_js"] = ( f"frontend/{self.get_frontend_language()}/polyfills.js" ) context["main_js"] = f"frontend/{self.get_frontend_language()}/main.js" context["webmanifest"] = ( f"frontend/{self.get_frontend_language()}/manifest.webmanifest" ) context["apple_touch_icon"] = ( f"frontend/{self.get_frontend_language()}/apple-touch-icon.png" ) return context class PassUserMixin(GenericAPIView): """ Pass a user object to serializer """ def get_serializer(self, *args, **kwargs): kwargs.setdefault("user", self.request.user) kwargs.setdefault( "full_perms", self.request.query_params.get("full_perms", False), ) return super().get_serializer(*args, **kwargs) class PermissionsAwareDocumentCountMixin(PassUserMixin): """ Mixin to add document count to queryset, permissions-aware if needed """ def get_queryset(self): filter = ( Q(documents__deleted_at__isnull=True) if self.request.user is None or self.request.user.is_superuser else ( Q( documents__deleted_at__isnull=True, documents__id__in=get_objects_for_user_owner_aware( self.request.user, "documents.view_document", Document, ).values_list("id", flat=True), ) ) ) return ( super() .get_queryset() .annotate(document_count=Count("documents", filter=filter)) ) @extend_schema_view(**generate_object_with_permissions_schema(CorrespondentSerializer)) class CorrespondentViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin): model = Correspondent queryset = Correspondent.objects.select_related("owner").order_by(Lower("name")) serializer_class = CorrespondentSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = ( DjangoFilterBackend, OrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = CorrespondentFilterSet ordering_fields = ( "name", "matching_algorithm", "match", "document_count", "last_correspondence", ) def list(self, request, *args, **kwargs): if request.query_params.get("last_correspondence", None): self.queryset = self.queryset.annotate( last_correspondence=Max("documents__created"), ) return super().list(request, *args, **kwargs) def retrieve(self, request, *args, **kwargs): self.queryset = self.queryset.annotate( last_correspondence=Max("documents__created"), ) return super().retrieve(request, *args, **kwargs) @extend_schema_view(**generate_object_with_permissions_schema(TagSerializer)) class TagViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin): model = Tag queryset = Tag.objects.select_related("owner").order_by( Lower("name"), ) def get_serializer_class(self, *args, **kwargs): if int(self.request.version) == 1: return TagSerializerVersion1 else: return TagSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = ( DjangoFilterBackend, OrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = TagFilterSet ordering_fields = ("color", "name", "matching_algorithm", "match", "document_count") @extend_schema_view(**generate_object_with_permissions_schema(DocumentTypeSerializer)) class DocumentTypeViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin): model = DocumentType queryset = DocumentType.objects.select_related("owner").order_by(Lower("name")) serializer_class = DocumentTypeSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = ( DjangoFilterBackend, OrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = DocumentTypeFilterSet ordering_fields = ("name", "matching_algorithm", "match", "document_count") @extend_schema_view( retrieve=extend_schema( description="Retrieve a single document", responses={ 200: DocumentSerializer(all_fields=True), 400: None, }, parameters=[ OpenApiParameter( name="full_perms", type=OpenApiTypes.BOOL, location=OpenApiParameter.QUERY, ), OpenApiParameter( name="fields", type=OpenApiTypes.STR, many=True, location=OpenApiParameter.QUERY, ), ], ), download=extend_schema( description="Download the document", parameters=[ OpenApiParameter( name="original", type=OpenApiTypes.BOOL, location=OpenApiParameter.QUERY, ), ], responses={200: OpenApiTypes.BINARY}, ), history=extend_schema( description="View the document history", responses={ 200: inline_serializer( name="LogEntry", many=True, fields={ "id": serializers.IntegerField(), "timestamp": serializers.DateTimeField(), "action": serializers.CharField(), "changes": serializers.DictField(), "actor": inline_serializer( name="Actor", fields={ "id": serializers.IntegerField(), "username": serializers.CharField(), }, ), }, ), 400: None, 403: None, 404: None, }, ), metadata=extend_schema( description="View the document metadata", responses={ 200: inline_serializer( name="Metadata", fields={ "original_checksum": serializers.CharField(), "original_size": serializers.IntegerField(), "original_mime_type": serializers.CharField(), "media_filename": serializers.CharField(), "has_archive_version": serializers.BooleanField(), "original_metadata": serializers.DictField(), "archive_checksum": serializers.CharField(), "archive_media_filename": serializers.CharField(), "original_filename": serializers.CharField(), "archive_size": serializers.IntegerField(), "archive_metadata": serializers.DictField(), "lang": serializers.CharField(), }, ), 400: None, 403: None, 404: None, }, ), notes=extend_schema( description="View, add, or delete notes for the document", responses={ 200: { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "integer"}, "note": {"type": "string"}, "created": {"type": "string", "format": "date-time"}, "user": { "type": "object", "properties": { "id": {"type": "integer"}, "username": {"type": "string"}, "first_name": {"type": "string"}, "last_name": {"type": "string"}, }, }, }, }, }, 400: None, 403: None, 404: None, }, ), suggestions=extend_schema( description="View suggestions for the document", responses={ 200: inline_serializer( name="Suggestions", fields={ "correspondents": serializers.ListField( child=serializers.IntegerField(), ), "tags": serializers.ListField(child=serializers.IntegerField()), "document_types": serializers.ListField( child=serializers.IntegerField(), ), "storage_paths": serializers.ListField( child=serializers.IntegerField(), ), "dates": serializers.ListField(child=serializers.CharField()), }, ), 400: None, 403: None, 404: None, }, ), thumb=extend_schema( description="View the document thumbnail", responses={200: OpenApiTypes.BINARY}, ), preview=extend_schema( description="View the document preview", responses={200: OpenApiTypes.BINARY}, ), share_links=extend_schema( operation_id="document_share_links", description="View share links for the document", parameters=[ OpenApiParameter( name="id", type=OpenApiTypes.STR, location=OpenApiParameter.PATH, ), ], responses={ 200: { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "integer"}, "created": {"type": "string", "format": "date-time"}, "expiration": {"type": "string", "format": "date-time"}, "slug": {"type": "string"}, }, }, }, 400: None, 403: None, 404: None, }, ), ) class DocumentViewSet( PassUserMixin, RetrieveModelMixin, UpdateModelMixin, DestroyModelMixin, ListModelMixin, GenericViewSet, ): model = Document queryset = Document.objects.annotate(num_notes=Count("notes")) serializer_class = DocumentSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = ( DjangoFilterBackend, SearchFilter, DocumentsOrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = DocumentFilterSet search_fields = ("title", "correspondent__name", "content") ordering_fields = ( "id", "title", "correspondent__name", "document_type__name", "created", "modified", "added", "archive_serial_number", "num_notes", "owner", "page_count", "custom_field_", ) def get_queryset(self): return ( Document.objects.distinct() .order_by("-created") .annotate(num_notes=Count("notes")) .select_related("correspondent", "storage_path", "document_type", "owner") .prefetch_related("tags", "custom_fields", "notes") ) def get_serializer(self, *args, **kwargs): fields_param = self.request.query_params.get("fields", None) fields = fields_param.split(",") if fields_param else None truncate_content = self.request.query_params.get("truncate_content", "False") kwargs.setdefault("context", self.get_serializer_context()) kwargs.setdefault("fields", fields) kwargs.setdefault("truncate_content", truncate_content.lower() in ["true", "1"]) kwargs.setdefault( "full_perms", self.request.query_params.get("full_perms", False), ) return super().get_serializer(*args, **kwargs) def update(self, request, *args, **kwargs): response = super().update(request, *args, **kwargs) from paperless import index index.add_or_update_document(self.get_object()) document_updated.send( sender=self.__class__, document=self.get_object(), ) return response def destroy(self, request, *args, **kwargs): from paperless import index index.remove_document_from_index(self.get_object()) try: return super().destroy(request, *args, **kwargs) except Exception as e: if "Data too long for column" in str(e): logger.warning( "Detected a possible incompatible database column. See https://docs.paperless-ngx.com/troubleshooting/#convert-uuid-field", ) logger.error(f"Error deleting document: {e!s}") return HttpResponseBadRequest( "Error deleting document, check logs for more detail.", ) @staticmethod def original_requested(request): return ( "original" in request.query_params and request.query_params["original"] == "true" ) def file_response(self, pk, request, disposition): doc = Document.global_objects.select_related("owner").get(id=pk) if request.user is not None and not has_perms_owner_aware( request.user, "view_document", doc, ): return HttpResponseForbidden("Insufficient permissions") return serve_file( doc=doc, use_archive=not self.original_requested(request) and doc.has_archive_version, disposition=disposition, ) def get_metadata(self, file, mime_type): if not os.path.isfile(file): return None parser_class = get_parser_class_for_mime_type(mime_type) if parser_class: parser = parser_class(progress_callback=None, logging_group=None) try: return parser.extract_metadata(file, mime_type) except Exception: # pragma: no cover logger.exception(f"Issue getting metadata for {file}") # TODO: cover GPG errors, remove later. return [] else: # pragma: no cover logger.warning(f"No parser for {mime_type}") return [] def get_filesize(self, filename): if os.path.isfile(filename): return os.stat(filename).st_size else: return None @action(methods=["get"], detail=True, filter_backends=[]) @method_decorator(cache_control(no_cache=True)) @method_decorator( condition(etag_func=metadata_etag, last_modified_func=metadata_last_modified), ) def metadata(self, request, pk=None): try: doc = Document.objects.select_related("owner").get(pk=pk) if request.user is not None and not has_perms_owner_aware( request.user, "view_document", doc, ): return HttpResponseForbidden("Insufficient permissions") except Document.DoesNotExist: raise Http404 document_cached_metadata = get_metadata_cache(doc.pk) archive_metadata = None archive_filesize = None if document_cached_metadata is not None: original_metadata = document_cached_metadata.original_metadata archive_metadata = document_cached_metadata.archive_metadata refresh_metadata_cache(doc.pk) else: original_metadata = self.get_metadata(doc.source_path, doc.mime_type) if doc.has_archive_version: archive_filesize = self.get_filesize(doc.archive_path) archive_metadata = self.get_metadata( doc.archive_path, "application/pdf", ) set_metadata_cache(doc, original_metadata, archive_metadata) meta = { "original_checksum": doc.checksum, "original_size": self.get_filesize(doc.source_path), "original_mime_type": doc.mime_type, "media_filename": doc.filename, "has_archive_version": doc.has_archive_version, "original_metadata": original_metadata, "archive_checksum": doc.archive_checksum, "archive_media_filename": doc.archive_filename, "original_filename": doc.original_filename, "archive_size": archive_filesize, "archive_metadata": archive_metadata, } lang = "en" try: lang = detect(doc.content) except Exception: pass meta["lang"] = lang return Response(meta) @action(methods=["get"], detail=True, filter_backends=[]) @method_decorator(cache_control(no_cache=True)) @method_decorator( condition( etag_func=suggestions_etag, last_modified_func=suggestions_last_modified, ), ) def suggestions(self, request, pk=None): doc = get_object_or_404(Document.objects.select_related("owner"), pk=pk) if request.user is not None and not has_perms_owner_aware( request.user, "view_document", doc, ): return HttpResponseForbidden("Insufficient permissions") document_suggestions = get_suggestion_cache(doc.pk) if document_suggestions is not None: refresh_suggestions_cache(doc.pk) return Response(document_suggestions.suggestions) classifier = load_classifier() dates = [] if settings.NUMBER_OF_SUGGESTED_DATES > 0: gen = parse_date_generator(doc.filename, doc.content) dates = sorted( {i for i in itertools.islice(gen, settings.NUMBER_OF_SUGGESTED_DATES)}, ) resp_data = { "correspondents": [ c.id for c in match_correspondents(doc, classifier, request.user) ], "tags": [t.id for t in match_tags(doc, classifier, request.user)], "document_types": [ dt.id for dt in match_document_types(doc, classifier, request.user) ], "storage_paths": [ dt.id for dt in match_storage_paths(doc, classifier, request.user) ], "dates": [date.strftime("%Y-%m-%d") for date in dates if date is not None], } # Cache the suggestions and the classifier hash for later set_suggestions_cache(doc.pk, resp_data, classifier) return Response(resp_data) @action(methods=["get"], detail=True, filter_backends=[]) @method_decorator(cache_control(no_cache=True)) @method_decorator( condition(etag_func=preview_etag, last_modified_func=preview_last_modified), ) def preview(self, request, pk=None): try: response = self.file_response(pk, request, "inline") return response except (FileNotFoundError, Document.DoesNotExist): raise Http404 @action(methods=["get"], detail=True, filter_backends=[]) @method_decorator(cache_control(no_cache=True)) @method_decorator(last_modified(thumbnail_last_modified)) def thumb(self, request, pk=None): try: doc = Document.objects.select_related("owner").get(id=pk) if request.user is not None and not has_perms_owner_aware( request.user, "view_document", doc, ): return HttpResponseForbidden("Insufficient permissions") if doc.storage_type == Document.STORAGE_TYPE_GPG: handle = GnuPG.decrypted(doc.thumbnail_file) else: handle = doc.thumbnail_file return HttpResponse(handle, content_type="image/webp") except (FileNotFoundError, Document.DoesNotExist): raise Http404 @action(methods=["get"], detail=True) def download(self, request, pk=None): try: return self.file_response(pk, request, "attachment") except (FileNotFoundError, Document.DoesNotExist): raise Http404 @action( methods=["get", "post", "delete"], detail=True, permission_classes=[PaperlessNotePermissions], filter_backends=[], ) def notes(self, request, pk=None): currentUser = request.user try: doc = ( Document.objects.select_related("owner") .prefetch_related("notes") .only("pk", "owner__id") .get(pk=pk) ) if currentUser is not None and not has_perms_owner_aware( currentUser, "view_document", doc, ): return HttpResponseForbidden("Insufficient permissions to view notes") except Document.DoesNotExist: raise Http404 serializer = self.get_serializer(doc) if request.method == "GET": try: notes = serializer.to_representation(doc).get("notes") return Response(notes) except Exception as e: logger.warning(f"An error occurred retrieving notes: {e!s}") return Response( {"error": "Error retrieving notes, check logs for more detail."}, ) elif request.method == "POST": try: if currentUser is not None and not has_perms_owner_aware( currentUser, "change_document", doc, ): return HttpResponseForbidden( "Insufficient permissions to create notes", ) c = Note.objects.create( document=doc, note=request.data["note"], user=currentUser, ) # If audit log is enabled make an entry in the log # about this note change if settings.AUDIT_LOG_ENABLED: LogEntry.objects.log_create( instance=doc, changes={ "Note Added": ["None", c.id], }, action=LogEntry.Action.UPDATE, ) doc.modified = timezone.now() doc.save() from paperless import index index.add_or_update_document(doc) notes = serializer.to_representation(doc).get("notes") return Response(notes) except Exception as e: logger.warning(f"An error occurred saving note: {e!s}") return Response( { "error": "Error saving note, check logs for more detail.", }, ) elif request.method == "DELETE": if currentUser is not None and not has_perms_owner_aware( currentUser, "change_document", doc, ): return HttpResponseForbidden("Insufficient permissions to delete notes") note = Note.objects.get(id=int(request.GET.get("id"))) if settings.AUDIT_LOG_ENABLED: LogEntry.objects.log_create( instance=doc, changes={ "Note Deleted": [note.id, "None"], }, action=LogEntry.Action.UPDATE, ) note.delete() doc.modified = timezone.now() doc.save() from paperless import index index.add_or_update_document(doc) notes = serializer.to_representation(doc).get("notes") return Response(notes) return Response( { "error": "error", }, ) @action(methods=["get"], detail=True, filter_backends=[]) def share_links(self, request, pk=None): currentUser = request.user try: doc = Document.objects.select_related("owner").get(pk=pk) if currentUser is not None and not has_perms_owner_aware( currentUser, "change_document", doc, ): return HttpResponseForbidden( "Insufficient permissions to add share link", ) except Document.DoesNotExist: raise Http404 if request.method == "GET": now = timezone.now() links = ( ShareLink.objects.filter(document=doc) .only("pk", "created", "expiration", "slug") .exclude(expiration__lt=now) .order_by("-created") ) serializer = ShareLinkSerializer(links, many=True) return Response(serializer.data) @action(methods=["get"], detail=True, name="Audit Trail", filter_backends=[]) def history(self, request, pk=None): if not settings.AUDIT_LOG_ENABLED: return HttpResponseBadRequest("Audit log is disabled") try: doc = Document.objects.get(pk=pk) if not request.user.has_perm("auditlog.view_logentry") or ( doc.owner is not None and doc.owner != request.user and not request.user.is_superuser ): return HttpResponseForbidden( "Insufficient permissions", ) except Document.DoesNotExist: # pragma: no cover raise Http404 # documents entries = [ { "id": entry.id, "timestamp": entry.timestamp, "action": entry.get_action_display(), "changes": entry.changes, "actor": ( {"id": entry.actor.id, "username": entry.actor.username} if entry.actor else None ), } for entry in LogEntry.objects.get_for_object(doc).select_related( "actor", ) ] # custom fields for entry in LogEntry.objects.get_for_objects( doc.custom_fields.all(), ).select_related("actor"): entries.append( { "id": entry.id, "timestamp": entry.timestamp, "action": entry.get_action_display(), "changes": { "custom_fields": { "type": "custom_field", "field": str(entry.object_repr).split(":")[0].strip(), "value": str(entry.object_repr).split(":")[1].strip(), }, }, "actor": ( {"id": entry.actor.id, "username": entry.actor.username} if entry.actor else None ), }, ) return Response(sorted(entries, key=lambda x: x["timestamp"], reverse=True)) @action(methods=["post"], detail=True) def email(self, request, pk=None): try: doc = Document.objects.select_related("owner").get(pk=pk) if request.user is not None and not has_perms_owner_aware( request.user, "view_document", doc, ): return HttpResponseForbidden("Insufficient permissions") except Document.DoesNotExist: raise Http404 try: if ( "addresses" not in request.data or "subject" not in request.data or "message" not in request.data ): return HttpResponseBadRequest("Missing required fields") use_archive_version = request.data.get("use_archive_version", True) addresses = request.data.get("addresses").split(",") if not all( re.match(r"[^@]+@[^@]+\.[^@]+", address.strip()) for address in addresses ): return HttpResponseBadRequest("Invalid email address found") send_email( subject=request.data.get("subject"), body=request.data.get("message"), to=addresses, attachment=( doc.archive_path if use_archive_version and doc.has_archive_version else doc.source_path ), attachment_mime_type=doc.mime_type, ) logger.debug( f"Sent document {doc.id} via email to {addresses}", ) return Response({"message": "Email sent"}) except Exception as e: logger.warning(f"An error occurred emailing document: {e!s}") return HttpResponseServerError( "Error emailing document, check logs for more detail.", ) @extend_schema_view( list=extend_schema( parameters=[ OpenApiParameter( name="full_perms", type=OpenApiTypes.BOOL, location=OpenApiParameter.QUERY, ), OpenApiParameter( name="fields", type=OpenApiTypes.STR, many=True, location=OpenApiParameter.QUERY, ), ], responses={ 200: DocumentSerializer(many=True, all_fields=True), }, ), ) class UnifiedSearchViewSet(DocumentViewSet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.searcher = None def get_serializer_class(self): if self._is_search_request(): return SearchResultSerializer else: return DocumentSerializer def _is_search_request(self): return ( "query" in self.request.query_params or "more_like_id" in self.request.query_params ) def filter_queryset(self, queryset): filtered_queryset = super().filter_queryset(queryset) if self._is_search_request(): from paperless import index if "query" in self.request.query_params: query_class = index.DelayedFullTextQuery elif "more_like_id" in self.request.query_params: query_class = index.DelayedMoreLikeThisQuery else: raise ValueError return query_class( self.searcher, self.request.query_params, self.paginator.get_page_size(self.request), filter_queryset=filtered_queryset, ) else: return filtered_queryset def list(self, request, *args, **kwargs): if self._is_search_request(): from paperless import index try: with index.open_index_searcher() as s: self.searcher = s return super().list(request) except NotFound: raise except Exception as e: logger.warning(f"An error occurred listing search results: {e!s}") return HttpResponseBadRequest( "Error listing search results, check logs for more detail.", ) else: return super().list(request) @action(detail=False, methods=["GET"], name="Get Next ASN") def next_asn(self, request, *args, **kwargs): max_asn = Document.objects.aggregate( Max("archive_serial_number", default=0), ).get( "archive_serial_number__max", ) return Response(max_asn + 1) @extend_schema_view( list=extend_schema( description="Logs view", responses={ (200, "application/json"): serializers.ListSerializer( child=serializers.CharField(), ), }, ), retrieve=extend_schema( description="Single log view", operation_id="retrieve_log", parameters=[ OpenApiParameter( name="id", type=OpenApiTypes.STR, location=OpenApiParameter.PATH, ), ], responses={ (200, "application/json"): serializers.ListSerializer( child=serializers.CharField(), ), (404, "application/json"): None, }, ), ) class LogViewSet(ViewSet): permission_classes = (IsAuthenticated, PaperlessAdminPermissions) log_files = ["paperless", "mail", "celery"] def get_log_filename(self, log): return os.path.join(settings.LOGGING_DIR, f"{log}.log") def retrieve(self, request, *args, **kwargs): log_file = kwargs.get("pk") if log_file not in self.log_files: raise Http404 filename = self.get_log_filename(log_file) if not os.path.isfile(filename): raise Http404 with open(filename) as f: lines = [line.rstrip() for line in f.readlines()] return Response(lines) def list(self, request, *args, **kwargs): exist = [ log for log in self.log_files if os.path.isfile(self.get_log_filename(log)) ] return Response(exist) class SavedViewViewSet(ModelViewSet, PassUserMixin): model = SavedView queryset = SavedView.objects.all() serializer_class = SavedViewSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) def get_queryset(self): user = self.request.user return ( SavedView.objects.filter(owner=user) .select_related("owner") .prefetch_related("filter_rules") ) def perform_create(self, serializer): serializer.save(owner=self.request.user) @extend_schema_view( post=extend_schema( operation_id="bulk_edit", description="Perform a bulk edit operation on a list of documents", external_docs={ "description": "Further documentation", "url": "https://docs.paperless-ngx.com/api/#bulk-editing", }, responses={ 200: inline_serializer( name="BulkEditDocumentsResult", fields={ "result": serializers.CharField(), }, ), }, ), ) class BulkEditView(PassUserMixin): MODIFIED_FIELD_BY_METHOD = { "set_correspondent": "correspondent", "set_document_type": "document_type", "set_storage_path": "storage_path", "add_tag": "tags", "remove_tag": "tags", "modify_tags": "tags", "modify_custom_fields": "custom_fields", "set_permissions": None, "delete": "deleted_at", "rotate": "checksum", "delete_pages": "checksum", "split": None, "merge": None, "reprocess": "checksum", } permission_classes = (IsAuthenticated,) serializer_class = BulkEditSerializer parser_classes = (parsers.JSONParser,) def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = self.request.user method = serializer.validated_data.get("method") parameters = serializer.validated_data.get("parameters") documents = serializer.validated_data.get("documents") if method in [ bulk_edit.split, bulk_edit.merge, ]: parameters["user"] = user if not user.is_superuser: document_objs = Document.objects.select_related("owner").filter( pk__in=documents, ) user_is_owner_of_all_documents = all( (doc.owner == user or doc.owner is None) for doc in document_objs ) # check global and object permissions for all documents has_perms = user.has_perm("documents.change_document") and all( has_perms_owner_aware(user, "change_document", doc) for doc in document_objs ) # check ownership for methods that change original document if ( has_perms and method in [ bulk_edit.set_permissions, bulk_edit.delete, bulk_edit.rotate, bulk_edit.delete_pages, ] ) or ( method in [bulk_edit.merge, bulk_edit.split] and parameters["delete_originals"] ): has_perms = user_is_owner_of_all_documents # check global add permissions for methods that create documents if ( has_perms and method in [bulk_edit.split, bulk_edit.merge] and not user.has_perm( "documents.add_document", ) ): has_perms = False # check global delete permissions for methods that delete documents if ( has_perms and ( method == bulk_edit.delete or ( method in [bulk_edit.merge, bulk_edit.split] and parameters["delete_originals"] ) ) and not user.has_perm("documents.delete_document") ): has_perms = False if not has_perms: return HttpResponseForbidden("Insufficient permissions") try: modified_field = self.MODIFIED_FIELD_BY_METHOD.get(method.__name__, None) if settings.AUDIT_LOG_ENABLED and modified_field: old_documents = { obj["pk"]: obj for obj in Document.objects.filter(pk__in=documents).values( "pk", "correspondent", "document_type", "storage_path", "tags", "custom_fields", "deleted_at", "checksum", ) } # TODO: parameter validation result = method(documents, **parameters) if settings.AUDIT_LOG_ENABLED and modified_field: new_documents = Document.objects.filter(pk__in=documents) for doc in new_documents: old_value = old_documents[doc.pk][modified_field] new_value = getattr(doc, modified_field) if isinstance(new_value, Model): # correspondent, document type, etc. new_value = new_value.pk elif isinstance(new_value, Manager): # tags, custom fields new_value = list(new_value.values_list("pk", flat=True)) LogEntry.objects.log_create( instance=doc, changes={ modified_field: [ old_value, new_value, ], }, action=LogEntry.Action.UPDATE, actor=user, additional_data={ "reason": f"Bulk edit: {method.__name__}", }, ) return Response({"result": result}) except Exception as e: logger.warning(f"An error occurred performing bulk edit: {e!s}") return HttpResponseBadRequest( "Error performing bulk edit, check logs for more detail.", ) @extend_schema_view( post=extend_schema( description="Upload a document via the API", external_docs={ "description": "Further documentation", "url": "https://docs.paperless-ngx.com/api/#file-uploads", }, responses={ (200, "application/json"): OpenApiTypes.STR, }, ), ) class PostDocumentView(GenericAPIView): permission_classes = (IsAuthenticated,) serializer_class = PostDocumentSerializer parser_classes = (parsers.MultiPartParser,) def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) doc_name, doc_data = serializer.validated_data.get("document") correspondent_id = serializer.validated_data.get("correspondent") document_type_id = serializer.validated_data.get("document_type") storage_path_id = serializer.validated_data.get("storage_path") tag_ids = serializer.validated_data.get("tags") title = serializer.validated_data.get("title") created = serializer.validated_data.get("created") archive_serial_number = serializer.validated_data.get("archive_serial_number") custom_field_ids = serializer.validated_data.get("custom_fields") from_webui = serializer.validated_data.get("from_webui") t = int(mktime(datetime.now().timetuple())) settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) temp_file_path = Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) / Path( pathvalidate.sanitize_filename(doc_name), ) temp_file_path.write_bytes(doc_data) os.utime(temp_file_path, times=(t, t)) input_doc = ConsumableDocument( source=DocumentSource.WebUI if from_webui else DocumentSource.ApiUpload, original_file=temp_file_path, ) input_doc_overrides = DocumentMetadataOverrides( filename=doc_name, title=title, correspondent_id=correspondent_id, document_type_id=document_type_id, storage_path_id=storage_path_id, tag_ids=tag_ids, created=created, asn=archive_serial_number, owner_id=request.user.id, # TODO: set values custom_fields={cf_id: None for cf_id in custom_field_ids} if custom_field_ids else None, ) async_task = consume_file.delay( input_doc, input_doc_overrides, ) return Response(async_task.id) @extend_schema_view( post=extend_schema( description="Get selection data for the selected documents", responses={ (200, "application/json"): inline_serializer( name="SelectionData", fields={ "selected_correspondents": serializers.ListSerializer( child=inline_serializer( name="CorrespondentCounts", fields={ "id": serializers.IntegerField(), "document_count": serializers.IntegerField(), }, ), ), "selected_tags": serializers.ListSerializer( child=inline_serializer( name="TagCounts", fields={ "id": serializers.IntegerField(), "document_count": serializers.IntegerField(), }, ), ), "selected_document_types": serializers.ListSerializer( child=inline_serializer( name="DocumentTypeCounts", fields={ "id": serializers.IntegerField(), "document_count": serializers.IntegerField(), }, ), ), "selected_storage_paths": serializers.ListSerializer( child=inline_serializer( name="StoragePathCounts", fields={ "id": serializers.IntegerField(), "document_count": serializers.IntegerField(), }, ), ), "selected_custom_fields": serializers.ListSerializer( child=inline_serializer( name="CustomFieldCounts", fields={ "id": serializers.IntegerField(), "document_count": serializers.IntegerField(), }, ), ), }, ), }, ), ) class SelectionDataView(GenericAPIView): permission_classes = (IsAuthenticated,) serializer_class = DocumentListSerializer parser_classes = (parsers.MultiPartParser, parsers.JSONParser) def post(self, request, format=None): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) ids = serializer.validated_data.get("documents") correspondents = Correspondent.objects.annotate( document_count=Count( Case(When(documents__id__in=ids, then=1), output_field=IntegerField()), ), ) tags = Tag.objects.annotate( document_count=Count( Case(When(documents__id__in=ids, then=1), output_field=IntegerField()), ), ) types = DocumentType.objects.annotate( document_count=Count( Case(When(documents__id__in=ids, then=1), output_field=IntegerField()), ), ) storage_paths = StoragePath.objects.annotate( document_count=Count( Case(When(documents__id__in=ids, then=1), output_field=IntegerField()), ), ) custom_fields = CustomField.objects.annotate( document_count=Count( Case( When( fields__document__id__in=ids, then=1, ), output_field=IntegerField(), ), ), ) r = Response( { "selected_correspondents": [ {"id": t.id, "document_count": t.document_count} for t in correspondents ], "selected_tags": [ {"id": t.id, "document_count": t.document_count} for t in tags ], "selected_document_types": [ {"id": t.id, "document_count": t.document_count} for t in types ], "selected_storage_paths": [ {"id": t.id, "document_count": t.document_count} for t in storage_paths ], "selected_custom_fields": [ {"id": t.id, "document_count": t.document_count} for t in custom_fields ], }, ) return r @extend_schema_view( get=extend_schema( description="Get a list of all available tags", parameters=[ OpenApiParameter( name="term", required=False, type=str, description="Term to search for", ), OpenApiParameter( name="limit", required=False, type=int, description="Number of completions to return", ), ], responses={ (200, "application/json"): serializers.ListSerializer( child=serializers.CharField(), ), }, ), ) class SearchAutoCompleteView(GenericAPIView): permission_classes = (IsAuthenticated,) def get(self, request, format=None): user = self.request.user if hasattr(self.request, "user") else None if "term" in request.query_params: term = request.query_params["term"] else: return HttpResponseBadRequest("Term required") if "limit" in request.query_params: limit = int(request.query_params["limit"]) if limit <= 0: return HttpResponseBadRequest("Invalid limit") else: limit = 10 from paperless import index ix = index.open_index() return Response( index.autocomplete( ix, term, limit, user, ), ) @extend_schema_view( get=extend_schema( description="Global search", parameters=[ OpenApiParameter( name="query", required=True, type=str, description="Query to search for", ), OpenApiParameter( name="db_only", required=False, type=bool, description="Search only the database", ), ], responses={ (200, "application/json"): inline_serializer( name="SearchResult", fields={ "total": serializers.IntegerField(), "documents": DocumentSerializer(many=True), "saved_views": SavedViewSerializer(many=True), "tags": TagSerializer(many=True), "correspondents": CorrespondentSerializer(many=True), "document_types": DocumentTypeSerializer(many=True), "storage_paths": StoragePathSerializer(many=True), "users": UserSerializer(many=True), "groups": GroupSerializer(many=True), "mail_rules": MailRuleSerializer(many=True), "mail_accounts": MailAccountSerializer(many=True), "workflows": WorkflowSerializer(many=True), "custom_fields": CustomFieldSerializer(many=True), }, ), }, ), ) class GlobalSearchView(PassUserMixin): permission_classes = (IsAuthenticated,) serializer_class = SearchResultSerializer def get(self, request, *args, **kwargs): query = request.query_params.get("query", None) if query is None: return HttpResponseBadRequest("Query required") elif len(query) < 3: return HttpResponseBadRequest("Query must be at least 3 characters") db_only = request.query_params.get("db_only", False) OBJECT_LIMIT = 3 docs = [] if request.user.has_perm("documents.view_document"): all_docs = get_objects_for_user_owner_aware( request.user, "view_document", Document, ) # First search by title docs = all_docs.filter(title__icontains=query) if not db_only and len(docs) < OBJECT_LIMIT: # If we don't have enough results, search by content from paperless import index with index.open_index_searcher() as s: fts_query = index.DelayedFullTextQuery( s, request.query_params, OBJECT_LIMIT, filter_queryset=all_docs, ) results = fts_query[0:1] docs = docs | Document.objects.filter( id__in=[r["id"] for r in results], ) docs = docs[:OBJECT_LIMIT] saved_views = ( SavedView.objects.filter(owner=request.user, name__icontains=query) if request.user.has_perm("documents.view_savedview") else [] ) saved_views = saved_views[:OBJECT_LIMIT] tags = ( get_objects_for_user_owner_aware(request.user, "view_tag", Tag).filter( name__icontains=query, ) if request.user.has_perm("documents.view_tag") else [] ) tags = tags[:OBJECT_LIMIT] correspondents = ( get_objects_for_user_owner_aware( request.user, "view_correspondent", Correspondent, ).filter(name__icontains=query) if request.user.has_perm("documents.view_correspondent") else [] ) correspondents = correspondents[:OBJECT_LIMIT] document_types = ( get_objects_for_user_owner_aware( request.user, "view_documenttype", DocumentType, ).filter(name__icontains=query) if request.user.has_perm("documents.view_documenttype") else [] ) document_types = document_types[:OBJECT_LIMIT] storage_paths = ( get_objects_for_user_owner_aware( request.user, "view_storagepath", StoragePath, ).filter(name__icontains=query) if request.user.has_perm("documents.view_storagepath") else [] ) storage_paths = storage_paths[:OBJECT_LIMIT] users = ( User.objects.filter(username__icontains=query) if request.user.has_perm("auth.view_user") else [] ) users = users[:OBJECT_LIMIT] groups = ( Group.objects.filter(name__icontains=query) if request.user.has_perm("auth.view_group") else [] ) groups = groups[:OBJECT_LIMIT] mail_rules = ( MailRule.objects.filter(name__icontains=query) if request.user.has_perm("paperless_mail.view_mailrule") else [] ) mail_rules = mail_rules[:OBJECT_LIMIT] mail_accounts = ( MailAccount.objects.filter(name__icontains=query) if request.user.has_perm("paperless_mail.view_mailaccount") else [] ) mail_accounts = mail_accounts[:OBJECT_LIMIT] workflows = ( Workflow.objects.filter(name__icontains=query) if request.user.has_perm("documents.view_workflow") else [] ) workflows = workflows[:OBJECT_LIMIT] custom_fields = ( CustomField.objects.filter(name__icontains=query) if request.user.has_perm("documents.view_customfield") else [] ) custom_fields = custom_fields[:OBJECT_LIMIT] context = { "request": request, } docs_serializer = DocumentSerializer(docs, many=True, context=context) saved_views_serializer = SavedViewSerializer( saved_views, many=True, context=context, ) tags_serializer = TagSerializer(tags, many=True, context=context) correspondents_serializer = CorrespondentSerializer( correspondents, many=True, context=context, ) document_types_serializer = DocumentTypeSerializer( document_types, many=True, context=context, ) storage_paths_serializer = StoragePathSerializer( storage_paths, many=True, context=context, ) users_serializer = UserSerializer(users, many=True, context=context) groups_serializer = GroupSerializer(groups, many=True, context=context) mail_rules_serializer = MailRuleSerializer( mail_rules, many=True, context=context, ) mail_accounts_serializer = MailAccountSerializer( mail_accounts, many=True, context=context, ) workflows_serializer = WorkflowSerializer(workflows, many=True, context=context) custom_fields_serializer = CustomFieldSerializer( custom_fields, many=True, context=context, ) return Response( { "total": len(docs) + len(saved_views) + len(tags) + len(correspondents) + len(document_types) + len(storage_paths) + len(users) + len(groups) + len(mail_rules) + len(mail_accounts) + len(workflows) + len(custom_fields), "documents": docs_serializer.data, "saved_views": saved_views_serializer.data, "tags": tags_serializer.data, "correspondents": correspondents_serializer.data, "document_types": document_types_serializer.data, "storage_paths": storage_paths_serializer.data, "users": users_serializer.data, "groups": groups_serializer.data, "mail_rules": mail_rules_serializer.data, "mail_accounts": mail_accounts_serializer.data, "workflows": workflows_serializer.data, "custom_fields": custom_fields_serializer.data, }, ) @extend_schema_view( get=extend_schema( description="Get statistics for the current user", responses={ (200, "application/json"): OpenApiTypes.OBJECT, }, ), ) class StatisticsView(GenericAPIView): permission_classes = (IsAuthenticated,) def get(self, request, format=None): user = request.user if request.user is not None else None documents = ( ( Document.objects.all() if user is None else get_objects_for_user_owner_aware( user, "documents.view_document", Document, ) ) .only("mime_type", "content") .prefetch_related("tags") ) tags = ( Tag.objects.all() if user is None else get_objects_for_user_owner_aware(user, "documents.view_tag", Tag) ) correspondent_count = ( Correspondent.objects.count() if user is None else get_objects_for_user_owner_aware( user, "documents.view_correspondent", Correspondent, ).count() ) document_type_count = ( DocumentType.objects.count() if user is None else get_objects_for_user_owner_aware( user, "documents.view_documenttype", DocumentType, ).count() ) storage_path_count = ( StoragePath.objects.count() if user is None else get_objects_for_user_owner_aware( user, "documents.view_storagepath", StoragePath, ).count() ) documents_total = documents.count() inbox_tags = tags.filter(is_inbox_tag=True) documents_inbox = ( documents.filter(tags__id__in=inbox_tags).distinct().count() if inbox_tags.exists() else None ) document_file_type_counts = ( documents.values("mime_type") .annotate(mime_type_count=Count("mime_type")) .order_by("-mime_type_count") if documents_total > 0 else [] ) character_count = ( documents.annotate( characters=Length("content"), ) .aggregate(Sum("characters")) .get("characters__sum") ) current_asn = Document.objects.aggregate( Max("archive_serial_number", default=0), ).get( "archive_serial_number__max", ) return Response( { "documents_total": documents_total, "documents_inbox": documents_inbox, "inbox_tag": ( inbox_tags.first().pk if inbox_tags.exists() else None ), # backwards compatibility "inbox_tags": ( [tag.pk for tag in inbox_tags] if inbox_tags.exists() else None ), "document_file_type_counts": document_file_type_counts, "character_count": character_count, "tag_count": len(tags), "correspondent_count": correspondent_count, "document_type_count": document_type_count, "storage_path_count": storage_path_count, "current_asn": current_asn, }, ) class BulkDownloadView(GenericAPIView): permission_classes = (IsAuthenticated,) serializer_class = BulkDownloadSerializer parser_classes = (parsers.JSONParser,) def post(self, request, format=None): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) ids = serializer.validated_data.get("documents") documents = Document.objects.filter(pk__in=ids) compression = serializer.validated_data.get("compression") content = serializer.validated_data.get("content") follow_filename_format = serializer.validated_data.get("follow_formatting") for document in documents: if not has_perms_owner_aware(request.user, "view_document", document): return HttpResponseForbidden("Insufficient permissions") settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) temp = tempfile.NamedTemporaryFile( # noqa: SIM115 dir=settings.SCRATCH_DIR, suffix="-compressed-archive", delete=False, ) if content == "both": strategy_class = OriginalAndArchiveStrategy elif content == "originals": strategy_class = OriginalsOnlyStrategy else: strategy_class = ArchiveOnlyStrategy with zipfile.ZipFile(temp.name, "w", compression) as zipf: strategy = strategy_class(zipf, follow_formatting=follow_filename_format) for document in documents: strategy.add_document(document) # TODO(stumpylog): Investigate using FileResponse here with open(temp.name, "rb") as f: response = HttpResponse(f, content_type="application/zip") response["Content-Disposition"] = '{}; filename="{}"'.format( "attachment", "documents.zip", ) return response @extend_schema_view(**generate_object_with_permissions_schema(StoragePathSerializer)) class StoragePathViewSet(ModelViewSet, PermissionsAwareDocumentCountMixin): model = StoragePath queryset = StoragePath.objects.select_related("owner").order_by( Lower("name"), ) serializer_class = StoragePathSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = ( DjangoFilterBackend, OrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = StoragePathFilterSet ordering_fields = ("name", "path", "matching_algorithm", "match", "document_count") def get_permissions(self): if self.action == "test": # Test action does not require object level permissions self.permission_classes = (IsAuthenticated,) return super().get_permissions() def destroy(self, request, *args, **kwargs): """ When a storage path is deleted, see if documents using it require a rename/move """ instance = self.get_object() doc_ids = [doc.id for doc in instance.documents.all()] # perform the deletion so renaming/moving can happen response = super().destroy(request, *args, **kwargs) if len(doc_ids): bulk_edit.bulk_update_documents.delay(doc_ids) return response @action(methods=["post"], detail=False) def test(self, request): """ Test storage path against a document """ serializer = StoragePathTestSerializer(data=request.data) serializer.is_valid(raise_exception=True) document = serializer.validated_data.get("document") path = serializer.validated_data.get("path") result = validate_filepath_template_and_render(path, document) return Response(result) class UiSettingsView(GenericAPIView): queryset = UiSettings.objects.all() permission_classes = (IsAuthenticated, PaperlessObjectPermissions) serializer_class = UiSettingsViewSerializer def get(self, request, format=None): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = User.objects.select_related("ui_settings").get(pk=request.user.id) ui_settings = {} if hasattr(user, "ui_settings"): ui_settings = user.ui_settings.settings if "update_checking" in ui_settings: ui_settings["update_checking"]["backend_setting"] = ( settings.ENABLE_UPDATE_CHECK ) else: ui_settings["update_checking"] = { "backend_setting": settings.ENABLE_UPDATE_CHECK, } ui_settings["trash_delay"] = settings.EMPTY_TRASH_DELAY general_config = GeneralConfig() ui_settings["app_title"] = settings.APP_TITLE if general_config.app_title is not None and len(general_config.app_title) > 0: ui_settings["app_title"] = general_config.app_title ui_settings["app_logo"] = settings.APP_LOGO if general_config.app_logo is not None and len(general_config.app_logo) > 0: ui_settings["app_logo"] = general_config.app_logo ui_settings["auditlog_enabled"] = settings.AUDIT_LOG_ENABLED if settings.GMAIL_OAUTH_ENABLED or settings.OUTLOOK_OAUTH_ENABLED: manager = PaperlessMailOAuth2Manager() if settings.GMAIL_OAUTH_ENABLED: ui_settings["gmail_oauth_url"] = manager.get_gmail_authorization_url() request.session["oauth_state"] = manager.state if settings.OUTLOOK_OAUTH_ENABLED: ui_settings["outlook_oauth_url"] = ( manager.get_outlook_authorization_url() ) request.session["oauth_state"] = manager.state ui_settings["email_enabled"] = settings.EMAIL_ENABLED user_resp = { "id": user.id, "username": user.username, "is_staff": user.is_staff, "is_superuser": user.is_superuser, "groups": list(user.groups.values_list("id", flat=True)), } if len(user.first_name) > 0: user_resp["first_name"] = user.first_name if len(user.last_name) > 0: user_resp["last_name"] = user.last_name # strip . roles = map(lambda perm: re.sub(r"^\w+.", "", perm), user.get_all_permissions()) return Response( { "user": user_resp, "settings": ui_settings, "permissions": roles, }, ) def post(self, request, format=None): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) serializer.save(user=self.request.user) return Response( { "success": True, }, ) @extend_schema_view( get=extend_schema( description="Get the current version of the Paperless-NGX server", responses={ (200, "application/json"): OpenApiTypes.OBJECT, }, ), ) class RemoteVersionView(GenericAPIView): def get(self, request, format=None): remote_version = "0.0.0" is_greater_than_current = False current_version = packaging_version.parse(version.__full_version_str__) try: resp = httpx.get( "https://api.github.com/repos/paperless-ngx/paperless-ngx/releases/latest", headers={"Accept": "application/json"}, ) resp.raise_for_status() try: data = resp.json() remote_version = data["tag_name"] # Some early tags used ngx-x.y.z remote_version = remote_version.removeprefix("ngx-") except ValueError as e: logger.debug(f"An error occurred parsing remote version json: {e}") except httpx.HTTPError as e: logger.debug(f"An error occurred checking for available updates: {e}") is_greater_than_current = ( packaging_version.parse( remote_version, ) > current_version ) return Response( { "version": remote_version, "update_available": is_greater_than_current, }, ) @extend_schema_view( acknowledge=extend_schema( operation_id="acknowledge_tasks", description="Acknowledge a list of tasks", request={ "application/json": { "type": "object", "properties": { "tasks": { "type": "array", "items": {"type": "integer"}, }, }, "required": ["tasks"], }, }, responses={ (200, "application/json"): inline_serializer( name="AcknowledgeTasks", fields={ "result": serializers.IntegerField(), }, ), (400, "application/json"): None, }, ), ) class TasksViewSet(ReadOnlyModelViewSet): permission_classes = (IsAuthenticated, PaperlessObjectPermissions) serializer_class = TasksViewSerializer filter_backends = ( DjangoFilterBackend, OrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = PaperlessTaskFilterSet TASK_AND_ARGS_BY_NAME = { PaperlessTask.TaskName.INDEX_OPTIMIZE: (index_optimize, {}), PaperlessTask.TaskName.TRAIN_CLASSIFIER: ( train_classifier, {"scheduled": False}, ), PaperlessTask.TaskName.CHECK_SANITY: ( sanity_check, {"scheduled": False, "raise_on_error": False}, ), } def get_queryset(self): queryset = PaperlessTask.objects.all().order_by("-date_created") task_id = self.request.query_params.get("task_id") if task_id is not None: queryset = PaperlessTask.objects.filter(task_id=task_id) return queryset @action(methods=["post"], detail=False) def acknowledge(self, request): serializer = AcknowledgeTasksViewSerializer(data=request.data) serializer.is_valid(raise_exception=True) task_ids = serializer.validated_data.get("tasks") try: tasks = PaperlessTask.objects.filter(id__in=task_ids) if request.user is not None and not request.user.is_superuser: tasks = tasks.filter(owner=request.user) | tasks.filter(owner=None) result = tasks.update( acknowledged=True, ) return Response({"result": result}) except Exception: return HttpResponseBadRequest() @action(methods=["post"], detail=False) def run(self, request): serializer = RunTaskViewSerializer(data=request.data) serializer.is_valid(raise_exception=True) task_name = serializer.validated_data.get("task_name") if not request.user.is_superuser: return HttpResponseForbidden("Insufficient permissions") try: task_func, task_args = self.TASK_AND_ARGS_BY_NAME[task_name] result = task_func(**task_args) return Response({"result": result}) except Exception as e: logger.warning(f"An error occurred running task: {e!s}") return HttpResponseServerError( "Error running task, check logs for more detail.", ) class ShareLinkViewSet(ModelViewSet, PassUserMixin): model = ShareLink queryset = ShareLink.objects.all() serializer_class = ShareLinkSerializer pagination_class = StandardPagination permission_classes = (IsAuthenticated, PaperlessObjectPermissions) filter_backends = ( DjangoFilterBackend, OrderingFilter, ObjectOwnedOrGrantedPermissionsFilter, ) filterset_class = ShareLinkFilterSet ordering_fields = ("created", "expiration", "document") class SharedLinkView(View): authentication_classes = [] permission_classes = [] def get(self, request, slug): share_link = ShareLink.objects.filter(slug=slug).first() if share_link is None: return HttpResponseRedirect("/accounts/login/?sharelink_notfound=1") if share_link.expiration is not None and share_link.expiration < timezone.now(): return HttpResponseRedirect("/accounts/login/?sharelink_expired=1") return serve_file( doc=share_link.document, use_archive=share_link.file_version == "archive", disposition="inline", ) def serve_file(*, doc: Document, use_archive: bool, disposition: str): if use_archive: file_handle = doc.archive_file filename = doc.get_public_filename(archive=True) mime_type = "application/pdf" else: file_handle = doc.source_file filename = doc.get_public_filename() mime_type = doc.mime_type # Support browser previewing csv files by using text mime type if mime_type in {"application/csv", "text/csv"} and disposition == "inline": mime_type = "text/plain" if doc.storage_type == Document.STORAGE_TYPE_GPG: file_handle = GnuPG.decrypted(file_handle) response = HttpResponse(file_handle, content_type=mime_type) # Firefox is not able to handle unicode characters in filename field # RFC 5987 addresses this issue # see https://datatracker.ietf.org/doc/html/rfc5987#section-4.2 # Chromium cannot handle commas in the filename filename_normalized = ( normalize("NFKD", filename.replace(",", "_")) .encode( "ascii", "ignore", ) .decode("ascii") ) filename_encoded = quote(filename) content_disposition = ( f"{disposition}; " f'filename="{filename_normalized}"; ' f"filename*=utf-8''{filename_encoded}" ) response["Content-Disposition"] = content_disposition return response @extend_schema_view( post=extend_schema( operation_id="bulk_edit_objects", description="Perform a bulk edit operation on a list of objects", external_docs={ "description": "Further documentation", "url": "https://docs.paperless-ngx.com/api/#objects", }, responses={ 200: inline_serializer( name="BulkEditResult", fields={ "result": serializers.CharField(), }, ), }, ), ) class BulkEditObjectsView(PassUserMixin): permission_classes = (IsAuthenticated,) serializer_class = BulkEditObjectsSerializer parser_classes = (parsers.JSONParser,) def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = self.request.user object_type = serializer.validated_data.get("object_type") object_ids = serializer.validated_data.get("objects") object_class = serializer.get_object_class(object_type) operation = serializer.validated_data.get("operation") objs = object_class.objects.select_related("owner").filter(pk__in=object_ids) if not user.is_superuser: model_name = object_class._meta.verbose_name perm = ( f"documents.change_{model_name}" if operation == "set_permissions" else f"documents.delete_{model_name}" ) has_perms = user.has_perm(perm) and all( (obj.owner == user or obj.owner is None) for obj in objs ) if not has_perms: return HttpResponseForbidden("Insufficient permissions") if operation == "set_permissions": permissions = serializer.validated_data.get("permissions") owner = serializer.validated_data.get("owner") merge = serializer.validated_data.get("merge") try: qs = object_class.objects.filter(id__in=object_ids) # if merge is true, we dont want to remove the owner if "owner" in serializer.validated_data and ( not merge or (merge and owner is not None) ): # if merge is true, we dont want to overwrite the owner qs_owner_update = qs.filter(owner__isnull=True) if merge else qs qs_owner_update.update(owner=owner) if "permissions" in serializer.validated_data: for obj in qs: set_permissions_for_object( permissions=permissions, object=obj, merge=merge, ) except Exception as e: logger.warning( f"An error occurred performing bulk permissions edit: {e!s}", ) return HttpResponseBadRequest( "Error performing bulk permissions edit, check logs for more detail.", ) elif operation == "delete": objs.delete() return Response({"result": "OK"}) class WorkflowTriggerViewSet(ModelViewSet): permission_classes = (IsAuthenticated, PaperlessObjectPermissions) serializer_class = WorkflowTriggerSerializer pagination_class = StandardPagination model = WorkflowTrigger queryset = WorkflowTrigger.objects.all() class WorkflowActionViewSet(ModelViewSet): permission_classes = (IsAuthenticated, PaperlessObjectPermissions) serializer_class = WorkflowActionSerializer pagination_class = StandardPagination model = WorkflowAction queryset = WorkflowAction.objects.all().prefetch_related( "assign_tags", "assign_view_users", "assign_view_groups", "assign_change_users", "assign_change_groups", "assign_custom_fields", ) class WorkflowViewSet(ModelViewSet): permission_classes = (IsAuthenticated, PaperlessObjectPermissions) serializer_class = WorkflowSerializer pagination_class = StandardPagination model = Workflow queryset = ( Workflow.objects.all() .order_by("order") .prefetch_related( "triggers", "actions", ) ) class CustomFieldViewSet(ModelViewSet): permission_classes = (IsAuthenticated, PaperlessObjectPermissions) serializer_class = CustomFieldSerializer pagination_class = StandardPagination filter_backends = ( DjangoFilterBackend, OrderingFilter, ) filterset_class = CustomFieldFilterSet model = CustomField queryset = CustomField.objects.all().order_by("-created") def get_queryset(self): filter = ( Q(fields__document__deleted_at__isnull=True) if self.request.user is None or self.request.user.is_superuser else ( Q( fields__document__deleted_at__isnull=True, fields__document__id__in=get_objects_for_user_owner_aware( self.request.user, "documents.view_document", Document, ).values_list("id", flat=True), ) ) ) return ( super() .get_queryset() .annotate( document_count=Count( "fields", filter=filter, ), ) ) @extend_schema_view( get=extend_schema( description="Get the current system status of the Paperless-NGX server", responses={ (200, "application/json"): inline_serializer( name="SystemStatus", fields={ "pngx_version": serializers.CharField(), "server_os": serializers.CharField(), "install_type": serializers.CharField(), "storage": inline_serializer( name="Storage", fields={ "total": serializers.IntegerField(), "available": serializers.IntegerField(), }, ), "database": inline_serializer( name="Database", fields={ "type": serializers.CharField(), "url": serializers.CharField(), "status": serializers.CharField(), "error": serializers.CharField(), "migration_status": inline_serializer( name="MigrationStatus", fields={ "latest_migration": serializers.CharField(), "unapplied_migrations": serializers.ListSerializer( child=serializers.CharField(), ), }, ), }, ), "tasks": inline_serializer( name="Tasks", fields={ "redis_url": serializers.CharField(), "redis_status": serializers.CharField(), "redis_error": serializers.CharField(), "celery_status": serializers.CharField(), }, ), "index": inline_serializer( name="Index", fields={ "status": serializers.CharField(), "error": serializers.CharField(), "last_modified": serializers.DateTimeField(), }, ), "classifier": inline_serializer( name="Classifier", fields={ "status": serializers.CharField(), "error": serializers.CharField(), "last_trained": serializers.DateTimeField(), }, ), "sanity_check": inline_serializer( name="SanityCheck", fields={ "status": serializers.CharField(), "error": serializers.CharField(), "last_run": serializers.DateTimeField(), }, ), }, ), }, ), ) class SystemStatusView(PassUserMixin): permission_classes = (IsAuthenticated,) def get(self, request, format=None): if not request.user.is_staff: return HttpResponseForbidden("Insufficient permissions") current_version = version.__full_version_str__ install_type = "bare-metal" if os.environ.get("KUBERNETES_SERVICE_HOST") is not None: install_type = "kubernetes" elif os.environ.get("PNGX_CONTAINERIZED") == "1": install_type = "docker" db_conn = connections["default"] db_url = db_conn.settings_dict["NAME"] db_error = None try: db_conn.ensure_connection() db_status = "OK" loader = MigrationLoader(connection=db_conn) all_migrations = [f"{app}.{name}" for app, name in loader.graph.nodes] applied_migrations = [ f"{m.app}.{m.name}" for m in MigrationRecorder.Migration.objects.all().order_by("id") ] except Exception as e: # pragma: no cover applied_migrations = [] db_status = "ERROR" logger.exception( f"System status detected a possible problem while connecting to the database: {e}", ) db_error = "Error connecting to database, check logs for more detail." media_stats = os.statvfs(settings.MEDIA_ROOT) redis_url = settings._CHANNELS_REDIS_URL redis_url_parsed = urlparse(redis_url) redis_constructed_url = f"{redis_url_parsed.scheme}://{redis_url_parsed.path or redis_url_parsed.hostname}" if redis_url_parsed.hostname is not None: redis_constructed_url += f":{redis_url_parsed.port}" redis_error = None with Redis.from_url(url=redis_url) as client: try: client.ping() redis_status = "OK" except Exception as e: redis_status = "ERROR" logger.exception( f"System status detected a possible problem while connecting to redis: {e}", ) redis_error = "Error connecting to redis, check logs for more detail." celery_error = None celery_url = None try: celery_ping = celery_app.control.inspect().ping() celery_url = next(iter(celery_ping.keys())) first_worker_ping = celery_ping[celery_url] if first_worker_ping["ok"] == "pong": celery_active = "OK" except Exception as e: celery_active = "ERROR" logger.exception( f"System status detected a possible problem while connecting to celery: {e}", ) celery_error = "Error connecting to celery, check logs for more detail." index_error = None try: ix = index.open_index() index_status = "OK" index_last_modified = make_aware( datetime.fromtimestamp(ix.last_modified()), ) except Exception as e: index_status = "ERROR" index_error = "Error opening index, check logs for more detail." logger.exception( f"System status detected a possible problem while opening the index: {e}", ) index_last_modified = None last_trained_task = ( PaperlessTask.objects.filter( task_name=PaperlessTask.TaskName.TRAIN_CLASSIFIER, ) .order_by("-date_done") .first() ) classifier_status = "OK" classifier_error = None if last_trained_task is None: classifier_status = "WARNING" classifier_error = "No classifier training tasks found" elif last_trained_task and last_trained_task.status == states.FAILURE: classifier_status = "ERROR" classifier_error = last_trained_task.result classifier_last_trained = ( last_trained_task.date_done if last_trained_task else None ) last_sanity_check = ( PaperlessTask.objects.filter( task_name=PaperlessTask.TaskName.CHECK_SANITY, ) .order_by("-date_done") .first() ) sanity_check_status = "OK" sanity_check_error = None if last_sanity_check is None: sanity_check_status = "WARNING" sanity_check_error = "No sanity check tasks found" elif last_sanity_check and last_sanity_check.status == states.FAILURE: sanity_check_status = "ERROR" sanity_check_error = last_sanity_check.result sanity_check_last_run = ( last_sanity_check.date_done if last_sanity_check else None ) return Response( { "pngx_version": current_version, "server_os": platform.platform(), "install_type": install_type, "storage": { "total": media_stats.f_frsize * media_stats.f_blocks, "available": media_stats.f_frsize * media_stats.f_bavail, }, "database": { "type": db_conn.vendor, "url": db_url, "status": db_status, "error": db_error, "migration_status": { "latest_migration": applied_migrations[-1], "unapplied_migrations": [ m for m in all_migrations if m not in applied_migrations ], }, }, "tasks": { "redis_url": redis_constructed_url, "redis_status": redis_status, "redis_error": redis_error, "celery_status": celery_active, "celery_url": celery_url, "celery_error": celery_error, "index_status": index_status, "index_last_modified": index_last_modified, "index_error": index_error, "classifier_status": classifier_status, "classifier_last_trained": classifier_last_trained, "classifier_error": classifier_error, "sanity_check_status": sanity_check_status, "sanity_check_last_run": sanity_check_last_run, "sanity_check_error": sanity_check_error, }, }, ) class TrashView(ListModelMixin, PassUserMixin): permission_classes = (IsAuthenticated,) serializer_class = TrashSerializer filter_backends = (ObjectOwnedPermissionsFilter,) pagination_class = StandardPagination model = Document queryset = Document.deleted_objects.all() def get(self, request, format=None): self.serializer_class = DocumentSerializer return self.list(request, format) def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) doc_ids = serializer.validated_data.get("documents") docs = ( Document.global_objects.filter(id__in=doc_ids) if doc_ids is not None else self.filter_queryset(self.get_queryset()).all() ) for doc in docs: if not has_perms_owner_aware(request.user, "delete_document", doc): return HttpResponseForbidden("Insufficient permissions") action = serializer.validated_data.get("action") if action == "restore": for doc in Document.deleted_objects.filter(id__in=doc_ids).all(): doc.restore(strict=False) elif action == "empty": if doc_ids is None: doc_ids = [doc.id for doc in docs] empty_trash(doc_ids=doc_ids) return Response({"result": "OK", "doc_ids": doc_ids})