mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Connects up the celery signals to support pending, started and success/failure, without relying on django-celery-results
This commit is contained in:
		| @@ -21,7 +21,7 @@ export interface PaperlessTask extends ObjectWithId { | |||||||
|  |  | ||||||
|   task_id: string |   task_id: string | ||||||
|  |  | ||||||
|   name: string |   task_file_name: string | ||||||
|  |  | ||||||
|   date_created: Date |   date_created: Date | ||||||
|  |  | ||||||
|   | |||||||
| @@ -0,0 +1,134 @@ | |||||||
|  | # Generated by Django 4.1.2 on 2022-10-17 16:31 | ||||||
|  |  | ||||||
|  | from django.db import migrations, models | ||||||
|  | import django.utils.timezone | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Migration(migrations.Migration): | ||||||
|  |  | ||||||
|  |     dependencies = [ | ||||||
|  |         ("documents", "1026_transition_to_celery"), | ||||||
|  |     ] | ||||||
|  |  | ||||||
|  |     operations = [ | ||||||
|  |         migrations.RemoveField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="attempted_task", | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="date_created", | ||||||
|  |             field=models.DateTimeField( | ||||||
|  |                 default=django.utils.timezone.now, | ||||||
|  |                 help_text="Datetime field when the task result was created in UTC", | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Created DateTime", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="date_done", | ||||||
|  |             field=models.DateTimeField( | ||||||
|  |                 default=None, | ||||||
|  |                 help_text="Datetime field when the task was completed in UTC", | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Completed DateTime", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="date_started", | ||||||
|  |             field=models.DateTimeField( | ||||||
|  |                 default=None, | ||||||
|  |                 help_text="Datetime field when the task was started in UTC", | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Started DateTime", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="result", | ||||||
|  |             field=models.TextField( | ||||||
|  |                 default=None, | ||||||
|  |                 help_text="The data returned by the task", | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Result Data", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="status", | ||||||
|  |             field=models.CharField( | ||||||
|  |                 choices=[ | ||||||
|  |                     ("FAILURE", "FAILURE"), | ||||||
|  |                     ("PENDING", "PENDING"), | ||||||
|  |                     ("RECEIVED", "RECEIVED"), | ||||||
|  |                     ("RETRY", "RETRY"), | ||||||
|  |                     ("REVOKED", "REVOKED"), | ||||||
|  |                     ("STARTED", "STARTED"), | ||||||
|  |                     ("SUCCESS", "SUCCESS"), | ||||||
|  |                 ], | ||||||
|  |                 default="PENDING", | ||||||
|  |                 help_text="Current state of the task being run", | ||||||
|  |                 max_length=30, | ||||||
|  |                 verbose_name="Task State", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="task_args", | ||||||
|  |             field=models.JSONField( | ||||||
|  |                 help_text="JSON representation of the positional arguments used with the task", | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Task Positional Arguments", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="task_file_name", | ||||||
|  |             field=models.CharField( | ||||||
|  |                 help_text="Name of the file which the Task was run for", | ||||||
|  |                 max_length=255, | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Task Name", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="task_kwargs", | ||||||
|  |             field=models.JSONField( | ||||||
|  |                 help_text="JSON representation of the named arguments used with the task", | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Task Named Arguments", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AddField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="task_name", | ||||||
|  |             field=models.CharField( | ||||||
|  |                 help_text="Name of the Task which was run", | ||||||
|  |                 max_length=255, | ||||||
|  |                 null=True, | ||||||
|  |                 verbose_name="Task Name", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AlterField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="acknowledged", | ||||||
|  |             field=models.BooleanField( | ||||||
|  |                 default=False, | ||||||
|  |                 help_text="If the task is acknowledged via the frontend or API", | ||||||
|  |                 verbose_name="Acknowledged", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |         migrations.AlterField( | ||||||
|  |             model_name="paperlesstask", | ||||||
|  |             name="task_id", | ||||||
|  |             field=models.CharField( | ||||||
|  |                 help_text="Celery ID for the Task that was run", | ||||||
|  |                 max_length=255, | ||||||
|  |                 unique=True, | ||||||
|  |                 verbose_name="Task ID", | ||||||
|  |             ), | ||||||
|  |         ), | ||||||
|  |     ] | ||||||
| @@ -7,14 +7,17 @@ from typing import Optional | |||||||
|  |  | ||||||
| import dateutil.parser | import dateutil.parser | ||||||
| import pathvalidate | import pathvalidate | ||||||
|  | from celery import states | ||||||
| from django.conf import settings | from django.conf import settings | ||||||
| from django.contrib.auth.models import User | from django.contrib.auth.models import User | ||||||
| from django.db import models | from django.db import models | ||||||
| from django.utils import timezone | from django.utils import timezone | ||||||
| from django.utils.translation import gettext_lazy as _ | from django.utils.translation import gettext_lazy as _ | ||||||
| from django_celery_results.models import TaskResult |  | ||||||
| from documents.parsers import get_default_file_extension | from documents.parsers import get_default_file_extension | ||||||
|  |  | ||||||
|  | ALL_STATES = sorted(states.ALL_STATES) | ||||||
|  | TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES)) | ||||||
|  |  | ||||||
|  |  | ||||||
| class MatchingModel(models.Model): | class MatchingModel(models.Model): | ||||||
|  |  | ||||||
| @@ -527,15 +530,79 @@ class UiSettings(models.Model): | |||||||
|  |  | ||||||
|  |  | ||||||
| class PaperlessTask(models.Model): | class PaperlessTask(models.Model): | ||||||
|     task_id = models.CharField(max_length=128) |     task_id = models.CharField( | ||||||
|     acknowledged = models.BooleanField(default=False) |         max_length=255, | ||||||
|  |         unique=True, | ||||||
|  |         verbose_name=_("Task ID"), | ||||||
|  |         help_text=_("Celery ID for the Task that was run"), | ||||||
|  |     ) | ||||||
|  |  | ||||||
|     attempted_task = models.OneToOneField( |     acknowledged = models.BooleanField( | ||||||
|         TaskResult, |         default=False, | ||||||
|         on_delete=models.CASCADE, |         verbose_name=_("Acknowledged"), | ||||||
|         related_name="attempted_task", |         help_text=_("If the task is acknowledged via the frontend or API"), | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     task_file_name = models.CharField( | ||||||
|         null=True, |         null=True, | ||||||
|         blank=True, |         max_length=255, | ||||||
|  |         verbose_name=_("Task Name"), | ||||||
|  |         help_text=_("Name of the file which the Task was run for"), | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     task_name = models.CharField( | ||||||
|  |         null=True, | ||||||
|  |         max_length=255, | ||||||
|  |         verbose_name=_("Task Name"), | ||||||
|  |         help_text=_("Name of the Task which was run"), | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     task_args = models.JSONField( | ||||||
|  |         null=True, | ||||||
|  |         verbose_name=_("Task Positional Arguments"), | ||||||
|  |         help_text=_( | ||||||
|  |             "JSON representation of the positional arguments used with the task", | ||||||
|  |         ), | ||||||
|  |     ) | ||||||
|  |     task_kwargs = models.JSONField( | ||||||
|  |         null=True, | ||||||
|  |         verbose_name=_("Task Named Arguments"), | ||||||
|  |         help_text=_( | ||||||
|  |             "JSON representation of the named arguments used with the task", | ||||||
|  |         ), | ||||||
|  |     ) | ||||||
|  |     status = models.CharField( | ||||||
|  |         max_length=30, | ||||||
|  |         default=states.PENDING, | ||||||
|  |         choices=TASK_STATE_CHOICES, | ||||||
|  |         verbose_name=_("Task State"), | ||||||
|  |         help_text=_("Current state of the task being run"), | ||||||
|  |     ) | ||||||
|  |     date_created = models.DateTimeField( | ||||||
|  |         null=True, | ||||||
|  |         default=timezone.now, | ||||||
|  |         verbose_name=_("Created DateTime"), | ||||||
|  |         help_text=_("Datetime field when the task result was created in UTC"), | ||||||
|  |     ) | ||||||
|  |     date_started = models.DateTimeField( | ||||||
|  |         null=True, | ||||||
|  |         default=None, | ||||||
|  |         verbose_name=_("Started DateTime"), | ||||||
|  |         help_text=_("Datetime field when the task was started in UTC"), | ||||||
|  |     ) | ||||||
|  |     date_done = models.DateTimeField( | ||||||
|  |         null=True, | ||||||
|  |         default=None, | ||||||
|  |         verbose_name=_("Completed DateTime"), | ||||||
|  |         help_text=_("Datetime field when the task was completed in UTC"), | ||||||
|  |     ) | ||||||
|  |     result = models.TextField( | ||||||
|  |         null=True, | ||||||
|  |         default=None, | ||||||
|  |         verbose_name=_("Result Data"), | ||||||
|  |         help_text=_( | ||||||
|  |             "The data returned by the task", | ||||||
|  |         ), | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,12 +1,6 @@ | |||||||
| import datetime | import datetime | ||||||
| import math | import math | ||||||
| import re | import re | ||||||
| from ast import literal_eval |  | ||||||
| from asyncio.log import logger |  | ||||||
| from pathlib import Path |  | ||||||
| from typing import Dict |  | ||||||
| from typing import Optional |  | ||||||
| from typing import Tuple |  | ||||||
|  |  | ||||||
| from celery import states | from celery import states | ||||||
|  |  | ||||||
| @@ -640,14 +634,13 @@ class TasksViewSerializer(serializers.ModelSerializer): | |||||||
|         fields = ( |         fields = ( | ||||||
|             "id", |             "id", | ||||||
|             "task_id", |             "task_id", | ||||||
|  |             "task_file_name", | ||||||
|             "date_created", |             "date_created", | ||||||
|             "date_done", |             "date_done", | ||||||
|             "type", |             "type", | ||||||
|             "status", |             "status", | ||||||
|             "result", |             "result", | ||||||
|             "acknowledged", |             "acknowledged", | ||||||
|             "task_name", |  | ||||||
|             "name", |  | ||||||
|             "related_document", |             "related_document", | ||||||
|         ) |         ) | ||||||
|  |  | ||||||
| @@ -657,108 +650,14 @@ class TasksViewSerializer(serializers.ModelSerializer): | |||||||
|         # just file tasks, for now |         # just file tasks, for now | ||||||
|         return "file" |         return "file" | ||||||
|  |  | ||||||
|     result = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_result(self, obj): |  | ||||||
|         result = "" |  | ||||||
|         if ( |  | ||||||
|             hasattr(obj, "attempted_task") |  | ||||||
|             and obj.attempted_task |  | ||||||
|             and obj.attempted_task.result |  | ||||||
|         ): |  | ||||||
|             try: |  | ||||||
|                 result: str = obj.attempted_task.result |  | ||||||
|                 if "exc_message" in result: |  | ||||||
|                     # This is a dict in this case |  | ||||||
|                     result: Dict = literal_eval(result) |  | ||||||
|                     # This is a list, grab the first item (most recent) |  | ||||||
|                     result = result["exc_message"][0] |  | ||||||
|             except Exception as e:  # pragma: no cover |  | ||||||
|                 # Extra security if something is malformed |  | ||||||
|                 logger.warn(f"Error getting task result: {e}", exc_info=True) |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     status = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_status(self, obj): |  | ||||||
|         result = "unknown" |  | ||||||
|         if hasattr(obj, "attempted_task") and obj.attempted_task: |  | ||||||
|             result = obj.attempted_task.status |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     date_created = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_date_created(self, obj): |  | ||||||
|         result = "" |  | ||||||
|         if hasattr(obj, "attempted_task") and obj.attempted_task: |  | ||||||
|             result = obj.attempted_task.date_created |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     date_done = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_date_done(self, obj): |  | ||||||
|         result = "" |  | ||||||
|         if hasattr(obj, "attempted_task") and obj.attempted_task: |  | ||||||
|             result = obj.attempted_task.date_done |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     task_id = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_task_id(self, obj): |  | ||||||
|         result = "" |  | ||||||
|         if hasattr(obj, "attempted_task") and obj.attempted_task: |  | ||||||
|             result = obj.attempted_task.task_id |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     task_name = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_task_name(self, obj): |  | ||||||
|         result = "" |  | ||||||
|         if hasattr(obj, "attempted_task") and obj.attempted_task: |  | ||||||
|             result = obj.attempted_task.task_name |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     name = serializers.SerializerMethodField() |  | ||||||
|  |  | ||||||
|     def get_name(self, obj): |  | ||||||
|         result = "" |  | ||||||
|         if hasattr(obj, "attempted_task") and obj.attempted_task: |  | ||||||
|             try: |  | ||||||
|                 task_kwargs: Optional[str] = obj.attempted_task.task_kwargs |  | ||||||
|                 # Try the override filename first (this is a webui created task?) |  | ||||||
|                 if task_kwargs is not None: |  | ||||||
|                     # It's a string, string of a dict.  Who knows why... |  | ||||||
|                     kwargs = literal_eval(literal_eval(task_kwargs)) |  | ||||||
|                     if "override_filename" in kwargs: |  | ||||||
|                         result = kwargs["override_filename"] |  | ||||||
|  |  | ||||||
|                 # Nothing was found, report the task first argument |  | ||||||
|                 if not len(result): |  | ||||||
|                     # There are always some arguments to the consume |  | ||||||
|                     task_args: Tuple = literal_eval( |  | ||||||
|                         literal_eval(obj.attempted_task.task_args), |  | ||||||
|                     ) |  | ||||||
|                     filepath = Path(task_args[0]) |  | ||||||
|                     result = filepath.name |  | ||||||
|             except Exception as e:  # pragma: no cover |  | ||||||
|                 # Extra security if something is malformed |  | ||||||
|                 logger.warning(f"Error getting file name from task: {e}", exc_info=True) |  | ||||||
|  |  | ||||||
|         return result |  | ||||||
|  |  | ||||||
|     related_document = serializers.SerializerMethodField() |     related_document = serializers.SerializerMethodField() | ||||||
|  |     related_doc_re = re.compile(r"New document id (\d+) created") | ||||||
|  |  | ||||||
|     def get_related_document(self, obj): |     def get_related_document(self, obj): | ||||||
|         result = "" |         result = None | ||||||
|         regexp = r"New document id (\d+) created" |         if obj.status is not None and obj.status == states.SUCCESS: | ||||||
|         if ( |  | ||||||
|             hasattr(obj, "attempted_task") |  | ||||||
|             and obj.attempted_task |  | ||||||
|             and obj.attempted_task.result |  | ||||||
|             and obj.attempted_task.status == states.SUCCESS |  | ||||||
|         ): |  | ||||||
|             try: |             try: | ||||||
|                 result = re.search(regexp, obj.attempted_task.result).group(1) |                 result = self.related_doc_re.search(obj.result).group(1) | ||||||
|             except Exception: |             except Exception: | ||||||
|                 pass |                 pass | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,7 +1,13 @@ | |||||||
| import logging | import logging | ||||||
| import os | import os | ||||||
| import shutil | import shutil | ||||||
|  | from ast import literal_eval | ||||||
|  | from pathlib import Path | ||||||
|  |  | ||||||
|  | from celery import states | ||||||
|  | from celery.signals import before_task_publish | ||||||
|  | from celery.signals import task_postrun | ||||||
|  | from celery.signals import task_prerun | ||||||
| from django.conf import settings | from django.conf import settings | ||||||
| from django.contrib.admin.models import ADDITION | from django.contrib.admin.models import ADDITION | ||||||
| from django.contrib.admin.models import LogEntry | from django.contrib.admin.models import LogEntry | ||||||
| @@ -13,7 +19,6 @@ from django.db.models import Q | |||||||
| from django.dispatch import receiver | from django.dispatch import receiver | ||||||
| from django.utils import termcolors | from django.utils import termcolors | ||||||
| from django.utils import timezone | from django.utils import timezone | ||||||
| from django_celery_results.models import TaskResult |  | ||||||
| from filelock import FileLock | from filelock import FileLock | ||||||
|  |  | ||||||
| from .. import matching | from .. import matching | ||||||
| @@ -502,19 +507,94 @@ def add_to_index(sender, document, **kwargs): | |||||||
|     index.add_or_update_document(document) |     index.add_or_update_document(document) | ||||||
|  |  | ||||||
|  |  | ||||||
| @receiver(models.signals.post_save, sender=TaskResult) | @before_task_publish.connect | ||||||
| def update_paperless_task(sender, instance: TaskResult, **kwargs): | def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs): | ||||||
|  |     """ | ||||||
|  |     Creates the PaperlessTask object in a pending state.  This is sent before | ||||||
|  |     the task reaches the broker, but | ||||||
|  |  | ||||||
|  |     https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish | ||||||
|  |  | ||||||
|  |     """ | ||||||
|  |     if "task" not in headers or headers["task"] != "documents.tasks.consume_file": | ||||||
|  |         # Assumption: this is only ever a v2 message | ||||||
|  |         return | ||||||
|  |  | ||||||
|     try: |     try: | ||||||
|         if instance.task_name == "documents.tasks.consume_file": |         task_file_name = "" | ||||||
|             paperless_task, _ = PaperlessTask.objects.get_or_create( |         if headers["kwargsrepr"] is not None: | ||||||
|                 task_id=instance.task_id, |             task_kwargs = literal_eval(headers["kwargsrepr"]) | ||||||
|  |             if "override_filename" in task_kwargs: | ||||||
|  |                 task_file_name = task_kwargs["override_filename"] | ||||||
|  |         else: | ||||||
|  |             task_kwargs = None | ||||||
|  |  | ||||||
|  |         task_args = literal_eval(headers["argsrepr"]) | ||||||
|  |  | ||||||
|  |         # Nothing was found, report the task first argument | ||||||
|  |         if not len(task_file_name): | ||||||
|  |             # There are always some arguments to the consume, first is always filename | ||||||
|  |             filepath = Path(task_args[0]) | ||||||
|  |             task_file_name = filepath.name | ||||||
|  |  | ||||||
|  |         PaperlessTask.objects.create( | ||||||
|  |             task_id=headers["id"], | ||||||
|  |             status=states.PENDING, | ||||||
|  |             task_file_name=task_file_name, | ||||||
|  |             task_name=headers["task"], | ||||||
|  |             task_args=task_args, | ||||||
|  |             task_kwargs=task_kwargs, | ||||||
|  |             result=None, | ||||||
|  |             date_created=timezone.now(), | ||||||
|  |             date_started=None, | ||||||
|  |             date_done=None, | ||||||
|         ) |         ) | ||||||
|             paperless_task.name = instance.task_name |     except Exception as e:  # pragma: no cover | ||||||
|             paperless_task.created = instance.date_created |  | ||||||
|             paperless_task.completed = instance.date_done |  | ||||||
|             paperless_task.attempted_task = instance |  | ||||||
|             paperless_task.save() |  | ||||||
|     except Exception as e: |  | ||||||
|         # Don't let an exception in the signal handlers prevent |         # Don't let an exception in the signal handlers prevent | ||||||
|         # a document from being consumed. |         # a document from being consumed. | ||||||
|         logger.error(f"Creating PaperlessTask failed: {e}") |         logger.error(f"Creating PaperlessTask failed: {e}") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @task_prerun.connect | ||||||
|  | def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs): | ||||||
|  |     """ | ||||||
|  |  | ||||||
|  |     Updates the PaperlessTask to be started.  Sent before the task begins execution | ||||||
|  |     on a worker. | ||||||
|  |  | ||||||
|  |     https://docs.celeryq.dev/en/stable/userguide/signals.html#task-prerun | ||||||
|  |     """ | ||||||
|  |     try: | ||||||
|  |         task_instance = PaperlessTask.objects.filter(task_id=task_id).first() | ||||||
|  |  | ||||||
|  |         if task_instance is not None: | ||||||
|  |             task_instance.status = states.STARTED | ||||||
|  |             task_instance.date_started = timezone.now() | ||||||
|  |             task_instance.save() | ||||||
|  |     except Exception as e:  # pragma: no cover | ||||||
|  |         # Don't let an exception in the signal handlers prevent | ||||||
|  |         # a document from being consumed. | ||||||
|  |         logger.error(f"Setting PaperlessTask started failed: {e}") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @task_postrun.connect | ||||||
|  | def task_postrun_handler( | ||||||
|  |     sender=None, task_id=None, task=None, retval=None, state=None, **kwargs | ||||||
|  | ): | ||||||
|  |     """ | ||||||
|  |     Updates the result of the PaperlessTask. | ||||||
|  |  | ||||||
|  |     https://docs.celeryq.dev/en/stable/userguide/signals.html#task-postrun | ||||||
|  |     """ | ||||||
|  |     try: | ||||||
|  |         task_instance = PaperlessTask.objects.filter(task_id=task_id).first() | ||||||
|  |  | ||||||
|  |         if task_instance is not None: | ||||||
|  |             task_instance.status = state | ||||||
|  |             task_instance.result = retval | ||||||
|  |             task_instance.date_done = timezone.now() | ||||||
|  |             task_instance.save() | ||||||
|  |     except Exception as e:  # pragma: no cover | ||||||
|  |         # Don't let an exception in the signal handlers prevent | ||||||
|  |         # a document from being consumed. | ||||||
|  |         logger.error(f"Updating PaperlessTask failed: {e}") | ||||||
|   | |||||||
| @@ -32,7 +32,6 @@ from documents.models import PaperlessTask | |||||||
| from documents.models import SavedView | from documents.models import SavedView | ||||||
| from documents.models import StoragePath | from documents.models import StoragePath | ||||||
| from documents.models import Tag | from documents.models import Tag | ||||||
| from django_celery_results.models import TaskResult |  | ||||||
| from documents.models import Comment | from documents.models import Comment | ||||||
| from documents.models import StoragePath | from documents.models import StoragePath | ||||||
| from documents.tests.utils import DirectoriesMixin | from documents.tests.utils import DirectoriesMixin | ||||||
| @@ -2756,19 +2755,16 @@ class TestTasks(APITestCase): | |||||||
|         THEN: |         THEN: | ||||||
|             - Attempting and pending tasks are serialized and provided |             - Attempting and pending tasks are serialized and provided | ||||||
|         """ |         """ | ||||||
|         result1 = TaskResult.objects.create( |  | ||||||
|             task_id=str(uuid.uuid4()), |  | ||||||
|             task_name="documents.tasks.some_great_task", |  | ||||||
|             status=celery.states.PENDING, |  | ||||||
|         ) |  | ||||||
|         PaperlessTask.objects.create(attempted_task=result1) |  | ||||||
|  |  | ||||||
|         result2 = TaskResult.objects.create( |         task1 = PaperlessTask.objects.create( | ||||||
|             task_id=str(uuid.uuid4()), |             task_id=str(uuid.uuid4()), | ||||||
|             task_name="documents.tasks.some_awesome_task", |             task_file_name="task_one.pdf", | ||||||
|             status=celery.states.STARTED, |         ) | ||||||
|  |  | ||||||
|  |         task2 = PaperlessTask.objects.create( | ||||||
|  |             task_id=str(uuid.uuid4()), | ||||||
|  |             task_file_name="task_two.pdf", | ||||||
|         ) |         ) | ||||||
|         PaperlessTask.objects.create(attempted_task=result2) |  | ||||||
|  |  | ||||||
|         response = self.client.get(self.ENDPOINT) |         response = self.client.get(self.ENDPOINT) | ||||||
|  |  | ||||||
| @@ -2777,13 +2773,18 @@ class TestTasks(APITestCase): | |||||||
|         returned_task1 = response.data[1] |         returned_task1 = response.data[1] | ||||||
|         returned_task2 = response.data[0] |         returned_task2 = response.data[0] | ||||||
|  |  | ||||||
|         self.assertEqual(returned_task1["task_id"], result1.task_id) |         from pprint import pprint | ||||||
|         self.assertEqual(returned_task1["status"], celery.states.PENDING) |  | ||||||
|         self.assertEqual(returned_task1["task_name"], result1.task_name) |  | ||||||
|  |  | ||||||
|         self.assertEqual(returned_task2["task_id"], result2.task_id) |         pprint(returned_task1) | ||||||
|         self.assertEqual(returned_task2["status"], celery.states.STARTED) |         pprint(returned_task2) | ||||||
|         self.assertEqual(returned_task2["task_name"], result2.task_name) |  | ||||||
|  |         self.assertEqual(returned_task1["task_id"], task1.task_id) | ||||||
|  |         self.assertEqual(returned_task1["status"], celery.states.PENDING) | ||||||
|  |         self.assertEqual(returned_task1["task_file_name"], task1.task_file_name) | ||||||
|  |  | ||||||
|  |         self.assertEqual(returned_task2["task_id"], task2.task_id) | ||||||
|  |         self.assertEqual(returned_task2["status"], celery.states.PENDING) | ||||||
|  |         self.assertEqual(returned_task2["task_file_name"], task2.task_file_name) | ||||||
|  |  | ||||||
|     def test_acknowledge_tasks(self): |     def test_acknowledge_tasks(self): | ||||||
|         """ |         """ | ||||||
| @@ -2794,12 +2795,10 @@ class TestTasks(APITestCase): | |||||||
|         THEN: |         THEN: | ||||||
|             - Task is marked as acknowledged |             - Task is marked as acknowledged | ||||||
|         """ |         """ | ||||||
|         result1 = TaskResult.objects.create( |         task = PaperlessTask.objects.create( | ||||||
|             task_id=str(uuid.uuid4()), |             task_id=str(uuid.uuid4()), | ||||||
|             task_name="documents.tasks.some_task", |             task_file_name="task_one.pdf", | ||||||
|             status=celery.states.PENDING, |  | ||||||
|         ) |         ) | ||||||
|         task = PaperlessTask.objects.create(attempted_task=result1) |  | ||||||
|  |  | ||||||
|         response = self.client.get(self.ENDPOINT) |         response = self.client.get(self.ENDPOINT) | ||||||
|         self.assertEqual(len(response.data), 1) |         self.assertEqual(len(response.data), 1) | ||||||
| @@ -2822,13 +2821,12 @@ class TestTasks(APITestCase): | |||||||
|         THEN: |         THEN: | ||||||
|             - The returned data includes the task result |             - The returned data includes the task result | ||||||
|         """ |         """ | ||||||
|         result1 = TaskResult.objects.create( |         task = PaperlessTask.objects.create( | ||||||
|             task_id=str(uuid.uuid4()), |             task_id=str(uuid.uuid4()), | ||||||
|             task_name="documents.tasks.some_task", |             task_file_name="task_one.pdf", | ||||||
|             status=celery.states.SUCCESS, |             status=celery.states.SUCCESS, | ||||||
|             result="Success. New document id 1 created", |             result="Success. New document id 1 created", | ||||||
|         ) |         ) | ||||||
|         _ = PaperlessTask.objects.create(attempted_task=result1) |  | ||||||
|  |  | ||||||
|         response = self.client.get(self.ENDPOINT) |         response = self.client.get(self.ENDPOINT) | ||||||
|  |  | ||||||
| @@ -2849,17 +2847,12 @@ class TestTasks(APITestCase): | |||||||
|         THEN: |         THEN: | ||||||
|             - The returned result is the exception info |             - The returned result is the exception info | ||||||
|         """ |         """ | ||||||
|         result1 = TaskResult.objects.create( |         task = PaperlessTask.objects.create( | ||||||
|             task_id=str(uuid.uuid4()), |             task_id=str(uuid.uuid4()), | ||||||
|             task_name="documents.tasks.some_task", |             task_file_name="task_one.pdf", | ||||||
|             status=celery.states.SUCCESS, |             status=celery.states.FAILURE, | ||||||
|             result={ |             result="test.pdf: Not consuming test.pdf: It is a duplicate.", | ||||||
|                 "exc_type": "ConsumerError", |  | ||||||
|                 "exc_message": ["test.pdf: Not consuming test.pdf: It is a duplicate."], |  | ||||||
|                 "exc_module": "documents.consumer", |  | ||||||
|             }, |  | ||||||
|         ) |         ) | ||||||
|         _ = PaperlessTask.objects.create(attempted_task=result1) |  | ||||||
|  |  | ||||||
|         response = self.client.get(self.ENDPOINT) |         response = self.client.get(self.ENDPOINT) | ||||||
|  |  | ||||||
| @@ -2883,14 +2876,22 @@ class TestTasks(APITestCase): | |||||||
|         THEN: |         THEN: | ||||||
|             - Returned data include the filename |             - Returned data include the filename | ||||||
|         """ |         """ | ||||||
|         result1 = TaskResult.objects.create( |         task = PaperlessTask.objects.create( | ||||||
|             task_id=str(uuid.uuid4()), |             task_id=str(uuid.uuid4()), | ||||||
|  |             task_file_name="test.pdf", | ||||||
|             task_name="documents.tasks.some_task", |             task_name="documents.tasks.some_task", | ||||||
|             status=celery.states.SUCCESS, |             status=celery.states.SUCCESS, | ||||||
|             task_args="\"('/tmp/paperless/paperless-upload-5iq7skzc',)\"", |             task_args=("/tmp/paperless/paperless-upload-5iq7skzc",), | ||||||
|             task_kwargs="\"{'override_filename': 'test.pdf', 'override_title': None, 'override_correspondent_id': None, 'override_document_type_id': None, 'override_tag_ids': None, 'task_id': '466e8fe7-7193-4698-9fff-72f0340e2082', 'override_created': None}\"", |             task_kwargs={ | ||||||
|  |                 "override_filename": "test.pdf", | ||||||
|  |                 "override_title": None, | ||||||
|  |                 "override_correspondent_id": None, | ||||||
|  |                 "override_document_type_id": None, | ||||||
|  |                 "override_tag_ids": None, | ||||||
|  |                 "task_id": "466e8fe7-7193-4698-9fff-72f0340e2082", | ||||||
|  |                 "override_created": None, | ||||||
|  |             }, | ||||||
|         ) |         ) | ||||||
|         _ = PaperlessTask.objects.create(attempted_task=result1) |  | ||||||
|  |  | ||||||
|         response = self.client.get(self.ENDPOINT) |         response = self.client.get(self.ENDPOINT) | ||||||
|  |  | ||||||
| @@ -2899,7 +2900,7 @@ class TestTasks(APITestCase): | |||||||
|  |  | ||||||
|         returned_data = response.data[0] |         returned_data = response.data[0] | ||||||
|  |  | ||||||
|         self.assertEqual(returned_data["name"], "test.pdf") |         self.assertEqual(returned_data["task_file_name"], "test.pdf") | ||||||
|  |  | ||||||
|     def test_task_name_consume_folder(self): |     def test_task_name_consume_folder(self): | ||||||
|         """ |         """ | ||||||
| @@ -2911,14 +2912,14 @@ class TestTasks(APITestCase): | |||||||
|         THEN: |         THEN: | ||||||
|             - Returned data include the filename |             - Returned data include the filename | ||||||
|         """ |         """ | ||||||
|         result1 = TaskResult.objects.create( |         task = PaperlessTask.objects.create( | ||||||
|             task_id=str(uuid.uuid4()), |             task_id=str(uuid.uuid4()), | ||||||
|  |             task_file_name="anothertest.pdf", | ||||||
|             task_name="documents.tasks.some_task", |             task_name="documents.tasks.some_task", | ||||||
|             status=celery.states.SUCCESS, |             status=celery.states.SUCCESS, | ||||||
|             task_args="\"('/consume/anothertest.pdf',)\"", |             task_args=("/consume/anothertest.pdf",), | ||||||
|             task_kwargs="\"{'override_tag_ids': None}\"", |             task_kwargs={"override_tag_ids": None}, | ||||||
|         ) |         ) | ||||||
|         _ = PaperlessTask.objects.create(attempted_task=result1) |  | ||||||
|  |  | ||||||
|         response = self.client.get(self.ENDPOINT) |         response = self.client.get(self.ENDPOINT) | ||||||
|  |  | ||||||
| @@ -2927,4 +2928,4 @@ class TestTasks(APITestCase): | |||||||
|  |  | ||||||
|         returned_data = response.data[0] |         returned_data = response.data[0] | ||||||
|  |  | ||||||
|         self.assertEqual(returned_data["name"], "anothertest.pdf") |         self.assertEqual(returned_data["task_file_name"], "anothertest.pdf") | ||||||
|   | |||||||
							
								
								
									
										126
									
								
								src/documents/tests/test_task_signals.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										126
									
								
								src/documents/tests/test_task_signals.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,126 @@ | |||||||
|  | import celery | ||||||
|  | from django.test import TestCase | ||||||
|  | from documents.models import PaperlessTask | ||||||
|  | from documents.signals.handlers import before_task_publish_handler | ||||||
|  | from documents.signals.handlers import task_postrun_handler | ||||||
|  | from documents.signals.handlers import task_prerun_handler | ||||||
|  | from documents.tests.utils import DirectoriesMixin | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class TestTaskSignalHandler(DirectoriesMixin, TestCase): | ||||||
|  |  | ||||||
|  |     HEADERS_CONSUME = { | ||||||
|  |         "lang": "py", | ||||||
|  |         "task": "documents.tasks.consume_file", | ||||||
|  |         "id": "52d31e24-9dcc-4c32-9e16-76007e9add5e", | ||||||
|  |         "shadow": None, | ||||||
|  |         "eta": None, | ||||||
|  |         "expires": None, | ||||||
|  |         "group": None, | ||||||
|  |         "group_index": None, | ||||||
|  |         "retries": 0, | ||||||
|  |         "timelimit": [None, None], | ||||||
|  |         "root_id": "52d31e24-9dcc-4c32-9e16-76007e9add5e", | ||||||
|  |         "parent_id": None, | ||||||
|  |         "argsrepr": "('/consume/hello-999.pdf',)", | ||||||
|  |         "kwargsrepr": "{'override_tag_ids': None}", | ||||||
|  |         "origin": "gen260@paperless-ngx-dev-webserver", | ||||||
|  |         "ignore_result": False, | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     HEADERS_WEB_UI = { | ||||||
|  |         "lang": "py", | ||||||
|  |         "task": "documents.tasks.consume_file", | ||||||
|  |         "id": "6e88a41c-e5f8-4631-9972-68c314512498", | ||||||
|  |         "shadow": None, | ||||||
|  |         "eta": None, | ||||||
|  |         "expires": None, | ||||||
|  |         "group": None, | ||||||
|  |         "group_index": None, | ||||||
|  |         "retries": 0, | ||||||
|  |         "timelimit": [None, None], | ||||||
|  |         "root_id": "6e88a41c-e5f8-4631-9972-68c314512498", | ||||||
|  |         "parent_id": None, | ||||||
|  |         "argsrepr": "('/tmp/paperless/paperless-upload-st9lmbvx',)", | ||||||
|  |         "kwargsrepr": "{'override_filename': 'statement.pdf', 'override_title': None, 'override_correspondent_id': None, 'override_document_type_id': None, 'override_tag_ids': None, 'task_id': 'f5622ca9-3707-4ed0-b418-9680b912572f', 'override_created': None}", | ||||||
|  |         "origin": "gen342@paperless-ngx-dev-webserver", | ||||||
|  |         "ignore_result": False, | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     def util_call_before_task_publish_handler(self, headers_to_use): | ||||||
|  |         self.assertEqual(PaperlessTask.objects.all().count(), 0) | ||||||
|  |  | ||||||
|  |         before_task_publish_handler(headers=headers_to_use) | ||||||
|  |  | ||||||
|  |         self.assertEqual(PaperlessTask.objects.all().count(), 1) | ||||||
|  |  | ||||||
|  |     def test_before_task_publish_handler_consume(self): | ||||||
|  |         """ | ||||||
|  |         GIVEN: | ||||||
|  |             - A celery task completed with an exception | ||||||
|  |         WHEN: | ||||||
|  |             - API call is made to get tasks | ||||||
|  |         THEN: | ||||||
|  |             - The returned result is the exception info | ||||||
|  |         """ | ||||||
|  |         self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME) | ||||||
|  |  | ||||||
|  |         task = PaperlessTask.objects.get() | ||||||
|  |         self.assertIsNotNone(task) | ||||||
|  |         self.assertEqual(self.HEADERS_CONSUME["id"], task.task_id) | ||||||
|  |         self.assertListEqual(["/consume/hello-999.pdf"], task.task_args) | ||||||
|  |         self.assertDictEqual({"override_tag_ids": None}, task.task_kwargs) | ||||||
|  |         self.assertEqual("hello-999.pdf", task.task_file_name) | ||||||
|  |         self.assertEqual("documents.tasks.consume_file", task.task_name) | ||||||
|  |         self.assertEqual(celery.states.PENDING, task.status) | ||||||
|  |  | ||||||
|  |     def test_before_task_publish_handler_webui(self): | ||||||
|  |  | ||||||
|  |         self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_WEB_UI) | ||||||
|  |  | ||||||
|  |         task = PaperlessTask.objects.get() | ||||||
|  |  | ||||||
|  |         self.assertIsNotNone(task) | ||||||
|  |  | ||||||
|  |         self.assertEqual(self.HEADERS_WEB_UI["id"], task.task_id) | ||||||
|  |         self.assertListEqual( | ||||||
|  |             ["/tmp/paperless/paperless-upload-st9lmbvx"], | ||||||
|  |             task.task_args, | ||||||
|  |         ) | ||||||
|  |         self.assertDictEqual( | ||||||
|  |             { | ||||||
|  |                 "override_filename": "statement.pdf", | ||||||
|  |                 "override_title": None, | ||||||
|  |                 "override_correspondent_id": None, | ||||||
|  |                 "override_document_type_id": None, | ||||||
|  |                 "override_tag_ids": None, | ||||||
|  |                 "task_id": "f5622ca9-3707-4ed0-b418-9680b912572f", | ||||||
|  |                 "override_created": None, | ||||||
|  |             }, | ||||||
|  |             task.task_kwargs, | ||||||
|  |         ) | ||||||
|  |         self.assertEqual("statement.pdf", task.task_file_name) | ||||||
|  |         self.assertEqual("documents.tasks.consume_file", task.task_name) | ||||||
|  |         self.assertEqual(celery.states.PENDING, task.status) | ||||||
|  |  | ||||||
|  |     def test_task_prerun_handler(self): | ||||||
|  |         self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME) | ||||||
|  |  | ||||||
|  |         task_prerun_handler(task_id=self.HEADERS_CONSUME["id"]) | ||||||
|  |  | ||||||
|  |         task = PaperlessTask.objects.get() | ||||||
|  |  | ||||||
|  |         self.assertEqual(celery.states.STARTED, task.status) | ||||||
|  |  | ||||||
|  |     def test_task_postrun_handler(self): | ||||||
|  |         self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME) | ||||||
|  |  | ||||||
|  |         task_postrun_handler( | ||||||
|  |             task_id=self.HEADERS_CONSUME["id"], | ||||||
|  |             retval="Success. New document id 1 created", | ||||||
|  |             state=celery.states.SUCCESS, | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |         task = PaperlessTask.objects.get() | ||||||
|  |  | ||||||
|  |         self.assertEqual(celery.states.SUCCESS, task.status) | ||||||
| @@ -886,9 +886,8 @@ class TasksViewSet(ReadOnlyModelViewSet): | |||||||
|     queryset = ( |     queryset = ( | ||||||
|         PaperlessTask.objects.filter( |         PaperlessTask.objects.filter( | ||||||
|             acknowledged=False, |             acknowledged=False, | ||||||
|             attempted_task__isnull=False, |  | ||||||
|         ) |         ) | ||||||
|         .order_by("attempted_task__date_created") |         .order_by("date_created") | ||||||
|         .reverse() |         .reverse() | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Trenton H
					Trenton H