Handle doc updates, refactor

This commit is contained in:
shamoon
2025-04-27 01:24:00 -07:00
parent b3b9a8fb5b
commit 8d48d398eb
9 changed files with 389 additions and 174 deletions

View File

@@ -11,6 +11,7 @@ class DocumentsConfig(AppConfig):
from documents.signals import document_consumption_finished
from documents.signals import document_updated
from documents.signals.handlers import add_inbox_tags
from documents.signals.handlers import add_or_update_document_in_llm_index
from documents.signals.handlers import add_to_index
from documents.signals.handlers import run_workflows_added
from documents.signals.handlers import run_workflows_updated
@@ -26,6 +27,7 @@ class DocumentsConfig(AppConfig):
document_consumption_finished.connect(set_storage_path)
document_consumption_finished.connect(add_to_index)
document_consumption_finished.connect(run_workflows_added)
document_consumption_finished.connect(add_or_update_document_in_llm_index)
document_updated.connect(run_workflows_updated)
import documents.schema # noqa: F401

View File

@@ -48,6 +48,7 @@ from documents.models import WorkflowTrigger
from documents.permissions import get_objects_for_user_owner_aware
from documents.permissions import set_permissions_for_object
from documents.templating.workflows import parse_w_workflow_placeholders
from paperless.config import AIConfig
if TYPE_CHECKING:
from pathlib import Path
@@ -1449,3 +1450,26 @@ def task_failure_handler(
task_instance.save()
except Exception: # pragma: no cover
logger.exception("Updating PaperlessTask failed")
def add_or_update_document_in_llm_index(sender, document, **kwargs):
"""
Add or update a document in the LLM index when it is created or updated.
"""
ai_config = AIConfig()
if ai_config.llm_index_enabled():
from documents.tasks import update_document_in_llm_index
update_document_in_llm_index.delay(document)
@receiver(models.signals.post_delete, sender=Document)
def delete_document_from_llm_index(sender, instance: Document, **kwargs):
"""
Delete a document from the LLM index when it is deleted.
"""
ai_config = AIConfig()
if ai_config.llm_index_enabled():
from documents.tasks import remove_document_from_llm_index
remove_document_from_llm_index.delay(instance)

View File

@@ -6,8 +6,6 @@ import uuid
from pathlib import Path
from tempfile import TemporaryDirectory
import faiss
import llama_index.core.settings as llama_settings
import tqdm
from celery import Task
from celery import shared_task
@@ -19,13 +17,6 @@ from django.db import transaction
from django.db.models.signals import post_save
from django.utils import timezone
from filelock import FileLock
from llama_index.core import Document as LlamaDocument
from llama_index.core import StorageContext
from llama_index.core import VectorStoreIndex
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.storage.index_store import SimpleIndexStore
from llama_index.vector_stores.faiss import FaissVectorStore
from whoosh.writing import AsyncWriter
from documents import index
@@ -63,9 +54,10 @@ from documents.sanity_checker import SanityCheckFailedException
from documents.signals import document_updated
from documents.signals.handlers import cleanup_document_deletion
from documents.signals.handlers import run_workflows
from paperless.ai.embedding import build_llm_index_text
from paperless.ai.embedding import get_embedding_dim
from paperless.ai.embedding import get_embedding_model
from paperless.ai.indexing import llm_index_add_or_update_document
from paperless.ai.indexing import llm_index_remove_document
from paperless.ai.indexing import rebuild_llm_index
from paperless.config import AIConfig
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import LogEntry
@@ -254,6 +246,11 @@ def bulk_update_documents(document_ids):
for doc in documents:
index.update_document(writer, doc)
ai_config = AIConfig()
if ai_config.llm_index_enabled():
for doc in documents:
llm_index_add_or_update_document()
@shared_task
def update_document_content_maybe_archive_file(document_id):
@@ -353,6 +350,10 @@ def update_document_content_maybe_archive_file(document_id):
with index.open_index_writer() as writer:
index.update_document(writer, document)
ai_config = AIConfig()
if ai_config.llm_index_enabled:
llm_index_add_or_update_document(document)
clear_document_caches(document.pk)
except Exception:
@@ -532,60 +533,25 @@ def check_scheduled_workflows():
def llm_index_rebuild(*, progress_bar_disable=False, rebuild=False):
if rebuild:
shutil.rmtree(settings.LLM_INDEX_DIR, ignore_errors=True)
settings.LLM_INDEX_DIR.mkdir(parents=True, exist_ok=True)
embed_model = get_embedding_model()
llama_settings.Settings.embed_model = embed_model
if rebuild or not settings.LLM_INDEX_DIR.exists():
embedding_dim = get_embedding_dim()
faiss_index = faiss.IndexFlatL2(embedding_dim)
vector_store = FaissVectorStore(faiss_index=faiss_index)
else:
vector_store = FaissVectorStore.from_persist_dir(settings.LLM_INDEX_DIR)
docstore = SimpleDocumentStore()
index_store = SimpleIndexStore()
storage_context = StorageContext.from_defaults(
docstore=docstore,
index_store=index_store,
persist_dir=settings.LLM_INDEX_DIR,
vector_store=vector_store,
rebuild_llm_index(
progress_bar_disable=progress_bar_disable,
rebuild=rebuild,
)
parser = SimpleNodeParser()
nodes = []
for document in tqdm.tqdm(Document.objects.all(), disable=progress_bar_disable):
if not document.content:
continue
@shared_task
def update_document_in_llm_index(document):
llm_index_add_or_update_document(document)
text = build_llm_index_text(document)
metadata = {
"document_id": document.id,
"title": document.title,
"tags": [t.name for t in document.tags.all()],
"correspondent": document.correspondent.name
if document.correspondent
else None,
"document_type": document.document_type.name
if document.document_type
else None,
"created": document.created.isoformat() if document.created else None,
"added": document.added.isoformat() if document.added else None,
}
doc = LlamaDocument(text=text, metadata=metadata)
doc_nodes = parser.get_nodes_from_documents([doc])
nodes.extend(doc_nodes)
@shared_task
def remove_document_from_llm_index(document):
llm_index_remove_document(document)
index = VectorStoreIndex(
nodes=nodes,
storage_context=storage_context,
embed_model=embed_model,
)
index.storage_context.persist(persist_dir=settings.LLM_INDEX_DIR)
# TODO: schedule to run periodically
@shared_task
def rebuild_llm_index_task():
from paperless.ai.indexing import rebuild_llm_index
rebuild_llm_index(rebuild=True)