diff --git a/src/documents/tests/test_api_document_versions.py b/src/documents/tests/test_api_document_versions.py index 38d5f5810..34228b209 100644 --- a/src/documents/tests/test_api_document_versions.py +++ b/src/documents/tests/test_api_document_versions.py @@ -1,17 +1,21 @@ from __future__ import annotations from typing import TYPE_CHECKING +from unittest import TestCase from unittest import mock from auditlog.models import LogEntry # type: ignore[import-untyped] from django.contrib.auth.models import Permission from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import FieldError from django.core.files.uploadedfile import SimpleUploadedFile from rest_framework import status from rest_framework.test import APITestCase from documents.data_models import DocumentSource +from documents.filters import EffectiveContentFilter +from documents.filters import TitleContentFilter from documents.models import Document from documents.tests.utils import DirectoriesMixin @@ -393,6 +397,28 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase): self.assertEqual(resp.status_code, status.HTTP_200_OK) self.assertTrue(metadata.called) + def test_metadata_version_param_errors(self) -> None: + root = self._create_pdf(title="root", checksum="root") + + resp = self.client.get( + f"/api/documents/{root.id}/metadata/?version=not-a-number", + ) + self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) + + resp = self.client.get(f"/api/documents/{root.id}/metadata/?version=9999") + self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) + + other_root = self._create_pdf(title="other", checksum="other") + other_version = self._create_pdf( + title="other-v1", + checksum="other-v1", + root_document=other_root, + ) + resp = self.client.get( + f"/api/documents/{root.id}/metadata/?version={other_version.id}", + ) + self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) + def test_metadata_returns_403_when_user_lacks_permission(self) -> None: owner = User.objects.create_user(username="owner") other = User.objects.create_user(username="other") @@ -613,3 +639,39 @@ class TestDocumentVersioningApi(DirectoriesMixin, APITestCase): self.assertEqual(resp.status_code, status.HTTP_200_OK) self.assertEqual(resp.data["content"], "v1-content") + + +class TestVersionAwareFilters(TestCase): + def test_title_content_filter_falls_back_to_content(self) -> None: + queryset = mock.Mock() + fallback_queryset = mock.Mock() + queryset.filter.side_effect = [FieldError("missing field"), fallback_queryset] + + result = TitleContentFilter().filter(queryset, " latest ") + + self.assertIs(result, fallback_queryset) + self.assertEqual(queryset.filter.call_count, 2) + + def test_effective_content_filter_falls_back_to_content_lookup(self) -> None: + queryset = mock.Mock() + fallback_queryset = mock.Mock() + queryset.filter.side_effect = [FieldError("missing field"), fallback_queryset] + + result = EffectiveContentFilter(lookup_expr="icontains").filter( + queryset, + " latest ", + ) + + self.assertIs(result, fallback_queryset) + first_kwargs = queryset.filter.call_args_list[0].kwargs + second_kwargs = queryset.filter.call_args_list[1].kwargs + self.assertEqual(first_kwargs, {"effective_content__icontains": "latest"}) + self.assertEqual(second_kwargs, {"content__icontains": "latest"}) + + def test_effective_content_filter_returns_input_for_empty_values(self) -> None: + queryset = mock.Mock() + + result = EffectiveContentFilter(lookup_expr="icontains").filter(queryset, " ") + + self.assertIs(result, queryset) + queryset.filter.assert_not_called() diff --git a/src/documents/tests/test_api_documents.py b/src/documents/tests/test_api_documents.py index f4640ccd5..83a62f767 100644 --- a/src/documents/tests/test_api_documents.py +++ b/src/documents/tests/test_api_documents.py @@ -1242,6 +1242,38 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase): self.assertIsNone(overrides.document_type_id) self.assertIsNone(overrides.tag_ids) + def test_document_filters_use_latest_version_content(self) -> None: + root = Document.objects.create( + title="versioned root", + checksum="root", + mime_type="application/pdf", + content="root-content", + ) + version = Document.objects.create( + title="versioned root", + checksum="v1", + mime_type="application/pdf", + root_document=root, + content="latest-version-content", + ) + + response = self.client.get( + "/api/documents/?content__icontains=latest-version-content", + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + results = response.data["results"] + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["id"], root.id) + self.assertEqual(results[0]["content"], version.content) + + response = self.client.get( + "/api/documents/?title_content=latest-version-content", + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + results = response.data["results"] + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["id"], root.id) + def test_create_wrong_endpoint(self) -> None: response = self.client.post( "/api/documents/", diff --git a/src/documents/tests/test_bulk_edit.py b/src/documents/tests/test_bulk_edit.py index 4a509d0fe..40e247412 100644 --- a/src/documents/tests/test_bulk_edit.py +++ b/src/documents/tests/test_bulk_edit.py @@ -381,6 +381,55 @@ class TestBulkEdit(DirectoriesMixin, TestCase): [self.doc3.id, self.doc4.id, self.doc5.id], ) + def test_delete_root_document_deletes_all_versions(self) -> None: + version = Document.objects.create( + checksum="A-v1", + title="A version", + root_document=self.doc1, + ) + + bulk_edit.delete([self.doc1.id]) + + self.assertFalse(Document.objects.filter(id=self.doc1.id).exists()) + self.assertFalse(Document.objects.filter(id=version.id).exists()) + + def test_delete_version_document_keeps_root(self) -> None: + version = Document.objects.create( + checksum="A-v1", + title="A version", + root_document=self.doc1, + ) + + bulk_edit.delete([version.id]) + + self.assertTrue(Document.objects.filter(id=self.doc1.id).exists()) + self.assertFalse(Document.objects.filter(id=version.id).exists()) + + def test_get_root_and_current_doc_mapping(self) -> None: + version1 = Document.objects.create( + checksum="B-v1", + title="B version 1", + root_document=self.doc2, + ) + version2 = Document.objects.create( + checksum="B-v2", + title="B version 2", + root_document=self.doc2, + ) + + root_ids_by_doc_id = bulk_edit._get_root_ids_by_doc_id( + [self.doc2.id, version1.id, version2.id], + ) + self.assertEqual(root_ids_by_doc_id[self.doc2.id], self.doc2.id) + self.assertEqual(root_ids_by_doc_id[version1.id], self.doc2.id) + self.assertEqual(root_ids_by_doc_id[version2.id], self.doc2.id) + + root_docs, current_docs = bulk_edit._get_root_and_current_docs_by_root_id( + {self.doc2.id}, + ) + self.assertEqual(root_docs[self.doc2.id].id, self.doc2.id) + self.assertEqual(current_docs[self.doc2.id].id, version2.id) + @mock.patch("documents.tasks.bulk_update_documents.delay") def test_set_permissions(self, m) -> None: doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id] diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 554bf18a5..01cf93c62 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -98,11 +98,18 @@ class FaultyGenericExceptionParser(_BaseTestParser): raise Exception("Generic exception.") -def fake_magic_from_file(file, *, mime=False): +def fake_magic_from_file(file, *, mime=False): # NOSONAR if mime: filepath = Path(file) if filepath.name.startswith("invalid_pdf"): return "application/octet-stream" + if filepath.suffix == "": + try: + with Path(filepath).open("rb") as handle: + if handle.read(4) == b"%PDF": + return "application/pdf" + except OSError: + pass if filepath.suffix == ".pdf": return "application/pdf" elif filepath.suffix == ".png": @@ -747,6 +754,65 @@ class TestConsumer( self.assertTrue(version.original_filename.endswith("_v0.pdf")) self.assertTrue(bool(version.content)) + @override_settings(AUDIT_LOG_ENABLED=True) + @mock.patch("documents.consumer.load_classifier") + def test_consume_version_with_missing_actor_and_filename_without_suffix( + self, + m: mock.Mock, + ) -> None: + m.return_value = MagicMock() + + with self.get_consumer(self.get_test_file()) as consumer: + consumer.run() + + root_doc = Document.objects.first() + self.assertIsNotNone(root_doc) + assert root_doc is not None + + version_file = self.get_test_file2() + status = DummyProgressManager(version_file.name, None) + overrides = DocumentMetadataOverrides( + filename="version-upload", + actor_id=999999, + ) + doc = ConsumableDocument( + DocumentSource.ApiUpload, + original_file=version_file, + root_document_id=root_doc.pk, + ) + + preflight = ConsumerPreflightPlugin( + doc, + overrides, + status, # type: ignore[arg-type] + self.dirs.scratch_dir, + "task-id", + ) + preflight.setup() + preflight.run() + + consumer = ConsumerPlugin( + doc, + overrides, + status, # type: ignore[arg-type] + self.dirs.scratch_dir, + "task-id", + ) + consumer.setup() + try: + self.assertEqual(consumer.filename, "version-upload_v0") + consumer.run() + finally: + consumer.cleanup() + + version = ( + Document.objects.filter(root_document=root_doc).order_by("-id").first() + ) + self.assertIsNotNone(version) + assert version is not None + self.assertEqual(version.original_filename, "version-upload_v0") + self.assertTrue(bool(version.content)) + @mock.patch("documents.consumer.load_classifier") def testClassifyDocument(self, m) -> None: correspondent = Correspondent.objects.create( @@ -1359,6 +1425,19 @@ class TestMetadataOverrides(TestCase): base.update(incoming) self.assertTrue(base.skip_asn_if_exists) + def test_update_actor_and_version_label(self) -> None: + base = DocumentMetadataOverrides( + actor_id=1, + version_label="root", + ) + incoming = DocumentMetadataOverrides( + actor_id=2, + version_label="v2", + ) + base.update(incoming) + self.assertEqual(base.actor_id, 2) + self.assertEqual(base.version_label, "v2") + class TestBarcodeApplyDetectedASN(TestCase): """ diff --git a/src/documents/tests/test_version_conditionals.py b/src/documents/tests/test_version_conditionals.py new file mode 100644 index 000000000..af82fe4f0 --- /dev/null +++ b/src/documents/tests/test_version_conditionals.py @@ -0,0 +1,87 @@ +from types import SimpleNamespace +from unittest import mock + +from django.test import TestCase + +from documents.conditionals import _resolve_effective_doc +from documents.conditionals import metadata_etag +from documents.conditionals import preview_etag +from documents.conditionals import thumbnail_last_modified +from documents.models import Document +from documents.tests.utils import DirectoriesMixin + + +class TestConditionals(DirectoriesMixin, TestCase): + def test_metadata_etag_uses_latest_version_for_root_request(self) -> None: + root = Document.objects.create( + title="root", + checksum="root-checksum", + archive_checksum="root-archive", + mime_type="application/pdf", + ) + latest = Document.objects.create( + title="v1", + checksum="version-checksum", + archive_checksum="version-archive", + mime_type="application/pdf", + root_document=root, + ) + request = SimpleNamespace(query_params={}) + + self.assertEqual(metadata_etag(request, root.id), latest.checksum) + self.assertEqual(preview_etag(request, root.id), latest.archive_checksum) + + def test_resolve_effective_doc_returns_none_for_invalid_or_unrelated_version( + self, + ) -> None: + root = Document.objects.create( + title="root", + checksum="root", + mime_type="application/pdf", + ) + other_root = Document.objects.create( + title="other", + checksum="other", + mime_type="application/pdf", + ) + other_version = Document.objects.create( + title="other-v1", + checksum="other-v1", + mime_type="application/pdf", + root_document=other_root, + ) + + invalid_request = SimpleNamespace(query_params={"version": "not-a-number"}) + unrelated_request = SimpleNamespace( + query_params={"version": str(other_version.id)}, + ) + + self.assertIsNone(_resolve_effective_doc(root.id, invalid_request)) + self.assertIsNone(_resolve_effective_doc(root.id, unrelated_request)) + + def test_thumbnail_last_modified_uses_effective_document_for_cache_key( + self, + ) -> None: + root = Document.objects.create( + title="root", + checksum="root", + mime_type="application/pdf", + ) + latest = Document.objects.create( + title="v2", + checksum="v2", + mime_type="application/pdf", + root_document=root, + ) + latest.thumbnail_path.parent.mkdir(parents=True, exist_ok=True) + latest.thumbnail_path.write_bytes(b"thumb") + + request = SimpleNamespace(query_params={}) + with mock.patch( + "documents.conditionals.get_thumbnail_modified_key", + return_value="thumb-modified-key", + ) as get_thumb_key: + result = thumbnail_last_modified(request, root.id) + + self.assertIsNotNone(result) + get_thumb_key.assert_called_once_with(latest.id)