Switches task serialization over to pickle format

This commit is contained in:
Trenton H 2022-11-22 09:59:59 -08:00
parent b0625cdced
commit 97d6503fef
6 changed files with 109 additions and 69 deletions

View File

@ -0,0 +1,21 @@
# Generated by Django 4.1.3 on 2022-11-22 17:50
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("documents", "1027_remove_paperlesstask_attempted_task_and_more"),
]
operations = [
migrations.RemoveField(
model_name="paperlesstask",
name="task_args",
),
migrations.RemoveField(
model_name="paperlesstask",
name="task_kwargs",
),
]

View File

@ -560,20 +560,6 @@ class PaperlessTask(models.Model):
help_text=_("Name of the Task which was run"),
)
task_args = models.JSONField(
null=True,
verbose_name=_("Task Positional Arguments"),
help_text=_(
"JSON representation of the positional arguments used with the task",
),
)
task_kwargs = models.JSONField(
null=True,
verbose_name=_("Task Named Arguments"),
help_text=_(
"JSON representation of the named arguments used with the task",
),
)
status = models.CharField(
max_length=30,
default=states.PENDING,

View File

@ -1,7 +1,6 @@
import logging
import os
import shutil
from ast import literal_eval
from pathlib import Path
from celery import states
@ -521,25 +520,24 @@ def add_to_index(sender, document, **kwargs):
def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
"""
Creates the PaperlessTask object in a pending state. This is sent before
the task reaches the broker, but
the task reaches the broker, but before it begins executing on a worker.
https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish
https://docs.celeryq.dev/en/stable/internals/protocol.html#version-2
"""
if "task" not in headers or headers["task"] != "documents.tasks.consume_file":
# Assumption: this is only ever a v2 message
return
try:
task_file_name = ""
if headers["kwargsrepr"] is not None:
task_kwargs = literal_eval(headers["kwargsrepr"])
if "override_filename" in task_kwargs:
task_file_name = task_kwargs["override_filename"]
else:
task_kwargs = None
task_args = body[0]
task_kwargs = body[1]
task_args = literal_eval(headers["argsrepr"])
task_file_name = ""
if "override_filename" in task_kwargs:
task_file_name = task_kwargs["override_filename"]
# Nothing was found, report the task first argument
if not len(task_file_name):
@ -552,8 +550,6 @@ def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
status=states.PENDING,
task_file_name=task_file_name,
task_name=headers["task"],
task_args=task_args,
task_kwargs=task_kwargs,
result=None,
date_created=timezone.now(),
date_started=None,
@ -562,7 +558,7 @@ def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")
logger.error(f"Creating PaperlessTask failed: {e}", exc_info=True)
@task_prerun.connect
@ -584,7 +580,7 @@ def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Setting PaperlessTask started failed: {e}")
logger.error(f"Setting PaperlessTask started failed: {e}", exc_info=True)
@task_postrun.connect
@ -607,4 +603,4 @@ def task_postrun_handler(
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Updating PaperlessTask failed: {e}")
logger.error(f"Updating PaperlessTask failed: {e}", exc_info=True)

View File

@ -3043,16 +3043,6 @@ class TestTasks(APITestCase):
task_file_name="test.pdf",
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
task_args=("/tmp/paperless/paperless-upload-5iq7skzc",),
task_kwargs={
"override_filename": "test.pdf",
"override_title": None,
"override_correspondent_id": None,
"override_document_type_id": None,
"override_tag_ids": None,
"task_id": "466e8fe7-7193-4698-9fff-72f0340e2082",
"override_created": None,
},
)
response = self.client.get(self.ENDPOINT)
@ -3079,8 +3069,6 @@ class TestTasks(APITestCase):
task_file_name="anothertest.pdf",
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
task_args=("/consume/anothertest.pdf",),
task_kwargs={"override_tag_ids": None},
)
response = self.client.get(self.ENDPOINT)

View File

@ -28,6 +28,14 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
"ignore_result": False,
}
BODY_CONSUME = (
# args
("/consume/hello-999.pdf",),
# kwargs
{"override_tag_ids": None},
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
)
HEADERS_WEB_UI = {
"lang": "py",
"task": "documents.tasks.consume_file",
@ -47,64 +55,90 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
"ignore_result": False,
}
def util_call_before_task_publish_handler(self, headers_to_use):
BODY_WEB_UI = (
# args
("/tmp/paperless/paperless-upload-st9lmbvx",),
# kwargs
{
"override_filename": "statement.pdf",
"override_title": None,
"override_correspondent_id": None,
"override_document_type_id": None,
"override_tag_ids": None,
"task_id": "f5622ca9-3707-4ed0-b418-9680b912572f",
"override_created": None,
},
{"callbacks": None, "errbacks": None, "chain": None, "chord": None},
)
def util_call_before_task_publish_handler(self, headers_to_use, body_to_use):
"""
Simple utility to call the pre-run handle and ensure it created a single task
instance
"""
self.assertEqual(PaperlessTask.objects.all().count(), 0)
before_task_publish_handler(headers=headers_to_use)
before_task_publish_handler(headers=headers_to_use, body=body_to_use)
self.assertEqual(PaperlessTask.objects.all().count(), 1)
def test_before_task_publish_handler_consume(self):
"""
GIVEN:
- A celery task completed with an exception
- A celery task is started via the consume folder
WHEN:
- API call is made to get tasks
- Task before publish handler is called
THEN:
- The returned result is the exception info
- The task is created and marked as pending
"""
self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
self.util_call_before_task_publish_handler(
headers_to_use=self.HEADERS_CONSUME,
body_to_use=self.BODY_CONSUME,
)
task = PaperlessTask.objects.get()
self.assertIsNotNone(task)
self.assertEqual(self.HEADERS_CONSUME["id"], task.task_id)
self.assertListEqual(["/consume/hello-999.pdf"], task.task_args)
self.assertDictEqual({"override_tag_ids": None}, task.task_kwargs)
self.assertEqual("hello-999.pdf", task.task_file_name)
self.assertEqual("documents.tasks.consume_file", task.task_name)
self.assertEqual(celery.states.PENDING, task.status)
def test_before_task_publish_handler_webui(self):
self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_WEB_UI)
"""
GIVEN:
- A celery task is started via the web ui
WHEN:
- Task before publish handler is called
THEN:
- The task is created and marked as pending
"""
self.util_call_before_task_publish_handler(
headers_to_use=self.HEADERS_WEB_UI,
body_to_use=self.BODY_WEB_UI,
)
task = PaperlessTask.objects.get()
self.assertIsNotNone(task)
self.assertEqual(self.HEADERS_WEB_UI["id"], task.task_id)
self.assertListEqual(
["/tmp/paperless/paperless-upload-st9lmbvx"],
task.task_args,
)
self.assertDictEqual(
{
"override_filename": "statement.pdf",
"override_title": None,
"override_correspondent_id": None,
"override_document_type_id": None,
"override_tag_ids": None,
"task_id": "f5622ca9-3707-4ed0-b418-9680b912572f",
"override_created": None,
},
task.task_kwargs,
)
self.assertEqual("statement.pdf", task.task_file_name)
self.assertEqual("documents.tasks.consume_file", task.task_name)
self.assertEqual(celery.states.PENDING, task.status)
def test_task_prerun_handler(self):
self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
"""
GIVEN:
- A celery task is started via the consume folder
WHEN:
- Task starts execution
THEN:
- The task is marked as started
"""
self.util_call_before_task_publish_handler(
headers_to_use=self.HEADERS_CONSUME,
body_to_use=self.BODY_CONSUME,
)
task_prerun_handler(task_id=self.HEADERS_CONSUME["id"])
@ -113,7 +147,18 @@ class TestTaskSignalHandler(DirectoriesMixin, TestCase):
self.assertEqual(celery.states.STARTED, task.status)
def test_task_postrun_handler(self):
self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME)
"""
GIVEN:
- A celery task is started via the consume folder
WHEN:
- Task finished execution
THEN:
- The task is marked as started
"""
self.util_call_before_task_publish_handler(
headers_to_use=self.HEADERS_CONSUME,
body_to_use=self.BODY_CONSUME,
)
task_postrun_handler(
task_id=self.HEADERS_CONSUME["id"],

View File

@ -526,6 +526,10 @@ CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "default"
# This allows types to stay types through a .delay
CELERY_TASK_SERIALIZER = "pickle"
CELERY_ACCEPT_CONTENT = ["application/x-python-serialize"]
CELERY_BEAT_SCHEDULE = {
# Every ten minutes
"Check all e-mail accounts": {