325 lines
11 KiB
Python

import hashlib
import logging
import shutil
import uuid
from datetime import timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional
import tqdm
from celery import Task
from celery import shared_task
from django.conf import settings
from django.db import models
from django.db import transaction
from django.db.models.signals import post_save
from django.utils import timezone
from filelock import FileLock
from whoosh.writing import AsyncWriter
from documents import index
from documents import sanity_checker
from documents.barcodes import BarcodePlugin
from documents.caching import clear_document_caches
from documents.classifier import DocumentClassifier
from documents.classifier import load_classifier
from documents.consumer import ConsumerPlugin
from documents.consumer import WorkflowTriggerPlugin
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
from documents.double_sided import CollatePlugin
from documents.file_handling import create_source_path_directory
from documents.file_handling import generate_unique_filename
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.parsers import DocumentParser
from documents.parsers import get_parser_class_for_mime_type
from documents.plugins.base import ConsumeTaskPlugin
from documents.plugins.base import ProgressManager
from documents.plugins.base import StopConsumeTaskError
from documents.plugins.helpers import ProgressStatusOptions
from documents.sanity_checker import SanityCheckFailedException
from documents.signals import document_updated
from documents.signals.handlers import cleanup_document_deletion
if settings.AUDIT_LOG_ENABLED:
import json
from auditlog.models import LogEntry
logger = logging.getLogger("paperless.tasks")
@shared_task
def index_optimize():
ix = index.open_index()
writer = AsyncWriter(ix)
writer.commit(optimize=True)
def index_reindex(progress_bar_disable=False):
documents = Document.objects.all()
ix = index.open_index(recreate=True)
with AsyncWriter(ix) as writer:
for document in tqdm.tqdm(documents, disable=progress_bar_disable):
index.update_document(writer, document)
@shared_task
def train_classifier():
if (
not Tag.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists()
and not DocumentType.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists()
and not Correspondent.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists()
and not StoragePath.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists()
):
logger.info("No automatic matching items, not training")
# Special case, items were once auto and trained, so remove the model
# and prevent its use again
if settings.MODEL_FILE.exists():
logger.info(f"Removing {settings.MODEL_FILE} so it won't be used")
settings.MODEL_FILE.unlink()
return
classifier = load_classifier()
if not classifier:
classifier = DocumentClassifier()
try:
if classifier.train():
logger.info(
f"Saving updated classifier model to {settings.MODEL_FILE}...",
)
classifier.save()
else:
logger.debug("Training data unchanged.")
except Exception as e:
logger.warning("Classifier error: " + str(e))
@shared_task(bind=True)
def consume_file(
self: Task,
input_doc: ConsumableDocument,
overrides: Optional[DocumentMetadataOverrides] = None,
):
# Default no overrides
if overrides is None:
overrides = DocumentMetadataOverrides()
plugins: list[type[ConsumeTaskPlugin]] = [
CollatePlugin,
BarcodePlugin,
WorkflowTriggerPlugin,
ConsumerPlugin,
]
with (
ProgressManager(
overrides.filename or input_doc.original_file.name,
self.request.id,
) as status_mgr,
TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir,
):
tmp_dir = Path(tmp_dir)
for plugin_class in plugins:
plugin_name = plugin_class.NAME
plugin = plugin_class(
input_doc,
overrides,
status_mgr,
tmp_dir,
self.request.id,
)
if not plugin.able_to_run:
logger.debug(f"Skipping plugin {plugin_name}")
continue
try:
logger.debug(f"Executing plugin {plugin_name}")
plugin.setup()
msg = plugin.run()
if msg is not None:
logger.info(f"{plugin_name} completed with: {msg}")
else:
logger.info(f"{plugin_name} completed with no message")
overrides = plugin.metadata
except StopConsumeTaskError as e:
logger.info(f"{plugin_name} requested task exit: {e.message}")
return e.message
except Exception as e:
logger.exception(f"{plugin_name} failed: {e}")
status_mgr.send_progress(ProgressStatusOptions.FAILED, f"{e}", 100, 100)
raise
finally:
plugin.cleanup()
return msg
@shared_task
def sanity_check():
messages = sanity_checker.check_sanity()
messages.log_messages()
if messages.has_error:
raise SanityCheckFailedException("Sanity check failed with errors. See log.")
elif messages.has_warning:
return "Sanity check exited with warnings. See log."
elif len(messages) > 0:
return "Sanity check exited with infos. See log."
else:
return "No issues detected."
@shared_task
def bulk_update_documents(document_ids):
documents = Document.objects.filter(id__in=document_ids)
ix = index.open_index()
for doc in documents:
clear_document_caches(doc.pk)
document_updated.send(
sender=None,
document=doc,
logging_group=uuid.uuid4(),
)
post_save.send(Document, instance=doc, created=False)
with AsyncWriter(ix) as writer:
for doc in documents:
index.update_document(writer, doc)
@shared_task
def update_document_archive_file(document_id):
"""
Re-creates the archive file of a document, including new OCR content and thumbnail
"""
document = Document.objects.get(id=document_id)
mime_type = document.mime_type
parser_class: type[DocumentParser] = get_parser_class_for_mime_type(mime_type)
if not parser_class:
logger.error(
f"No parser found for mime type {mime_type}, cannot "
f"archive document {document} (ID: {document_id})",
)
return
parser: DocumentParser = parser_class(logging_group=uuid.uuid4())
try:
parser.parse(document.source_path, mime_type, document.get_public_filename())
thumbnail = parser.get_thumbnail(
document.source_path,
mime_type,
document.get_public_filename(),
)
if parser.get_archive_path():
with transaction.atomic():
with open(parser.get_archive_path(), "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
# I'm going to save first so that in case the file move
# fails, the database is rolled back.
# We also don't use save() since that triggers the filehandling
# logic, and we don't want that yet (file not yet in place)
document.archive_filename = generate_unique_filename(
document,
archive_filename=True,
)
oldDocument = Document.objects.get(pk=document.pk)
Document.objects.filter(pk=document.pk).update(
archive_checksum=checksum,
content=parser.get_text(),
archive_filename=document.archive_filename,
)
newDocument = Document.objects.get(pk=document.pk)
if settings.AUDIT_LOG_ENABLED:
LogEntry.objects.log_create(
instance=oldDocument,
changes=json.dumps(
{
"content": [oldDocument.content, newDocument.content],
"archive_checksum": [
oldDocument.archive_checksum,
newDocument.archive_checksum,
],
"archive_filename": [
oldDocument.archive_filename,
newDocument.archive_filename,
],
},
),
additional_data=json.dumps(
{
"reason": "Redo OCR called",
},
),
action=LogEntry.Action.UPDATE,
)
with FileLock(settings.MEDIA_LOCK):
create_source_path_directory(document.archive_path)
shutil.move(parser.get_archive_path(), document.archive_path)
shutil.move(thumbnail, document.thumbnail_path)
with index.open_index_writer() as writer:
index.update_document(writer, document)
clear_document_caches(document.pk)
except Exception:
logger.exception(
f"Error while parsing document {document} (ID: {document_id})",
)
finally:
parser.cleanup()
@shared_task
def empty_trash(doc_ids=None):
documents = (
Document.deleted_objects.filter(id__in=doc_ids)
if doc_ids is not None
else Document.deleted_objects.filter(
deleted_at__lt=timezone.localtime(timezone.now())
- timedelta(
days=settings.EMPTY_TRASH_DELAY,
),
)
)
try:
# Temporarily connect the cleanup handler
models.signals.post_delete.connect(cleanup_document_deletion, sender=Document)
documents.delete() # this is effectively a hard delete
except Exception as e: # pragma: no cover
logger.exception(f"Error while emptying trash: {e}")
finally:
models.signals.post_delete.disconnect(
cleanup_document_deletion,
sender=Document,
)