import hashlib import logging import shutil import uuid from datetime import timedelta from pathlib import Path from tempfile import TemporaryDirectory import tqdm from celery import Task from celery import shared_task from django.conf import settings from django.db import models from django.db import transaction from django.db.models.signals import post_save from django.utils import timezone from filelock import FileLock from whoosh.writing import AsyncWriter from documents import index from documents import sanity_checker from documents.barcodes import BarcodePlugin from documents.caching import clear_document_caches from documents.classifier import DocumentClassifier from documents.classifier import load_classifier from documents.consumer import ConsumerPlugin from documents.consumer import WorkflowTriggerPlugin from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides from documents.double_sided import CollatePlugin from documents.file_handling import create_source_path_directory from documents.file_handling import generate_unique_filename from documents.models import Correspondent from documents.models import Document from documents.models import DocumentType from documents.models import StoragePath from documents.models import Tag from documents.parsers import DocumentParser from documents.parsers import get_parser_class_for_mime_type from documents.plugins.base import ConsumeTaskPlugin from documents.plugins.base import ProgressManager from documents.plugins.base import StopConsumeTaskError from documents.plugins.helpers import ProgressStatusOptions from documents.sanity_checker import SanityCheckFailedException from documents.signals import document_updated from documents.signals.handlers import cleanup_document_deletion if settings.AUDIT_LOG_ENABLED: from auditlog.models import LogEntry logger = logging.getLogger("paperless.tasks") @shared_task def index_optimize(): ix = index.open_index() writer = AsyncWriter(ix) writer.commit(optimize=True) def index_reindex(progress_bar_disable=False): documents = Document.objects.all() ix = index.open_index(recreate=True) with AsyncWriter(ix) as writer: for document in tqdm.tqdm(documents, disable=progress_bar_disable): index.update_document(writer, document) @shared_task def train_classifier(): if ( not Tag.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists() and not DocumentType.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists() and not Correspondent.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists() and not StoragePath.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists() ): logger.info("No automatic matching items, not training") # Special case, items were once auto and trained, so remove the model # and prevent its use again if settings.MODEL_FILE.exists(): logger.info(f"Removing {settings.MODEL_FILE} so it won't be used") settings.MODEL_FILE.unlink() return classifier = load_classifier() if not classifier: classifier = DocumentClassifier() try: if classifier.train(): logger.info( f"Saving updated classifier model to {settings.MODEL_FILE}...", ) classifier.save() else: logger.debug("Training data unchanged.") except Exception as e: logger.warning("Classifier error: " + str(e)) @shared_task(bind=True) def consume_file( self: Task, input_doc: ConsumableDocument, overrides: DocumentMetadataOverrides | None = None, ): # Default no overrides if overrides is None: overrides = DocumentMetadataOverrides() plugins: list[type[ConsumeTaskPlugin]] = [ CollatePlugin, BarcodePlugin, WorkflowTriggerPlugin, ConsumerPlugin, ] with ( ProgressManager( overrides.filename or input_doc.original_file.name, self.request.id, ) as status_mgr, TemporaryDirectory(dir=settings.SCRATCH_DIR) as tmp_dir, ): tmp_dir = Path(tmp_dir) for plugin_class in plugins: plugin_name = plugin_class.NAME plugin = plugin_class( input_doc, overrides, status_mgr, tmp_dir, self.request.id, ) if not plugin.able_to_run: logger.debug(f"Skipping plugin {plugin_name}") continue try: logger.debug(f"Executing plugin {plugin_name}") plugin.setup() msg = plugin.run() if msg is not None: logger.info(f"{plugin_name} completed with: {msg}") else: logger.info(f"{plugin_name} completed with no message") overrides = plugin.metadata except StopConsumeTaskError as e: logger.info(f"{plugin_name} requested task exit: {e.message}") return e.message except Exception as e: logger.exception(f"{plugin_name} failed: {e}") status_mgr.send_progress(ProgressStatusOptions.FAILED, f"{e}", 100, 100) raise finally: plugin.cleanup() return msg @shared_task def sanity_check(): messages = sanity_checker.check_sanity() messages.log_messages() if messages.has_error: raise SanityCheckFailedException("Sanity check failed with errors. See log.") elif messages.has_warning: return "Sanity check exited with warnings. See log." elif len(messages) > 0: return "Sanity check exited with infos. See log." else: return "No issues detected." @shared_task def bulk_update_documents(document_ids): documents = Document.objects.filter(id__in=document_ids) ix = index.open_index() for doc in documents: clear_document_caches(doc.pk) document_updated.send( sender=None, document=doc, logging_group=uuid.uuid4(), ) post_save.send(Document, instance=doc, created=False) with AsyncWriter(ix) as writer: for doc in documents: index.update_document(writer, doc) @shared_task def update_document_content_maybe_archive_file(document_id): """ Re-creates OCR content and thumbnail for a document, and archive file if it exists. """ document = Document.objects.get(id=document_id) mime_type = document.mime_type parser_class: type[DocumentParser] = get_parser_class_for_mime_type(mime_type) if not parser_class: logger.error( f"No parser found for mime type {mime_type}, cannot " f"archive document {document} (ID: {document_id})", ) return parser: DocumentParser = parser_class(logging_group=uuid.uuid4()) try: parser.parse(document.source_path, mime_type, document.get_public_filename()) thumbnail = parser.get_thumbnail( document.source_path, mime_type, document.get_public_filename(), ) with transaction.atomic(): oldDocument = Document.objects.get(pk=document.pk) if parser.get_archive_path(): with open(parser.get_archive_path(), "rb") as f: checksum = hashlib.md5(f.read()).hexdigest() # I'm going to save first so that in case the file move # fails, the database is rolled back. # We also don't use save() since that triggers the filehandling # logic, and we don't want that yet (file not yet in place) document.archive_filename = generate_unique_filename( document, archive_filename=True, ) Document.objects.filter(pk=document.pk).update( archive_checksum=checksum, content=parser.get_text(), archive_filename=document.archive_filename, ) newDocument = Document.objects.get(pk=document.pk) if settings.AUDIT_LOG_ENABLED: LogEntry.objects.log_create( instance=oldDocument, changes={ "content": [oldDocument.content, newDocument.content], "archive_checksum": [ oldDocument.archive_checksum, newDocument.archive_checksum, ], "archive_filename": [ oldDocument.archive_filename, newDocument.archive_filename, ], }, additional_data={ "reason": "Update document content", }, action=LogEntry.Action.UPDATE, ) else: Document.objects.filter(pk=document.pk).update( content=parser.get_text(), ) if settings.AUDIT_LOG_ENABLED: LogEntry.objects.log_create( instance=oldDocument, changes={ "content": [oldDocument.content, parser.get_text()], }, additional_data={ "reason": "Update document content", }, action=LogEntry.Action.UPDATE, ) with FileLock(settings.MEDIA_LOCK): if parser.get_archive_path(): create_source_path_directory(document.archive_path) shutil.move(parser.get_archive_path(), document.archive_path) shutil.move(thumbnail, document.thumbnail_path) document.refresh_from_db() logger.info( f"Updating index for document {document_id} ({document.archive_checksum})", ) with index.open_index_writer() as writer: index.update_document(writer, document) clear_document_caches(document.pk) except Exception: logger.exception( f"Error while parsing document {document} (ID: {document_id})", ) finally: parser.cleanup() @shared_task def empty_trash(doc_ids=None): documents = ( Document.deleted_objects.filter(id__in=doc_ids) if doc_ids is not None else Document.deleted_objects.filter( deleted_at__lt=timezone.localtime(timezone.now()) - timedelta( days=settings.EMPTY_TRASH_DELAY, ), ) ) try: # Temporarily connect the cleanup handler models.signals.post_delete.connect(cleanup_document_deletion, sender=Document) documents.delete() # this is effectively a hard delete except Exception as e: # pragma: no cover logger.exception(f"Error while emptying trash: {e}") finally: models.signals.post_delete.disconnect( cleanup_document_deletion, sender=Document, )