Switches task serialization over to pickle format

This commit is contained in:
Trenton H
2022-11-22 09:59:59 -08:00
parent 478b85dd1b
commit 5030a5212e
6 changed files with 109 additions and 69 deletions

View File

@@ -1,7 +1,6 @@
import logging
import os
import shutil
from ast import literal_eval
from pathlib import Path
from celery import states
@@ -521,25 +520,24 @@ def add_to_index(sender, document, **kwargs):
def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
"""
Creates the PaperlessTask object in a pending state. This is sent before
the task reaches the broker, but
the task reaches the broker, but before it begins executing on a worker.
https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish
https://docs.celeryq.dev/en/stable/internals/protocol.html#version-2
"""
if "task" not in headers or headers["task"] != "documents.tasks.consume_file":
# Assumption: this is only ever a v2 message
return
try:
task_file_name = ""
if headers["kwargsrepr"] is not None:
task_kwargs = literal_eval(headers["kwargsrepr"])
if "override_filename" in task_kwargs:
task_file_name = task_kwargs["override_filename"]
else:
task_kwargs = None
task_args = body[0]
task_kwargs = body[1]
task_args = literal_eval(headers["argsrepr"])
task_file_name = ""
if "override_filename" in task_kwargs:
task_file_name = task_kwargs["override_filename"]
# Nothing was found, report the task first argument
if not len(task_file_name):
@@ -552,8 +550,6 @@ def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
status=states.PENDING,
task_file_name=task_file_name,
task_name=headers["task"],
task_args=task_args,
task_kwargs=task_kwargs,
result=None,
date_created=timezone.now(),
date_started=None,
@@ -562,7 +558,7 @@ def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs):
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")
logger.error(f"Creating PaperlessTask failed: {e}", exc_info=True)
@task_prerun.connect
@@ -584,7 +580,7 @@ def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs):
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Setting PaperlessTask started failed: {e}")
logger.error(f"Setting PaperlessTask started failed: {e}", exc_info=True)
@task_postrun.connect
@@ -607,4 +603,4 @@ def task_postrun_handler(
except Exception as e: # pragma: no cover
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Updating PaperlessTask failed: {e}")
logger.error(f"Updating PaperlessTask failed: {e}", exc_info=True)