mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-02-16 00:19:32 -06:00
Merge branch 'dev' into feature-document-versions-1218
This commit is contained in:
@@ -3,8 +3,10 @@ import hashlib
|
||||
import logging
|
||||
import shutil
|
||||
import uuid
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from tempfile import mkstemp
|
||||
|
||||
import tqdm
|
||||
from celery import Task
|
||||
@@ -22,9 +24,12 @@ from whoosh.writing import AsyncWriter
|
||||
from documents import index
|
||||
from documents import sanity_checker
|
||||
from documents.barcodes import BarcodePlugin
|
||||
from documents.bulk_download import ArchiveOnlyStrategy
|
||||
from documents.bulk_download import OriginalsOnlyStrategy
|
||||
from documents.caching import clear_document_caches
|
||||
from documents.classifier import DocumentClassifier
|
||||
from documents.classifier import load_classifier
|
||||
from documents.consumer import AsnCheckPlugin
|
||||
from documents.consumer import ConsumerPlugin
|
||||
from documents.consumer import ConsumerPreflightPlugin
|
||||
from documents.consumer import WorkflowTriggerPlugin
|
||||
@@ -39,9 +44,10 @@ from documents.models import CustomFieldInstance
|
||||
from documents.models import Document
|
||||
from documents.models import DocumentType
|
||||
from documents.models import PaperlessTask
|
||||
from documents.models import ShareLink
|
||||
from documents.models import ShareLinkBundle
|
||||
from documents.models import StoragePath
|
||||
from documents.models import Tag
|
||||
from documents.models import Workflow
|
||||
from documents.models import WorkflowRun
|
||||
from documents.models import WorkflowTrigger
|
||||
from documents.parsers import DocumentParser
|
||||
@@ -54,6 +60,11 @@ from documents.sanity_checker import SanityCheckFailedException
|
||||
from documents.signals import document_updated
|
||||
from documents.signals.handlers import cleanup_document_deletion
|
||||
from documents.signals.handlers import run_workflows
|
||||
from documents.workflows.utils import get_workflows_for_trigger
|
||||
from paperless.config import AIConfig
|
||||
from paperless_ai.indexing import llm_index_add_or_update_document
|
||||
from paperless_ai.indexing import llm_index_remove_document
|
||||
from paperless_ai.indexing import update_llm_index
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
from auditlog.models import LogEntry
|
||||
@@ -61,13 +72,13 @@ logger = logging.getLogger("paperless.tasks")
|
||||
|
||||
|
||||
@shared_task
|
||||
def index_optimize():
|
||||
def index_optimize() -> None:
|
||||
ix = index.open_index()
|
||||
writer = AsyncWriter(ix)
|
||||
writer.commit(optimize=True)
|
||||
|
||||
|
||||
def index_reindex(*, progress_bar_disable=False):
|
||||
def index_reindex(*, progress_bar_disable=False) -> None:
|
||||
documents = Document.objects.all()
|
||||
|
||||
ix = index.open_index(recreate=True)
|
||||
@@ -78,7 +89,7 @@ def index_reindex(*, progress_bar_disable=False):
|
||||
|
||||
|
||||
@shared_task
|
||||
def train_classifier(*, scheduled=True):
|
||||
def train_classifier(*, scheduled=True) -> None:
|
||||
task = PaperlessTask.objects.create(
|
||||
type=PaperlessTask.TaskType.SCHEDULED_TASK
|
||||
if scheduled
|
||||
@@ -150,8 +161,10 @@ def consume_file(
|
||||
if input_doc.head_version_id is not None
|
||||
else [
|
||||
ConsumerPreflightPlugin,
|
||||
AsnCheckPlugin,
|
||||
CollatePlugin,
|
||||
BarcodePlugin,
|
||||
AsnCheckPlugin, # Re-run ASN check after barcode reading
|
||||
WorkflowTriggerPlugin,
|
||||
ConsumerPlugin,
|
||||
]
|
||||
@@ -228,7 +241,7 @@ def sanity_check(*, scheduled=True, raise_on_error=True):
|
||||
|
||||
|
||||
@shared_task
|
||||
def bulk_update_documents(document_ids):
|
||||
def bulk_update_documents(document_ids) -> None:
|
||||
documents = Document.objects.filter(id__in=document_ids)
|
||||
|
||||
ix = index.open_index()
|
||||
@@ -246,9 +259,16 @@ def bulk_update_documents(document_ids):
|
||||
for doc in documents:
|
||||
index.update_document(writer, doc)
|
||||
|
||||
ai_config = AIConfig()
|
||||
if ai_config.llm_index_enabled:
|
||||
update_llm_index(
|
||||
progress_bar_disable=True,
|
||||
rebuild=False,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
def update_document_content_maybe_archive_file(document_id):
|
||||
def update_document_content_maybe_archive_file(document_id) -> None:
|
||||
"""
|
||||
Re-creates OCR content and thumbnail for a document, and archive file if
|
||||
it exists.
|
||||
@@ -345,6 +365,10 @@ def update_document_content_maybe_archive_file(document_id):
|
||||
with index.open_index_writer() as writer:
|
||||
index.update_document(writer, document)
|
||||
|
||||
ai_config = AIConfig()
|
||||
if ai_config.llm_index_enabled:
|
||||
llm_index_add_or_update_document(document)
|
||||
|
||||
clear_document_caches(document.pk)
|
||||
|
||||
except Exception:
|
||||
@@ -356,7 +380,7 @@ def update_document_content_maybe_archive_file(document_id):
|
||||
|
||||
|
||||
@shared_task
|
||||
def empty_trash(doc_ids=None):
|
||||
def empty_trash(doc_ids=None) -> None:
|
||||
if doc_ids is None:
|
||||
logger.info("Emptying trash of all expired documents")
|
||||
documents = (
|
||||
@@ -393,7 +417,7 @@ def empty_trash(doc_ids=None):
|
||||
|
||||
|
||||
@shared_task
|
||||
def check_scheduled_workflows():
|
||||
def check_scheduled_workflows() -> None:
|
||||
"""
|
||||
Check and run all enabled scheduled workflows.
|
||||
|
||||
@@ -404,13 +428,8 @@ def check_scheduled_workflows():
|
||||
|
||||
Once a document satisfies this condition, and recurring/non-recurring constraints are met, the workflow is run.
|
||||
"""
|
||||
scheduled_workflows: list[Workflow] = (
|
||||
Workflow.objects.filter(
|
||||
triggers__type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
|
||||
enabled=True,
|
||||
)
|
||||
.distinct()
|
||||
.prefetch_related("triggers")
|
||||
scheduled_workflows = get_workflows_for_trigger(
|
||||
WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
|
||||
)
|
||||
if scheduled_workflows.count() > 0:
|
||||
logger.debug(f"Checking {len(scheduled_workflows)} scheduled workflows")
|
||||
@@ -502,7 +521,7 @@ def check_scheduled_workflows():
|
||||
trigger.schedule_is_recurring
|
||||
and workflow_runs.exists()
|
||||
and (
|
||||
workflow_runs.last().run_at
|
||||
workflow_runs.first().run_at
|
||||
> now
|
||||
- datetime.timedelta(
|
||||
days=trigger.schedule_recurring_interval_days,
|
||||
@@ -567,3 +586,169 @@ def update_document_parent_tags(tag: Tag, new_parent: Tag) -> None:
|
||||
|
||||
if affected:
|
||||
bulk_update_documents.delay(document_ids=list(affected))
|
||||
|
||||
|
||||
@shared_task
|
||||
def llmindex_index(
|
||||
*,
|
||||
progress_bar_disable=True,
|
||||
rebuild=False,
|
||||
scheduled=True,
|
||||
auto=False,
|
||||
) -> None:
|
||||
ai_config = AIConfig()
|
||||
if ai_config.llm_index_enabled:
|
||||
task = PaperlessTask.objects.create(
|
||||
type=PaperlessTask.TaskType.SCHEDULED_TASK
|
||||
if scheduled
|
||||
else PaperlessTask.TaskType.AUTO
|
||||
if auto
|
||||
else PaperlessTask.TaskType.MANUAL_TASK,
|
||||
task_id=uuid.uuid4(),
|
||||
task_name=PaperlessTask.TaskName.LLMINDEX_UPDATE,
|
||||
status=states.STARTED,
|
||||
date_created=timezone.now(),
|
||||
date_started=timezone.now(),
|
||||
)
|
||||
from paperless_ai.indexing import update_llm_index
|
||||
|
||||
try:
|
||||
result = update_llm_index(
|
||||
progress_bar_disable=progress_bar_disable,
|
||||
rebuild=rebuild,
|
||||
)
|
||||
task.status = states.SUCCESS
|
||||
task.result = result
|
||||
except Exception as e:
|
||||
logger.error("LLM index error: " + str(e))
|
||||
task.status = states.FAILURE
|
||||
task.result = str(e)
|
||||
|
||||
task.date_done = timezone.now()
|
||||
task.save(update_fields=["status", "result", "date_done"])
|
||||
else:
|
||||
logger.info("LLM index is disabled, skipping update.")
|
||||
|
||||
|
||||
@shared_task
|
||||
def update_document_in_llm_index(document) -> None:
|
||||
llm_index_add_or_update_document(document)
|
||||
|
||||
|
||||
@shared_task
|
||||
def remove_document_from_llm_index(document) -> None:
|
||||
llm_index_remove_document(document)
|
||||
|
||||
|
||||
@shared_task
|
||||
def build_share_link_bundle(bundle_id: int) -> None:
|
||||
try:
|
||||
bundle = (
|
||||
ShareLinkBundle.objects.filter(pk=bundle_id)
|
||||
.prefetch_related("documents")
|
||||
.get()
|
||||
)
|
||||
except ShareLinkBundle.DoesNotExist:
|
||||
logger.warning("Share link bundle %s no longer exists.", bundle_id)
|
||||
return
|
||||
|
||||
bundle.remove_file()
|
||||
bundle.status = ShareLinkBundle.Status.PROCESSING
|
||||
bundle.last_error = None
|
||||
bundle.size_bytes = None
|
||||
bundle.built_at = None
|
||||
bundle.file_path = ""
|
||||
bundle.save(
|
||||
update_fields=[
|
||||
"status",
|
||||
"last_error",
|
||||
"size_bytes",
|
||||
"built_at",
|
||||
"file_path",
|
||||
],
|
||||
)
|
||||
|
||||
documents = list(bundle.documents.all().order_by("pk"))
|
||||
|
||||
_, temp_zip_path_str = mkstemp(suffix=".zip", dir=settings.SCRATCH_DIR)
|
||||
temp_zip_path = Path(temp_zip_path_str)
|
||||
|
||||
try:
|
||||
strategy_class = (
|
||||
ArchiveOnlyStrategy
|
||||
if bundle.file_version == ShareLink.FileVersion.ARCHIVE
|
||||
else OriginalsOnlyStrategy
|
||||
)
|
||||
with zipfile.ZipFile(temp_zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
||||
strategy = strategy_class(zipf)
|
||||
for document in documents:
|
||||
strategy.add_document(document)
|
||||
|
||||
output_dir = settings.SHARE_LINK_BUNDLE_DIR
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
final_path = (output_dir / f"{bundle.slug}.zip").resolve()
|
||||
if final_path.exists():
|
||||
final_path.unlink()
|
||||
shutil.move(temp_zip_path, final_path)
|
||||
|
||||
bundle.file_path = f"{bundle.slug}.zip"
|
||||
bundle.size_bytes = final_path.stat().st_size
|
||||
bundle.status = ShareLinkBundle.Status.READY
|
||||
bundle.built_at = timezone.now()
|
||||
bundle.last_error = None
|
||||
bundle.save(
|
||||
update_fields=[
|
||||
"file_path",
|
||||
"size_bytes",
|
||||
"status",
|
||||
"built_at",
|
||||
"last_error",
|
||||
],
|
||||
)
|
||||
logger.info("Built share link bundle %s", bundle.pk)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"Failed to build share link bundle %s: %s",
|
||||
bundle_id,
|
||||
exc,
|
||||
)
|
||||
bundle.status = ShareLinkBundle.Status.FAILED
|
||||
bundle.last_error = {
|
||||
"bundle_id": bundle_id,
|
||||
"exception_type": exc.__class__.__name__,
|
||||
"message": str(exc),
|
||||
"timestamp": timezone.now().isoformat(),
|
||||
}
|
||||
bundle.save(update_fields=["status", "last_error"])
|
||||
try:
|
||||
temp_zip_path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
temp_zip_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
@shared_task
|
||||
def cleanup_expired_share_link_bundles() -> None:
|
||||
now = timezone.now()
|
||||
expired_qs = ShareLinkBundle.objects.filter(
|
||||
expiration__isnull=False,
|
||||
expiration__lt=now,
|
||||
)
|
||||
count = 0
|
||||
for bundle in expired_qs.iterator():
|
||||
count += 1
|
||||
try:
|
||||
bundle.delete()
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Failed to delete expired share link bundle %s: %s",
|
||||
bundle.pk,
|
||||
exc,
|
||||
)
|
||||
if count:
|
||||
logger.info("Deleted %s expired share link bundle(s)", count)
|
||||
|
||||
Reference in New Issue
Block a user