mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-02-14 00:09:35 -06:00
Bit more coverage
This commit is contained in:
@@ -1,17 +1,21 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest import TestCase
|
||||
from unittest import mock
|
||||
|
||||
from auditlog.models import LogEntry # type: ignore[import-untyped]
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import FieldError
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from documents.data_models import DocumentSource
|
||||
from documents.filters import EffectiveContentFilter
|
||||
from documents.filters import TitleContentFilter
|
||||
from documents.models import Document
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
|
||||
@@ -393,6 +397,28 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
self.assertTrue(metadata.called)
|
||||
|
||||
def test_metadata_version_param_errors(self) -> None:
|
||||
root = self._create_pdf(title="root", checksum="root")
|
||||
|
||||
resp = self.client.get(
|
||||
f"/api/documents/{root.id}/metadata/?version=not-a-number",
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
resp = self.client.get(f"/api/documents/{root.id}/metadata/?version=9999")
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
other_root = self._create_pdf(title="other", checksum="other")
|
||||
other_version = self._create_pdf(
|
||||
title="other-v1",
|
||||
checksum="other-v1",
|
||||
root_document=other_root,
|
||||
)
|
||||
resp = self.client.get(
|
||||
f"/api/documents/{root.id}/metadata/?version={other_version.id}",
|
||||
)
|
||||
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
def test_metadata_returns_403_when_user_lacks_permission(self) -> None:
|
||||
owner = User.objects.create_user(username="owner")
|
||||
other = User.objects.create_user(username="other")
|
||||
@@ -613,3 +639,39 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase):
|
||||
|
||||
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(resp.data["content"], "v1-content")
|
||||
|
||||
|
||||
class TestVersionAwareFilters(TestCase):
|
||||
def test_title_content_filter_falls_back_to_content(self) -> None:
|
||||
queryset = mock.Mock()
|
||||
fallback_queryset = mock.Mock()
|
||||
queryset.filter.side_effect = [FieldError("missing field"), fallback_queryset]
|
||||
|
||||
result = TitleContentFilter().filter(queryset, " latest ")
|
||||
|
||||
self.assertIs(result, fallback_queryset)
|
||||
self.assertEqual(queryset.filter.call_count, 2)
|
||||
|
||||
def test_effective_content_filter_falls_back_to_content_lookup(self) -> None:
|
||||
queryset = mock.Mock()
|
||||
fallback_queryset = mock.Mock()
|
||||
queryset.filter.side_effect = [FieldError("missing field"), fallback_queryset]
|
||||
|
||||
result = EffectiveContentFilter(lookup_expr="icontains").filter(
|
||||
queryset,
|
||||
" latest ",
|
||||
)
|
||||
|
||||
self.assertIs(result, fallback_queryset)
|
||||
first_kwargs = queryset.filter.call_args_list[0].kwargs
|
||||
second_kwargs = queryset.filter.call_args_list[1].kwargs
|
||||
self.assertEqual(first_kwargs, {"effective_content__icontains": "latest"})
|
||||
self.assertEqual(second_kwargs, {"content__icontains": "latest"})
|
||||
|
||||
def test_effective_content_filter_returns_input_for_empty_values(self) -> None:
|
||||
queryset = mock.Mock()
|
||||
|
||||
result = EffectiveContentFilter(lookup_expr="icontains").filter(queryset, " ")
|
||||
|
||||
self.assertIs(result, queryset)
|
||||
queryset.filter.assert_not_called()
|
||||
|
||||
@@ -1242,6 +1242,38 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
self.assertIsNone(overrides.document_type_id)
|
||||
self.assertIsNone(overrides.tag_ids)
|
||||
|
||||
def test_document_filters_use_latest_version_content(self) -> None:
|
||||
root = Document.objects.create(
|
||||
title="versioned root",
|
||||
checksum="root",
|
||||
mime_type="application/pdf",
|
||||
content="root-content",
|
||||
)
|
||||
version = Document.objects.create(
|
||||
title="versioned root",
|
||||
checksum="v1",
|
||||
mime_type="application/pdf",
|
||||
root_document=root,
|
||||
content="latest-version-content",
|
||||
)
|
||||
|
||||
response = self.client.get(
|
||||
"/api/documents/?content__icontains=latest-version-content",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
results = response.data["results"]
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0]["id"], root.id)
|
||||
self.assertEqual(results[0]["content"], version.content)
|
||||
|
||||
response = self.client.get(
|
||||
"/api/documents/?title_content=latest-version-content",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
results = response.data["results"]
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0]["id"], root.id)
|
||||
|
||||
def test_create_wrong_endpoint(self) -> None:
|
||||
response = self.client.post(
|
||||
"/api/documents/",
|
||||
|
||||
@@ -381,6 +381,55 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
[self.doc3.id, self.doc4.id, self.doc5.id],
|
||||
)
|
||||
|
||||
def test_delete_root_document_deletes_all_versions(self) -> None:
|
||||
version = Document.objects.create(
|
||||
checksum="A-v1",
|
||||
title="A version",
|
||||
root_document=self.doc1,
|
||||
)
|
||||
|
||||
bulk_edit.delete([self.doc1.id])
|
||||
|
||||
self.assertFalse(Document.objects.filter(id=self.doc1.id).exists())
|
||||
self.assertFalse(Document.objects.filter(id=version.id).exists())
|
||||
|
||||
def test_delete_version_document_keeps_root(self) -> None:
|
||||
version = Document.objects.create(
|
||||
checksum="A-v1",
|
||||
title="A version",
|
||||
root_document=self.doc1,
|
||||
)
|
||||
|
||||
bulk_edit.delete([version.id])
|
||||
|
||||
self.assertTrue(Document.objects.filter(id=self.doc1.id).exists())
|
||||
self.assertFalse(Document.objects.filter(id=version.id).exists())
|
||||
|
||||
def test_get_root_and_current_doc_mapping(self) -> None:
|
||||
version1 = Document.objects.create(
|
||||
checksum="B-v1",
|
||||
title="B version 1",
|
||||
root_document=self.doc2,
|
||||
)
|
||||
version2 = Document.objects.create(
|
||||
checksum="B-v2",
|
||||
title="B version 2",
|
||||
root_document=self.doc2,
|
||||
)
|
||||
|
||||
root_ids_by_doc_id = bulk_edit._get_root_ids_by_doc_id(
|
||||
[self.doc2.id, version1.id, version2.id],
|
||||
)
|
||||
self.assertEqual(root_ids_by_doc_id[self.doc2.id], self.doc2.id)
|
||||
self.assertEqual(root_ids_by_doc_id[version1.id], self.doc2.id)
|
||||
self.assertEqual(root_ids_by_doc_id[version2.id], self.doc2.id)
|
||||
|
||||
root_docs, current_docs = bulk_edit._get_root_and_current_docs_by_root_id(
|
||||
{self.doc2.id},
|
||||
)
|
||||
self.assertEqual(root_docs[self.doc2.id].id, self.doc2.id)
|
||||
self.assertEqual(current_docs[self.doc2.id].id, version2.id)
|
||||
|
||||
@mock.patch("documents.tasks.bulk_update_documents.delay")
|
||||
def test_set_permissions(self, m) -> None:
|
||||
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
|
||||
|
||||
@@ -98,11 +98,18 @@ class FaultyGenericExceptionParser(_BaseTestParser):
|
||||
raise Exception("Generic exception.")
|
||||
|
||||
|
||||
def fake_magic_from_file(file, *, mime=False):
|
||||
def fake_magic_from_file(file, *, mime=False): # NOSONAR
|
||||
if mime:
|
||||
filepath = Path(file)
|
||||
if filepath.name.startswith("invalid_pdf"):
|
||||
return "application/octet-stream"
|
||||
if filepath.suffix == "":
|
||||
try:
|
||||
with Path(filepath).open("rb") as handle:
|
||||
if handle.read(4) == b"%PDF":
|
||||
return "application/pdf"
|
||||
except OSError:
|
||||
pass
|
||||
if filepath.suffix == ".pdf":
|
||||
return "application/pdf"
|
||||
elif filepath.suffix == ".png":
|
||||
@@ -747,6 +754,65 @@ class TestConsumer(
|
||||
self.assertTrue(version.original_filename.endswith("_v0.pdf"))
|
||||
self.assertTrue(bool(version.content))
|
||||
|
||||
@override_settings(AUDIT_LOG_ENABLED=True)
|
||||
@mock.patch("documents.consumer.load_classifier")
|
||||
def test_consume_version_with_missing_actor_and_filename_without_suffix(
|
||||
self,
|
||||
m: mock.Mock,
|
||||
) -> None:
|
||||
m.return_value = MagicMock()
|
||||
|
||||
with self.get_consumer(self.get_test_file()) as consumer:
|
||||
consumer.run()
|
||||
|
||||
root_doc = Document.objects.first()
|
||||
self.assertIsNotNone(root_doc)
|
||||
assert root_doc is not None
|
||||
|
||||
version_file = self.get_test_file2()
|
||||
status = DummyProgressManager(version_file.name, None)
|
||||
overrides = DocumentMetadataOverrides(
|
||||
filename="version-upload",
|
||||
actor_id=999999,
|
||||
)
|
||||
doc = ConsumableDocument(
|
||||
DocumentSource.ApiUpload,
|
||||
original_file=version_file,
|
||||
root_document_id=root_doc.pk,
|
||||
)
|
||||
|
||||
preflight = ConsumerPreflightPlugin(
|
||||
doc,
|
||||
overrides,
|
||||
status, # type: ignore[arg-type]
|
||||
self.dirs.scratch_dir,
|
||||
"task-id",
|
||||
)
|
||||
preflight.setup()
|
||||
preflight.run()
|
||||
|
||||
consumer = ConsumerPlugin(
|
||||
doc,
|
||||
overrides,
|
||||
status, # type: ignore[arg-type]
|
||||
self.dirs.scratch_dir,
|
||||
"task-id",
|
||||
)
|
||||
consumer.setup()
|
||||
try:
|
||||
self.assertEqual(consumer.filename, "version-upload_v0")
|
||||
consumer.run()
|
||||
finally:
|
||||
consumer.cleanup()
|
||||
|
||||
version = (
|
||||
Document.objects.filter(root_document=root_doc).order_by("-id").first()
|
||||
)
|
||||
self.assertIsNotNone(version)
|
||||
assert version is not None
|
||||
self.assertEqual(version.original_filename, "version-upload_v0")
|
||||
self.assertTrue(bool(version.content))
|
||||
|
||||
@mock.patch("documents.consumer.load_classifier")
|
||||
def testClassifyDocument(self, m) -> None:
|
||||
correspondent = Correspondent.objects.create(
|
||||
@@ -1359,6 +1425,19 @@ class TestMetadataOverrides(TestCase):
|
||||
base.update(incoming)
|
||||
self.assertTrue(base.skip_asn_if_exists)
|
||||
|
||||
def test_update_actor_and_version_label(self) -> None:
|
||||
base = DocumentMetadataOverrides(
|
||||
actor_id=1,
|
||||
version_label="root",
|
||||
)
|
||||
incoming = DocumentMetadataOverrides(
|
||||
actor_id=2,
|
||||
version_label="v2",
|
||||
)
|
||||
base.update(incoming)
|
||||
self.assertEqual(base.actor_id, 2)
|
||||
self.assertEqual(base.version_label, "v2")
|
||||
|
||||
|
||||
class TestBarcodeApplyDetectedASN(TestCase):
|
||||
"""
|
||||
|
||||
87
src/documents/tests/test_version_conditionals.py
Normal file
87
src/documents/tests/test_version_conditionals.py
Normal file
@@ -0,0 +1,87 @@
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from documents.conditionals import _resolve_effective_doc
|
||||
from documents.conditionals import metadata_etag
|
||||
from documents.conditionals import preview_etag
|
||||
from documents.conditionals import thumbnail_last_modified
|
||||
from documents.models import Document
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
|
||||
|
||||
class TestConditionals(DirectoriesMixin, TestCase):
|
||||
def test_metadata_etag_uses_latest_version_for_root_request(self) -> None:
|
||||
root = Document.objects.create(
|
||||
title="root",
|
||||
checksum="root-checksum",
|
||||
archive_checksum="root-archive",
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
latest = Document.objects.create(
|
||||
title="v1",
|
||||
checksum="version-checksum",
|
||||
archive_checksum="version-archive",
|
||||
mime_type="application/pdf",
|
||||
root_document=root,
|
||||
)
|
||||
request = SimpleNamespace(query_params={})
|
||||
|
||||
self.assertEqual(metadata_etag(request, root.id), latest.checksum)
|
||||
self.assertEqual(preview_etag(request, root.id), latest.archive_checksum)
|
||||
|
||||
def test_resolve_effective_doc_returns_none_for_invalid_or_unrelated_version(
|
||||
self,
|
||||
) -> None:
|
||||
root = Document.objects.create(
|
||||
title="root",
|
||||
checksum="root",
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
other_root = Document.objects.create(
|
||||
title="other",
|
||||
checksum="other",
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
other_version = Document.objects.create(
|
||||
title="other-v1",
|
||||
checksum="other-v1",
|
||||
mime_type="application/pdf",
|
||||
root_document=other_root,
|
||||
)
|
||||
|
||||
invalid_request = SimpleNamespace(query_params={"version": "not-a-number"})
|
||||
unrelated_request = SimpleNamespace(
|
||||
query_params={"version": str(other_version.id)},
|
||||
)
|
||||
|
||||
self.assertIsNone(_resolve_effective_doc(root.id, invalid_request))
|
||||
self.assertIsNone(_resolve_effective_doc(root.id, unrelated_request))
|
||||
|
||||
def test_thumbnail_last_modified_uses_effective_document_for_cache_key(
|
||||
self,
|
||||
) -> None:
|
||||
root = Document.objects.create(
|
||||
title="root",
|
||||
checksum="root",
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
latest = Document.objects.create(
|
||||
title="v2",
|
||||
checksum="v2",
|
||||
mime_type="application/pdf",
|
||||
root_document=root,
|
||||
)
|
||||
latest.thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
latest.thumbnail_path.write_bytes(b"thumb")
|
||||
|
||||
request = SimpleNamespace(query_params={})
|
||||
with mock.patch(
|
||||
"documents.conditionals.get_thumbnail_modified_key",
|
||||
return_value="thumb-modified-key",
|
||||
) as get_thumb_key:
|
||||
result = thumbnail_last_modified(request, root.id)
|
||||
|
||||
self.assertIsNotNone(result)
|
||||
get_thumb_key.assert_called_once_with(latest.id)
|
||||
Reference in New Issue
Block a user