DRY, nice

This commit is contained in:
shamoon
2026-02-12 18:55:47 -08:00
parent da865b85fa
commit f5ee86e778
5 changed files with 162 additions and 110 deletions

View File

@@ -13,59 +13,7 @@ from documents.caching import CLASSIFIER_VERSION_KEY
from documents.caching import get_thumbnail_modified_key
from documents.classifier import DocumentClassifier
from documents.models import Document
def _resolve_effective_doc(pk: int, request) -> Document | None:
"""
Resolve which Document row should be considered for caching keys:
- If a version is requested, use that version
- If pk is a root doc, use its newest child version if present, else the root.
- Else, pk is a version, use that version.
Returns None if resolution fails (treat as no-cache).
"""
try:
request_doc = Document.objects.only("id", "root_document_id").get(pk=pk)
except Document.DoesNotExist:
return None
root_doc = (
request_doc
if request_doc.root_document_id is None
else Document.objects.only("id").get(id=request_doc.root_document_id)
)
version_param = (
request.query_params.get("version")
if hasattr(request, "query_params")
else None
)
if version_param:
try:
version_id = int(version_param)
candidate = Document.objects.only("id", "root_document_id").get(
id=version_id,
)
if (
candidate.id != root_doc.id
and candidate.root_document_id != root_doc.id
):
return None
return candidate
except Exception:
return None
# Default behavior: if pk is a root doc, prefer its newest child version
if request_doc.root_document_id is None:
latest = (
Document.objects.filter(root_document=root_doc)
.only("id")
.order_by("id")
.last()
)
return latest or root_doc
# pk is already a version
return request_doc
from documents.versioning import resolve_effective_document_by_pk
def suggestions_etag(request, pk: int) -> str | None:
@@ -125,7 +73,7 @@ def metadata_etag(request, pk: int) -> str | None:
Metadata is extracted from the original file, so use its checksum as the
ETag
"""
doc = _resolve_effective_doc(pk, request)
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
return None
return doc.checksum
@@ -137,7 +85,7 @@ def metadata_last_modified(request, pk: int) -> datetime | None:
not the modification of the original file, but of the database object, but might as well
error on the side of more cautious
"""
doc = _resolve_effective_doc(pk, request)
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
return None
return doc.modified
@@ -147,7 +95,7 @@ def preview_etag(request, pk: int) -> str | None:
"""
ETag for the document preview, using the original or archive checksum, depending on the request
"""
doc = _resolve_effective_doc(pk, request)
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
return None
use_original = (
@@ -163,7 +111,7 @@ def preview_last_modified(request, pk: int) -> datetime | None:
Uses the documents modified time to set the Last-Modified header. Not strictly
speaking correct, but close enough and quick
"""
doc = _resolve_effective_doc(pk, request)
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
return None
return doc.modified
@@ -175,7 +123,7 @@ def thumbnail_last_modified(request: Any, pk: int) -> datetime | None:
Cache should be (slightly?) faster than filesystem
"""
try:
doc = _resolve_effective_doc(pk, request)
doc = resolve_effective_document_by_pk(pk, request).document
if doc is None:
return None
if not doc.thumbnail_path.exists():
@@ -195,5 +143,5 @@ def thumbnail_last_modified(request: Any, pk: int) -> datetime | None:
)
cache.set(doc_key, last_modified, CACHE_50_MINUTES)
return last_modified
except Document.DoesNotExist: # pragma: no cover
except (Document.DoesNotExist, OSError): # pragma: no cover
return None

View File

@@ -50,6 +50,7 @@ from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import Note
from documents.models import User
from documents.versioning import get_latest_version_for_root
if TYPE_CHECKING:
from django.db.models import QuerySet
@@ -185,16 +186,11 @@ def update_document(writer: AsyncWriter, doc: Document) -> None:
only_with_perms_in=["view_document"],
)
viewer_ids: str = ",".join([str(u.id) for u in users_with_perms])
effective_content = doc.content
if doc.root_document_id is None:
latest_version = (
Document.objects.filter(root_document=doc)
.only("content")
.order_by("-id")
.first()
)
if latest_version is not None:
effective_content = latest_version.content
effective_content = (
get_latest_version_for_root(doc).content
if doc.root_document_id is None
else doc.content
)
writer.update_document(
id=doc.pk,
title=doc.title,

View File

@@ -3,12 +3,12 @@ from unittest import mock
from django.test import TestCase
from documents.conditionals import _resolve_effective_doc
from documents.conditionals import metadata_etag
from documents.conditionals import preview_etag
from documents.conditionals import thumbnail_last_modified
from documents.models import Document
from documents.tests.utils import DirectoriesMixin
from documents.versioning import resolve_effective_document_by_pk
class TestConditionals(DirectoriesMixin, TestCase):
@@ -56,8 +56,12 @@ class TestConditionals(DirectoriesMixin, TestCase):
query_params={"version": str(other_version.id)},
)
self.assertIsNone(_resolve_effective_doc(root.id, invalid_request))
self.assertIsNone(_resolve_effective_doc(root.id, unrelated_request))
self.assertIsNone(
resolve_effective_document_by_pk(root.id, invalid_request).document,
)
self.assertIsNone(
resolve_effective_document_by_pk(root.id, unrelated_request).document,
)
def test_thumbnail_last_modified_uses_effective_document_for_cache_key(
self,

120
src/documents/versioning.py Normal file
View File

@@ -0,0 +1,120 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Any
from documents.models import Document
class VersionResolutionError(str, Enum):
INVALID = "invalid"
NOT_FOUND = "not_found"
@dataclass(frozen=True)
class VersionResolution:
document: Document | None
error: VersionResolutionError | None = None
def _document_manager(*, include_deleted: bool) -> Any:
return Document.global_objects if include_deleted else Document.objects
def get_request_version_param(request: Any) -> str | None:
if hasattr(request, "query_params"):
return request.query_params.get("version")
return None
def get_root_document(doc: Document, *, include_deleted: bool = False) -> Document:
# Use root_document_id to avoid a query when this is already a root.
# If root_document isn't available, fall back to the document itself.
if doc.root_document_id is None:
return doc
if doc.root_document is not None:
return doc.root_document
manager = _document_manager(include_deleted=include_deleted)
root_doc = manager.only("id").filter(id=doc.root_document_id).first()
return root_doc or doc
def get_latest_version_for_root(
root_doc: Document,
*,
include_deleted: bool = False,
) -> Document:
manager = _document_manager(include_deleted=include_deleted)
latest = manager.filter(root_document=root_doc).order_by("-id").first()
return latest or root_doc
def resolve_requested_version_for_root(
root_doc: Document,
request: Any,
*,
include_deleted: bool = False,
) -> VersionResolution:
version_param = get_request_version_param(request)
if not version_param:
return VersionResolution(
document=get_latest_version_for_root(
root_doc,
include_deleted=include_deleted,
),
)
try:
version_id = int(version_param)
except (TypeError, ValueError):
return VersionResolution(document=None, error=VersionResolutionError.INVALID)
manager = _document_manager(include_deleted=include_deleted)
candidate = manager.only("id", "root_document_id").filter(id=version_id).first()
if candidate is None:
return VersionResolution(document=None, error=VersionResolutionError.NOT_FOUND)
if candidate.id != root_doc.id and candidate.root_document_id != root_doc.id:
return VersionResolution(document=None, error=VersionResolutionError.NOT_FOUND)
return VersionResolution(document=candidate)
def resolve_effective_document(
request_doc: Document,
request: Any,
*,
include_deleted: bool = False,
) -> VersionResolution:
root_doc = get_root_document(request_doc, include_deleted=include_deleted)
if get_request_version_param(request) is not None:
return resolve_requested_version_for_root(
root_doc,
request,
include_deleted=include_deleted,
)
if request_doc.root_document_id is None:
return VersionResolution(
document=get_latest_version_for_root(
root_doc,
include_deleted=include_deleted,
),
)
return VersionResolution(document=request_doc)
def resolve_effective_document_by_pk(
pk: int,
request: Any,
*,
include_deleted: bool = False,
) -> VersionResolution:
manager = _document_manager(include_deleted=include_deleted)
request_doc = manager.only("id", "root_document_id").filter(pk=pk).first()
if request_doc is None:
return VersionResolution(document=None, error=VersionResolutionError.NOT_FOUND)
return resolve_effective_document(
request_doc,
request,
include_deleted=include_deleted,
)

View File

@@ -206,6 +206,11 @@ from documents.tasks import sanity_check
from documents.tasks import train_classifier
from documents.tasks import update_document_parent_tags
from documents.utils import get_boolean
from documents.versioning import VersionResolutionError
from documents.versioning import get_latest_version_for_root
from documents.versioning import get_request_version_param
from documents.versioning import get_root_document
from documents.versioning import resolve_requested_version_for_root
from paperless import version
from paperless.celery import app as celery_app
from paperless.config import AIConfig
@@ -834,14 +839,6 @@ class DocumentViewSet(
)
return super().get_serializer(*args, **kwargs)
@staticmethod
def _get_root_doc(doc: Document) -> Document:
# Use root_document_id to avoid a query when this is already a root.
# If root_document isn't available, fall back to the document itself.
if doc.root_document_id is None:
return doc
return doc.root_document or doc
@extend_schema(
operation_id="documents_root",
responses=inline_serializer(
@@ -861,7 +858,7 @@ class DocumentViewSet(
except Document.DoesNotExist:
raise Http404
root_doc = self._get_root_doc(doc)
root_doc = get_root_document(doc)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
@@ -896,7 +893,7 @@ class DocumentViewSet(
content_doc = (
self._resolve_file_doc(root_doc, request)
if "version" in request.query_params
else self._get_latest_doc_for_root(root_doc)
else get_latest_version_for_root(root_doc)
)
content_updated = "content" in request.data
updated_content = request.data.get("content") if content_updated else None
@@ -967,31 +964,18 @@ class DocumentViewSet(
)
def _resolve_file_doc(self, root_doc: Document, request):
version_param = request.query_params.get("version")
if version_param:
try:
version_id = int(version_param)
except (TypeError, ValueError):
if get_request_version_param(request):
resolution = resolve_requested_version_for_root(
root_doc,
request,
include_deleted=True,
)
if resolution.error == VersionResolutionError.INVALID:
raise NotFound("Invalid version parameter")
try:
candidate = Document.global_objects.select_related("owner").get(
id=version_id,
)
except Document.DoesNotExist:
if resolution.document is None:
raise Http404
if (
candidate.id != root_doc.id
and candidate.root_document_id != root_doc.id
):
raise Http404
return candidate
latest = Document.objects.filter(root_document=root_doc).order_by("id").last()
return latest or root_doc
@staticmethod
def _get_latest_doc_for_root(root_doc: Document) -> Document:
latest = Document.objects.filter(root_document=root_doc).order_by("-id").first()
return latest or root_doc
return resolution.document
return get_latest_version_for_root(root_doc)
def _get_effective_file_doc(
self,
@@ -1015,7 +999,7 @@ class DocumentViewSet(
"owner",
"root_document",
).get(id=pk)
root_doc = self._get_root_doc(request_doc)
root_doc = get_root_document(request_doc)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
@@ -1065,7 +1049,7 @@ class DocumentViewSet(
"owner",
"root_document",
).get(pk=pk)
root_doc = self._get_root_doc(request_doc)
root_doc = get_root_document(request_doc)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
@@ -1251,7 +1235,7 @@ class DocumentViewSet(
"owner",
"root_document",
).get(id=pk)
root_doc = self._get_root_doc(request_doc)
root_doc = get_root_document(request_doc)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
@@ -1279,7 +1263,7 @@ class DocumentViewSet(
"owner",
"root_document",
).get(id=pk)
root_doc = self._get_root_doc(request_doc)
root_doc = get_root_document(request_doc)
if request.user is not None and not has_perms_owner_aware(
request.user,
"view_document",
@@ -1672,7 +1656,7 @@ class DocumentViewSet(
"owner",
"root_document",
).get(pk=pk)
root_doc = self._get_root_doc(root_doc)
root_doc = get_root_document(root_doc)
except Document.DoesNotExist:
raise Http404