mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-10-02 01:42:50 -05:00
Merge branch 'dev' into feature-ai
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import types
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.admin.sites import AdminSite
|
||||
from django.contrib.auth.models import User
|
||||
@@ -7,7 +8,9 @@ from django.utils import timezone
|
||||
|
||||
from documents import index
|
||||
from documents.admin import DocumentAdmin
|
||||
from documents.admin import TagAdmin
|
||||
from documents.models import Document
|
||||
from documents.models import Tag
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from paperless.admin import PaperlessUserAdmin
|
||||
|
||||
@@ -70,6 +73,24 @@ class TestDocumentAdmin(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(self.doc_admin.created_(doc), "2020-04-12")
|
||||
|
||||
|
||||
class TestTagAdmin(DirectoriesMixin, TestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.tag_admin = TagAdmin(model=Tag, admin_site=AdminSite())
|
||||
|
||||
@patch("documents.tasks.bulk_update_documents")
|
||||
def test_parent_tags_get_added(self, mock_bulk_update):
|
||||
document = Document.objects.create(title="test")
|
||||
parent = Tag.objects.create(name="parent")
|
||||
child = Tag.objects.create(name="child")
|
||||
document.tags.add(child)
|
||||
|
||||
child.tn_parent = parent
|
||||
self.tag_admin.save_model(None, child, None, change=True)
|
||||
document.refresh_from_db()
|
||||
self.assertIn(parent, document.tags.all())
|
||||
|
||||
|
||||
class TestPaperlessAdmin(DirectoriesMixin, TestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
@@ -839,7 +839,7 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
m.assert_called()
|
||||
args, kwargs = m.call_args
|
||||
_, kwargs = m.call_args
|
||||
self.assertEqual(kwargs["merge"], False)
|
||||
|
||||
response = self.client.post(
|
||||
@@ -857,7 +857,7 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
m.assert_called()
|
||||
args, kwargs = m.call_args
|
||||
_, kwargs = m.call_args
|
||||
self.assertEqual(kwargs["merge"], True)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import json
|
||||
import shutil
|
||||
import tempfile
|
||||
import uuid
|
||||
@@ -1528,7 +1529,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
|
||||
input_doc, overrides = self.get_last_consume_delay_call_args()
|
||||
|
||||
new_overrides, msg = run_workflows(
|
||||
new_overrides, _ = run_workflows(
|
||||
trigger_type=WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
|
||||
document=input_doc,
|
||||
logging_group=None,
|
||||
@@ -1537,6 +1538,86 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
overrides.update(new_overrides)
|
||||
self.assertEqual(overrides.custom_fields, {cf.id: None, cf2.id: 123})
|
||||
|
||||
def test_upload_with_custom_field_values(self):
|
||||
"""
|
||||
GIVEN: A document with a source file
|
||||
WHEN: Upload the document with custom fields and values
|
||||
THEN: Metadata is set correctly
|
||||
"""
|
||||
self.consume_file_mock.return_value = celery.result.AsyncResult(
|
||||
id=str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
cf_string = CustomField.objects.create(
|
||||
name="stringfield",
|
||||
data_type=CustomField.FieldDataType.STRING,
|
||||
)
|
||||
cf_int = CustomField.objects.create(
|
||||
name="intfield",
|
||||
data_type=CustomField.FieldDataType.INT,
|
||||
)
|
||||
|
||||
with (Path(__file__).parent / "samples" / "simple.pdf").open("rb") as f:
|
||||
response = self.client.post(
|
||||
"/api/documents/post_document/",
|
||||
{
|
||||
"document": f,
|
||||
"custom_fields": json.dumps(
|
||||
{
|
||||
str(cf_string.id): "a string",
|
||||
str(cf_int.id): 123,
|
||||
},
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
self.consume_file_mock.assert_called_once()
|
||||
|
||||
input_doc, overrides = self.get_last_consume_delay_call_args()
|
||||
|
||||
self.assertEqual(input_doc.original_file.name, "simple.pdf")
|
||||
self.assertEqual(overrides.filename, "simple.pdf")
|
||||
self.assertEqual(
|
||||
overrides.custom_fields,
|
||||
{cf_string.id: "a string", cf_int.id: 123},
|
||||
)
|
||||
|
||||
def test_upload_with_custom_fields_errors(self):
|
||||
"""
|
||||
GIVEN: A document with a source file
|
||||
WHEN: Upload the document with invalid custom fields payloads
|
||||
THEN: The upload is rejected
|
||||
"""
|
||||
self.consume_file_mock.return_value = celery.result.AsyncResult(
|
||||
id=str(uuid.uuid4()),
|
||||
)
|
||||
|
||||
error_payloads = [
|
||||
# Non-integer key in mapping
|
||||
{"custom_fields": json.dumps({"abc": "a string"})},
|
||||
# List with non-integer entry
|
||||
{"custom_fields": json.dumps(["abc"])},
|
||||
# Nonexistent id in mapping
|
||||
{"custom_fields": json.dumps({99999999: "a string"})},
|
||||
# Nonexistent id in list
|
||||
{"custom_fields": json.dumps([99999999])},
|
||||
# Invalid type (JSON string, not list/dict/int)
|
||||
{"custom_fields": json.dumps("not-a-supported-structure")},
|
||||
]
|
||||
|
||||
for payload in error_payloads:
|
||||
with (Path(__file__).parent / "samples" / "simple.pdf").open("rb") as f:
|
||||
data = {"document": f, **payload}
|
||||
response = self.client.post(
|
||||
"/api/documents/post_document/",
|
||||
data,
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
self.consume_file_mock.assert_not_called()
|
||||
|
||||
def test_upload_with_webui_source(self):
|
||||
"""
|
||||
GIVEN: A document with a source file
|
||||
@@ -1557,7 +1638,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
|
||||
self.consume_file_mock.assert_called_once()
|
||||
|
||||
input_doc, overrides = self.get_last_consume_delay_call_args()
|
||||
input_doc, _ = self.get_last_consume_delay_call_args()
|
||||
|
||||
self.assertEqual(input_doc.source, WorkflowTrigger.DocumentSourceChoices.WEB_UI)
|
||||
|
||||
|
@@ -614,14 +614,16 @@ class TestBarcodeNewConsume(
|
||||
self.assertIsNotFile(temp_copy)
|
||||
|
||||
# Check the split files exist
|
||||
# Check the original_path is set
|
||||
# Check the source is unchanged
|
||||
# Check the overrides are unchanged
|
||||
for (
|
||||
new_input_doc,
|
||||
new_doc_overrides,
|
||||
) in self.get_all_consume_delay_call_args():
|
||||
self.assertEqual(new_input_doc.source, DocumentSource.ConsumeFolder)
|
||||
self.assertIsFile(new_input_doc.original_file)
|
||||
self.assertEqual(new_input_doc.original_path, temp_copy)
|
||||
self.assertEqual(new_input_doc.source, DocumentSource.ConsumeFolder)
|
||||
self.assertEqual(overrides, new_doc_overrides)
|
||||
|
||||
|
||||
|
@@ -74,7 +74,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
)
|
||||
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 3)
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
|
||||
|
||||
def test_unset_correspondent(self):
|
||||
@@ -82,7 +82,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], None)
|
||||
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 0)
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
|
||||
|
||||
def test_set_document_type(self):
|
||||
@@ -93,7 +93,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
)
|
||||
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 3)
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
|
||||
|
||||
def test_unset_document_type(self):
|
||||
@@ -101,7 +101,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], None)
|
||||
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 0)
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
|
||||
|
||||
def test_set_document_storage_path(self):
|
||||
@@ -123,7 +123,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
|
||||
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
|
||||
|
||||
@@ -154,7 +154,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
|
||||
|
||||
self.async_task.assert_called()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
|
||||
|
||||
@@ -166,7 +166,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
)
|
||||
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 4)
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc3.id])
|
||||
|
||||
def test_remove_tag(self):
|
||||
@@ -174,7 +174,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
bulk_edit.remove_tag([self.doc1.id, self.doc3.id, self.doc4.id], self.t1.id)
|
||||
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 1)
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc4.id])
|
||||
|
||||
def test_modify_tags(self):
|
||||
@@ -191,7 +191,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
self.assertCountEqual(list(self.doc3.tags.all()), [self.t2, tag_unrelated])
|
||||
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
# TODO: doc3 should not be affected, but the query for that is rather complicated
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
|
||||
|
||||
@@ -248,7 +248,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
)
|
||||
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
|
||||
|
||||
def test_modify_custom_fields_with_values(self):
|
||||
@@ -325,7 +325,7 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
)
|
||||
|
||||
self.async_task.assert_called_once()
|
||||
args, kwargs = self.async_task.call_args
|
||||
_, kwargs = self.async_task.call_args
|
||||
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
|
||||
|
||||
# removal of document link cf, should also remove symmetric link
|
||||
|
@@ -209,6 +209,26 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
# assert that we have an error logged with this invalid file.
|
||||
error_logger.assert_called_once()
|
||||
|
||||
@mock.patch("documents.management.commands.document_consumer.logger.warning")
|
||||
def test_permission_error_on_prechecks(self, warning_logger):
|
||||
filepath = Path(self.dirs.consumption_dir) / "selinux.txt"
|
||||
filepath.touch()
|
||||
|
||||
original_stat = Path.stat
|
||||
|
||||
def raising_stat(self, *args, **kwargs):
|
||||
if self == filepath:
|
||||
raise PermissionError("Permission denied")
|
||||
return original_stat(self, *args, **kwargs)
|
||||
|
||||
with mock.patch("pathlib.Path.stat", new=raising_stat):
|
||||
document_consumer._consume(filepath)
|
||||
|
||||
warning_logger.assert_called_once()
|
||||
(args, _) = warning_logger.call_args
|
||||
self.assertIn("Permission denied", args[0])
|
||||
self.consume_file_mock.assert_not_called()
|
||||
|
||||
@override_settings(CONSUMPTION_DIR="does_not_exist")
|
||||
def test_consumption_directory_invalid(self):
|
||||
self.assertRaises(CommandError, call_command, "document_consumer", "--oneshot")
|
||||
|
@@ -206,3 +206,29 @@ class TestFuzzyMatchCommand(TestCase):
|
||||
self.assertEqual(Document.objects.count(), 2)
|
||||
self.assertIsNotNone(Document.objects.get(pk=1))
|
||||
self.assertIsNotNone(Document.objects.get(pk=2))
|
||||
|
||||
def test_empty_content(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- 2 documents exist, content is empty (pw-protected)
|
||||
WHEN:
|
||||
- Command is called
|
||||
THEN:
|
||||
- No matches are found
|
||||
"""
|
||||
Document.objects.create(
|
||||
checksum="BEEFCAFE",
|
||||
title="A",
|
||||
content="",
|
||||
mime_type="application/pdf",
|
||||
filename="test.pdf",
|
||||
)
|
||||
Document.objects.create(
|
||||
checksum="DEADBEAF",
|
||||
title="A",
|
||||
content="",
|
||||
mime_type="application/pdf",
|
||||
filename="other_test.pdf",
|
||||
)
|
||||
stdout, _ = self.call_command()
|
||||
self.assertIn("No matches found", stdout)
|
||||
|
@@ -123,14 +123,14 @@ class TestRetagger(DirectoriesMixin, TestCase):
|
||||
|
||||
def test_add_type(self):
|
||||
call_command("document_retagger", "--document_type")
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, _ = self.get_updated_docs()
|
||||
|
||||
self.assertEqual(d_first.document_type, self.doctype_first)
|
||||
self.assertEqual(d_second.document_type, self.doctype_second)
|
||||
|
||||
def test_add_correspondent(self):
|
||||
call_command("document_retagger", "--correspondent")
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, _ = self.get_updated_docs()
|
||||
|
||||
self.assertEqual(d_first.correspondent, self.correspondent_first)
|
||||
self.assertEqual(d_second.correspondent, self.correspondent_second)
|
||||
@@ -160,7 +160,7 @@ class TestRetagger(DirectoriesMixin, TestCase):
|
||||
|
||||
def test_add_tags_suggest(self):
|
||||
call_command("document_retagger", "--tags", "--suggest")
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, d_auto = self.get_updated_docs()
|
||||
|
||||
self.assertEqual(d_first.tags.count(), 0)
|
||||
self.assertEqual(d_second.tags.count(), 0)
|
||||
@@ -168,14 +168,14 @@ class TestRetagger(DirectoriesMixin, TestCase):
|
||||
|
||||
def test_add_type_suggest(self):
|
||||
call_command("document_retagger", "--document_type", "--suggest")
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, _ = self.get_updated_docs()
|
||||
|
||||
self.assertIsNone(d_first.document_type)
|
||||
self.assertIsNone(d_second.document_type)
|
||||
|
||||
def test_add_correspondent_suggest(self):
|
||||
call_command("document_retagger", "--correspondent", "--suggest")
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, _ = self.get_updated_docs()
|
||||
|
||||
self.assertIsNone(d_first.correspondent)
|
||||
self.assertIsNone(d_second.correspondent)
|
||||
@@ -187,7 +187,7 @@ class TestRetagger(DirectoriesMixin, TestCase):
|
||||
"--suggest",
|
||||
"--base-url=http://localhost",
|
||||
)
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, d_auto = self.get_updated_docs()
|
||||
|
||||
self.assertEqual(d_first.tags.count(), 0)
|
||||
self.assertEqual(d_second.tags.count(), 0)
|
||||
@@ -200,7 +200,7 @@ class TestRetagger(DirectoriesMixin, TestCase):
|
||||
"--suggest",
|
||||
"--base-url=http://localhost",
|
||||
)
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, _ = self.get_updated_docs()
|
||||
|
||||
self.assertIsNone(d_first.document_type)
|
||||
self.assertIsNone(d_second.document_type)
|
||||
@@ -212,7 +212,7 @@ class TestRetagger(DirectoriesMixin, TestCase):
|
||||
"--suggest",
|
||||
"--base-url=http://localhost",
|
||||
)
|
||||
d_first, d_second, d_unrelated, d_auto = self.get_updated_docs()
|
||||
d_first, d_second, _, _ = self.get_updated_docs()
|
||||
|
||||
self.assertIsNone(d_first.correspondent)
|
||||
self.assertIsNone(d_second.correspondent)
|
||||
|
@@ -4,6 +4,7 @@ import shutil
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
|
||||
@@ -281,6 +282,7 @@ class TestMigrateArchiveFilesErrors(DirectoriesMixin, TestMigrations):
|
||||
migrate_to = "1012_fix_archive_files"
|
||||
auto_migrate = False
|
||||
|
||||
@pytest.mark.skip(reason="Fails with migration tearDown util. Needs investigation.")
|
||||
def test_archive_missing(self):
|
||||
Document = self.apps.get_model("documents", "Document")
|
||||
|
||||
@@ -300,6 +302,7 @@ class TestMigrateArchiveFilesErrors(DirectoriesMixin, TestMigrations):
|
||||
self.performMigration,
|
||||
)
|
||||
|
||||
@pytest.mark.skip(reason="Fails with migration tearDown util. Needs investigation.")
|
||||
def test_parser_missing(self):
|
||||
Document = self.apps.get_model("documents", "Document")
|
||||
|
||||
|
205
src/documents/tests/test_tag_hierarchy.py
Normal file
205
src/documents/tests/test_tag_hierarchy.py
Normal file
@@ -0,0 +1,205 @@
|
||||
from unittest import mock
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from documents import bulk_edit
|
||||
from documents.models import Document
|
||||
from documents.models import Tag
|
||||
from documents.models import Workflow
|
||||
from documents.models import WorkflowAction
|
||||
from documents.models import WorkflowTrigger
|
||||
from documents.signals.handlers import run_workflows
|
||||
|
||||
|
||||
class TestTagHierarchy(APITestCase):
|
||||
def setUp(self):
|
||||
self.user = User.objects.create_superuser(username="admin")
|
||||
self.client.force_authenticate(user=self.user)
|
||||
|
||||
self.parent = Tag.objects.create(name="Parent")
|
||||
self.child = Tag.objects.create(name="Child", tn_parent=self.parent)
|
||||
|
||||
patcher = mock.patch("documents.bulk_edit.bulk_update_documents.delay")
|
||||
self.async_task = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
self.document = Document.objects.create(
|
||||
title="doc",
|
||||
content="",
|
||||
checksum="1",
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
|
||||
def test_document_api_add_child_adds_parent(self):
|
||||
self.client.patch(
|
||||
f"/api/documents/{self.document.pk}/",
|
||||
{"tags": [self.child.pk]},
|
||||
format="json",
|
||||
)
|
||||
self.document.refresh_from_db()
|
||||
tags = set(self.document.tags.values_list("pk", flat=True))
|
||||
assert tags == {self.parent.pk, self.child.pk}
|
||||
|
||||
def test_document_api_remove_parent_removes_children(self):
|
||||
self.document.add_nested_tags([self.parent, self.child])
|
||||
self.client.patch(
|
||||
f"/api/documents/{self.document.pk}/",
|
||||
{"tags": [self.child.pk]},
|
||||
format="json",
|
||||
)
|
||||
self.document.refresh_from_db()
|
||||
assert self.document.tags.count() == 0
|
||||
|
||||
def test_document_api_remove_parent_removes_child(self):
|
||||
self.document.add_nested_tags([self.child])
|
||||
self.client.patch(
|
||||
f"/api/documents/{self.document.pk}/",
|
||||
{"tags": []},
|
||||
format="json",
|
||||
)
|
||||
self.document.refresh_from_db()
|
||||
assert self.document.tags.count() == 0
|
||||
|
||||
def test_bulk_edit_respects_hierarchy(self):
|
||||
bulk_edit.add_tag([self.document.pk], self.child.pk)
|
||||
self.document.refresh_from_db()
|
||||
tags = set(self.document.tags.values_list("pk", flat=True))
|
||||
assert tags == {self.parent.pk, self.child.pk}
|
||||
|
||||
bulk_edit.remove_tag([self.document.pk], self.parent.pk)
|
||||
self.document.refresh_from_db()
|
||||
assert self.document.tags.count() == 0
|
||||
|
||||
bulk_edit.modify_tags([self.document.pk], [self.child.pk], [])
|
||||
self.document.refresh_from_db()
|
||||
tags = set(self.document.tags.values_list("pk", flat=True))
|
||||
assert tags == {self.parent.pk, self.child.pk}
|
||||
|
||||
bulk_edit.modify_tags([self.document.pk], [], [self.parent.pk])
|
||||
self.document.refresh_from_db()
|
||||
assert self.document.tags.count() == 0
|
||||
|
||||
def test_workflow_actions(self):
|
||||
workflow = Workflow.objects.create(name="wf", order=0)
|
||||
trigger = WorkflowTrigger.objects.create(
|
||||
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
||||
)
|
||||
assign_action = WorkflowAction.objects.create()
|
||||
assign_action.assign_tags.add(self.child)
|
||||
workflow.triggers.add(trigger)
|
||||
workflow.actions.add(assign_action)
|
||||
|
||||
run_workflows(trigger.type, self.document)
|
||||
self.document.refresh_from_db()
|
||||
tags = set(self.document.tags.values_list("pk", flat=True))
|
||||
assert tags == {self.parent.pk, self.child.pk}
|
||||
|
||||
# removal
|
||||
removal_action = WorkflowAction.objects.create(
|
||||
type=WorkflowAction.WorkflowActionType.REMOVAL,
|
||||
)
|
||||
removal_action.remove_tags.add(self.parent)
|
||||
workflow.actions.clear()
|
||||
workflow.actions.add(removal_action)
|
||||
|
||||
run_workflows(trigger.type, self.document)
|
||||
self.document.refresh_from_db()
|
||||
assert self.document.tags.count() == 0
|
||||
|
||||
def test_tag_view_parent_update_adds_parent_to_docs(self):
|
||||
orphan = Tag.objects.create(name="Orphan")
|
||||
self.document.tags.add(orphan)
|
||||
|
||||
self.client.patch(
|
||||
f"/api/tags/{orphan.pk}/",
|
||||
{"parent": self.parent.pk},
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.document.refresh_from_db()
|
||||
tags = set(self.document.tags.values_list("pk", flat=True))
|
||||
assert tags == {self.parent.pk, orphan.pk}
|
||||
|
||||
def test_cannot_set_parent_to_self(self):
|
||||
tag = Tag.objects.create(name="Selfie")
|
||||
resp = self.client.patch(
|
||||
f"/api/tags/{tag.pk}/",
|
||||
{"parent": tag.pk},
|
||||
format="json",
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "Cannot set itself as parent" in str(resp.data["parent"])
|
||||
|
||||
def test_cannot_set_parent_to_descendant(self):
|
||||
a = Tag.objects.create(name="A")
|
||||
b = Tag.objects.create(name="B", tn_parent=a)
|
||||
c = Tag.objects.create(name="C", tn_parent=b)
|
||||
|
||||
# Attempt to set A's parent to C (descendant) should fail
|
||||
resp = self.client.patch(
|
||||
f"/api/tags/{a.pk}/",
|
||||
{"parent": c.pk},
|
||||
format="json",
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "Cannot set parent to a descendant" in str(resp.data["parent"])
|
||||
|
||||
def test_max_depth_on_create(self):
|
||||
a = Tag.objects.create(name="A1")
|
||||
b = Tag.objects.create(name="B1", tn_parent=a)
|
||||
c = Tag.objects.create(name="C1", tn_parent=b)
|
||||
d = Tag.objects.create(name="D1", tn_parent=c)
|
||||
|
||||
# Creating E under D yields depth 5: allowed
|
||||
resp_ok = self.client.post(
|
||||
"/api/tags/",
|
||||
{"name": "E1", "parent": d.pk},
|
||||
format="json",
|
||||
)
|
||||
assert resp_ok.status_code in (200, 201)
|
||||
e_id = (
|
||||
resp_ok.data["id"] if resp_ok.status_code == 201 else resp_ok.data.get("id")
|
||||
)
|
||||
assert e_id is not None
|
||||
|
||||
# Creating F under E would yield depth 6: rejected
|
||||
resp_fail = self.client.post(
|
||||
"/api/tags/",
|
||||
{"name": "F1", "parent": e_id},
|
||||
format="json",
|
||||
)
|
||||
assert resp_fail.status_code == 400
|
||||
assert "parent" in resp_fail.data
|
||||
assert "Invalid" in str(resp_fail.data["parent"])
|
||||
|
||||
def test_max_depth_on_move_subtree(self):
|
||||
a = Tag.objects.create(name="A2")
|
||||
b = Tag.objects.create(name="B2", tn_parent=a)
|
||||
c = Tag.objects.create(name="C2", tn_parent=b)
|
||||
d = Tag.objects.create(name="D2", tn_parent=c)
|
||||
|
||||
x = Tag.objects.create(name="X2")
|
||||
y = Tag.objects.create(name="Y2", tn_parent=x)
|
||||
assert y.parent_pk == x.pk
|
||||
|
||||
# Moving X under D would make deepest node Y exceed depth 5 -> reject
|
||||
resp_fail = self.client.patch(
|
||||
f"/api/tags/{x.pk}/",
|
||||
{"parent": d.pk},
|
||||
format="json",
|
||||
)
|
||||
assert resp_fail.status_code == 400
|
||||
assert "Maximum nesting depth exceeded" in str(
|
||||
resp_fail.data["non_field_errors"],
|
||||
)
|
||||
|
||||
# Moving X under C (depth 3) should be allowed (deepest becomes 5)
|
||||
resp_ok = self.client.patch(
|
||||
f"/api/tags/{x.pk}/",
|
||||
{"parent": c.pk},
|
||||
format="json",
|
||||
)
|
||||
assert resp_ok.status_code in (200, 202)
|
||||
x.refresh_from_db()
|
||||
assert x.parent_pk == c.id
|
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import tempfile
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
@@ -5,11 +6,15 @@ from unittest.mock import MagicMock
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
from django.db import connection
|
||||
from django.test import TestCase
|
||||
from django.test import override_settings
|
||||
from django.test.utils import CaptureQueriesContext
|
||||
from django.utils import timezone
|
||||
from guardian.shortcuts import assign_perm
|
||||
from rest_framework import status
|
||||
|
||||
from documents.caching import get_llm_suggestion_cache
|
||||
@@ -164,6 +169,116 @@ class TestViews(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(response.request["PATH_INFO"], "/accounts/login/")
|
||||
self.assertContains(response, b"Share link has expired")
|
||||
|
||||
def test_list_with_full_permissions(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Tags with different permissions
|
||||
WHEN:
|
||||
- Request to get tag list with full permissions is made
|
||||
THEN:
|
||||
- Tag list is returned with the right permission information
|
||||
"""
|
||||
user2 = User.objects.create(username="user2")
|
||||
user3 = User.objects.create(username="user3")
|
||||
group1 = Group.objects.create(name="group1")
|
||||
group2 = Group.objects.create(name="group2")
|
||||
group3 = Group.objects.create(name="group3")
|
||||
t1 = Tag.objects.create(name="invoice", pk=1)
|
||||
assign_perm("view_tag", self.user, t1)
|
||||
assign_perm("view_tag", user2, t1)
|
||||
assign_perm("view_tag", user3, t1)
|
||||
assign_perm("view_tag", group1, t1)
|
||||
assign_perm("view_tag", group2, t1)
|
||||
assign_perm("view_tag", group3, t1)
|
||||
assign_perm("change_tag", self.user, t1)
|
||||
assign_perm("change_tag", user2, t1)
|
||||
assign_perm("change_tag", group1, t1)
|
||||
assign_perm("change_tag", group2, t1)
|
||||
|
||||
Tag.objects.create(name="bank statement", pk=2)
|
||||
d1 = Document.objects.create(
|
||||
title="Invoice 1",
|
||||
content="This is the invoice of a very expensive item",
|
||||
checksum="A",
|
||||
)
|
||||
d1.tags.add(t1)
|
||||
d2 = Document.objects.create(
|
||||
title="Invoice 2",
|
||||
content="Internet invoice, I should pay it to continue contributing",
|
||||
checksum="B",
|
||||
)
|
||||
d2.tags.add(t1)
|
||||
|
||||
view_permissions = Permission.objects.filter(
|
||||
codename__contains="view_tag",
|
||||
)
|
||||
self.user.user_permissions.add(*view_permissions)
|
||||
self.user.save()
|
||||
|
||||
self.client.force_login(self.user)
|
||||
response = self.client.get("/api/tags/?page=1&full_perms=true")
|
||||
results = json.loads(response.content)["results"]
|
||||
for tag in results:
|
||||
if tag["name"] == "invoice":
|
||||
assert tag["permissions"] == {
|
||||
"view": {
|
||||
"users": [self.user.pk, user2.pk, user3.pk],
|
||||
"groups": [group1.pk, group2.pk, group3.pk],
|
||||
},
|
||||
"change": {
|
||||
"users": [self.user.pk, user2.pk],
|
||||
"groups": [group1.pk, group2.pk],
|
||||
},
|
||||
}
|
||||
elif tag["name"] == "bank statement":
|
||||
assert tag["permissions"] == {
|
||||
"view": {"users": [], "groups": []},
|
||||
"change": {"users": [], "groups": []},
|
||||
}
|
||||
else:
|
||||
assert False, f"Unexpected tag found: {tag['name']}"
|
||||
|
||||
def test_list_no_n_plus_1_queries(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Tags with different permissions
|
||||
WHEN:
|
||||
- Request to get tag list with full permissions is made
|
||||
THEN:
|
||||
- Permissions are not queries in database tag by tag,
|
||||
i.e. there are no N+1 queries
|
||||
"""
|
||||
view_permissions = Permission.objects.filter(
|
||||
codename__contains="view_tag",
|
||||
)
|
||||
self.user.user_permissions.add(*view_permissions)
|
||||
self.user.save()
|
||||
self.client.force_login(self.user)
|
||||
|
||||
# Start by a small list, and count the number of SQL queries
|
||||
for i in range(2):
|
||||
Tag.objects.create(name=f"tag_{i}")
|
||||
|
||||
with CaptureQueriesContext(connection) as ctx_small:
|
||||
response_small = self.client.get("/api/tags/?full_perms=true")
|
||||
assert response_small.status_code == 200
|
||||
num_queries_small = len(ctx_small.captured_queries)
|
||||
|
||||
# Complete the list, and count the number of SQL queries again
|
||||
for i in range(2, 50):
|
||||
Tag.objects.create(name=f"tag_{i}")
|
||||
|
||||
with CaptureQueriesContext(connection) as ctx_large:
|
||||
response_large = self.client.get("/api/tags/?full_perms=true")
|
||||
assert response_large.status_code == 200
|
||||
num_queries_large = len(ctx_large.captured_queries)
|
||||
|
||||
# A few additional queries are allowed, but not a linear explosion
|
||||
assert num_queries_large <= num_queries_small + 5, (
|
||||
f"Possible N+1 queries detected: {num_queries_small} queries for 2 tags, "
|
||||
f"but {num_queries_large} queries for 50 tags"
|
||||
)
|
||||
|
||||
|
||||
class TestAISuggestions(DirectoriesMixin, TestCase):
|
||||
def setUp(self):
|
||||
|
@@ -327,6 +327,19 @@ class TestMigrations(TransactionTestCase):
|
||||
def setUpBeforeMigration(self, apps):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
"""
|
||||
Ensure the database schema is restored to the latest migration after
|
||||
each migration test, so subsequent tests run against HEAD.
|
||||
"""
|
||||
try:
|
||||
executor = MigrationExecutor(connection)
|
||||
executor.loader.build_graph()
|
||||
targets = executor.loader.graph.leaf_nodes()
|
||||
executor.migrate(targets)
|
||||
finally:
|
||||
super().tearDown()
|
||||
|
||||
|
||||
class SampleDirMixin:
|
||||
SAMPLE_DIR = Path(__file__).parent / "samples"
|
||||
|
Reference in New Issue
Block a user