From c4b53d3006e7ed1cecaef7cf5651e55db7eb4f65 Mon Sep 17 00:00:00 2001 From: shamoon <4887959+shamoon@users.noreply.github.com> Date: Sun, 1 Dec 2024 17:33:15 -0800 Subject: [PATCH] Try use celery task with retry for webhook --- src/documents/signals/handlers.py | 40 ++++++++++++++++++++++++--- src/documents/tests/test_workflows.py | 11 ++++---- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index 80b6fff72..bec0e4d89 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -3,7 +3,9 @@ import os import shutil import httpx +from celery import shared_task from celery import states +from celery.exceptions import MaxRetriesExceededError from celery.signals import before_task_publish from celery.signals import task_failure from celery.signals import task_postrun @@ -559,6 +561,32 @@ def run_workflows_updated(sender, document: Document, logging_group=None, **kwar ) +@shared_task( + retry_backoff=True, +) +def send_webhook(url, data, headers, files): + try: + httpx.post( + url, + data=data, + files=files, + headers=headers, + ).raise_for_status() + logger.info( + f"Webhook sent to {url}", + ) + except httpx.HTTPStatusError as e: + logger.error( + f"Failed sending webhook to {url}: {e}", + ) + try: + send_webhook.retry(exc=e) + except MaxRetriesExceededError: + logger.error( + f"Max retries exceeded for webhook to {url}", + ) + + def run_workflows( trigger_type: WorkflowTrigger.WorkflowTriggerType, document: Document | ConsumableDocument, @@ -997,12 +1025,16 @@ def run_workflows( files = { "file": (document.original_filename, f, document.mime_type), } - httpx.post( - action.webhook.url, + send_webhook.delay( + url=action.webhook.url, data=data, - files=files, headers=headers, - ).raise_for_status() + files=files, + ) + logger.debug( + f"Webhook to {action.webhook.url} queued", + extra={"group": logging_group}, + ) except Exception as e: logger.exception( f"Error occurred sending webhook: {e}", diff --git a/src/documents/tests/test_workflows.py b/src/documents/tests/test_workflows.py index d0fb8e54c..511b30a53 100644 --- a/src/documents/tests/test_workflows.py +++ b/src/documents/tests/test_workflows.py @@ -2293,7 +2293,7 @@ class TestWorkflows( EMAIL_ENABLED=True, PAPERLESS_URL="http://localhost:8000", ) - @mock.patch("httpx.post") + @mock.patch("documents.signals.handlers.send_webhook.delay") def test_workflow_webhook_action_body(self, mock_post): """ GIVEN: @@ -2342,9 +2342,10 @@ class TestWorkflows( run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc) mock_post.assert_called_once_with( - "http://paperless-ngx.com", + url="http://paperless-ngx.com", data=f"Test message: http://localhost:8000/documents/{doc.id}/", headers={}, + files=None, ) @override_settings( @@ -2352,7 +2353,7 @@ class TestWorkflows( EMAIL_ENABLED=True, PAPERLESS_URL="http://localhost:8000", ) - @mock.patch("httpx.post") + @mock.patch("documents.signals.handlers.send_webhook.delay") def test_workflow_webhook_action_w_files(self, mock_post): """ GIVEN: @@ -2404,10 +2405,10 @@ class TestWorkflows( run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc) mock_post.assert_called_once_with( - "http://paperless-ngx.com", + url="http://paperless-ngx.com", data=f"Test message: http://localhost:8000/documents/{doc.id}/", - files={"file": ("simple.pdf", mock.ANY, "application/pdf")}, headers={}, + files={"file": ("simple.pdf", mock.ANY, "application/pdf")}, ) @override_settings(