Merge pull request #3359 from paperless-ngx/feature-fix-autocomplete-respect-perms

Fix: respect permissions for autocomplete suggestions
This commit is contained in:
shamoon
2023-05-12 13:35:45 -07:00
committed by GitHub
4 changed files with 124 additions and 34 deletions

View File

@@ -1,6 +1,7 @@
import logging
import math
import os
from collections import Counter
from contextlib import contextmanager
from dateutil.parser import isoparse
@@ -17,17 +18,21 @@ from whoosh.fields import NUMERIC
from whoosh.fields import TEXT
from whoosh.fields import Schema
from whoosh.highlight import HtmlFormatter
from whoosh.index import FileIndex
from whoosh.index import create_in
from whoosh.index import exists_in
from whoosh.index import open_dir
from whoosh.qparser import MultifieldParser
from whoosh.qparser import QueryParser
from whoosh.qparser.dateparse import DateParserPlugin
from whoosh.scoring import TF_IDF
from whoosh.searching import ResultsPage
from whoosh.searching import Searcher
from whoosh.writing import AsyncWriter
from documents.models import Document
from documents.models import Note
from documents.models import User
logger = logging.getLogger("paperless.index")
@@ -252,15 +257,9 @@ class DelayedQuery:
for owner_id in v.split(","):
criterias.append(query.Not(query.Term("owner_id", owner_id)))
user_criterias = [query.Term("has_owner", False)]
if "user" in self.query_params:
if self.query_params["is_superuser"]: # superusers see all docs
user_criterias = []
else:
user_criterias.append(query.Term("owner_id", self.query_params["user"]))
user_criterias.append(
query.Term("viewer_id", str(self.query_params["user"])),
)
user_criterias = get_permissions_criterias(
user=self.user,
)
if len(criterias) > 0:
if len(user_criterias) > 0:
criterias.append(query.Or(user_criterias))
@@ -300,12 +299,13 @@ class DelayedQuery:
else:
return sort_fields_map[field], reverse
def __init__(self, searcher: Searcher, query_params, page_size):
def __init__(self, searcher: Searcher, query_params, page_size, user):
self.searcher = searcher
self.query_params = query_params
self.page_size = page_size
self.saved_results = dict()
self.first_score = None
self.user = user
def __len__(self):
page = self[0:1]
@@ -386,13 +386,42 @@ class DelayedMoreLikeThisQuery(DelayedQuery):
return q, mask
def autocomplete(ix, term, limit=10):
with ix.reader() as reader:
terms = []
for score, t in reader.most_distinctive_terms(
"content",
number=limit,
prefix=term.lower(),
):
terms.append(t)
return terms
def autocomplete(ix: FileIndex, term: str, limit: int = 10, user: User = None):
"""
Mimics whoosh.reading.IndexReader.most_distinctive_terms with permissions
and without scoring
"""
terms = []
with ix.searcher(weighting=TF_IDF()) as s:
qp = QueryParser("content", schema=ix.schema)
q = qp.parse(f"{term.lower()}*")
user_criterias = get_permissions_criterias(user)
results = s.search(
q,
terms=True,
filter=query.Or(user_criterias) if user_criterias is not None else None,
)
termCounts = Counter()
if results.has_matched_terms():
for hit in results:
for _, term in hit.matched_terms():
termCounts[term] += 1
terms = [t for t, _ in termCounts.most_common(limit)]
return terms
def get_permissions_criterias(user: User = None):
user_criterias = [query.Term("has_owner", False)]
if user is not None:
if user.is_superuser: # superusers see all docs
user_criterias = []
else:
user_criterias.append(query.Term("owner_id", user.id))
user_criterias.append(
query.Term("viewer_id", str(user.id)),
)
return user_criterias