mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-08-16 00:36:22 +00:00
Refactor: Use django-filter logic for filtering full text search queries (#7507)
This commit is contained in:
@@ -15,6 +15,7 @@ from rest_framework.test import APITestCase
|
||||
from whoosh.writing import AsyncWriter
|
||||
|
||||
from documents import index
|
||||
from documents.bulk_edit import set_permissions
|
||||
from documents.models import Correspondent
|
||||
from documents.models import CustomField
|
||||
from documents.models import CustomFieldInstance
|
||||
@@ -1159,7 +1160,8 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
|
||||
[d3.id, d2.id, d1.id],
|
||||
)
|
||||
|
||||
def test_global_search(self):
|
||||
@mock.patch("documents.bulk_edit.bulk_update_documents")
|
||||
def test_global_search(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- Multiple documents and objects
|
||||
@@ -1186,11 +1188,38 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
|
||||
checksum="C",
|
||||
pk=3,
|
||||
)
|
||||
# The below two documents are owned by user2 and shouldn't show up in results!
|
||||
d4 = Document.objects.create(
|
||||
title="doc 4 owned by user2",
|
||||
content="bank bank bank bank 4",
|
||||
checksum="D",
|
||||
pk=4,
|
||||
)
|
||||
d5 = Document.objects.create(
|
||||
title="doc 5 owned by user2",
|
||||
content="bank bank bank bank 5",
|
||||
checksum="E",
|
||||
pk=5,
|
||||
)
|
||||
|
||||
user1 = User.objects.create_user("bank user1")
|
||||
user2 = User.objects.create_superuser("user2")
|
||||
group1 = Group.objects.create(name="bank group1")
|
||||
Group.objects.create(name="group2")
|
||||
|
||||
user1.user_permissions.add(
|
||||
*Permission.objects.filter(codename__startswith="view_").exclude(
|
||||
content_type__app_label="admin",
|
||||
),
|
||||
)
|
||||
set_permissions([4, 5], set_permissions=[], owner=user2, merge=False)
|
||||
|
||||
with index.open_index_writer() as writer:
|
||||
index.update_document(writer, d1)
|
||||
index.update_document(writer, d2)
|
||||
index.update_document(writer, d3)
|
||||
index.update_document(writer, d4)
|
||||
index.update_document(writer, d5)
|
||||
|
||||
correspondent1 = Correspondent.objects.create(name="bank correspondent 1")
|
||||
Correspondent.objects.create(name="correspondent 2")
|
||||
@@ -1200,10 +1229,7 @@ class TestDocumentSearchApi(DirectoriesMixin, APITestCase):
|
||||
StoragePath.objects.create(name="path 2", path="path2")
|
||||
tag1 = Tag.objects.create(name="bank tag1")
|
||||
Tag.objects.create(name="tag2")
|
||||
user1 = User.objects.create_superuser("bank user1")
|
||||
User.objects.create_user("user2")
|
||||
group1 = Group.objects.create(name="bank group1")
|
||||
Group.objects.create(name="group2")
|
||||
|
||||
SavedView.objects.create(
|
||||
name="bank view",
|
||||
show_on_dashboard=True,
|
||||
|
@@ -1,8 +1,6 @@
|
||||
from dateutil.parser import isoparse
|
||||
from django.test import TestCase
|
||||
from whoosh import query
|
||||
|
||||
from documents.index import DelayedQuery
|
||||
from documents.index import get_permissions_criterias
|
||||
from documents.models import User
|
||||
|
||||
@@ -58,162 +56,3 @@ class TestDelayedQuery(TestCase):
|
||||
)
|
||||
for user, expected in tests:
|
||||
self.assertEqual(get_permissions_criterias(user), expected)
|
||||
|
||||
def test_no_query_filters(self):
|
||||
dq = DelayedQuery(None, {}, None, None)
|
||||
self.assertEqual(dq._get_query_filter(), self.has_no_owner)
|
||||
|
||||
def test_date_query_filters(self):
|
||||
def _get_testset(param: str):
|
||||
date_str = "1970-01-01T02:44"
|
||||
date_obj = isoparse(date_str)
|
||||
return (
|
||||
(
|
||||
{f"{param}__date__lt": date_str},
|
||||
query.And(
|
||||
[
|
||||
query.DateRange(param, start=None, end=date_obj),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
(
|
||||
{f"{param}__date__gt": date_str},
|
||||
query.And(
|
||||
[
|
||||
query.DateRange(param, start=date_obj, end=None),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
query_params = ["created", "added"]
|
||||
for param in query_params:
|
||||
for params, expected in _get_testset(param):
|
||||
dq = DelayedQuery(None, params, None, None)
|
||||
got = dq._get_query_filter()
|
||||
self.assertCountEqual(got, expected)
|
||||
|
||||
def test_is_tagged_query_filter(self):
|
||||
tests = (
|
||||
("True", True),
|
||||
("true", True),
|
||||
("1", True),
|
||||
("False", False),
|
||||
("false", False),
|
||||
("0", False),
|
||||
("foo", False),
|
||||
)
|
||||
for param, expected in tests:
|
||||
dq = DelayedQuery(None, {"is_tagged": param}, None, None)
|
||||
self.assertEqual(
|
||||
dq._get_query_filter(),
|
||||
query.And([query.Term("has_tag", expected), self.has_no_owner]),
|
||||
)
|
||||
|
||||
def test_tags_query_filters(self):
|
||||
# tests contains tuples of query_parameter dics and the expected whoosh query
|
||||
param = "tags"
|
||||
field, _ = DelayedQuery.param_map[param]
|
||||
tests = (
|
||||
(
|
||||
{f"{param}__id__all": "42,43"},
|
||||
query.And(
|
||||
[
|
||||
query.Term(f"{field}_id", "42"),
|
||||
query.Term(f"{field}_id", "43"),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
# tags does not allow __id
|
||||
(
|
||||
{f"{param}__id": "42"},
|
||||
self.has_no_owner,
|
||||
),
|
||||
# tags does not allow __isnull
|
||||
(
|
||||
{f"{param}__isnull": "true"},
|
||||
self.has_no_owner,
|
||||
),
|
||||
self._get_testset__id__in(param, field),
|
||||
self._get_testset__id__none(param, field),
|
||||
)
|
||||
|
||||
for params, expected in tests:
|
||||
dq = DelayedQuery(None, params, None, None)
|
||||
got = dq._get_query_filter()
|
||||
self.assertCountEqual(got, expected)
|
||||
|
||||
def test_generic_query_filters(self):
|
||||
def _get_testset(param: str):
|
||||
field, _ = DelayedQuery.param_map[param]
|
||||
return (
|
||||
(
|
||||
{f"{param}__id": "42"},
|
||||
query.And(
|
||||
[
|
||||
query.Term(f"{field}_id", "42"),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
self._get_testset__id__in(param, field),
|
||||
self._get_testset__id__none(param, field),
|
||||
(
|
||||
{f"{param}__isnull": "true"},
|
||||
query.And(
|
||||
[
|
||||
query.Term(f"has_{field}", False),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
(
|
||||
{f"{param}__isnull": "false"},
|
||||
query.And(
|
||||
[
|
||||
query.Term(f"has_{field}", True),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
query_params = ["correspondent", "document_type", "storage_path", "owner"]
|
||||
for param in query_params:
|
||||
for params, expected in _get_testset(param):
|
||||
dq = DelayedQuery(None, params, None, None)
|
||||
got = dq._get_query_filter()
|
||||
self.assertCountEqual(got, expected)
|
||||
|
||||
def test_char_query_filter(self):
|
||||
def _get_testset(param: str):
|
||||
return (
|
||||
(
|
||||
{f"{param}__icontains": "foo"},
|
||||
query.And(
|
||||
[
|
||||
query.Term(f"{param}", "foo"),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
(
|
||||
{f"{param}__istartswith": "foo"},
|
||||
query.And(
|
||||
[
|
||||
query.Prefix(f"{param}", "foo"),
|
||||
self.has_no_owner,
|
||||
],
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
query_params = ["checksum", "original_filename"]
|
||||
for param in query_params:
|
||||
for params, expected in _get_testset(param):
|
||||
dq = DelayedQuery(None, params, None, None)
|
||||
got = dq._get_query_filter()
|
||||
self.assertCountEqual(got, expected)
|
||||
|
Reference in New Issue
Block a user