Chore: refactor workflows code (#11563)

This commit is contained in:
shamoon
2025-12-11 12:13:10 -08:00
committed by GitHub
parent c845cf0a19
commit 66d363bdc5
8 changed files with 890 additions and 707 deletions

View File

@@ -1,14 +1,10 @@
from __future__ import annotations
import ipaddress
import logging
import shutil
import socket
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import httpx
from celery import shared_task
from celery import states
from celery.signals import before_task_publish
@@ -27,20 +23,15 @@ from django.db.models import Q
from django.dispatch import receiver
from django.utils import timezone
from filelock import FileLock
from guardian.shortcuts import remove_perm
from documents import matching
from documents.caching import clear_document_caches
from documents.file_handling import create_source_path_directory
from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_unique_filename
from documents.mail import EmailAttachment
from documents.mail import send_email
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import PaperlessTask
from documents.models import SavedView
@@ -51,8 +42,14 @@ from documents.models import WorkflowAction
from documents.models import WorkflowRun
from documents.models import WorkflowTrigger
from documents.permissions import get_objects_for_user_owner_aware
from documents.permissions import set_permissions_for_object
from documents.templating.workflows import parse_w_workflow_placeholders
from documents.workflows.actions import build_workflow_action_context
from documents.workflows.actions import execute_email_action
from documents.workflows.actions import execute_webhook_action
from documents.workflows.mutations import apply_assignment_to_document
from documents.workflows.mutations import apply_assignment_to_overrides
from documents.workflows.mutations import apply_removal_to_document
from documents.workflows.mutations import apply_removal_to_overrides
from documents.workflows.utils import get_workflows_for_trigger
if TYPE_CHECKING:
from documents.classifier import DocumentClassifier
@@ -673,92 +670,6 @@ def run_workflows_updated(sender, document: Document, logging_group=None, **kwar
)
def _is_public_ip(ip: str) -> bool:
try:
obj = ipaddress.ip_address(ip)
return not (
obj.is_private
or obj.is_loopback
or obj.is_link_local
or obj.is_multicast
or obj.is_unspecified
)
except ValueError: # pragma: no cover
return False
def _resolve_first_ip(host: str) -> str | None:
try:
info = socket.getaddrinfo(host, None)
return info[0][4][0] if info else None
except Exception: # pragma: no cover
return None
@shared_task(
retry_backoff=True,
autoretry_for=(httpx.HTTPStatusError,),
max_retries=3,
throws=(httpx.HTTPError,),
)
def send_webhook(
url: str,
data: str | dict,
headers: dict,
files: dict,
*,
as_json: bool = False,
):
p = urlparse(url)
if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname:
logger.warning("Webhook blocked: invalid scheme/hostname")
raise ValueError("Invalid URL scheme or hostname.")
port = p.port or (443 if p.scheme == "https" else 80)
if (
len(settings.WEBHOOKS_ALLOWED_PORTS) > 0
and port not in settings.WEBHOOKS_ALLOWED_PORTS
):
logger.warning("Webhook blocked: port not permitted")
raise ValueError("Destination port not permitted.")
ip = _resolve_first_ip(p.hostname)
if not ip or (
not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS
):
logger.warning("Webhook blocked: destination not allowed")
raise ValueError("Destination host is not allowed.")
try:
post_args = {
"url": url,
"headers": {
k: v for k, v in (headers or {}).items() if k.lower() != "host"
},
"files": files or None,
"timeout": 5.0,
"follow_redirects": False,
}
if as_json:
post_args["json"] = data
elif isinstance(data, dict):
post_args["data"] = data
else:
post_args["content"] = data
httpx.post(
**post_args,
).raise_for_status()
logger.info(
f"Webhook sent to {url}",
)
except Exception as e:
logger.error(
f"Failed attempt sending webhook to {url}: {e}",
)
raise e
def run_workflows(
trigger_type: WorkflowTrigger.WorkflowTriggerType,
document: Document | ConsumableDocument,
@@ -767,572 +678,16 @@ def run_workflows(
overrides: DocumentMetadataOverrides | None = None,
original_file: Path | None = None,
) -> tuple[DocumentMetadataOverrides, str] | None:
"""Run workflows which match a Document (or ConsumableDocument) for a specific trigger type or a single workflow if given.
Assignment or removal actions are either applied directly to the document or an overrides object. If an overrides
object is provided, the function returns the object with the applied changes or None if no actions were applied and a string
of messages for each action. If no overrides object is provided, the changes are applied directly to the document and the
function returns None.
"""
Execute workflows matching a document for the given trigger. When `overrides` is provided
(consumption flow), actions mutate that object and the function returns `(overrides, messages)`.
Otherwise actions mutate the actual document and return nothing.
def assignment_action():
if action.assign_tags.exists():
tag_ids_to_add: set[int] = set()
for tag in action.assign_tags.all():
tag_ids_to_add.add(tag.pk)
tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks())
Attachments for email/webhook actions use `original_file` when given, otherwise fall back to
`document.source_path` (Document) or `document.original_file` (ConsumableDocument).
if not use_overrides:
doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add)
else:
if overrides.tag_ids is None:
overrides.tag_ids = []
overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add)
if action.assign_correspondent:
if not use_overrides:
document.correspondent = action.assign_correspondent
else:
overrides.correspondent_id = action.assign_correspondent.pk
if action.assign_document_type:
if not use_overrides:
document.document_type = action.assign_document_type
else:
overrides.document_type_id = action.assign_document_type.pk
if action.assign_storage_path:
if not use_overrides:
document.storage_path = action.assign_storage_path
else:
overrides.storage_path_id = action.assign_storage_path.pk
if action.assign_owner:
if not use_overrides:
document.owner = action.assign_owner
else:
overrides.owner_id = action.assign_owner.pk
if action.assign_title:
if not use_overrides:
try:
document.title = parse_w_workflow_placeholders(
action.assign_title,
document.correspondent.name if document.correspondent else "",
document.document_type.name if document.document_type else "",
document.owner.username if document.owner else "",
timezone.localtime(document.added),
document.original_filename or "",
document.filename or "",
document.created,
)
except Exception:
logger.exception(
f"Error occurred parsing title assignment '{action.assign_title}', falling back to original",
extra={"group": logging_group},
)
else:
overrides.title = action.assign_title
if any(
[
action.assign_view_users.exists(),
action.assign_view_groups.exists(),
action.assign_change_users.exists(),
action.assign_change_groups.exists(),
],
):
permissions = {
"view": {
"users": action.assign_view_users.values_list("id", flat=True),
"groups": action.assign_view_groups.values_list("id", flat=True),
},
"change": {
"users": action.assign_change_users.values_list("id", flat=True),
"groups": action.assign_change_groups.values_list("id", flat=True),
},
}
if not use_overrides:
set_permissions_for_object(
permissions=permissions,
object=document,
merge=True,
)
else:
overrides.view_users = list(
set(
(overrides.view_users or [])
+ list(permissions["view"]["users"]),
),
)
overrides.view_groups = list(
set(
(overrides.view_groups or [])
+ list(permissions["view"]["groups"]),
),
)
overrides.change_users = list(
set(
(overrides.change_users or [])
+ list(permissions["change"]["users"]),
),
)
overrides.change_groups = list(
set(
(overrides.change_groups or [])
+ list(permissions["change"]["groups"]),
),
)
if action.assign_custom_fields.exists():
if not use_overrides:
for field in action.assign_custom_fields.all():
value_field_name = CustomFieldInstance.get_value_field_name(
data_type=field.data_type,
)
args = {
value_field_name: action.assign_custom_fields_values.get(
str(field.pk),
None,
),
}
# for some reason update_or_create doesn't work here
instance = CustomFieldInstance.objects.filter(
field=field,
document=document,
).first()
if instance and args[value_field_name] is not None:
setattr(instance, value_field_name, args[value_field_name])
instance.save()
elif not instance:
CustomFieldInstance.objects.create(
**args,
field=field,
document=document,
)
else:
if overrides.custom_fields is None:
overrides.custom_fields = {}
overrides.custom_fields.update(
{
field.pk: action.assign_custom_fields_values.get(
str(field.pk),
None,
)
for field in action.assign_custom_fields.all()
},
)
def removal_action():
if action.remove_all_tags:
if not use_overrides:
doc_tag_ids.clear()
else:
overrides.tag_ids = None
else:
tag_ids_to_remove: set[int] = set()
for tag in action.remove_tags.all():
tag_ids_to_remove.add(tag.pk)
tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks())
if not use_overrides:
doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove]
elif overrides.tag_ids:
overrides.tag_ids = [
t for t in overrides.tag_ids if t not in tag_ids_to_remove
]
if not use_overrides and (
action.remove_all_correspondents
or (
document.correspondent
and action.remove_correspondents.filter(
pk=document.correspondent.pk,
).exists()
)
):
document.correspondent = None
elif use_overrides and (
action.remove_all_correspondents
or (
overrides.correspondent_id
and action.remove_correspondents.filter(
pk=overrides.correspondent_id,
).exists()
)
):
overrides.correspondent_id = None
if not use_overrides and (
action.remove_all_document_types
or (
document.document_type
and action.remove_document_types.filter(
pk=document.document_type.pk,
).exists()
)
):
document.document_type = None
elif use_overrides and (
action.remove_all_document_types
or (
overrides.document_type_id
and action.remove_document_types.filter(
pk=overrides.document_type_id,
).exists()
)
):
overrides.document_type_id = None
if not use_overrides and (
action.remove_all_storage_paths
or (
document.storage_path
and action.remove_storage_paths.filter(
pk=document.storage_path.pk,
).exists()
)
):
document.storage_path = None
elif use_overrides and (
action.remove_all_storage_paths
or (
overrides.storage_path_id
and action.remove_storage_paths.filter(
pk=overrides.storage_path_id,
).exists()
)
):
overrides.storage_path_id = None
if not use_overrides and (
action.remove_all_owners
or (
document.owner
and action.remove_owners.filter(pk=document.owner.pk).exists()
)
):
document.owner = None
elif use_overrides and (
action.remove_all_owners
or (
overrides.owner_id
and action.remove_owners.filter(pk=overrides.owner_id).exists()
)
):
overrides.owner_id = None
if action.remove_all_permissions:
if not use_overrides:
permissions = {
"view": {"users": [], "groups": []},
"change": {"users": [], "groups": []},
}
set_permissions_for_object(
permissions=permissions,
object=document,
merge=False,
)
else:
overrides.view_users = None
overrides.view_groups = None
overrides.change_users = None
overrides.change_groups = None
elif any(
[
action.remove_view_users.exists(),
action.remove_view_groups.exists(),
action.remove_change_users.exists(),
action.remove_change_groups.exists(),
],
):
if not use_overrides:
for user in action.remove_view_users.all():
remove_perm("view_document", user, document)
for user in action.remove_change_users.all():
remove_perm("change_document", user, document)
for group in action.remove_view_groups.all():
remove_perm("view_document", group, document)
for group in action.remove_change_groups.all():
remove_perm("change_document", group, document)
else:
if overrides.view_users:
for user in action.remove_view_users.filter(
pk__in=overrides.view_users,
):
overrides.view_users.remove(user.pk)
if overrides.change_users:
for user in action.remove_change_users.filter(
pk__in=overrides.change_users,
):
overrides.change_users.remove(user.pk)
if overrides.view_groups:
for group in action.remove_view_groups.filter(
pk__in=overrides.view_groups,
):
overrides.view_groups.remove(group.pk)
if overrides.change_groups:
for group in action.remove_change_groups.filter(
pk__in=overrides.change_groups,
):
overrides.change_groups.remove(group.pk)
if action.remove_all_custom_fields:
if not use_overrides:
CustomFieldInstance.objects.filter(document=document).hard_delete()
else:
overrides.custom_fields = None
elif action.remove_custom_fields.exists():
if not use_overrides:
CustomFieldInstance.objects.filter(
field__in=action.remove_custom_fields.all(),
document=document,
).hard_delete()
elif overrides.custom_fields:
for field in action.remove_custom_fields.filter(
pk__in=overrides.custom_fields.keys(),
):
overrides.custom_fields.pop(field.pk, None)
def email_action():
if not settings.EMAIL_ENABLED:
logger.error(
"Email backend has not been configured, cannot send email notifications",
extra={"group": logging_group},
)
return
if not use_overrides:
title = document.title
doc_url = (
f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/"
)
correspondent = (
document.correspondent.name if document.correspondent else ""
)
document_type = (
document.document_type.name if document.document_type else ""
)
owner_username = document.owner.username if document.owner else ""
filename = document.original_filename or ""
current_filename = document.filename or ""
added = timezone.localtime(document.added)
created = document.created
else:
title = overrides.title if overrides.title else str(document.original_file)
doc_url = ""
correspondent = (
Correspondent.objects.filter(pk=overrides.correspondent_id).first()
if overrides.correspondent_id
else ""
)
document_type = (
DocumentType.objects.filter(pk=overrides.document_type_id).first().name
if overrides.document_type_id
else ""
)
owner_username = (
User.objects.filter(pk=overrides.owner_id).first().username
if overrides.owner_id
else ""
)
filename = document.original_file if document.original_file else ""
current_filename = filename
added = timezone.localtime(timezone.now())
created = overrides.created
subject = (
parse_w_workflow_placeholders(
action.email.subject,
correspondent,
document_type,
owner_username,
added,
filename,
current_filename,
created,
title,
doc_url,
)
if action.email.subject
else ""
)
body = (
parse_w_workflow_placeholders(
action.email.body,
correspondent,
document_type,
owner_username,
added,
filename,
current_filename,
created,
title,
doc_url,
)
if action.email.body
else ""
)
try:
attachments: list[EmailAttachment] = []
if action.email.include_document:
attachment: EmailAttachment | None = None
if trigger_type in [
WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED,
WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
] and isinstance(document, Document):
friendly_name = (
Path(current_filename).name
if current_filename
else document.source_path.name
)
attachment = EmailAttachment(
path=document.source_path,
mime_type=document.mime_type,
friendly_name=friendly_name,
)
elif original_file:
friendly_name = (
Path(current_filename).name
if current_filename
else original_file.name
)
attachment = EmailAttachment(
path=original_file,
mime_type=document.mime_type,
friendly_name=friendly_name,
)
if attachment:
attachments = [attachment]
n_messages = send_email(
subject=subject,
body=body,
to=action.email.to.split(","),
attachments=attachments,
)
logger.debug(
f"Sent {n_messages} notification email(s) to {action.email.to}",
extra={"group": logging_group},
)
except Exception as e:
logger.exception(
f"Error occurred sending notification email: {e}",
extra={"group": logging_group},
)
def webhook_action():
if not use_overrides:
title = document.title
doc_url = (
f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/"
)
correspondent = (
document.correspondent.name if document.correspondent else ""
)
document_type = (
document.document_type.name if document.document_type else ""
)
owner_username = document.owner.username if document.owner else ""
filename = document.original_filename or ""
current_filename = document.filename or ""
added = timezone.localtime(document.added)
created = document.created
else:
title = overrides.title if overrides.title else str(document.original_file)
doc_url = ""
correspondent = (
Correspondent.objects.filter(pk=overrides.correspondent_id).first()
if overrides.correspondent_id
else ""
)
document_type = (
DocumentType.objects.filter(pk=overrides.document_type_id).first().name
if overrides.document_type_id
else ""
)
owner_username = (
User.objects.filter(pk=overrides.owner_id).first().username
if overrides.owner_id
else ""
)
filename = document.original_file if document.original_file else ""
current_filename = filename
added = timezone.localtime(timezone.now())
created = overrides.created
try:
data = {}
if action.webhook.use_params:
if action.webhook.params:
try:
for key, value in action.webhook.params.items():
data[key] = parse_w_workflow_placeholders(
value,
correspondent,
document_type,
owner_username,
added,
filename,
current_filename,
created,
title,
doc_url,
)
except Exception as e:
logger.error(
f"Error occurred parsing webhook params: {e}",
extra={"group": logging_group},
)
elif action.webhook.body:
data = parse_w_workflow_placeholders(
action.webhook.body,
correspondent,
document_type,
owner_username,
added,
filename,
current_filename,
created,
title,
doc_url,
)
headers = {}
if action.webhook.headers:
try:
headers = {
str(k): str(v) for k, v in action.webhook.headers.items()
}
except Exception as e:
logger.error(
f"Error occurred parsing webhook headers: {e}",
extra={"group": logging_group},
)
files = None
if action.webhook.include_document:
with original_file.open("rb") as f:
files = {
"file": (
filename,
f.read(),
document.mime_type,
),
}
send_webhook.delay(
url=action.webhook.url,
data=data,
headers=headers,
files=files,
as_json=action.webhook.as_json,
)
logger.debug(
f"Webhook to {action.webhook.url} queued",
extra={"group": logging_group},
)
except Exception as e:
logger.exception(
f"Error occurred sending webhook: {e}",
extra={"group": logging_group},
)
Passing `workflow_to_run` skips the workflow query (currently only used by scheduled runs).
"""
use_overrides = overrides is not None
if original_file is None:
@@ -1341,30 +696,7 @@ def run_workflows(
)
messages = []
workflows = (
(
Workflow.objects.filter(enabled=True, triggers__type=trigger_type)
.prefetch_related(
"actions",
"actions__assign_view_users",
"actions__assign_view_groups",
"actions__assign_change_users",
"actions__assign_change_groups",
"actions__assign_custom_fields",
"actions__remove_tags",
"actions__remove_correspondents",
"actions__remove_document_types",
"actions__remove_storage_paths",
"actions__remove_custom_fields",
"actions__remove_owners",
"triggers",
)
.order_by("order")
.distinct()
)
if workflow_to_run is None
else [workflow_to_run]
)
workflows = get_workflows_for_trigger(trigger_type, workflow_to_run)
for workflow in workflows:
if not use_overrides:
@@ -1384,13 +716,39 @@ def run_workflows(
messages.append(message)
if action.type == WorkflowAction.WorkflowActionType.ASSIGNMENT:
assignment_action()
if use_overrides and overrides:
apply_assignment_to_overrides(action, overrides)
else:
apply_assignment_to_document(
action,
document,
doc_tag_ids,
logging_group,
)
elif action.type == WorkflowAction.WorkflowActionType.REMOVAL:
removal_action()
if use_overrides and overrides:
apply_removal_to_overrides(action, overrides)
else:
apply_removal_to_document(action, document, doc_tag_ids)
elif action.type == WorkflowAction.WorkflowActionType.EMAIL:
email_action()
context = build_workflow_action_context(document, overrides)
execute_email_action(
action,
document,
context,
logging_group,
original_file,
trigger_type,
)
elif action.type == WorkflowAction.WorkflowActionType.WEBHOOK:
webhook_action()
context = build_workflow_action_context(document, overrides)
execute_webhook_action(
action,
document,
context,
logging_group,
original_file,
)
if not use_overrides:
# limit title to 128 characters