Connects up the celery signals to support pending, started and success/failure, without relying on django-celery-results

This commit is contained in:
Trenton H
2022-10-17 12:42:08 -07:00
parent b87757a0e2
commit 20b7287dc2
8 changed files with 479 additions and 173 deletions

View File

@@ -1,7 +1,13 @@
import logging
import os
import shutil
from ast import literal_eval
from pathlib import Path
from celery import states
from celery.signals import before_task_publish
from celery.signals import task_postrun
from celery.signals import task_prerun
from django.conf import settings
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import LogEntry
@@ -13,7 +19,6 @@ from django.db.models import Q
from django.dispatch import receiver
from django.utils import termcolors
from django.utils import timezone
from django_celery_results.models import TaskResult
from filelock import FileLock
from .. import matching
@@ -502,19 +507,94 @@ def add_to_index(sender, document, **kwargs):
index.add_or_update_document(document)
@receiver(models.signals.post_save, sender=TaskResult)
def update_paperless_task(sender, instance: TaskResult, **kwargs):
@before_task_publish.connect
def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
"""
Creates the PaperlessTask object in a pending state. This is sent before
the task reaches the broker, but
https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish
"""
if "task" not in headers or headers["task"] != "documents.tasks.consume_file":
# Assumption: this is only ever a v2 message
return
try:
if instance.task_name == "documents.tasks.consume_file":
paperless_task, _ = PaperlessTask.objects.get_or_create(
task_id=instance.task_id,
)
paperless_task.name = instance.task_name
paperless_task.created = instance.date_created
paperless_task.completed = instance.date_done
paperless_task.attempted_task = instance
paperless_task.save()
except Exception as e:
task_file_name = ""
if headers["kwargsrepr"] is not None:
task_kwargs = literal_eval(headers["kwargsrepr"])
if "override_filename" in task_kwargs:
task_file_name = task_kwargs["override_filename"]
else:
task_kwargs = None
task_args = literal_eval(headers["argsrepr"])
# Nothing was found, report the task first argument
if not len(task_file_name):
# There are always some arguments to the consume, first is always filename
filepath = Path(task_args[0])
task_file_name = filepath.name
PaperlessTask.objects.create(
task_id=headers["id"],
status=states.PENDING,
task_file_name=task_file_name,
task_name=headers["task"],
task_args=task_args,
task_kwargs=task_kwargs,
result=None,
date_created=timezone.now(),
date_started=None,
date_done=None,
)
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
"""
Updates the PaperlessTask to be started. Sent before the task begins execution
on a worker.
https://docs.celeryq.dev/en/stable/userguide/signals.html#task-prerun
"""
try:
task_instance = PaperlessTask.objects.filter(task_id=task_id).first()
if task_instance is not None:
task_instance.status = states.STARTED
task_instance.date_started = timezone.now()
task_instance.save()
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Setting PaperlessTask started failed: {e}")
@task_postrun.connect
def task_postrun_handler(
sender=None, task_id=None, task=None, retval=None, state=None, **kwargs
):
"""
Updates the result of the PaperlessTask.
https://docs.celeryq.dev/en/stable/userguide/signals.html#task-postrun
"""
try:
task_instance = PaperlessTask.objects.filter(task_id=task_id).first()
if task_instance is not None:
task_instance.status = state
task_instance.result = retval
task_instance.date_done = timezone.now()
task_instance.save()
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Updating PaperlessTask failed: {e}")