mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-05-01 11:19:32 -05:00
Try use celery task with retry for webhook
This commit is contained in:
parent
3aac710a26
commit
c4b53d3006
@ -3,7 +3,9 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
from celery import shared_task
|
||||||
from celery import states
|
from celery import states
|
||||||
|
from celery.exceptions import MaxRetriesExceededError
|
||||||
from celery.signals import before_task_publish
|
from celery.signals import before_task_publish
|
||||||
from celery.signals import task_failure
|
from celery.signals import task_failure
|
||||||
from celery.signals import task_postrun
|
from celery.signals import task_postrun
|
||||||
@ -559,6 +561,32 @@ def run_workflows_updated(sender, document: Document, logging_group=None, **kwar
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(
|
||||||
|
retry_backoff=True,
|
||||||
|
)
|
||||||
|
def send_webhook(url, data, headers, files):
|
||||||
|
try:
|
||||||
|
httpx.post(
|
||||||
|
url,
|
||||||
|
data=data,
|
||||||
|
files=files,
|
||||||
|
headers=headers,
|
||||||
|
).raise_for_status()
|
||||||
|
logger.info(
|
||||||
|
f"Webhook sent to {url}",
|
||||||
|
)
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
logger.error(
|
||||||
|
f"Failed sending webhook to {url}: {e}",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
send_webhook.retry(exc=e)
|
||||||
|
except MaxRetriesExceededError:
|
||||||
|
logger.error(
|
||||||
|
f"Max retries exceeded for webhook to {url}",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def run_workflows(
|
def run_workflows(
|
||||||
trigger_type: WorkflowTrigger.WorkflowTriggerType,
|
trigger_type: WorkflowTrigger.WorkflowTriggerType,
|
||||||
document: Document | ConsumableDocument,
|
document: Document | ConsumableDocument,
|
||||||
@ -997,12 +1025,16 @@ def run_workflows(
|
|||||||
files = {
|
files = {
|
||||||
"file": (document.original_filename, f, document.mime_type),
|
"file": (document.original_filename, f, document.mime_type),
|
||||||
}
|
}
|
||||||
httpx.post(
|
send_webhook.delay(
|
||||||
action.webhook.url,
|
url=action.webhook.url,
|
||||||
data=data,
|
data=data,
|
||||||
files=files,
|
|
||||||
headers=headers,
|
headers=headers,
|
||||||
).raise_for_status()
|
files=files,
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"Webhook to {action.webhook.url} queued",
|
||||||
|
extra={"group": logging_group},
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
f"Error occurred sending webhook: {e}",
|
f"Error occurred sending webhook: {e}",
|
||||||
|
@ -2293,7 +2293,7 @@ class TestWorkflows(
|
|||||||
EMAIL_ENABLED=True,
|
EMAIL_ENABLED=True,
|
||||||
PAPERLESS_URL="http://localhost:8000",
|
PAPERLESS_URL="http://localhost:8000",
|
||||||
)
|
)
|
||||||
@mock.patch("httpx.post")
|
@mock.patch("documents.signals.handlers.send_webhook.delay")
|
||||||
def test_workflow_webhook_action_body(self, mock_post):
|
def test_workflow_webhook_action_body(self, mock_post):
|
||||||
"""
|
"""
|
||||||
GIVEN:
|
GIVEN:
|
||||||
@ -2342,9 +2342,10 @@ class TestWorkflows(
|
|||||||
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
|
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
|
||||||
|
|
||||||
mock_post.assert_called_once_with(
|
mock_post.assert_called_once_with(
|
||||||
"http://paperless-ngx.com",
|
url="http://paperless-ngx.com",
|
||||||
data=f"Test message: http://localhost:8000/documents/{doc.id}/",
|
data=f"Test message: http://localhost:8000/documents/{doc.id}/",
|
||||||
headers={},
|
headers={},
|
||||||
|
files=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
@ -2352,7 +2353,7 @@ class TestWorkflows(
|
|||||||
EMAIL_ENABLED=True,
|
EMAIL_ENABLED=True,
|
||||||
PAPERLESS_URL="http://localhost:8000",
|
PAPERLESS_URL="http://localhost:8000",
|
||||||
)
|
)
|
||||||
@mock.patch("httpx.post")
|
@mock.patch("documents.signals.handlers.send_webhook.delay")
|
||||||
def test_workflow_webhook_action_w_files(self, mock_post):
|
def test_workflow_webhook_action_w_files(self, mock_post):
|
||||||
"""
|
"""
|
||||||
GIVEN:
|
GIVEN:
|
||||||
@ -2404,10 +2405,10 @@ class TestWorkflows(
|
|||||||
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
|
run_workflows(WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, doc)
|
||||||
|
|
||||||
mock_post.assert_called_once_with(
|
mock_post.assert_called_once_with(
|
||||||
"http://paperless-ngx.com",
|
url="http://paperless-ngx.com",
|
||||||
data=f"Test message: http://localhost:8000/documents/{doc.id}/",
|
data=f"Test message: http://localhost:8000/documents/{doc.id}/",
|
||||||
files={"file": ("simple.pdf", mock.ANY, "application/pdf")},
|
|
||||||
headers={},
|
headers={},
|
||||||
|
files={"file": ("simple.pdf", mock.ANY, "application/pdf")},
|
||||||
)
|
)
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user