mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	Feature: Allow to filter documents by original filename and checksum (#3485)
* Allow to filter documents by original filename and checksum This adds filters for the original filename and checksum of documents to be able to to lazy checks if the file is already stored in paperless. * Add tests for DelayedQuery * Add checksum and original_filename to whoosh index and DelayedQuery * Refactored DelayedQuery to reduce duplicate code * Choose icontains for checksums as whoosh has no exact match query term * Bumped index version * Revert whoosh filtering logic to simpler structure, remove redundant tests Revert "Revert whoosh filtering logic to simpler structure, remove redundant tests" This reverts commit 86792174bfbc697f42b72c4b39ee9eba483bb425. --------- Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
This commit is contained in:
		@@ -80,7 +80,7 @@ django_checks() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
search_index() {
 | 
					search_index() {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	local -r index_version=5
 | 
						local -r index_version=6
 | 
				
			||||||
	local -r index_version_file=${DATA_DIR}/.index_version
 | 
						local -r index_version_file=${DATA_DIR}/.index_version
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if [[ (! -f "${index_version_file}") || $(<"${index_version_file}") != "$index_version" ]]; then
 | 
						if [[ (! -f "${index_version_file}") || $(<"${index_version_file}") != "$index_version" ]]; then
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -116,6 +116,8 @@ class DocumentFilterSet(FilterSet):
 | 
				
			|||||||
            "created": DATE_KWARGS,
 | 
					            "created": DATE_KWARGS,
 | 
				
			||||||
            "added": DATE_KWARGS,
 | 
					            "added": DATE_KWARGS,
 | 
				
			||||||
            "modified": DATE_KWARGS,
 | 
					            "modified": DATE_KWARGS,
 | 
				
			||||||
 | 
					            "original_filename": CHAR_KWARGS,
 | 
				
			||||||
 | 
					            "checksum": CHAR_KWARGS,
 | 
				
			||||||
            "correspondent": ["isnull"],
 | 
					            "correspondent": ["isnull"],
 | 
				
			||||||
            "correspondent__id": ID_KWARGS,
 | 
					            "correspondent__id": ID_KWARGS,
 | 
				
			||||||
            "correspondent__name": CHAR_KWARGS,
 | 
					            "correspondent__name": CHAR_KWARGS,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -64,6 +64,8 @@ def get_schema():
 | 
				
			|||||||
        owner_id=NUMERIC(),
 | 
					        owner_id=NUMERIC(),
 | 
				
			||||||
        has_owner=BOOLEAN(),
 | 
					        has_owner=BOOLEAN(),
 | 
				
			||||||
        viewer_id=KEYWORD(commas=True),
 | 
					        viewer_id=KEYWORD(commas=True),
 | 
				
			||||||
 | 
					        checksum=TEXT(),
 | 
				
			||||||
 | 
					        original_filename=TEXT(sortable=True),
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -149,6 +151,8 @@ def update_document(writer: AsyncWriter, doc: Document):
 | 
				
			|||||||
        owner_id=doc.owner.id if doc.owner else None,
 | 
					        owner_id=doc.owner.id if doc.owner else None,
 | 
				
			||||||
        has_owner=doc.owner is not None,
 | 
					        has_owner=doc.owner is not None,
 | 
				
			||||||
        viewer_id=viewer_ids if viewer_ids else None,
 | 
					        viewer_id=viewer_ids if viewer_ids else None,
 | 
				
			||||||
 | 
					        checksum=doc.checksum,
 | 
				
			||||||
 | 
					        original_filename=doc.original_filename,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -171,91 +175,85 @@ def remove_document_from_index(document):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class DelayedQuery:
 | 
					class DelayedQuery:
 | 
				
			||||||
 | 
					    param_map = {
 | 
				
			||||||
 | 
					        "correspondent": ("correspondent", ["id", "id__in", "id__none", "isnull"]),
 | 
				
			||||||
 | 
					        "document_type": ("type", ["id", "id__in", "id__none", "isnull"]),
 | 
				
			||||||
 | 
					        "storage_path": ("path", ["id", "id__in", "id__none", "isnull"]),
 | 
				
			||||||
 | 
					        "owner": ("owner", ["id", "id__in", "id__none", "isnull"]),
 | 
				
			||||||
 | 
					        "tags": ("tag", ["id__all", "id__in", "id__none"]),
 | 
				
			||||||
 | 
					        "added": ("added", ["date__lt", "date__gt"]),
 | 
				
			||||||
 | 
					        "created": ("created", ["date__lt", "date__gt"]),
 | 
				
			||||||
 | 
					        "checksum": ("checksum", ["icontains", "istartswith"]),
 | 
				
			||||||
 | 
					        "original_filename": ("original_filename", ["icontains", "istartswith"]),
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _get_query(self):
 | 
					    def _get_query(self):
 | 
				
			||||||
        raise NotImplementedError
 | 
					        raise NotImplementedError
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _get_query_filter(self):
 | 
					    def _get_query_filter(self):
 | 
				
			||||||
        criterias = []
 | 
					        criterias = []
 | 
				
			||||||
        for k, v in self.query_params.items():
 | 
					        for key, value in self.query_params.items():
 | 
				
			||||||
            if k == "correspondent__id":
 | 
					            # is_tagged is a special case
 | 
				
			||||||
                criterias.append(query.Term("correspondent_id", v))
 | 
					            if key == "is_tagged":
 | 
				
			||||||
            elif k == "correspondent__id__in":
 | 
					                criterias.append(query.Term("has_tag", self.evalBoolean(value)))
 | 
				
			||||||
                correspondents_in = []
 | 
					                continue
 | 
				
			||||||
                for correspondent_id in v.split(","):
 | 
					
 | 
				
			||||||
                    correspondents_in.append(
 | 
					            # Don't process query params without a filter
 | 
				
			||||||
                        query.Term("correspondent_id", correspondent_id),
 | 
					            if "__" not in key:
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # All other query params consist of a parameter and a query filter
 | 
				
			||||||
 | 
					            param, query_filter = key.split("__", 1)
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                field, supported_query_filters = self.param_map[param]
 | 
				
			||||||
 | 
					            except KeyError:
 | 
				
			||||||
 | 
					                logger.error("Unable to build a query filter for parameter %s", key)
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # We only support certain filters per parameter
 | 
				
			||||||
 | 
					            if query_filter not in supported_query_filters:
 | 
				
			||||||
 | 
					                logger.info(
 | 
				
			||||||
 | 
					                    f"Query filter {query_filter} not supported for parameter {param}",
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                criterias.append(query.Or(correspondents_in))
 | 
					                continue
 | 
				
			||||||
            elif k == "correspondent__id__none":
 | 
					
 | 
				
			||||||
                for correspondent_id in v.split(","):
 | 
					            if query_filter == "id":
 | 
				
			||||||
 | 
					                criterias.append(query.Term(f"{field}_id", value))
 | 
				
			||||||
 | 
					            elif query_filter == "id__in":
 | 
				
			||||||
 | 
					                in_filter = []
 | 
				
			||||||
 | 
					                for object_id in value.split(","):
 | 
				
			||||||
 | 
					                    in_filter.append(
 | 
				
			||||||
 | 
					                        query.Term(f"{field}_id", object_id),
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                criterias.append(query.Or(in_filter))
 | 
				
			||||||
 | 
					            elif query_filter == "id__none":
 | 
				
			||||||
 | 
					                for object_id in value.split(","):
 | 
				
			||||||
                    criterias.append(
 | 
					                    criterias.append(
 | 
				
			||||||
                        query.Not(query.Term("correspondent_id", correspondent_id)),
 | 
					                        query.Not(query.Term(f"{field}_id", object_id)),
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
            elif k == "tags__id__all":
 | 
					            elif query_filter == "isnull":
 | 
				
			||||||
                for tag_id in v.split(","):
 | 
					 | 
				
			||||||
                    criterias.append(query.Term("tag_id", tag_id))
 | 
					 | 
				
			||||||
            elif k == "tags__id__none":
 | 
					 | 
				
			||||||
                for tag_id in v.split(","):
 | 
					 | 
				
			||||||
                    criterias.append(query.Not(query.Term("tag_id", tag_id)))
 | 
					 | 
				
			||||||
            elif k == "tags__id__in":
 | 
					 | 
				
			||||||
                tags_in = []
 | 
					 | 
				
			||||||
                for tag_id in v.split(","):
 | 
					 | 
				
			||||||
                    tags_in.append(query.Term("tag_id", tag_id))
 | 
					 | 
				
			||||||
                criterias.append(query.Or(tags_in))
 | 
					 | 
				
			||||||
            elif k == "document_type__id":
 | 
					 | 
				
			||||||
                criterias.append(query.Term("type_id", v))
 | 
					 | 
				
			||||||
            elif k == "document_type__id__in":
 | 
					 | 
				
			||||||
                document_types_in = []
 | 
					 | 
				
			||||||
                for document_type_id in v.split(","):
 | 
					 | 
				
			||||||
                    document_types_in.append(query.Term("type_id", document_type_id))
 | 
					 | 
				
			||||||
                criterias.append(query.Or(document_types_in))
 | 
					 | 
				
			||||||
            elif k == "document_type__id__none":
 | 
					 | 
				
			||||||
                for document_type_id in v.split(","):
 | 
					 | 
				
			||||||
                    criterias.append(query.Not(query.Term("type_id", document_type_id)))
 | 
					 | 
				
			||||||
            elif k == "correspondent__isnull":
 | 
					 | 
				
			||||||
                criterias.append(
 | 
					                criterias.append(
 | 
				
			||||||
                    query.Term("has_correspondent", self.evalBoolean(v) is False),
 | 
					                    query.Term(f"has_{field}", self.evalBoolean(value) is False),
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
            elif k == "is_tagged":
 | 
					            elif query_filter == "id__all":
 | 
				
			||||||
                criterias.append(query.Term("has_tag", self.evalBoolean(v)))
 | 
					                for object_id in value.split(","):
 | 
				
			||||||
            elif k == "document_type__isnull":
 | 
					                    criterias.append(query.Term(f"{field}_id", object_id))
 | 
				
			||||||
                criterias.append(query.Term("has_type", self.evalBoolean(v) is False))
 | 
					            elif query_filter == "date__lt":
 | 
				
			||||||
            elif k == "created__date__lt":
 | 
					 | 
				
			||||||
                criterias.append(
 | 
					                criterias.append(
 | 
				
			||||||
                    query.DateRange("created", start=None, end=isoparse(v)),
 | 
					                    query.DateRange(field, start=None, end=isoparse(value)),
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
            elif k == "created__date__gt":
 | 
					            elif query_filter == "date__gt":
 | 
				
			||||||
                criterias.append(
 | 
					                criterias.append(
 | 
				
			||||||
                    query.DateRange("created", start=isoparse(v), end=None),
 | 
					                    query.DateRange(field, start=isoparse(value), end=None),
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					            elif query_filter == "icontains":
 | 
				
			||||||
 | 
					                criterias.append(
 | 
				
			||||||
 | 
					                    query.Term(field, value),
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					            elif query_filter == "istartswith":
 | 
				
			||||||
 | 
					                criterias.append(
 | 
				
			||||||
 | 
					                    query.Prefix(field, value),
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
            elif k == "added__date__gt":
 | 
					 | 
				
			||||||
                criterias.append(query.DateRange("added", start=isoparse(v), end=None))
 | 
					 | 
				
			||||||
            elif k == "added__date__lt":
 | 
					 | 
				
			||||||
                criterias.append(query.DateRange("added", start=None, end=isoparse(v)))
 | 
					 | 
				
			||||||
            elif k == "storage_path__id":
 | 
					 | 
				
			||||||
                criterias.append(query.Term("path_id", v))
 | 
					 | 
				
			||||||
            elif k == "storage_path__id__in":
 | 
					 | 
				
			||||||
                storage_paths_in = []
 | 
					 | 
				
			||||||
                for storage_path_id in v.split(","):
 | 
					 | 
				
			||||||
                    storage_paths_in.append(query.Term("path_id", storage_path_id))
 | 
					 | 
				
			||||||
                criterias.append(query.Or(storage_paths_in))
 | 
					 | 
				
			||||||
            elif k == "storage_path__id__none":
 | 
					 | 
				
			||||||
                for storage_path_id in v.split(","):
 | 
					 | 
				
			||||||
                    criterias.append(query.Not(query.Term("path_id", storage_path_id)))
 | 
					 | 
				
			||||||
            elif k == "storage_path__isnull":
 | 
					 | 
				
			||||||
                criterias.append(query.Term("has_path", self.evalBoolean(v) is False))
 | 
					 | 
				
			||||||
            elif k == "owner__isnull":
 | 
					 | 
				
			||||||
                criterias.append(query.Term("has_owner", self.evalBoolean(v) is False))
 | 
					 | 
				
			||||||
            elif k == "owner__id":
 | 
					 | 
				
			||||||
                criterias.append(query.Term("owner_id", v))
 | 
					 | 
				
			||||||
            elif k == "owner__id__in":
 | 
					 | 
				
			||||||
                owners_in = []
 | 
					 | 
				
			||||||
                for owner_id in v.split(","):
 | 
					 | 
				
			||||||
                    owners_in.append(query.Term("owner_id", owner_id))
 | 
					 | 
				
			||||||
                criterias.append(query.Or(owners_in))
 | 
					 | 
				
			||||||
            elif k == "owner__id__none":
 | 
					 | 
				
			||||||
                for owner_id in v.split(","):
 | 
					 | 
				
			||||||
                    criterias.append(query.Not(query.Term("owner_id", owner_id)))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        user_criterias = get_permissions_criterias(
 | 
					        user_criterias = get_permissions_criterias(
 | 
				
			||||||
            user=self.user,
 | 
					            user=self.user,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -420,6 +420,74 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
 | 
				
			|||||||
        results = response.data["results"]
 | 
					        results = response.data["results"]
 | 
				
			||||||
        self.assertEqual(len(results), 0)
 | 
					        self.assertEqual(len(results), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_document_checksum_filter(self):
 | 
				
			||||||
 | 
					        Document.objects.create(
 | 
				
			||||||
 | 
					            title="none1",
 | 
				
			||||||
 | 
					            checksum="A",
 | 
				
			||||||
 | 
					            mime_type="application/pdf",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        doc2 = Document.objects.create(
 | 
				
			||||||
 | 
					            title="none2",
 | 
				
			||||||
 | 
					            checksum="B",
 | 
				
			||||||
 | 
					            mime_type="application/pdf",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        Document.objects.create(
 | 
				
			||||||
 | 
					            title="none3",
 | 
				
			||||||
 | 
					            checksum="C",
 | 
				
			||||||
 | 
					            mime_type="application/pdf",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = self.client.get("/api/documents/?checksum__iexact=B")
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, status.HTTP_200_OK)
 | 
				
			||||||
 | 
					        results = response.data["results"]
 | 
				
			||||||
 | 
					        self.assertEqual(len(results), 1)
 | 
				
			||||||
 | 
					        self.assertEqual(results[0]["id"], doc2.id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = self.client.get("/api/documents/?checksum__iexact=X")
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, status.HTTP_200_OK)
 | 
				
			||||||
 | 
					        results = response.data["results"]
 | 
				
			||||||
 | 
					        self.assertEqual(len(results), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_document_original_filename_filter(self):
 | 
				
			||||||
 | 
					        doc1 = Document.objects.create(
 | 
				
			||||||
 | 
					            title="none1",
 | 
				
			||||||
 | 
					            checksum="A",
 | 
				
			||||||
 | 
					            mime_type="application/pdf",
 | 
				
			||||||
 | 
					            original_filename="docA.pdf",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        doc2 = Document.objects.create(
 | 
				
			||||||
 | 
					            title="none2",
 | 
				
			||||||
 | 
					            checksum="B",
 | 
				
			||||||
 | 
					            mime_type="application/pdf",
 | 
				
			||||||
 | 
					            original_filename="docB.pdf",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        doc3 = Document.objects.create(
 | 
				
			||||||
 | 
					            title="none3",
 | 
				
			||||||
 | 
					            checksum="C",
 | 
				
			||||||
 | 
					            mime_type="application/pdf",
 | 
				
			||||||
 | 
					            original_filename="docC.pdf",
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = self.client.get("/api/documents/?original_filename__iexact=DOCa.pdf")
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, status.HTTP_200_OK)
 | 
				
			||||||
 | 
					        results = response.data["results"]
 | 
				
			||||||
 | 
					        self.assertEqual(len(results), 1)
 | 
				
			||||||
 | 
					        self.assertEqual(results[0]["id"], doc1.id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = self.client.get("/api/documents/?original_filename__iexact=docx.pdf")
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, status.HTTP_200_OK)
 | 
				
			||||||
 | 
					        results = response.data["results"]
 | 
				
			||||||
 | 
					        self.assertEqual(len(results), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = self.client.get("/api/documents/?original_filename__istartswith=dOc")
 | 
				
			||||||
 | 
					        self.assertEqual(response.status_code, status.HTTP_200_OK)
 | 
				
			||||||
 | 
					        results = response.data["results"]
 | 
				
			||||||
 | 
					        self.assertEqual(len(results), 3)
 | 
				
			||||||
 | 
					        self.assertCountEqual(
 | 
				
			||||||
 | 
					            [results[0]["id"], results[1]["id"], results[2]["id"]],
 | 
				
			||||||
 | 
					            [doc1.id, doc2.id, doc3.id],
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_documents_title_content_filter(self):
 | 
					    def test_documents_title_content_filter(self):
 | 
				
			||||||
        doc1 = Document.objects.create(
 | 
					        doc1 = Document.objects.create(
 | 
				
			||||||
            title="title A",
 | 
					            title="title A",
 | 
				
			||||||
@@ -1086,17 +1154,19 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
 | 
				
			|||||||
            checksum="4",
 | 
					            checksum="4",
 | 
				
			||||||
            created=timezone.make_aware(datetime.datetime(2020, 7, 13)),
 | 
					            created=timezone.make_aware(datetime.datetime(2020, 7, 13)),
 | 
				
			||||||
            content="test",
 | 
					            content="test",
 | 
				
			||||||
 | 
					            original_filename="doc4.pdf",
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        d4.tags.add(t2)
 | 
					        d4.tags.add(t2)
 | 
				
			||||||
        d5 = Document.objects.create(
 | 
					        d5 = Document.objects.create(
 | 
				
			||||||
            checksum="5",
 | 
					            checksum="5",
 | 
				
			||||||
            added=timezone.make_aware(datetime.datetime(2020, 7, 13)),
 | 
					            added=timezone.make_aware(datetime.datetime(2020, 7, 13)),
 | 
				
			||||||
            content="test",
 | 
					            content="test",
 | 
				
			||||||
 | 
					            original_filename="doc5.pdf",
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        Document.objects.create(checksum="6", content="test2")
 | 
					        Document.objects.create(checksum="6", content="test2")
 | 
				
			||||||
        d7 = Document.objects.create(checksum="7", storage_path=sp, content="test")
 | 
					        d7 = Document.objects.create(checksum="7", storage_path=sp, content="test")
 | 
				
			||||||
        d8 = Document.objects.create(
 | 
					        d8 = Document.objects.create(
 | 
				
			||||||
            checksum="8",
 | 
					            checksum="foo",
 | 
				
			||||||
            correspondent=c2,
 | 
					            correspondent=c2,
 | 
				
			||||||
            document_type=dt2,
 | 
					            document_type=dt2,
 | 
				
			||||||
            storage_path=sp2,
 | 
					            storage_path=sp2,
 | 
				
			||||||
@@ -1239,6 +1309,16 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
 | 
				
			|||||||
            ),
 | 
					            ),
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertEqual(
 | 
				
			||||||
 | 
					            search_query("&checksum__icontains=foo"),
 | 
				
			||||||
 | 
					            [d8.id],
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertCountEqual(
 | 
				
			||||||
 | 
					            search_query("&original_filename__istartswith=doc"),
 | 
				
			||||||
 | 
					            [d4.id, d5.id],
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def test_search_filtering_respect_owner(self):
 | 
					    def test_search_filtering_respect_owner(self):
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        GIVEN:
 | 
					        GIVEN:
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										219
									
								
								src/documents/tests/test_delayedquery.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										219
									
								
								src/documents/tests/test_delayedquery.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,219 @@
 | 
				
			|||||||
 | 
					from dateutil.parser import isoparse
 | 
				
			||||||
 | 
					from django.test import TestCase
 | 
				
			||||||
 | 
					from whoosh import query
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from documents.index import DelayedQuery
 | 
				
			||||||
 | 
					from documents.index import get_permissions_criterias
 | 
				
			||||||
 | 
					from documents.models import User
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TestDelayedQuery(TestCase):
 | 
				
			||||||
 | 
					    def setUp(self):
 | 
				
			||||||
 | 
					        super().setUp()
 | 
				
			||||||
 | 
					        # all tests run without permission criteria, so has_no_owner query will always
 | 
				
			||||||
 | 
					        # be appended.
 | 
				
			||||||
 | 
					        self.has_no_owner = query.Or([query.Term("has_owner", False)])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get_testset__id__in(self, param, field):
 | 
				
			||||||
 | 
					        return (
 | 
				
			||||||
 | 
					            {f"{param}__id__in": "42,43"},
 | 
				
			||||||
 | 
					            query.And(
 | 
				
			||||||
 | 
					                [
 | 
				
			||||||
 | 
					                    query.Or(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.Term(f"{field}_id", "42"),
 | 
				
			||||||
 | 
					                            query.Term(f"{field}_id", "43"),
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                    self.has_no_owner,
 | 
				
			||||||
 | 
					                ],
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _get_testset__id__none(self, param, field):
 | 
				
			||||||
 | 
					        return (
 | 
				
			||||||
 | 
					            {f"{param}__id__none": "42,43"},
 | 
				
			||||||
 | 
					            query.And(
 | 
				
			||||||
 | 
					                [
 | 
				
			||||||
 | 
					                    query.Not(query.Term(f"{field}_id", "42")),
 | 
				
			||||||
 | 
					                    query.Not(query.Term(f"{field}_id", "43")),
 | 
				
			||||||
 | 
					                    self.has_no_owner,
 | 
				
			||||||
 | 
					                ],
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_get_permission_criteria(self):
 | 
				
			||||||
 | 
					        # tests contains touples of user instances and the expected filter
 | 
				
			||||||
 | 
					        tests = (
 | 
				
			||||||
 | 
					            (None, [query.Term("has_owner", False)]),
 | 
				
			||||||
 | 
					            (User(42, username="foo", is_superuser=True), []),
 | 
				
			||||||
 | 
					            (
 | 
				
			||||||
 | 
					                User(42, username="foo", is_superuser=False),
 | 
				
			||||||
 | 
					                [
 | 
				
			||||||
 | 
					                    query.Term("has_owner", False),
 | 
				
			||||||
 | 
					                    query.Term("owner_id", 42),
 | 
				
			||||||
 | 
					                    query.Term("viewer_id", "42"),
 | 
				
			||||||
 | 
					                ],
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        for user, expected in tests:
 | 
				
			||||||
 | 
					            self.assertEqual(get_permissions_criterias(user), expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_no_query_filters(self):
 | 
				
			||||||
 | 
					        dq = DelayedQuery(None, {}, None, None)
 | 
				
			||||||
 | 
					        self.assertEqual(dq._get_query_filter(), self.has_no_owner)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_date_query_filters(self):
 | 
				
			||||||
 | 
					        def _get_testset(param: str):
 | 
				
			||||||
 | 
					            date_str = "1970-01-01T02:44"
 | 
				
			||||||
 | 
					            date_obj = isoparse(date_str)
 | 
				
			||||||
 | 
					            return (
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__date__lt": date_str},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.DateRange(param, start=None, end=date_obj),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__date__gt": date_str},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.DateRange(param, start=date_obj, end=None),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        query_params = ["created", "added"]
 | 
				
			||||||
 | 
					        for param in query_params:
 | 
				
			||||||
 | 
					            for params, expected in _get_testset(param):
 | 
				
			||||||
 | 
					                dq = DelayedQuery(None, params, None, None)
 | 
				
			||||||
 | 
					                got = dq._get_query_filter()
 | 
				
			||||||
 | 
					                self.assertCountEqual(got, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_is_tagged_query_filter(self):
 | 
				
			||||||
 | 
					        tests = (
 | 
				
			||||||
 | 
					            ("True", True),
 | 
				
			||||||
 | 
					            ("true", True),
 | 
				
			||||||
 | 
					            ("1", True),
 | 
				
			||||||
 | 
					            ("False", False),
 | 
				
			||||||
 | 
					            ("false", False),
 | 
				
			||||||
 | 
					            ("0", False),
 | 
				
			||||||
 | 
					            ("foo", False),
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        for param, expected in tests:
 | 
				
			||||||
 | 
					            dq = DelayedQuery(None, {"is_tagged": param}, None, None)
 | 
				
			||||||
 | 
					            self.assertEqual(
 | 
				
			||||||
 | 
					                dq._get_query_filter(),
 | 
				
			||||||
 | 
					                query.And([query.Term("has_tag", expected), self.has_no_owner]),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_tags_query_filters(self):
 | 
				
			||||||
 | 
					        # tests contains touples of query_parameter dics and the expected whoosh query
 | 
				
			||||||
 | 
					        param = "tags"
 | 
				
			||||||
 | 
					        field, _ = DelayedQuery.param_map[param]
 | 
				
			||||||
 | 
					        tests = (
 | 
				
			||||||
 | 
					            (
 | 
				
			||||||
 | 
					                {f"{param}__id__all": "42,43"},
 | 
				
			||||||
 | 
					                query.And(
 | 
				
			||||||
 | 
					                    [
 | 
				
			||||||
 | 
					                        query.Term(f"{field}_id", "42"),
 | 
				
			||||||
 | 
					                        query.Term(f"{field}_id", "43"),
 | 
				
			||||||
 | 
					                        self.has_no_owner,
 | 
				
			||||||
 | 
					                    ],
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					            # tags does not allow __id
 | 
				
			||||||
 | 
					            (
 | 
				
			||||||
 | 
					                {f"{param}__id": "42"},
 | 
				
			||||||
 | 
					                self.has_no_owner,
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					            # tags does not allow __isnull
 | 
				
			||||||
 | 
					            (
 | 
				
			||||||
 | 
					                {f"{param}__isnull": "true"},
 | 
				
			||||||
 | 
					                self.has_no_owner,
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					            self._get_testset__id__in(param, field),
 | 
				
			||||||
 | 
					            self._get_testset__id__none(param, field),
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for params, expected in tests:
 | 
				
			||||||
 | 
					            dq = DelayedQuery(None, params, None, None)
 | 
				
			||||||
 | 
					            got = dq._get_query_filter()
 | 
				
			||||||
 | 
					            self.assertCountEqual(got, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_generic_query_filters(self):
 | 
				
			||||||
 | 
					        def _get_testset(param: str):
 | 
				
			||||||
 | 
					            field, _ = DelayedQuery.param_map[param]
 | 
				
			||||||
 | 
					            return (
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__id": "42"},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.Term(f"{field}_id", "42"),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                self._get_testset__id__in(param, field),
 | 
				
			||||||
 | 
					                self._get_testset__id__none(param, field),
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__isnull": "true"},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.Term(f"has_{field}", False),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__isnull": "false"},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.Term(f"has_{field}", True),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        query_params = ["correspondent", "document_type", "storage_path", "owner"]
 | 
				
			||||||
 | 
					        for param in query_params:
 | 
				
			||||||
 | 
					            for params, expected in _get_testset(param):
 | 
				
			||||||
 | 
					                dq = DelayedQuery(None, params, None, None)
 | 
				
			||||||
 | 
					                got = dq._get_query_filter()
 | 
				
			||||||
 | 
					                self.assertCountEqual(got, expected)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_char_query_filter(self):
 | 
				
			||||||
 | 
					        def _get_testset(param: str):
 | 
				
			||||||
 | 
					            return (
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__icontains": "foo"},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.Term(f"{param}", "foo"),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					                (
 | 
				
			||||||
 | 
					                    {f"{param}__istartswith": "foo"},
 | 
				
			||||||
 | 
					                    query.And(
 | 
				
			||||||
 | 
					                        [
 | 
				
			||||||
 | 
					                            query.Prefix(f"{param}", "foo"),
 | 
				
			||||||
 | 
					                            self.has_no_owner,
 | 
				
			||||||
 | 
					                        ],
 | 
				
			||||||
 | 
					                    ),
 | 
				
			||||||
 | 
					                ),
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        query_params = ["checksum", "original_filename"]
 | 
				
			||||||
 | 
					        for param in query_params:
 | 
				
			||||||
 | 
					            for params, expected in _get_testset(param):
 | 
				
			||||||
 | 
					                dq = DelayedQuery(None, params, None, None)
 | 
				
			||||||
 | 
					                got = dq._get_query_filter()
 | 
				
			||||||
 | 
					                self.assertCountEqual(got, expected)
 | 
				
			||||||
		Reference in New Issue
	
	Block a user