mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-08-12 00:19:48 +00:00
Feature: consumption templates (#4196)
* Initial implementation of consumption templates * Frontend implementation of consumption templates Testing * Support consumption template source * order templates, automatically add permissions * Support title assignment in consumption templates * Refactoring, filters to and, show sources on list Show sources on template list, update some translation strings Make filters and minor testing * Update strings * Only update django-multiselectfield * Basic docs, document some methods * Improve testing coverage, template multi-assignment merges
This commit is contained in:
@@ -32,6 +32,8 @@ from whoosh.writing import AsyncWriter
|
||||
|
||||
from documents import bulk_edit
|
||||
from documents import index
|
||||
from documents.data_models import DocumentSource
|
||||
from documents.models import ConsumptionTemplate
|
||||
from documents.models import Correspondent
|
||||
from documents.models import Document
|
||||
from documents.models import DocumentType
|
||||
@@ -45,6 +47,8 @@ from documents.models import Tag
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from documents.tests.utils import DocumentConsumeDelayMixin
|
||||
from paperless import version
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.models import MailRule
|
||||
|
||||
|
||||
class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
|
||||
@@ -5313,3 +5317,168 @@ class TestBulkEditObjectPermissions(APITestCase):
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
|
||||
class TestApiConsumptionTemplates(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/consumption_templates/"
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
||||
user = User.objects.create_superuser(username="temp_admin")
|
||||
self.client.force_authenticate(user=user)
|
||||
self.user2 = User.objects.create(username="user2")
|
||||
self.user3 = User.objects.create(username="user3")
|
||||
self.group1 = Group.objects.create(name="group1")
|
||||
|
||||
self.c = Correspondent.objects.create(name="Correspondent Name")
|
||||
self.c2 = Correspondent.objects.create(name="Correspondent Name 2")
|
||||
self.dt = DocumentType.objects.create(name="DocType Name")
|
||||
self.t1 = Tag.objects.create(name="t1")
|
||||
self.t2 = Tag.objects.create(name="t2")
|
||||
self.t3 = Tag.objects.create(name="t3")
|
||||
self.sp = StoragePath.objects.create(path="/test/")
|
||||
|
||||
self.ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{int(DocumentSource.ApiUpload)},{int(DocumentSource.ConsumeFolder)},{int(DocumentSource.MailFetch)}",
|
||||
filter_filename="*simple*",
|
||||
filter_path="*/samples/*",
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
self.ct.assign_tags.add(self.t1)
|
||||
self.ct.assign_tags.add(self.t2)
|
||||
self.ct.assign_tags.add(self.t3)
|
||||
self.ct.assign_view_users.add(self.user3.pk)
|
||||
self.ct.assign_view_groups.add(self.group1.pk)
|
||||
self.ct.assign_change_users.add(self.user3.pk)
|
||||
self.ct.assign_change_groups.add(self.group1.pk)
|
||||
self.ct.save()
|
||||
|
||||
def test_api_get_consumption_template(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to get all consumption template
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Existing consumption templates are returned
|
||||
"""
|
||||
response = self.client.get(self.ENDPOINT, format="json")
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data["count"], 1)
|
||||
|
||||
resp_consumption_template = response.data["results"][0]
|
||||
self.assertEqual(resp_consumption_template["id"], self.ct.id)
|
||||
self.assertEqual(
|
||||
resp_consumption_template["assign_correspondent"],
|
||||
self.ct.assign_correspondent.pk,
|
||||
)
|
||||
|
||||
def test_api_create_consumption_template(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a consumption template
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP response
|
||||
- New template is created
|
||||
"""
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "Template 2",
|
||||
"order": 1,
|
||||
"sources": [DocumentSource.ApiUpload],
|
||||
"filter_filename": "*test*",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||||
self.assertEqual(ConsumptionTemplate.objects.count(), 2)
|
||||
|
||||
def test_api_create_invalid_consumption_template(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a consumption template
|
||||
- Neither file name nor path filter are specified
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP 400 response
|
||||
- No template is created
|
||||
"""
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "Template 2",
|
||||
"order": 1,
|
||||
"sources": [DocumentSource.ApiUpload],
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertEqual(StoragePath.objects.count(), 1)
|
||||
|
||||
def test_api_create_consumption_template_with_mailrule(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a consumption template with a mail rule but no MailFetch source
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP response
|
||||
- New template is created with MailFetch as source
|
||||
"""
|
||||
account1 = MailAccount.objects.create(
|
||||
name="Email1",
|
||||
username="username1",
|
||||
password="password1",
|
||||
imap_server="server.example.com",
|
||||
imap_port=443,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
)
|
||||
rule1 = MailRule.objects.create(
|
||||
name="Rule1",
|
||||
account=account1,
|
||||
folder="INBOX",
|
||||
filter_from="from@example.com",
|
||||
filter_to="someone@somewhere.com",
|
||||
filter_subject="subject",
|
||||
filter_body="body",
|
||||
filter_attachment_filename="file.pdf",
|
||||
maximum_age=30,
|
||||
action=MailRule.MailAction.MARK_READ,
|
||||
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
|
||||
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
|
||||
order=0,
|
||||
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
)
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "Template 2",
|
||||
"order": 1,
|
||||
"sources": [DocumentSource.ApiUpload],
|
||||
"filter_mailrule": rule1.pk,
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
|
||||
self.assertEqual(ConsumptionTemplate.objects.count(), 2)
|
||||
ct = ConsumptionTemplate.objects.get(name="Template 2")
|
||||
self.assertEqual(ct.sources, [int(DocumentSource.MailFetch).__str__()])
|
||||
|
@@ -11,9 +11,12 @@ from unittest.mock import MagicMock
|
||||
|
||||
from dateutil import tz
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import TestCase
|
||||
from django.test import override_settings
|
||||
from django.utils import timezone
|
||||
from guardian.core import ObjectPermissionChecker
|
||||
|
||||
from documents.consumer import Consumer
|
||||
from documents.consumer import ConsumerError
|
||||
@@ -22,6 +25,7 @@ from documents.models import Correspondent
|
||||
from documents.models import Document
|
||||
from documents.models import DocumentType
|
||||
from documents.models import FileInfo
|
||||
from documents.models import StoragePath
|
||||
from documents.models import Tag
|
||||
from documents.parsers import DocumentParser
|
||||
from documents.parsers import ParseError
|
||||
@@ -431,6 +435,16 @@ class TestConsumer(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
self.assertEqual(document.document_type.id, dt.id)
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testOverrideStoragePath(self):
|
||||
sp = StoragePath.objects.create(name="test")
|
||||
|
||||
document = self.consumer.try_consume_file(
|
||||
self.get_test_file(),
|
||||
override_storage_path_id=sp.pk,
|
||||
)
|
||||
self.assertEqual(document.storage_path.id, sp.id)
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testOverrideTags(self):
|
||||
t1 = Tag.objects.create(name="t1")
|
||||
t2 = Tag.objects.create(name="t2")
|
||||
@@ -445,6 +459,51 @@ class TestConsumer(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
self.assertIn(t3, document.tags.all())
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testOverrideAsn(self):
|
||||
document = self.consumer.try_consume_file(
|
||||
self.get_test_file(),
|
||||
override_asn=123,
|
||||
)
|
||||
self.assertEqual(document.archive_serial_number, 123)
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testOverrideTitlePlaceholders(self):
|
||||
c = Correspondent.objects.create(name="Correspondent Name")
|
||||
dt = DocumentType.objects.create(name="DocType Name")
|
||||
|
||||
document = self.consumer.try_consume_file(
|
||||
self.get_test_file(),
|
||||
override_correspondent_id=c.pk,
|
||||
override_document_type_id=dt.pk,
|
||||
override_title="{correspondent}{document_type} {added_month}-{added_year_short}",
|
||||
)
|
||||
now = timezone.now()
|
||||
self.assertEqual(document.title, f"{c.name}{dt.name} {now.strftime('%m-%y')}")
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testOverrideOwner(self):
|
||||
testuser = User.objects.create(username="testuser")
|
||||
document = self.consumer.try_consume_file(
|
||||
self.get_test_file(),
|
||||
override_owner_id=testuser.pk,
|
||||
)
|
||||
self.assertEqual(document.owner, testuser)
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testOverridePermissions(self):
|
||||
testuser = User.objects.create(username="testuser")
|
||||
testgroup = Group.objects.create(name="testgroup")
|
||||
document = self.consumer.try_consume_file(
|
||||
self.get_test_file(),
|
||||
override_view_users=[testuser.pk],
|
||||
override_view_groups=[testgroup.pk],
|
||||
)
|
||||
user_checker = ObjectPermissionChecker(testuser)
|
||||
self.assertTrue(user_checker.has_perm("view_document", document))
|
||||
group_checker = ObjectPermissionChecker(testgroup)
|
||||
self.assertTrue(group_checker.has_perm("view_document", document))
|
||||
self._assert_first_last_send_progress()
|
||||
|
||||
def testNotAFile(self):
|
||||
self.assertRaisesMessage(
|
||||
ConsumerError,
|
||||
|
476
src/documents/tests/test_consumption_templates.py
Normal file
476
src/documents/tests/test_consumption_templates.py
Normal file
@@ -0,0 +1,476 @@
|
||||
from pathlib import Path
|
||||
from unittest import TestCase
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.auth.models import User
|
||||
|
||||
from documents import tasks
|
||||
from documents.data_models import ConsumableDocument
|
||||
from documents.data_models import DocumentSource
|
||||
from documents.models import ConsumptionTemplate
|
||||
from documents.models import Correspondent
|
||||
from documents.models import DocumentType
|
||||
from documents.models import StoragePath
|
||||
from documents.models import Tag
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from documents.tests.utils import FileSystemAssertsMixin
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.models import MailRule
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestConsumptionTemplates(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
SAMPLE_DIR = Path(__file__).parent / "samples"
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.c = Correspondent.objects.create(name="Correspondent Name")
|
||||
self.c2 = Correspondent.objects.create(name="Correspondent Name 2")
|
||||
self.dt = DocumentType.objects.create(name="DocType Name")
|
||||
self.t1 = Tag.objects.create(name="t1")
|
||||
self.t2 = Tag.objects.create(name="t2")
|
||||
self.t3 = Tag.objects.create(name="t3")
|
||||
self.sp = StoragePath.objects.create(path="/test/")
|
||||
|
||||
self.user2 = User.objects.create(username="user2")
|
||||
self.user3 = User.objects.create(username="user3")
|
||||
self.group1 = Group.objects.create(name="group1")
|
||||
|
||||
account1 = MailAccount.objects.create(
|
||||
name="Email1",
|
||||
username="username1",
|
||||
password="password1",
|
||||
imap_server="server.example.com",
|
||||
imap_port=443,
|
||||
imap_security=MailAccount.ImapSecurity.SSL,
|
||||
character_set="UTF-8",
|
||||
)
|
||||
self.rule1 = MailRule.objects.create(
|
||||
name="Rule1",
|
||||
account=account1,
|
||||
folder="INBOX",
|
||||
filter_from="from@example.com",
|
||||
filter_to="someone@somewhere.com",
|
||||
filter_subject="subject",
|
||||
filter_body="body",
|
||||
filter_attachment_filename="file.pdf",
|
||||
maximum_age=30,
|
||||
action=MailRule.MailAction.MARK_READ,
|
||||
assign_title_from=MailRule.TitleSource.NONE,
|
||||
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
|
||||
order=0,
|
||||
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
|
||||
assign_owner_from_rule=False,
|
||||
)
|
||||
|
||||
return super().setUp()
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_match(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing consumption template
|
||||
WHEN:
|
||||
- File that matches is consumed
|
||||
THEN:
|
||||
- Template overrides are applied
|
||||
"""
|
||||
ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_filename="*simple*",
|
||||
filter_path="*/samples/*",
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
ct.assign_tags.add(self.t1)
|
||||
ct.assign_tags.add(self.t2)
|
||||
ct.assign_tags.add(self.t3)
|
||||
ct.assign_view_users.add(self.user3.pk)
|
||||
ct.assign_view_groups.add(self.group1.pk)
|
||||
ct.assign_change_users.add(self.user3.pk)
|
||||
ct.assign_change_groups.add(self.group1.pk)
|
||||
ct.save()
|
||||
|
||||
self.assertEqual(ct.__str__(), "Template 1")
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="INFO") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=test_file,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
self.assertEqual(overrides["override_correspondent_id"], self.c.pk)
|
||||
self.assertEqual(overrides["override_document_type_id"], self.dt.pk)
|
||||
self.assertEqual(
|
||||
overrides["override_tag_ids"],
|
||||
[self.t1.pk, self.t2.pk, self.t3.pk],
|
||||
)
|
||||
self.assertEqual(overrides["override_storage_path_id"], self.sp.pk)
|
||||
self.assertEqual(overrides["override_owner_id"], self.user2.pk)
|
||||
self.assertEqual(overrides["override_view_users"], [self.user3.pk])
|
||||
self.assertEqual(overrides["override_view_groups"], [self.group1.pk])
|
||||
self.assertEqual(overrides["override_change_users"], [self.user3.pk])
|
||||
self.assertEqual(overrides["override_change_groups"], [self.group1.pk])
|
||||
self.assertEqual(
|
||||
overrides["override_title"],
|
||||
"Doc from {correspondent}",
|
||||
)
|
||||
|
||||
info = cm.output[0]
|
||||
expected_str = f"Document matched template {ct}"
|
||||
self.assertIn(expected_str, info)
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_match_mailrule(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing consumption template
|
||||
WHEN:
|
||||
- File that matches is consumed via mail rule
|
||||
THEN:
|
||||
- Template overrides are applied
|
||||
"""
|
||||
ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_mailrule=self.rule1,
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
ct.assign_tags.add(self.t1)
|
||||
ct.assign_tags.add(self.t2)
|
||||
ct.assign_tags.add(self.t3)
|
||||
ct.assign_view_users.add(self.user3.pk)
|
||||
ct.assign_view_groups.add(self.group1.pk)
|
||||
ct.assign_change_users.add(self.user3.pk)
|
||||
ct.assign_change_groups.add(self.group1.pk)
|
||||
ct.save()
|
||||
|
||||
self.assertEqual(ct.__str__(), "Template 1")
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="INFO") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=test_file,
|
||||
mailrule_id=self.rule1.pk,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
self.assertEqual(overrides["override_correspondent_id"], self.c.pk)
|
||||
self.assertEqual(overrides["override_document_type_id"], self.dt.pk)
|
||||
self.assertEqual(
|
||||
overrides["override_tag_ids"],
|
||||
[self.t1.pk, self.t2.pk, self.t3.pk],
|
||||
)
|
||||
self.assertEqual(overrides["override_storage_path_id"], self.sp.pk)
|
||||
self.assertEqual(overrides["override_owner_id"], self.user2.pk)
|
||||
self.assertEqual(overrides["override_view_users"], [self.user3.pk])
|
||||
self.assertEqual(overrides["override_view_groups"], [self.group1.pk])
|
||||
self.assertEqual(overrides["override_change_users"], [self.user3.pk])
|
||||
self.assertEqual(overrides["override_change_groups"], [self.group1.pk])
|
||||
self.assertEqual(
|
||||
overrides["override_title"],
|
||||
"Doc from {correspondent}",
|
||||
)
|
||||
|
||||
info = cm.output[0]
|
||||
expected_str = f"Document matched template {ct}"
|
||||
self.assertIn(expected_str, info)
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_match_multiple(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Multiple existing consumption template
|
||||
WHEN:
|
||||
- File that matches is consumed
|
||||
THEN:
|
||||
- Template overrides are applied with subsequent templates only overwriting empty values
|
||||
or merging if multiple
|
||||
"""
|
||||
ct1 = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_path="*/samples/*",
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
)
|
||||
ct1.assign_tags.add(self.t1)
|
||||
ct1.assign_tags.add(self.t2)
|
||||
ct1.assign_view_users.add(self.user2)
|
||||
ct1.save()
|
||||
ct2 = ConsumptionTemplate.objects.create(
|
||||
name="Template 2",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_filename="*simple*",
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c2,
|
||||
assign_storage_path=self.sp,
|
||||
)
|
||||
ct2.assign_tags.add(self.t3)
|
||||
ct1.assign_view_users.add(self.user3)
|
||||
ct2.save()
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="INFO") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=test_file,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
# template 1
|
||||
self.assertEqual(overrides["override_correspondent_id"], self.c.pk)
|
||||
self.assertEqual(overrides["override_document_type_id"], self.dt.pk)
|
||||
# template 2
|
||||
self.assertEqual(overrides["override_storage_path_id"], self.sp.pk)
|
||||
# template 1 & 2
|
||||
self.assertEqual(
|
||||
overrides["override_tag_ids"],
|
||||
[self.t1.pk, self.t2.pk, self.t3.pk],
|
||||
)
|
||||
self.assertEqual(
|
||||
overrides["override_view_users"],
|
||||
[self.user2.pk, self.user3.pk],
|
||||
)
|
||||
|
||||
expected_str = f"Document matched template {ct1}"
|
||||
self.assertIn(expected_str, cm.output[0])
|
||||
expected_str = f"Document matched template {ct2}"
|
||||
self.assertIn(expected_str, cm.output[1])
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_no_match_filename(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing consumption template
|
||||
WHEN:
|
||||
- File that does not match on filename is consumed
|
||||
THEN:
|
||||
- Template overrides are not applied
|
||||
"""
|
||||
ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_filename="*foobar*",
|
||||
filter_path=None,
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=test_file,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
self.assertIsNone(overrides["override_correspondent_id"])
|
||||
self.assertIsNone(overrides["override_document_type_id"])
|
||||
self.assertIsNone(overrides["override_tag_ids"])
|
||||
self.assertIsNone(overrides["override_storage_path_id"])
|
||||
self.assertIsNone(overrides["override_owner_id"])
|
||||
self.assertIsNone(overrides["override_view_users"])
|
||||
self.assertIsNone(overrides["override_view_groups"])
|
||||
self.assertIsNone(overrides["override_change_users"])
|
||||
self.assertIsNone(overrides["override_change_groups"])
|
||||
self.assertIsNone(overrides["override_title"])
|
||||
|
||||
expected_str = f"Document did not match template {ct}"
|
||||
self.assertIn(expected_str, cm.output[0])
|
||||
expected_str = f"Document filename {test_file.name} does not match"
|
||||
self.assertIn(expected_str, cm.output[1])
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_no_match_path(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing consumption template
|
||||
WHEN:
|
||||
- File that does not match on path is consumed
|
||||
THEN:
|
||||
- Template overrides are not applied
|
||||
"""
|
||||
ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_path="*foo/bar*",
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=test_file,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
self.assertIsNone(overrides["override_correspondent_id"])
|
||||
self.assertIsNone(overrides["override_document_type_id"])
|
||||
self.assertIsNone(overrides["override_tag_ids"])
|
||||
self.assertIsNone(overrides["override_storage_path_id"])
|
||||
self.assertIsNone(overrides["override_owner_id"])
|
||||
self.assertIsNone(overrides["override_view_users"])
|
||||
self.assertIsNone(overrides["override_view_groups"])
|
||||
self.assertIsNone(overrides["override_change_users"])
|
||||
self.assertIsNone(overrides["override_change_groups"])
|
||||
self.assertIsNone(overrides["override_title"])
|
||||
|
||||
expected_str = f"Document did not match template {ct}"
|
||||
self.assertIn(expected_str, cm.output[0])
|
||||
expected_str = f"Document path {test_file} does not match"
|
||||
self.assertIn(expected_str, cm.output[1])
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_no_match_mail_rule(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing consumption template
|
||||
WHEN:
|
||||
- File that does not match on source is consumed
|
||||
THEN:
|
||||
- Template overrides are not applied
|
||||
"""
|
||||
ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_mailrule=self.rule1,
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=test_file,
|
||||
mailrule_id=99,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
self.assertIsNone(overrides["override_correspondent_id"])
|
||||
self.assertIsNone(overrides["override_document_type_id"])
|
||||
self.assertIsNone(overrides["override_tag_ids"])
|
||||
self.assertIsNone(overrides["override_storage_path_id"])
|
||||
self.assertIsNone(overrides["override_owner_id"])
|
||||
self.assertIsNone(overrides["override_view_users"])
|
||||
self.assertIsNone(overrides["override_view_groups"])
|
||||
self.assertIsNone(overrides["override_change_users"])
|
||||
self.assertIsNone(overrides["override_change_groups"])
|
||||
self.assertIsNone(overrides["override_title"])
|
||||
|
||||
expected_str = f"Document did not match template {ct}"
|
||||
self.assertIn(expected_str, cm.output[0])
|
||||
expected_str = "Document mail rule 99 !="
|
||||
self.assertIn(expected_str, cm.output[1])
|
||||
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consumption_template_no_match_source(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing consumption template
|
||||
WHEN:
|
||||
- File that does not match on source is consumed
|
||||
THEN:
|
||||
- Template overrides are not applied
|
||||
"""
|
||||
ct = ConsumptionTemplate.objects.create(
|
||||
name="Template 1",
|
||||
order=0,
|
||||
sources=f"{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
|
||||
filter_path="*",
|
||||
assign_title="Doc from {correspondent}",
|
||||
assign_correspondent=self.c,
|
||||
assign_document_type=self.dt,
|
||||
assign_storage_path=self.sp,
|
||||
assign_owner=self.user2,
|
||||
)
|
||||
|
||||
test_file = self.SAMPLE_DIR / "simple.pdf"
|
||||
|
||||
with mock.patch("documents.tasks.async_to_sync"):
|
||||
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
|
||||
tasks.consume_file(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ApiUpload,
|
||||
original_file=test_file,
|
||||
),
|
||||
None,
|
||||
)
|
||||
m.assert_called_once()
|
||||
_, overrides = m.call_args
|
||||
self.assertIsNone(overrides["override_correspondent_id"])
|
||||
self.assertIsNone(overrides["override_document_type_id"])
|
||||
self.assertIsNone(overrides["override_tag_ids"])
|
||||
self.assertIsNone(overrides["override_storage_path_id"])
|
||||
self.assertIsNone(overrides["override_owner_id"])
|
||||
self.assertIsNone(overrides["override_view_users"])
|
||||
self.assertIsNone(overrides["override_view_groups"])
|
||||
self.assertIsNone(overrides["override_change_users"])
|
||||
self.assertIsNone(overrides["override_change_groups"])
|
||||
self.assertIsNone(overrides["override_title"])
|
||||
|
||||
expected_str = f"Document did not match template {ct}"
|
||||
self.assertIn(expected_str, cm.output[0])
|
||||
expected_str = f"Document source {DocumentSource.ApiUpload.name} not in ['{DocumentSource.ConsumeFolder.name}', '{DocumentSource.MailFetch.name}']"
|
||||
self.assertIn(expected_str, cm.output[1])
|
@@ -153,7 +153,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
|
||||
manifest = self._do_export(use_filename_format=use_filename_format)
|
||||
|
||||
self.assertEqual(len(manifest), 154)
|
||||
self.assertEqual(len(manifest), 159)
|
||||
|
||||
# dont include consumer or AnonymousUser users
|
||||
self.assertEqual(
|
||||
@@ -247,7 +247,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
self.assertEqual(Document.objects.get(id=self.d4.id).title, "wow_dec")
|
||||
self.assertEqual(GroupObjectPermission.objects.count(), 1)
|
||||
self.assertEqual(UserObjectPermission.objects.count(), 1)
|
||||
self.assertEqual(Permission.objects.count(), 112)
|
||||
self.assertEqual(Permission.objects.count(), 116)
|
||||
messages = check_sanity()
|
||||
# everything is alright after the test
|
||||
self.assertEqual(len(messages), 0)
|
||||
@@ -676,15 +676,15 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
)
|
||||
|
||||
self.assertEqual(ContentType.objects.count(), 28)
|
||||
self.assertEqual(Permission.objects.count(), 112)
|
||||
self.assertEqual(ContentType.objects.count(), 29)
|
||||
self.assertEqual(Permission.objects.count(), 116)
|
||||
|
||||
manifest = self._do_export()
|
||||
|
||||
with paperless_environment():
|
||||
self.assertEqual(
|
||||
len(list(filter(lambda e: e["model"] == "auth.permission", manifest))),
|
||||
112,
|
||||
116,
|
||||
)
|
||||
# add 1 more to db to show objects are not re-created by import
|
||||
Permission.objects.create(
|
||||
@@ -692,7 +692,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
codename="test_perm",
|
||||
content_type_id=1,
|
||||
)
|
||||
self.assertEqual(Permission.objects.count(), 113)
|
||||
self.assertEqual(Permission.objects.count(), 117)
|
||||
|
||||
# will cause an import error
|
||||
self.user.delete()
|
||||
@@ -701,5 +701,5 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
with self.assertRaises(IntegrityError):
|
||||
call_command("document_importer", "--no-progress-bar", self.target)
|
||||
|
||||
self.assertEqual(ContentType.objects.count(), 28)
|
||||
self.assertEqual(Permission.objects.count(), 113)
|
||||
self.assertEqual(ContentType.objects.count(), 29)
|
||||
self.assertEqual(Permission.objects.count(), 117)
|
||||
|
43
src/documents/tests/test_migration_consumption_templates.py
Normal file
43
src/documents/tests/test_migration_consumption_templates.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from documents.tests.utils import TestMigrations
|
||||
|
||||
|
||||
class TestMigrateConsumptionTemplate(TestMigrations):
|
||||
migrate_from = "1038_sharelink"
|
||||
migrate_to = "1039_consumptiontemplate"
|
||||
|
||||
def setUpBeforeMigration(self, apps):
|
||||
User = get_user_model()
|
||||
Group = apps.get_model("auth.Group")
|
||||
self.Permission = apps.get_model("auth", "Permission")
|
||||
self.user = User.objects.create(username="user1")
|
||||
self.group = Group.objects.create(name="group1")
|
||||
permission = self.Permission.objects.get(codename="add_document")
|
||||
self.user.user_permissions.add(permission.id)
|
||||
self.group.permissions.add(permission.id)
|
||||
|
||||
def test_users_with_add_documents_get_add_consumptiontemplate(self):
|
||||
permission = self.Permission.objects.get(codename="add_consumptiontemplate")
|
||||
self.assertTrue(self.user.has_perm(f"documents.{permission.codename}"))
|
||||
self.assertTrue(permission in self.group.permissions.all())
|
||||
|
||||
|
||||
class TestReverseMigrateConsumptionTemplate(TestMigrations):
|
||||
migrate_from = "1039_consumptiontemplate"
|
||||
migrate_to = "1038_sharelink"
|
||||
|
||||
def setUpBeforeMigration(self, apps):
|
||||
User = get_user_model()
|
||||
Group = apps.get_model("auth.Group")
|
||||
self.Permission = apps.get_model("auth", "Permission")
|
||||
self.user = User.objects.create(username="user1")
|
||||
self.group = Group.objects.create(name="group1")
|
||||
permission = self.Permission.objects.get(codename="add_consumptiontemplate")
|
||||
self.user.user_permissions.add(permission.id)
|
||||
self.group.permissions.add(permission.id)
|
||||
|
||||
def test_remove_consumptiontemplate_permissions(self):
|
||||
permission = self.Permission.objects.get(codename="add_consumptiontemplate")
|
||||
self.assertFalse(self.user.has_perm(f"documents.{permission.codename}"))
|
||||
self.assertFalse(permission in self.group.permissions.all())
|
Reference in New Issue
Block a user