Merge branch 'dev' into feature-ai

This commit is contained in:
shamoon
2025-10-15 16:32:55 -07:00
40 changed files with 3415 additions and 615 deletions

View File

@@ -3022,7 +3022,8 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
)
self.assertEqual(len(mail.outbox), 1)
self.assertEqual(mail.outbox[0].attachments[0][0], "archive.pdf")
expected_filename = f"{doc.created} test.pdf"
self.assertEqual(mail.outbox[0].attachments[0][0], expected_filename)
self.client.post(
f"/api/documents/{doc2.pk}/email/",
@@ -3035,7 +3036,8 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
)
self.assertEqual(len(mail.outbox), 2)
self.assertEqual(mail.outbox[1].attachments[0][0], "test2.pdf")
expected_filename2 = f"{doc2.created} test2.pdf"
self.assertEqual(mail.outbox[1].attachments[0][0], expected_filename2)
@mock.patch("django.core.mail.message.EmailMessage.send", side_effect=Exception)
def test_email_document_errors(self, mocked_send):
@@ -3093,7 +3095,7 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
"message": "hello",
},
)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
resp = self.client.post(
f"/api/documents/{doc.pk}/email/",

View File

@@ -0,0 +1,411 @@
import json
import shutil
from unittest import mock
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from django.core import mail
from django.test import override_settings
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import Document
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import SampleDirMixin
class TestEmail(DirectoriesMixin, SampleDirMixin, APITestCase):
ENDPOINT = "/api/documents/email/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
self.doc1 = Document.objects.create(
title="test1",
mime_type="application/pdf",
content="this is document 1",
checksum="1",
filename="test1.pdf",
archive_checksum="A1",
archive_filename="archive1.pdf",
)
self.doc2 = Document.objects.create(
title="test2",
mime_type="application/pdf",
content="this is document 2",
checksum="2",
filename="test2.pdf",
)
# Copy sample files to document paths (using different files to distinguish versions)
shutil.copy(
self.SAMPLE_DIR / "documents" / "originals" / "0000001.pdf",
self.doc1.archive_path,
)
shutil.copy(
self.SAMPLE_DIR / "documents" / "originals" / "0000002.pdf",
self.doc1.source_path,
)
shutil.copy(
self.SAMPLE_DIR / "documents" / "originals" / "0000003.pdf",
self.doc2.source_path,
)
@override_settings(
EMAIL_ENABLED=True,
EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend",
)
def test_email_success(self):
"""
GIVEN:
- Multiple existing documents (doc1 with archive, doc2 without)
WHEN:
- API request is made to bulk email documents
THEN:
- Email is sent with all documents attached
- Archive version used by default for doc1
- Original version used for doc2 (no archive available)
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk, self.doc2.pk],
"addresses": "hello@paperless-ngx.com,test@example.com",
"subject": "Bulk email test",
"message": "Here are your documents",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["message"], "Email sent")
self.assertEqual(len(mail.outbox), 1)
email = mail.outbox[0]
self.assertEqual(email.to, ["hello@paperless-ngx.com", "test@example.com"])
self.assertEqual(email.subject, "Bulk email test")
self.assertEqual(email.body, "Here are your documents")
self.assertEqual(len(email.attachments), 2)
attachment_names = [att[0] for att in email.attachments]
self.assertEqual(len(attachment_names), 2)
self.assertIn(f"{self.doc1!s}.pdf", attachment_names)
self.assertIn(f"{self.doc2!s}.pdf", attachment_names)
doc1_attachment = next(
att for att in email.attachments if att[0] == f"{self.doc1!s}.pdf"
)
archive_size = self.doc1.archive_path.stat().st_size
self.assertEqual(len(doc1_attachment[1]), archive_size)
doc2_attachment = next(
att for att in email.attachments if att[0] == f"{self.doc2!s}.pdf"
)
original_size = self.doc2.source_path.stat().st_size
self.assertEqual(len(doc2_attachment[1]), original_size)
@override_settings(
EMAIL_ENABLED=True,
EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend",
)
def test_email_use_original_version(self):
"""
GIVEN:
- Documents with archive versions
WHEN:
- API request is made to bulk email with use_archive_version=False
THEN:
- Original files are attached instead of archive versions
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
"use_archive_version": False,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(mail.outbox), 1)
attachment = mail.outbox[0].attachments[0]
self.assertEqual(attachment[0], f"{self.doc1!s}.pdf")
original_size = self.doc1.source_path.stat().st_size
self.assertEqual(len(attachment[1]), original_size)
def test_email_missing_required_fields(self):
"""
GIVEN:
- Request with missing required fields
WHEN:
- API request is made to bulk email endpoint
THEN:
- Bad request response is returned
"""
# Missing addresses
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Missing subject
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"addresses": "test@example.com",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Missing message
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"addresses": "test@example.com",
"subject": "Test",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Missing documents
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_email_empty_document_list(self):
"""
GIVEN:
- Request with empty document list
WHEN:
- API request is made to bulk email endpoint
THEN:
- Bad request response is returned
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [],
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_email_invalid_document_id(self):
"""
GIVEN:
- Request with non-existent document ID
WHEN:
- API request is made to bulk email endpoint
THEN:
- Bad request response is returned
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [999],
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_email_invalid_email_address(self):
"""
GIVEN:
- Request with invalid email address
WHEN:
- API request is made to bulk email endpoint
THEN:
- Bad request response is returned
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"addresses": "invalid-email",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# Test multiple addresses with one invalid
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"addresses": "valid@example.com,invalid-email",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_email_insufficient_permissions(self):
"""
GIVEN:
- User without permissions to view document
WHEN:
- API request is made to bulk email documents
THEN:
- Forbidden response is returned
"""
user1 = User.objects.create_user(username="test1")
user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
doc_owned = Document.objects.create(
title="owned_doc",
mime_type="application/pdf",
checksum="owned",
owner=self.user,
)
self.client.force_authenticate(user1)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk, doc_owned.pk],
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@override_settings(
EMAIL_ENABLED=True,
EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend",
)
def test_email_duplicate_filenames(self):
"""
GIVEN:
- Multiple documents with the same title
WHEN:
- API request is made to bulk email documents
THEN:
- Filenames are made unique with counters
"""
doc3 = Document.objects.create(
title="test1",
mime_type="application/pdf",
content="this is document 3",
checksum="3",
filename="test3.pdf",
)
shutil.copy(self.SAMPLE_DIR / "simple.pdf", doc3.source_path)
doc4 = Document.objects.create(
title="test1",
mime_type="application/pdf",
content="this is document 4",
checksum="4",
filename="test4.pdf",
)
shutil.copy(self.SAMPLE_DIR / "simple.pdf", doc4.source_path)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk, doc3.pk, doc4.pk],
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(mail.outbox), 1)
attachment_names = [att[0] for att in mail.outbox[0].attachments]
self.assertEqual(len(attachment_names), 3)
self.assertIn(f"{self.doc1!s}.pdf", attachment_names)
self.assertIn(f"{doc3!s}_01.pdf", attachment_names)
self.assertIn(f"{doc3!s}_02.pdf", attachment_names)
@mock.patch(
"django.core.mail.message.EmailMessage.send",
side_effect=Exception("Email error"),
)
def test_email_send_error(self, mocked_send):
"""
GIVEN:
- Existing documents
WHEN:
- API request is made to bulk email and error occurs during email send
THEN:
- Server error response is returned
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc1.pk],
"addresses": "test@example.com",
"subject": "Test",
"message": "Test message",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
self.assertIn("Error emailing documents", response.content.decode())

View File

@@ -184,6 +184,17 @@ class TestApiWorkflows(DirectoriesMixin, APITestCase):
"filter_filename": "*",
"filter_path": "*/samples/*",
"filter_has_tags": [self.t1.id],
"filter_has_all_tags": [self.t2.id],
"filter_has_not_tags": [self.t3.id],
"filter_has_not_correspondents": [self.c2.id],
"filter_has_not_document_types": [self.dt2.id],
"filter_has_not_storage_paths": [self.sp2.id],
"filter_custom_field_query": json.dumps(
[
"AND",
[[self.cf1.id, "exact", "value"]],
],
),
"filter_has_document_type": self.dt.id,
"filter_has_correspondent": self.c.id,
"filter_has_storage_path": self.sp.id,
@@ -223,6 +234,36 @@ class TestApiWorkflows(DirectoriesMixin, APITestCase):
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(Workflow.objects.count(), 2)
workflow = Workflow.objects.get(name="Workflow 2")
trigger = workflow.triggers.first()
self.assertSetEqual(
set(trigger.filter_has_tags.values_list("id", flat=True)),
{self.t1.id},
)
self.assertSetEqual(
set(trigger.filter_has_all_tags.values_list("id", flat=True)),
{self.t2.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_tags.values_list("id", flat=True)),
{self.t3.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_correspondents.values_list("id", flat=True)),
{self.c2.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_document_types.values_list("id", flat=True)),
{self.dt2.id},
)
self.assertSetEqual(
set(trigger.filter_has_not_storage_paths.values_list("id", flat=True)),
{self.sp2.id},
)
self.assertEqual(
trigger.filter_custom_field_query,
json.dumps(["AND", [[self.cf1.id, "exact", "value"]]]),
)
def test_api_create_invalid_workflow_trigger(self):
"""
@@ -376,6 +417,14 @@ class TestApiWorkflows(DirectoriesMixin, APITestCase):
{
"type": WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
"filter_has_tags": [self.t1.id],
"filter_has_all_tags": [self.t2.id],
"filter_has_not_tags": [self.t3.id],
"filter_has_not_correspondents": [self.c2.id],
"filter_has_not_document_types": [self.dt2.id],
"filter_has_not_storage_paths": [self.sp2.id],
"filter_custom_field_query": json.dumps(
["AND", [[self.cf1.id, "exact", "value"]]],
),
"filter_has_correspondent": self.c.id,
"filter_has_document_type": self.dt.id,
},
@@ -393,6 +442,30 @@ class TestApiWorkflows(DirectoriesMixin, APITestCase):
workflow = Workflow.objects.get(id=response.data["id"])
self.assertEqual(workflow.name, "Workflow Updated")
self.assertEqual(workflow.triggers.first().filter_has_tags.first(), self.t1)
self.assertEqual(
workflow.triggers.first().filter_has_all_tags.first(),
self.t2,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_tags.first(),
self.t3,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_correspondents.first(),
self.c2,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_document_types.first(),
self.dt2,
)
self.assertEqual(
workflow.triggers.first().filter_has_not_storage_paths.first(),
self.sp2,
)
self.assertEqual(
workflow.triggers.first().filter_custom_field_query,
json.dumps(["AND", [[self.cf1.id, "exact", "value"]]]),
)
self.assertEqual(workflow.actions.first().assign_title, "Action New Title")
def test_api_update_workflow_no_trigger_actions(self):

View File

@@ -1,4 +1,5 @@
import datetime
import json
import shutil
import socket
from datetime import timedelta
@@ -31,6 +32,7 @@ from documents import tasks
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentSource
from documents.matching import document_matches_workflow
from documents.matching import existing_document_matches_workflow
from documents.matching import prefilter_documents_by_workflowtrigger
from documents.models import Correspondent
from documents.models import CustomField
@@ -46,6 +48,7 @@ from documents.models import WorkflowActionEmail
from documents.models import WorkflowActionWebhook
from documents.models import WorkflowRun
from documents.models import WorkflowTrigger
from documents.serialisers import WorkflowTriggerSerializer
from documents.signals import document_consumption_finished
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import DummyProgressManager
@@ -1080,9 +1083,409 @@ class TestWorkflows(
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = f"Document tags {doc.tags.all()} do not include {trigger.filter_has_tags.all()}"
expected_str = f"Document tags {list(doc.tags.all())} do not include {list(trigger.filter_has_tags.all())}"
self.assertIn(expected_str, cm.output[1])
def test_document_added_no_match_all_tags(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
)
trigger.filter_has_all_tags.set([self.t1, self.t2])
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
correspondent=self.c,
original_filename="sample.pdf",
)
doc.tags.set([self.t1])
doc.save()
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = (
f"Document tags {list(doc.tags.all())} do not contain all of"
f" {list(trigger.filter_has_all_tags.all())}"
)
self.assertIn(expected_str, cm.output[1])
def test_document_added_excluded_tags(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
)
trigger.filter_has_not_tags.set([self.t3])
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
correspondent=self.c,
original_filename="sample.pdf",
)
doc.tags.set([self.t3])
doc.save()
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = (
f"Document tags {list(doc.tags.all())} include excluded tags"
f" {list(trigger.filter_has_not_tags.all())}"
)
self.assertIn(expected_str, cm.output[1])
def test_document_added_excluded_correspondent(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
)
trigger.filter_has_not_correspondents.set([self.c])
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
correspondent=self.c,
original_filename="sample.pdf",
)
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = (
f"Document correspondent {doc.correspondent} is excluded by"
f" {list(trigger.filter_has_not_correspondents.all())}"
)
self.assertIn(expected_str, cm.output[1])
def test_document_added_excluded_document_types(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
)
trigger.filter_has_not_document_types.set([self.dt])
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
document_type=self.dt,
original_filename="sample.pdf",
)
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = (
f"Document doc type {doc.document_type} is excluded by"
f" {list(trigger.filter_has_not_document_types.all())}"
)
self.assertIn(expected_str, cm.output[1])
def test_document_added_excluded_storage_paths(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
)
trigger.filter_has_not_storage_paths.set([self.sp])
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
storage_path=self.sp,
original_filename="sample.pdf",
)
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = (
f"Document storage path {doc.storage_path} is excluded by"
f" {list(trigger.filter_has_not_storage_paths.all())}"
)
self.assertIn(expected_str, cm.output[1])
def test_document_added_custom_field_query_no_match(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_custom_field_query=json.dumps(
[
"AND",
[[self.cf1.id, "exact", "expected"]],
],
),
)
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
workflow = Workflow.objects.create(name="Workflow 1", order=0)
workflow.triggers.add(trigger)
workflow.actions.add(action)
workflow.save()
doc = Document.objects.create(
title="sample test",
correspondent=self.c,
original_filename="sample.pdf",
)
CustomFieldInstance.objects.create(
document=doc,
field=self.cf1,
value_text="other",
)
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {workflow}"
self.assertIn(expected_str, cm.output[0])
self.assertIn(
"Document custom fields do not match the configured custom field query",
cm.output[1],
)
def test_document_added_custom_field_query_match(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_custom_field_query=json.dumps(
[
"AND",
[[self.cf1.id, "exact", "expected"]],
],
),
)
doc = Document.objects.create(
title="sample test",
correspondent=self.c,
original_filename="sample.pdf",
)
CustomFieldInstance.objects.create(
document=doc,
field=self.cf1,
value_text="expected",
)
matched, reason = existing_document_matches_workflow(doc, trigger)
self.assertTrue(matched)
self.assertIsNone(reason)
def test_prefilter_documents_custom_field_query(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_custom_field_query=json.dumps(
[
"AND",
[[self.cf1.id, "exact", "match"]],
],
),
)
doc1 = Document.objects.create(
title="doc 1",
correspondent=self.c,
original_filename="doc1.pdf",
checksum="checksum1",
)
CustomFieldInstance.objects.create(
document=doc1,
field=self.cf1,
value_text="match",
)
doc2 = Document.objects.create(
title="doc 2",
correspondent=self.c,
original_filename="doc2.pdf",
checksum="checksum2",
)
CustomFieldInstance.objects.create(
document=doc2,
field=self.cf1,
value_text="different",
)
filtered = prefilter_documents_by_workflowtrigger(
Document.objects.all(),
trigger,
)
self.assertIn(doc1, filtered)
self.assertNotIn(doc2, filtered)
def test_consumption_trigger_requires_filter_configuration(self):
serializer = WorkflowTriggerSerializer(
data={
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
},
)
self.assertFalse(serializer.is_valid())
errors = serializer.errors.get("non_field_errors", [])
self.assertIn(
"File name, path or mail rule filter are required",
[str(error) for error in errors],
)
def test_workflow_trigger_serializer_clears_empty_custom_field_query(self):
serializer = WorkflowTriggerSerializer(
data={
"type": WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
"filter_custom_field_query": "",
},
)
self.assertTrue(serializer.is_valid(), serializer.errors)
self.assertIsNone(serializer.validated_data.get("filter_custom_field_query"))
def test_existing_document_invalid_custom_field_query_configuration(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_custom_field_query="{ not json",
)
document = Document.objects.create(
title="doc invalid query",
original_filename="invalid.pdf",
checksum="checksum-invalid-query",
)
matched, reason = existing_document_matches_workflow(document, trigger)
self.assertFalse(matched)
self.assertEqual(reason, "Invalid custom field query configuration")
def test_prefilter_documents_returns_none_for_invalid_custom_field_query(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_custom_field_query="{ not json",
)
Document.objects.create(
title="doc",
original_filename="doc.pdf",
checksum="checksum-prefilter-invalid",
)
filtered = prefilter_documents_by_workflowtrigger(
Document.objects.all(),
trigger,
)
self.assertEqual(list(filtered), [])
def test_prefilter_documents_applies_all_filters(self):
other_document_type = DocumentType.objects.create(name="Other Type")
other_storage_path = StoragePath.objects.create(
name="Blocked path",
path="/blocked/",
)
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_has_correspondent=self.c,
filter_has_document_type=self.dt,
filter_has_storage_path=self.sp,
)
trigger.filter_has_tags.set([self.t1])
trigger.filter_has_all_tags.set([self.t1, self.t2])
trigger.filter_has_not_tags.set([self.t3])
trigger.filter_has_not_correspondents.set([self.c2])
trigger.filter_has_not_document_types.set([other_document_type])
trigger.filter_has_not_storage_paths.set([other_storage_path])
allowed_document = Document.objects.create(
title="allowed",
correspondent=self.c,
document_type=self.dt,
storage_path=self.sp,
original_filename="allow.pdf",
checksum="checksum-prefilter-allowed",
)
allowed_document.tags.set([self.t1, self.t2])
blocked_document = Document.objects.create(
title="blocked",
correspondent=self.c2,
document_type=other_document_type,
storage_path=other_storage_path,
original_filename="block.pdf",
checksum="checksum-prefilter-blocked",
)
blocked_document.tags.set([self.t1, self.t3])
filtered = prefilter_documents_by_workflowtrigger(
Document.objects.all(),
trigger,
)
self.assertIn(allowed_document, filtered)
self.assertNotIn(blocked_document, filtered)
def test_document_added_no_match_doctype(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,