mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-10-08 02:06:16 -05:00
More backend coverage
This commit is contained in:
@@ -48,6 +48,7 @@ from documents.models import WorkflowActionEmail
|
|||||||
from documents.models import WorkflowActionWebhook
|
from documents.models import WorkflowActionWebhook
|
||||||
from documents.models import WorkflowRun
|
from documents.models import WorkflowRun
|
||||||
from documents.models import WorkflowTrigger
|
from documents.models import WorkflowTrigger
|
||||||
|
from documents.serialisers import WorkflowTriggerSerializer
|
||||||
from documents.signals import document_consumption_finished
|
from documents.signals import document_consumption_finished
|
||||||
from documents.tests.utils import DirectoriesMixin
|
from documents.tests.utils import DirectoriesMixin
|
||||||
from documents.tests.utils import DummyProgressManager
|
from documents.tests.utils import DummyProgressManager
|
||||||
@@ -1377,6 +1378,103 @@ class TestWorkflows(
|
|||||||
self.assertIn(doc1, filtered)
|
self.assertIn(doc1, filtered)
|
||||||
self.assertNotIn(doc2, filtered)
|
self.assertNotIn(doc2, filtered)
|
||||||
|
|
||||||
|
def test_consumption_trigger_requires_filter_configuration(self):
|
||||||
|
serializer = WorkflowTriggerSerializer(
|
||||||
|
data={
|
||||||
|
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertFalse(serializer.is_valid())
|
||||||
|
errors = serializer.errors.get("non_field_errors", [])
|
||||||
|
self.assertIn(
|
||||||
|
"File name, path or mail rule filter are required",
|
||||||
|
[str(error) for error in errors],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_existing_document_invalid_custom_field_query_configuration(self):
|
||||||
|
trigger = WorkflowTrigger.objects.create(
|
||||||
|
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
||||||
|
filter_custom_field_query="{ not json",
|
||||||
|
)
|
||||||
|
|
||||||
|
document = Document.objects.create(
|
||||||
|
title="doc invalid query",
|
||||||
|
original_filename="invalid.pdf",
|
||||||
|
checksum="checksum-invalid-query",
|
||||||
|
)
|
||||||
|
|
||||||
|
matched, reason = existing_document_matches_workflow(document, trigger)
|
||||||
|
self.assertFalse(matched)
|
||||||
|
self.assertEqual(reason, "Invalid custom field query configuration")
|
||||||
|
|
||||||
|
def test_prefilter_documents_returns_none_for_invalid_custom_field_query(self):
|
||||||
|
trigger = WorkflowTrigger.objects.create(
|
||||||
|
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
||||||
|
filter_custom_field_query="{ not json",
|
||||||
|
)
|
||||||
|
|
||||||
|
Document.objects.create(
|
||||||
|
title="doc",
|
||||||
|
original_filename="doc.pdf",
|
||||||
|
checksum="checksum-prefilter-invalid",
|
||||||
|
)
|
||||||
|
|
||||||
|
filtered = prefilter_documents_by_workflowtrigger(
|
||||||
|
Document.objects.all(),
|
||||||
|
trigger,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(list(filtered), [])
|
||||||
|
|
||||||
|
def test_prefilter_documents_applies_all_filters(self):
|
||||||
|
other_document_type = DocumentType.objects.create(name="Other Type")
|
||||||
|
other_storage_path = StoragePath.objects.create(
|
||||||
|
name="Blocked path",
|
||||||
|
path="/blocked/",
|
||||||
|
)
|
||||||
|
|
||||||
|
trigger = WorkflowTrigger.objects.create(
|
||||||
|
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
||||||
|
filter_has_correspondent=self.c,
|
||||||
|
filter_has_document_type=self.dt,
|
||||||
|
filter_has_storage_path=self.sp,
|
||||||
|
)
|
||||||
|
trigger.filter_has_tags.set([self.t1])
|
||||||
|
trigger.filter_has_all_tags.set([self.t1, self.t2])
|
||||||
|
trigger.filter_has_not_tags.set([self.t3])
|
||||||
|
trigger.filter_has_not_correspondents.set([self.c2])
|
||||||
|
trigger.filter_has_not_document_types.set([other_document_type])
|
||||||
|
trigger.filter_has_not_storage_paths.set([other_storage_path])
|
||||||
|
|
||||||
|
allowed_document = Document.objects.create(
|
||||||
|
title="allowed",
|
||||||
|
correspondent=self.c,
|
||||||
|
document_type=self.dt,
|
||||||
|
storage_path=self.sp,
|
||||||
|
original_filename="allow.pdf",
|
||||||
|
checksum="checksum-prefilter-allowed",
|
||||||
|
)
|
||||||
|
allowed_document.tags.set([self.t1, self.t2])
|
||||||
|
|
||||||
|
blocked_document = Document.objects.create(
|
||||||
|
title="blocked",
|
||||||
|
correspondent=self.c2,
|
||||||
|
document_type=other_document_type,
|
||||||
|
storage_path=other_storage_path,
|
||||||
|
original_filename="block.pdf",
|
||||||
|
checksum="checksum-prefilter-blocked",
|
||||||
|
)
|
||||||
|
blocked_document.tags.set([self.t1, self.t3])
|
||||||
|
|
||||||
|
filtered = prefilter_documents_by_workflowtrigger(
|
||||||
|
Document.objects.all(),
|
||||||
|
trigger,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIn(allowed_document, filtered)
|
||||||
|
self.assertNotIn(blocked_document, filtered)
|
||||||
|
|
||||||
def test_document_added_no_match_doctype(self):
|
def test_document_added_no_match_doctype(self):
|
||||||
trigger = WorkflowTrigger.objects.create(
|
trigger = WorkflowTrigger.objects.create(
|
||||||
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
|
||||||
|
Reference in New Issue
Block a user