mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-12-12 01:11:17 -06:00
Compare commits
1 Commits
feature/cf
...
feature/ne
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26ebd6ddcf |
@@ -21,6 +21,7 @@ from django.core.validators import MaxLengthValidator
|
||||
from django.core.validators import RegexValidator
|
||||
from django.core.validators import integer_validator
|
||||
from django.db.models import Count
|
||||
from django.db.models.functions import Lower
|
||||
from django.utils.crypto import get_random_string
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.text import slugify
|
||||
@@ -38,6 +39,7 @@ from guardian.utils import get_user_obj_perms_model
|
||||
from rest_framework import fields
|
||||
from rest_framework import serializers
|
||||
from rest_framework.fields import SerializerMethodField
|
||||
from rest_framework.filters import OrderingFilter
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
from auditlog.context import set_actor
|
||||
@@ -575,15 +577,29 @@ class TagSerializer(MatchingModelSerializer, OwnedObjectSerializer):
|
||||
)
|
||||
def get_children(self, obj):
|
||||
filter_q = self.context.get("document_count_filter")
|
||||
if filter_q is None:
|
||||
request = self.context.get("request")
|
||||
if filter_q is None:
|
||||
user = getattr(request, "user", None) if request else None
|
||||
filter_q = get_document_count_filter_for_user(user)
|
||||
self.context["document_count_filter"] = filter_q
|
||||
serializer = TagSerializer(
|
||||
|
||||
children_queryset = (
|
||||
obj.get_children_queryset()
|
||||
.select_related("owner")
|
||||
.annotate(document_count=Count("documents", filter=filter_q)),
|
||||
.annotate(document_count=Count("documents", filter=filter_q))
|
||||
)
|
||||
|
||||
view = self.context.get("view")
|
||||
ordering = (
|
||||
OrderingFilter().get_ordering(request, children_queryset, view)
|
||||
if request and view
|
||||
else None
|
||||
)
|
||||
ordering = ordering or (Lower("name"),)
|
||||
children_queryset = children_queryset.order_by(*ordering)
|
||||
|
||||
serializer = TagSerializer(
|
||||
children_queryset,
|
||||
many=True,
|
||||
user=self.user,
|
||||
full_perms=self.full_perms,
|
||||
|
||||
@@ -28,7 +28,6 @@ from documents import matching
|
||||
from documents.caching import clear_document_caches
|
||||
from documents.file_handling import create_source_path_directory
|
||||
from documents.file_handling import delete_empty_directories
|
||||
from documents.file_handling import generate_filename
|
||||
from documents.file_handling import generate_unique_filename
|
||||
from documents.models import CustomField
|
||||
from documents.models import CustomFieldInstance
|
||||
@@ -43,7 +42,6 @@ from documents.models import WorkflowAction
|
||||
from documents.models import WorkflowRun
|
||||
from documents.models import WorkflowTrigger
|
||||
from documents.permissions import get_objects_for_user_owner_aware
|
||||
from documents.templating.utils import convert_format_str_to_template_format
|
||||
from documents.workflows.actions import build_workflow_action_context
|
||||
from documents.workflows.actions import execute_email_action
|
||||
from documents.workflows.actions import execute_webhook_action
|
||||
@@ -391,19 +389,6 @@ class CannotMoveFilesException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _filename_template_uses_custom_fields(doc: Document) -> bool:
|
||||
template = None
|
||||
if doc.storage_path is not None:
|
||||
template = doc.storage_path.path
|
||||
elif settings.FILENAME_FORMAT is not None:
|
||||
template = convert_format_str_to_template_format(settings.FILENAME_FORMAT)
|
||||
|
||||
if not template:
|
||||
return False
|
||||
|
||||
return "custom_fields" in template
|
||||
|
||||
|
||||
# should be disabled in /src/documents/management/commands/document_importer.py handle
|
||||
@receiver(models.signals.post_save, sender=CustomFieldInstance, weak=False)
|
||||
@receiver(models.signals.m2m_changed, sender=Document.tags.through, weak=False)
|
||||
@@ -414,8 +399,6 @@ def update_filename_and_move_files(
|
||||
**kwargs,
|
||||
):
|
||||
if isinstance(instance, CustomFieldInstance):
|
||||
if not _filename_template_uses_custom_fields(instance.document):
|
||||
return
|
||||
instance = instance.document
|
||||
|
||||
def validate_move(instance, old_path: Path, new_path: Path):
|
||||
@@ -453,47 +436,21 @@ def update_filename_and_move_files(
|
||||
old_filename = instance.filename
|
||||
old_source_path = instance.source_path
|
||||
|
||||
candidate_filename = generate_filename(instance)
|
||||
candidate_source_path = (
|
||||
settings.ORIGINALS_DIR / candidate_filename
|
||||
).resolve()
|
||||
if candidate_filename == Path(old_filename):
|
||||
new_filename = Path(old_filename)
|
||||
elif (
|
||||
candidate_source_path.exists()
|
||||
and candidate_source_path != old_source_path
|
||||
):
|
||||
# Only fall back to unique search when there is an actual conflict
|
||||
new_filename = generate_unique_filename(instance)
|
||||
else:
|
||||
new_filename = candidate_filename
|
||||
|
||||
# Need to convert to string to be able to save it to the db
|
||||
instance.filename = str(new_filename)
|
||||
instance.filename = str(generate_unique_filename(instance))
|
||||
move_original = old_filename != instance.filename
|
||||
|
||||
old_archive_filename = instance.archive_filename
|
||||
old_archive_path = instance.archive_path
|
||||
|
||||
if instance.has_archive_version:
|
||||
archive_candidate = generate_filename(instance, archive_filename=True)
|
||||
archive_candidate_path = (
|
||||
settings.ARCHIVE_DIR / archive_candidate
|
||||
).resolve()
|
||||
if archive_candidate == Path(old_archive_filename):
|
||||
new_archive_filename = Path(old_archive_filename)
|
||||
elif (
|
||||
archive_candidate_path.exists()
|
||||
and archive_candidate_path != old_archive_path
|
||||
):
|
||||
new_archive_filename = generate_unique_filename(
|
||||
# Need to convert to string to be able to save it to the db
|
||||
instance.archive_filename = str(
|
||||
generate_unique_filename(
|
||||
instance,
|
||||
archive_filename=True,
|
||||
),
|
||||
)
|
||||
else:
|
||||
new_archive_filename = archive_candidate
|
||||
|
||||
instance.archive_filename = str(new_archive_filename)
|
||||
|
||||
move_archive = old_archive_filename != instance.archive_filename
|
||||
else:
|
||||
|
||||
@@ -16,7 +16,6 @@ from django.utils import timezone
|
||||
from documents.file_handling import create_source_path_directory
|
||||
from documents.file_handling import delete_empty_directories
|
||||
from documents.file_handling import generate_filename
|
||||
from documents.file_handling import generate_unique_filename
|
||||
from documents.models import Correspondent
|
||||
from documents.models import CustomField
|
||||
from documents.models import CustomFieldInstance
|
||||
@@ -1633,73 +1632,6 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestCustomFieldFilenameUpdates(
|
||||
DirectoriesMixin,
|
||||
FileSystemAssertsMixin,
|
||||
TestCase,
|
||||
):
|
||||
def setUp(self):
|
||||
self.cf = CustomField.objects.create(
|
||||
name="flavor",
|
||||
data_type=CustomField.FieldDataType.STRING,
|
||||
)
|
||||
self.doc = Document.objects.create(
|
||||
title="document",
|
||||
mime_type="application/pdf",
|
||||
checksum="abc123",
|
||||
)
|
||||
self.cfi = CustomFieldInstance.objects.create(
|
||||
field=self.cf,
|
||||
document=self.doc,
|
||||
value_text="initial",
|
||||
)
|
||||
return super().setUp()
|
||||
|
||||
@override_settings(FILENAME_FORMAT=None)
|
||||
def test_custom_field_not_in_template_skips_filename_work(self):
|
||||
storage_path = StoragePath.objects.create(path="{{created}}/{{ title }}")
|
||||
self.doc.storage_path = storage_path
|
||||
self.doc.save()
|
||||
initial_filename = generate_filename(self.doc)
|
||||
Document.objects.filter(pk=self.doc.pk).update(filename=str(initial_filename))
|
||||
self.doc.refresh_from_db()
|
||||
Path(self.doc.source_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
Path(self.doc.source_path).touch()
|
||||
|
||||
with mock.patch("documents.signals.handlers.generate_unique_filename") as m:
|
||||
m.side_effect = generate_unique_filename
|
||||
self.cfi.value_text = "updated"
|
||||
self.cfi.save()
|
||||
|
||||
self.doc.refresh_from_db()
|
||||
self.assertEqual(Path(self.doc.filename), initial_filename)
|
||||
self.assertEqual(m.call_count, 0)
|
||||
|
||||
@override_settings(FILENAME_FORMAT=None)
|
||||
def test_custom_field_in_template_triggers_filename_update(self):
|
||||
storage_path = StoragePath.objects.create(
|
||||
path="{{ custom_fields|get_cf_value('flavor') }}/{{ title }}",
|
||||
)
|
||||
self.doc.storage_path = storage_path
|
||||
self.doc.save()
|
||||
initial_filename = generate_filename(self.doc)
|
||||
Document.objects.filter(pk=self.doc.pk).update(filename=str(initial_filename))
|
||||
self.doc.refresh_from_db()
|
||||
Path(self.doc.source_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
Path(self.doc.source_path).touch()
|
||||
|
||||
with mock.patch("documents.signals.handlers.generate_unique_filename") as m:
|
||||
m.side_effect = generate_unique_filename
|
||||
self.cfi.value_text = "updated"
|
||||
self.cfi.save()
|
||||
|
||||
self.doc.refresh_from_db()
|
||||
expected_filename = Path("updated/document.pdf")
|
||||
self.assertEqual(Path(self.doc.filename), expected_filename)
|
||||
self.assertTrue(Path(self.doc.source_path).is_file())
|
||||
self.assertLessEqual(m.call_count, 1)
|
||||
|
||||
|
||||
class TestPathDateLocalization:
|
||||
"""
|
||||
Groups all tests related to the `localize_date` function.
|
||||
|
||||
Reference in New Issue
Block a user