Merge branch 'dev' into feature-remote-ocr-2

This commit is contained in:
shamoon
2025-09-11 13:25:53 -07:00
committed by GitHub
36 changed files with 1891 additions and 964 deletions

View File

@@ -181,6 +181,7 @@ def modify_custom_fields(
defaults[value_field] = value
if (
custom_field.data_type == CustomField.FieldDataType.DOCUMENTLINK
and value
and doc_id in value
):
# Prevent self-linking

View File

@@ -195,6 +195,7 @@ def update_document(writer: AsyncWriter, doc: Document) -> None:
original_filename=doc.original_filename,
is_shared=len(viewer_ids) > 0,
)
logger.debug(f"Index updated for document {doc.pk}.")
def remove_document(writer: AsyncWriter, doc: Document) -> None:

View File

@@ -41,7 +41,11 @@ def log_reason(
def match_correspondents(document: Document, classifier: DocumentClassifier, user=None):
pred_id = classifier.predict_correspondent(document.content) if classifier else None
pred_id = (
classifier.predict_correspondent(document.suggestion_content)
if classifier
else None
)
if user is None and document.owner is not None:
user = document.owner
@@ -65,8 +69,11 @@ def match_correspondents(document: Document, classifier: DocumentClassifier, use
def match_document_types(document: Document, classifier: DocumentClassifier, user=None):
pred_id = classifier.predict_document_type(document.content) if classifier else None
pred_id = (
classifier.predict_document_type(document.suggestion_content)
if classifier
else None
)
if user is None and document.owner is not None:
user = document.owner
@@ -89,7 +96,9 @@ def match_document_types(document: Document, classifier: DocumentClassifier, use
def match_tags(document: Document, classifier: DocumentClassifier, user=None):
predicted_tag_ids = classifier.predict_tags(document.content) if classifier else []
predicted_tag_ids = (
classifier.predict_tags(document.suggestion_content) if classifier else []
)
if user is None and document.owner is not None:
user = document.owner
@@ -112,7 +121,11 @@ def match_tags(document: Document, classifier: DocumentClassifier, user=None):
def match_storage_paths(document: Document, classifier: DocumentClassifier, user=None):
pred_id = classifier.predict_storage_path(document.content) if classifier else None
pred_id = (
classifier.predict_storage_path(document.suggestion_content)
if classifier
else None
)
if user is None and document.owner is not None:
user = document.owner
@@ -373,6 +386,16 @@ def existing_document_matches_workflow(
)
trigger_matched = False
# Document storage_path vs trigger has_storage_path
if (
trigger.filter_has_storage_path is not None
and document.storage_path != trigger.filter_has_storage_path
):
reason = (
f"Document storage path {document.storage_path} does not match {trigger.filter_has_storage_path}",
)
trigger_matched = False
# Document original_filename vs trigger filename
if (
trigger.filter_filename is not None
@@ -417,6 +440,11 @@ def prefilter_documents_by_workflowtrigger(
document_type=trigger.filter_has_document_type,
)
if trigger.filter_has_storage_path is not None:
documents = documents.filter(
storage_path=trigger.filter_has_storage_path,
)
if trigger.filter_filename is not None and len(trigger.filter_filename) > 0:
# the true fnmatch will actually run later so we just want a loose filter here
regex = fnmatch_translate(trigger.filter_filename).lstrip("^").rstrip("$")

View File

@@ -0,0 +1,35 @@
# Generated by Django 5.2.6 on 2025-09-11 17:29
import django.db.models.deletion
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "1068_alter_document_created"),
]
operations = [
migrations.AddField(
model_name="workflowtrigger",
name="filter_has_storage_path",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="documents.storagepath",
verbose_name="has this storage path",
),
),
migrations.AlterField(
model_name="workflowaction",
name="assign_title",
field=models.TextField(
blank=True,
help_text="Assign a document title, must be a Jinja2 template, see documentation.",
null=True,
verbose_name="assign title",
),
),
]

View File

@@ -305,6 +305,28 @@ class Document(SoftDeleteModel, ModelWithOwner):
res += f" {self.title}"
return res
@property
def suggestion_content(self):
"""
Returns the document text used to generate suggestions.
If the document content length exceeds a specified limit,
the text is cropped to include the start and end segments.
Otherwise, the full content is returned.
This improves processing speed for large documents while keeping
enough context for accurate suggestions.
"""
if not self.content or len(self.content) <= 1200000:
return self.content
else:
# Use 80% from the start and 20% from the end
# to preserve both opening and closing context.
head_len = 800000
tail_len = 200000
return " ".join((self.content[:head_len], self.content[-tail_len:]))
@property
def source_path(self) -> Path:
if self.filename:
@@ -1022,6 +1044,14 @@ class WorkflowTrigger(models.Model):
verbose_name=_("has this correspondent"),
)
filter_has_storage_path = models.ForeignKey(
StoragePath,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("has this storage path"),
)
schedule_offset_days = models.IntegerField(
_("schedule offset days"),
default=0,
@@ -1185,14 +1215,12 @@ class WorkflowAction(models.Model):
default=WorkflowActionType.ASSIGNMENT,
)
assign_title = models.CharField(
assign_title = models.TextField(
_("assign title"),
max_length=256,
null=True,
blank=True,
help_text=_(
"Assign a document title, can include some placeholders, "
"see documentation.",
"Assign a document title, must be a Jinja2 template, see documentation.",
),
)

View File

@@ -2054,6 +2054,7 @@ class WorkflowTriggerSerializer(serializers.ModelSerializer):
"filter_has_tags",
"filter_has_correspondent",
"filter_has_document_type",
"filter_has_storage_path",
"schedule_offset_days",
"schedule_is_recurring",
"schedule_recurring_interval_days",

View File

@@ -0,0 +1,27 @@
from jinja2.sandbox import SandboxedEnvironment
class JinjaEnvironment(SandboxedEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.undefined_tracker = None
def is_safe_callable(self, obj):
# Block access to .save() and .delete() methods
if callable(obj) and getattr(obj, "__name__", None) in (
"save",
"delete",
"update",
):
return False
# Call the parent method for other cases
return super().is_safe_callable(obj)
_template_environment = JinjaEnvironment(
trim_blocks=True,
lstrip_blocks=True,
keep_trailing_newline=False,
autoescape=False,
extensions=["jinja2.ext.loopcontrols"],
)

View File

@@ -2,22 +2,16 @@ import logging
import os
import re
from collections.abc import Iterable
from datetime import date
from datetime import datetime
from pathlib import PurePath
import pathvalidate
from babel import Locale
from babel import dates
from django.utils import timezone
from django.utils.dateparse import parse_date
from django.utils.text import slugify as django_slugify
from jinja2 import StrictUndefined
from jinja2 import Template
from jinja2 import TemplateSyntaxError
from jinja2 import UndefinedError
from jinja2 import make_logging_undefined
from jinja2.sandbox import SandboxedEnvironment
from jinja2.sandbox import SecurityError
from documents.models import Correspondent
@@ -27,39 +21,16 @@ from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.templating.environment import _template_environment
from documents.templating.filters import format_datetime
from documents.templating.filters import get_cf_value
from documents.templating.filters import localize_date
logger = logging.getLogger("paperless.templating")
_LogStrictUndefined = make_logging_undefined(logger, StrictUndefined)
class FilePathEnvironment(SandboxedEnvironment):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.undefined_tracker = None
def is_safe_callable(self, obj):
# Block access to .save() and .delete() methods
if callable(obj) and getattr(obj, "__name__", None) in (
"save",
"delete",
"update",
):
return False
# Call the parent method for other cases
return super().is_safe_callable(obj)
_template_environment = FilePathEnvironment(
trim_blocks=True,
lstrip_blocks=True,
keep_trailing_newline=False,
autoescape=False,
extensions=["jinja2.ext.loopcontrols"],
undefined=_LogStrictUndefined,
)
class FilePathTemplate(Template):
def render(self, *args, **kwargs) -> str:
def clean_filepath(value: str) -> str:
@@ -81,54 +52,7 @@ class FilePathTemplate(Template):
return clean_filepath(original_render)
def get_cf_value(
custom_field_data: dict[str, dict[str, str]],
name: str,
default: str | None = None,
) -> str | None:
if name in custom_field_data and custom_field_data[name]["value"] is not None:
return custom_field_data[name]["value"]
elif default is not None:
return default
return None
def format_datetime(value: str | datetime, format: str) -> str:
if isinstance(value, str):
value = parse_date(value)
return value.strftime(format=format)
def localize_date(value: date | datetime, format: str, locale: str) -> str:
"""
Format a date or datetime object into a localized string using Babel.
Args:
value (date | datetime): The date or datetime to format. If a datetime
is provided, it should be timezone-aware (e.g., UTC from a Django DB object).
format (str): The format to use. Can be one of Babel's preset formats
('short', 'medium', 'long', 'full') or a custom pattern string.
locale (str): The locale code (e.g., 'en_US', 'fr_FR') to use for
localization.
Returns:
str: The localized, formatted date string.
Raises:
TypeError: If `value` is not a date or datetime instance.
"""
try:
Locale.parse(locale)
except Exception as e:
raise ValueError(f"Invalid locale identifier: {locale}") from e
if isinstance(value, datetime):
return dates.format_datetime(value, format=format, locale=locale)
elif isinstance(value, date):
return dates.format_date(value, format=format, locale=locale)
else:
raise TypeError(f"Unsupported type {type(value)} for localize_date")
_template_environment.undefined = _LogStrictUndefined
_template_environment.filters["get_cf_value"] = get_cf_value

View File

@@ -0,0 +1,60 @@
from datetime import date
from datetime import datetime
from babel import Locale
from babel import dates
from django.utils.dateparse import parse_date
from django.utils.dateparse import parse_datetime
def localize_date(value: date | datetime | str, format: str, locale: str) -> str:
"""
Format a date, datetime or str object into a localized string using Babel.
Args:
value (date | datetime | str): The date or datetime to format. If a datetime
is provided, it should be timezone-aware (e.g., UTC from a Django DB object).
if str is provided is is parsed as date.
format (str): The format to use. Can be one of Babel's preset formats
('short', 'medium', 'long', 'full') or a custom pattern string.
locale (str): The locale code (e.g., 'en_US', 'fr_FR') to use for
localization.
Returns:
str: The localized, formatted date string.
Raises:
TypeError: If `value` is not a date, datetime or str instance.
"""
if isinstance(value, str):
value = parse_datetime(value)
try:
Locale.parse(locale)
except Exception as e:
raise ValueError(f"Invalid locale identifier: {locale}") from e
if isinstance(value, datetime):
return dates.format_datetime(value, format=format, locale=locale)
elif isinstance(value, date):
return dates.format_date(value, format=format, locale=locale)
else:
raise TypeError(f"Unsupported type {type(value)} for localize_date")
def format_datetime(value: str | datetime, format: str) -> str:
if isinstance(value, str):
value = parse_date(value)
return value.strftime(format=format)
def get_cf_value(
custom_field_data: dict[str, dict[str, str]],
name: str,
default: str | None = None,
) -> str | None:
if name in custom_field_data and custom_field_data[name]["value"] is not None:
return custom_field_data[name]["value"]
elif default is not None:
return default
return None

View File

@@ -1,7 +1,33 @@
import logging
from datetime import date
from datetime import datetime
from pathlib import Path
from django.utils.text import slugify as django_slugify
from jinja2 import StrictUndefined
from jinja2 import Template
from jinja2 import TemplateSyntaxError
from jinja2 import UndefinedError
from jinja2 import make_logging_undefined
from jinja2.sandbox import SecurityError
from documents.templating.environment import _template_environment
from documents.templating.filters import format_datetime
from documents.templating.filters import localize_date
logger = logging.getLogger("paperless.templating")
_LogStrictUndefined = make_logging_undefined(logger, StrictUndefined)
_template_environment.undefined = _LogStrictUndefined
_template_environment.filters["datetime"] = format_datetime
_template_environment.filters["slugify"] = django_slugify
_template_environment.filters["localize_date"] = localize_date
def parse_w_workflow_placeholders(
text: str,
@@ -20,6 +46,7 @@ def parse_w_workflow_placeholders(
e.g. for pre-consumption triggers created will not have been parsed yet, but it will
for added / updated triggers
"""
formatting = {
"correspondent": correspondent_name,
"document_type": doc_type_name,
@@ -52,4 +79,28 @@ def parse_w_workflow_placeholders(
formatting.update({"doc_title": doc_title})
if doc_url is not None:
formatting.update({"doc_url": doc_url})
return text.format(**formatting).strip()
logger.debug(f"Jinja Template is : {text}")
try:
template = _template_environment.from_string(
text,
template_class=Template,
)
rendered_template = template.render(formatting)
# We're good!
return rendered_template
except UndefinedError as e:
# The undefined class logs this already for us
raise e
except TemplateSyntaxError as e:
logger.warning(f"Template syntax error in title generation: {e}")
except SecurityError as e:
logger.warning(f"Template attempted restricted operation: {e}")
except Exception as e:
logger.warning(f"Unknown error in title generation: {e}")
logger.warning(
f"Invalid title format '{text}', workflow not applied: {e}",
)
raise e
return None

View File

@@ -186,6 +186,7 @@ class TestApiWorkflows(DirectoriesMixin, APITestCase):
"filter_has_tags": [self.t1.id],
"filter_has_document_type": self.dt.id,
"filter_has_correspondent": self.c.id,
"filter_has_storage_path": self.sp.id,
},
],
"actions": [

View File

@@ -304,22 +304,6 @@ class TestConsumer(
self.assertEqual(document.title, "Override Title")
self._assert_first_last_send_progress()
def testOverrideTitleInvalidPlaceholders(self):
with self.assertLogs("paperless.consumer", level="ERROR") as cm:
with self.get_consumer(
self.get_test_file(),
DocumentMetadataOverrides(title="Override {correspondent]"),
) as consumer:
consumer.run()
document = Document.objects.first()
self.assertIsNotNone(document)
self.assertEqual(document.title, "sample")
expected_str = "Error occurred parsing title override 'Override {correspondent]', falling back to original"
self.assertIn(expected_str, cm.output[0])
def testOverrideCorrespondent(self):
c = Correspondent.objects.create(name="test")
@@ -437,7 +421,7 @@ class TestConsumer(
DocumentMetadataOverrides(
correspondent_id=c.pk,
document_type_id=dt.pk,
title="{correspondent}{document_type} {added_month}-{added_year_short}",
title="{{correspondent}}{{document_type}} {{added_month}}-{{added_year_short}}",
),
) as consumer:
consumer.run()

View File

@@ -6,6 +6,7 @@ from unittest import mock
from django.test import TestCase
from django.test import override_settings
from faker import Faker
from documents.models import Correspondent
from documents.models import Document
@@ -105,3 +106,27 @@ class TestDocument(TestCase):
created=date(2020, 12, 25),
)
self.assertEqual(doc.get_public_filename(), "2020-12-25 test")
def test_suggestion_content():
"""
Check that the document for suggestion is cropped, only if it exceeds the length limit.
"""
fake_text = Faker().text(max_nb_chars=1201000)
# Do not crop content under 1.2M chars
content_under_limit = fake_text[:1200000]
doc = Document(
title="test",
created=date(2025, 6, 1),
content=content_under_limit,
)
assert doc.suggestion_content == content_under_limit
# If over the limit, crop to 1M char (800K from the beginning, 200K from the end)
content_over_limit = fake_text[:1200001]
expected_cropped_content = (
content_over_limit[:800000] + " " + content_over_limit[-200000:]
)
doc.content = content_over_limit
assert doc.suggestion_content == expected_cropped_content

View File

@@ -23,7 +23,6 @@ from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.tasks import empty_trash
from documents.templating.filepath import localize_date
from documents.tests.factories import DocumentFactory
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
@@ -1591,166 +1590,13 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
)
class TestDateLocalization:
class TestPathDateLocalization:
"""
Groups all tests related to the `localize_date` function.
"""
TEST_DATE = datetime.date(2023, 10, 26)
TEST_DATETIME = datetime.datetime(
2023,
10,
26,
14,
30,
5,
tzinfo=datetime.timezone.utc,
)
@pytest.mark.parametrize(
"value, format_style, locale_str, expected_output",
[
pytest.param(
TEST_DATE,
"EEEE, MMM d, yyyy",
"en_US",
"Thursday, Oct 26, 2023",
id="date-en_US-custom",
),
pytest.param(
TEST_DATE,
"dd.MM.yyyy",
"de_DE",
"26.10.2023",
id="date-de_DE-custom",
),
# German weekday and month name translation
pytest.param(
TEST_DATE,
"EEEE",
"de_DE",
"Donnerstag",
id="weekday-de_DE",
),
pytest.param(
TEST_DATE,
"MMMM",
"de_DE",
"Oktober",
id="month-de_DE",
),
# French weekday and month name translation
pytest.param(
TEST_DATE,
"EEEE",
"fr_FR",
"jeudi",
id="weekday-fr_FR",
),
pytest.param(
TEST_DATE,
"MMMM",
"fr_FR",
"octobre",
id="month-fr_FR",
),
],
)
def test_localize_date_with_date_objects(
self,
value: datetime.date,
format_style: str,
locale_str: str,
expected_output: str,
):
"""
Tests `localize_date` with `date` objects across different locales and formats.
"""
assert localize_date(value, format_style, locale_str) == expected_output
@pytest.mark.parametrize(
"value, format_style, locale_str, expected_output",
[
pytest.param(
TEST_DATETIME,
"yyyy.MM.dd G 'at' HH:mm:ss zzz",
"en_US",
"2023.10.26 AD at 14:30:05 UTC",
id="datetime-en_US-custom",
),
pytest.param(
TEST_DATETIME,
"dd.MM.yyyy",
"fr_FR",
"26.10.2023",
id="date-fr_FR-custom",
),
# Spanish weekday and month translation
pytest.param(
TEST_DATETIME,
"EEEE",
"es_ES",
"jueves",
id="weekday-es_ES",
),
pytest.param(
TEST_DATETIME,
"MMMM",
"es_ES",
"octubre",
id="month-es_ES",
),
# Italian weekday and month translation
pytest.param(
TEST_DATETIME,
"EEEE",
"it_IT",
"giovedì",
id="weekday-it_IT",
),
pytest.param(
TEST_DATETIME,
"MMMM",
"it_IT",
"ottobre",
id="month-it_IT",
),
],
)
def test_localize_date_with_datetime_objects(
self,
value: datetime.datetime,
format_style: str,
locale_str: str,
expected_output: str,
):
# To handle the non-breaking space in French and other locales
result = localize_date(value, format_style, locale_str)
assert result.replace("\u202f", " ") == expected_output.replace("\u202f", " ")
@pytest.mark.parametrize(
"invalid_value",
[
"2023-10-26",
1698330605,
None,
[],
{},
],
)
def test_localize_date_raises_type_error_for_invalid_input(self, invalid_value):
with pytest.raises(TypeError) as excinfo:
localize_date(invalid_value, "medium", "en_US")
assert f"Unsupported type {type(invalid_value)}" in str(excinfo.value)
def test_localize_date_raises_error_for_invalid_locale(self):
with pytest.raises(ValueError) as excinfo:
localize_date(self.TEST_DATE, "medium", "invalid_locale_code")
assert "Invalid locale identifier" in str(excinfo.value)
@pytest.mark.django_db
@pytest.mark.parametrize(
"filename_format,expected_filename",

View File

@@ -0,0 +1,296 @@
import datetime
from typing import Any
from typing import Literal
import pytest
from documents.templating.filters import localize_date
class TestDateLocalization:
"""
Groups all tests related to the `localize_date` function.
"""
TEST_DATE = datetime.date(2023, 10, 26)
TEST_DATETIME = datetime.datetime(
2023,
10,
26,
14,
30,
5,
tzinfo=datetime.timezone.utc,
)
TEST_DATETIME_STRING: str = "2023-10-26T14:30:05+00:00"
TEST_DATE_STRING: str = "2023-10-26"
@pytest.mark.parametrize(
"value, format_style, locale_str, expected_output",
[
pytest.param(
TEST_DATE,
"EEEE, MMM d, yyyy",
"en_US",
"Thursday, Oct 26, 2023",
id="date-en_US-custom",
),
pytest.param(
TEST_DATE,
"dd.MM.yyyy",
"de_DE",
"26.10.2023",
id="date-de_DE-custom",
),
# German weekday and month name translation
pytest.param(
TEST_DATE,
"EEEE",
"de_DE",
"Donnerstag",
id="weekday-de_DE",
),
pytest.param(
TEST_DATE,
"MMMM",
"de_DE",
"Oktober",
id="month-de_DE",
),
# French weekday and month name translation
pytest.param(
TEST_DATE,
"EEEE",
"fr_FR",
"jeudi",
id="weekday-fr_FR",
),
pytest.param(
TEST_DATE,
"MMMM",
"fr_FR",
"octobre",
id="month-fr_FR",
),
],
)
def test_localize_date_with_date_objects(
self,
value: datetime.date,
format_style: str,
locale_str: str,
expected_output: str,
):
"""
Tests `localize_date` with `date` objects across different locales and formats.
"""
assert localize_date(value, format_style, locale_str) == expected_output
@pytest.mark.parametrize(
"value, format_style, locale_str, expected_output",
[
pytest.param(
TEST_DATETIME,
"yyyy.MM.dd G 'at' HH:mm:ss zzz",
"en_US",
"2023.10.26 AD at 14:30:05 UTC",
id="datetime-en_US-custom",
),
pytest.param(
TEST_DATETIME,
"dd.MM.yyyy",
"fr_FR",
"26.10.2023",
id="date-fr_FR-custom",
),
# Spanish weekday and month translation
pytest.param(
TEST_DATETIME,
"EEEE",
"es_ES",
"jueves",
id="weekday-es_ES",
),
pytest.param(
TEST_DATETIME,
"MMMM",
"es_ES",
"octubre",
id="month-es_ES",
),
# Italian weekday and month translation
pytest.param(
TEST_DATETIME,
"EEEE",
"it_IT",
"giovedì",
id="weekday-it_IT",
),
pytest.param(
TEST_DATETIME,
"MMMM",
"it_IT",
"ottobre",
id="month-it_IT",
),
],
)
def test_localize_date_with_datetime_objects(
self,
value: datetime.datetime,
format_style: str,
locale_str: str,
expected_output: str,
):
# To handle the non-breaking space in French and other locales
result = localize_date(value, format_style, locale_str)
assert result.replace("\u202f", " ") == expected_output.replace("\u202f", " ")
@pytest.mark.parametrize(
"invalid_value",
[
1698330605,
None,
[],
{},
],
)
def test_localize_date_raises_type_error_for_invalid_input(
self,
invalid_value: None | list[object] | dict[Any, Any] | Literal[1698330605],
):
with pytest.raises(TypeError) as excinfo:
localize_date(invalid_value, "medium", "en_US")
assert f"Unsupported type {type(invalid_value)}" in str(excinfo.value)
def test_localize_date_raises_error_for_invalid_locale(self):
with pytest.raises(ValueError) as excinfo:
localize_date(self.TEST_DATE, "medium", "invalid_locale_code")
assert "Invalid locale identifier" in str(excinfo.value)
@pytest.mark.parametrize(
"value, format_style, locale_str, expected_output",
[
pytest.param(
TEST_DATETIME_STRING,
"EEEE, MMM d, yyyy",
"en_US",
"Thursday, Oct 26, 2023",
id="date-en_US-custom",
),
pytest.param(
TEST_DATETIME_STRING,
"dd.MM.yyyy",
"de_DE",
"26.10.2023",
id="date-de_DE-custom",
),
# German weekday and month name translation
pytest.param(
TEST_DATETIME_STRING,
"EEEE",
"de_DE",
"Donnerstag",
id="weekday-de_DE",
),
pytest.param(
TEST_DATETIME_STRING,
"MMMM",
"de_DE",
"Oktober",
id="month-de_DE",
),
# French weekday and month name translation
pytest.param(
TEST_DATETIME_STRING,
"EEEE",
"fr_FR",
"jeudi",
id="weekday-fr_FR",
),
pytest.param(
TEST_DATETIME_STRING,
"MMMM",
"fr_FR",
"octobre",
id="month-fr_FR",
),
],
)
def test_localize_date_with_datetime_string(
self,
value: str,
format_style: str,
locale_str: str,
expected_output: str,
):
"""
Tests `localize_date` with `date` string across different locales and formats.
"""
assert localize_date(value, format_style, locale_str) == expected_output
@pytest.mark.parametrize(
"value, format_style, locale_str, expected_output",
[
pytest.param(
TEST_DATE_STRING,
"EEEE, MMM d, yyyy",
"en_US",
"Thursday, Oct 26, 2023",
id="date-en_US-custom",
),
pytest.param(
TEST_DATE_STRING,
"dd.MM.yyyy",
"de_DE",
"26.10.2023",
id="date-de_DE-custom",
),
# German weekday and month name translation
pytest.param(
TEST_DATE_STRING,
"EEEE",
"de_DE",
"Donnerstag",
id="weekday-de_DE",
),
pytest.param(
TEST_DATE_STRING,
"MMMM",
"de_DE",
"Oktober",
id="month-de_DE",
),
# French weekday and month name translation
pytest.param(
TEST_DATE_STRING,
"EEEE",
"fr_FR",
"jeudi",
id="weekday-fr_FR",
),
pytest.param(
TEST_DATE_STRING,
"MMMM",
"fr_FR",
"octobre",
id="month-fr_FR",
),
],
)
def test_localize_date_with_date_string(
self,
value: str,
format_style: str,
locale_str: str,
expected_output: str,
):
"""
Tests `localize_date` with `date` string across different locales and formats.
"""
assert localize_date(value, format_style, locale_str) == expected_output

View File

@@ -1,6 +1,8 @@
import datetime
import shutil
import socket
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock
@@ -15,6 +17,7 @@ from guardian.shortcuts import get_users_with_perms
from httpx import HTTPError
from httpx import HTTPStatusError
from pytest_httpx import HTTPXMock
from rest_framework.test import APIClient
from rest_framework.test import APITestCase
from documents.signals.handlers import run_workflows
@@ -22,7 +25,7 @@ from documents.signals.handlers import send_webhook
if TYPE_CHECKING:
from django.db.models import QuerySet
from pytest_django.fixtures import SettingsWrapper
from documents import tasks
from documents.data_models import ConsumableDocument
@@ -122,7 +125,7 @@ class TestWorkflows(
filter_path=f"*/{self.dirs.scratch_dir.parts[-1]}/*",
)
action = WorkflowAction.objects.create(
assign_title="Doc from {correspondent}",
assign_title="Doc from {{correspondent}}",
assign_correspondent=self.c,
assign_document_type=self.dt,
assign_storage_path=self.sp,
@@ -241,7 +244,7 @@ class TestWorkflows(
)
action = WorkflowAction.objects.create(
assign_title="Doc from {correspondent}",
assign_title="Doc from {{correspondent}}",
assign_correspondent=self.c,
assign_document_type=self.dt,
assign_storage_path=self.sp,
@@ -892,7 +895,7 @@ class TestWorkflows(
filter_filename="*sample*",
)
action = WorkflowAction.objects.create(
assign_title="Doc created in {created_year}",
assign_title="Doc created in {{created_year}}",
assign_correspondent=self.c2,
assign_document_type=self.dt,
assign_storage_path=self.sp,
@@ -1147,6 +1150,38 @@ class TestWorkflows(
expected_str = f"Document correspondent {doc.correspondent} does not match {trigger.filter_has_correspondent}"
self.assertIn(expected_str, cm.output[1])
def test_document_added_no_match_storage_path(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_has_storage_path=self.sp,
)
action = WorkflowAction.objects.create(
assign_title="Doc assign owner",
assign_owner=self.user2,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
doc = Document.objects.create(
title="sample test",
original_filename="sample.pdf",
)
with self.assertLogs("paperless.matching", level="DEBUG") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Document did not match {w}"
self.assertIn(expected_str, cm.output[0])
expected_str = f"Document storage path {doc.storage_path} does not match {trigger.filter_has_storage_path}"
self.assertIn(expected_str, cm.output[1])
def test_document_added_invalid_title_placeholders(self):
"""
GIVEN:
@@ -1155,7 +1190,7 @@ class TestWorkflows(
WHEN:
- File that matches is added
THEN:
- Title is not updated, error is output
- Title is updated but the placeholder isn't replaced
"""
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
@@ -1181,15 +1216,12 @@ class TestWorkflows(
created=created,
)
with self.assertLogs("paperless.handlers", level="ERROR") as cm:
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
expected_str = f"Error occurred parsing title assignment '{action.assign_title}', falling back to original"
self.assertIn(expected_str, cm.output[0])
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
self.assertEqual(doc.title, "sample test")
self.assertEqual(doc.title, "Doc {created_year]")
def test_document_updated_workflow(self):
trigger = WorkflowTrigger.objects.create(
@@ -1223,6 +1255,45 @@ class TestWorkflows(
self.assertEqual(doc.custom_fields.all().count(), 1)
def test_document_consumption_workflow_month_placeholder_addded(self):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
sources=f"{DocumentSource.ApiUpload}",
filter_filename="simple*",
)
action = WorkflowAction.objects.create(
assign_title="Doc added in {{added_month_name_short}}",
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
superuser = User.objects.create_superuser("superuser")
self.client.force_authenticate(user=superuser)
test_file = shutil.copy(
self.SAMPLE_DIR / "simple.pdf",
self.dirs.scratch_dir / "simple.pdf",
)
with mock.patch("documents.tasks.ProgressManager", DummyProgressManager):
tasks.consume_file(
ConsumableDocument(
source=DocumentSource.ApiUpload,
original_file=test_file,
),
None,
)
document = Document.objects.first()
self.assertRegex(
document.title,
r"Doc added in \w{3,}",
) # Match any 3-letter month name
def test_document_updated_workflow_existing_custom_field(self):
"""
GIVEN:
@@ -1777,6 +1848,7 @@ class TestWorkflows(
filter_filename="*sample*",
filter_has_document_type=self.dt,
filter_has_correspondent=self.c,
filter_has_storage_path=self.sp,
)
trigger.filter_has_tags.set([self.t1])
trigger.save()
@@ -1797,6 +1869,7 @@ class TestWorkflows(
title=f"sample test {i}",
checksum=f"checksum{i}",
correspondent=self.c,
storage_path=self.sp,
original_filename=f"sample_{i}.pdf",
document_type=self.dt if i % 2 == 0 else None,
)
@@ -2035,7 +2108,7 @@ class TestWorkflows(
filter_filename="*simple*",
)
action = WorkflowAction.objects.create(
assign_title="Doc from {correspondent}",
assign_title="Doc from {{correspondent}}",
assign_correspondent=self.c,
assign_document_type=self.dt,
assign_storage_path=self.sp,
@@ -2614,7 +2687,7 @@ class TestWorkflows(
)
webhook_action = WorkflowActionWebhook.objects.create(
use_params=False,
body="Test message: {doc_url}",
body="Test message: {{doc_url}}",
url="http://paperless-ngx.com",
include_document=False,
)
@@ -2673,7 +2746,7 @@ class TestWorkflows(
)
webhook_action = WorkflowActionWebhook.objects.create(
use_params=False,
body="Test message: {doc_url}",
body="Test message: {{doc_url}}",
url="http://paperless-ngx.com",
include_document=True,
)
@@ -3130,3 +3203,234 @@ class TestWebhookSecurity:
req = httpx_mock.get_request()
assert req.headers["Host"] == "paperless-ngx.com"
assert "evil.test" not in req.headers.get("Host", "")
@pytest.mark.django_db
class TestDateWorkflowLocalization(
SampleDirMixin,
):
"""Test cases for workflows that use date localization in templates."""
TEST_DATETIME = datetime.datetime(
2023,
6,
26,
14,
30,
5,
tzinfo=datetime.timezone.utc,
)
@pytest.mark.parametrize(
"title_template,expected_title",
[
pytest.param(
"Created at {{ created | localize_date('MMMM', 'es_ES') }}",
"Created at junio",
id="spanish_month",
),
pytest.param(
"Created at {{ created | localize_date('MMMM', 'de_DE') }}",
"Created at Juni", # codespell:ignore
id="german_month",
),
pytest.param(
"Created at {{ created | localize_date('dd/MM/yyyy', 'en_GB') }}",
"Created at 26/06/2023",
id="british_date_format",
),
],
)
def test_document_added_workflow_localization(
self,
title_template: str,
expected_title: str,
):
"""
GIVEN:
- Document added workflow with title template using localize_date filter
WHEN:
- Document is consumed
THEN:
- Document title is set with localized date
"""
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED,
filter_filename="*sample*",
)
action = WorkflowAction.objects.create(
assign_title=title_template,
)
workflow = Workflow.objects.create(
name="Workflow 1",
order=0,
)
workflow.triggers.add(trigger)
workflow.actions.add(action)
workflow.save()
doc = Document.objects.create(
title="sample test",
correspondent=None,
original_filename="sample.pdf",
created=self.TEST_DATETIME,
)
document_consumption_finished.send(
sender=self.__class__,
document=doc,
)
doc.refresh_from_db()
assert doc.title == expected_title
@pytest.mark.parametrize(
"title_template,expected_title",
[
pytest.param(
"Created at {{ created | localize_date('MMMM', 'es_ES') }}",
"Created at junio",
id="spanish_month",
),
pytest.param(
"Created at {{ created | localize_date('MMMM', 'de_DE') }}",
"Created at Juni", # codespell:ignore
id="german_month",
),
pytest.param(
"Created at {{ created | localize_date('dd/MM/yyyy', 'en_GB') }}",
"Created at 26/06/2023",
id="british_date_format",
),
],
)
def test_document_updated_workflow_localization(
self,
title_template: str,
expected_title: str,
):
"""
GIVEN:
- Document updated workflow with title template using localize_date filter
WHEN:
- Document is updated via API
THEN:
- Document title is set with localized date
"""
# Setup test data
dt = DocumentType.objects.create(name="DocType Name")
c = Correspondent.objects.create(name="Correspondent Name")
client = APIClient()
superuser = User.objects.create_superuser("superuser")
client.force_authenticate(user=superuser)
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
filter_has_document_type=dt,
)
doc = Document.objects.create(
title="sample test",
correspondent=c,
original_filename="sample.pdf",
created=self.TEST_DATETIME,
)
action = WorkflowAction.objects.create(
assign_title=title_template,
)
workflow = Workflow.objects.create(
name="Workflow 1",
order=0,
)
workflow.triggers.add(trigger)
workflow.actions.add(action)
workflow.save()
client.patch(
f"/api/documents/{doc.id}/",
{"document_type": dt.id},
format="json",
)
doc.refresh_from_db()
assert doc.title == expected_title
@pytest.mark.parametrize(
"title_template,expected_title",
[
pytest.param(
"Added at {{ added | localize_date('MMMM', 'es_ES') }}",
"Added at junio",
id="spanish_month",
),
pytest.param(
"Added at {{ added | localize_date('MMMM', 'de_DE') }}",
"Added at Juni", # codespell:ignore
id="german_month",
),
pytest.param(
"Added at {{ added | localize_date('dd/MM/yyyy', 'en_GB') }}",
"Added at 26/06/2023",
id="british_date_format",
),
],
)
def test_document_consumption_workflow_localization(
self,
tmp_path: Path,
settings: SettingsWrapper,
title_template: str,
expected_title: str,
):
trigger = WorkflowTrigger.objects.create(
type=WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
sources=f"{DocumentSource.ApiUpload}",
filter_filename="simple*",
)
test_file = shutil.copy(
self.SAMPLE_DIR / "simple.pdf",
tmp_path / "simple.pdf",
)
action = WorkflowAction.objects.create(
assign_title=title_template,
)
w = Workflow.objects.create(
name="Workflow 1",
order=0,
)
w.triggers.add(trigger)
w.actions.add(action)
w.save()
settings.SCRATCH_DIR = tmp_path / "scratch"
(tmp_path / "scratch").mkdir(parents=True, exist_ok=True)
# Temporarily override "now" for the environment so templates using
# added/created placeholders behave as if it's a different system date.
with (
mock.patch(
"documents.tasks.ProgressManager",
DummyProgressManager,
),
mock.patch(
"django.utils.timezone.now",
return_value=self.TEST_DATETIME,
),
):
tasks.consume_file(
ConsumableDocument(
source=DocumentSource.ApiUpload,
original_file=test_file,
),
None,
)
document = Document.objects.first()
assert document.title == expected_title

File diff suppressed because it is too large Load Diff

View File

@@ -468,7 +468,12 @@ class MailAccountHandler(LoggingMixin):
def _correspondent_from_name(self, name: str) -> Correspondent | None:
try:
return Correspondent.objects.get_or_create(name=name)[0]
return Correspondent.objects.get_or_create(
name=name,
defaults={
"match": name,
},
)[0]
except DatabaseError as e:
self.log.error(f"Error while retrieving correspondent {name}: {e}")
return None

View File

@@ -26,6 +26,7 @@ from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import Correspondent
from documents.models import MatchingModel
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from paperless_mail import tasks
@@ -446,6 +447,8 @@ class TestMail(
c = handler._get_correspondent(message, rule)
self.assertIsNotNone(c)
self.assertEqual(c.name, "someone@somewhere.com")
self.assertEqual(c.matching_algorithm, MatchingModel.MATCH_ANY)
self.assertEqual(c.match, "someone@somewhere.com")
c = handler._get_correspondent(message2, rule)
self.assertIsNotNone(c)
self.assertEqual(c.name, "me@localhost.com")