Messing with scheduled trigger type

This commit is contained in:
shamoon
2024-10-23 10:17:19 -07:00
parent b3b0e95d2d
commit ae9bf6d286
13 changed files with 306 additions and 4 deletions

View File

@@ -409,6 +409,7 @@ def document_matches_workflow(
elif (
trigger_type == WorkflowTrigger.WorkflowTriggerType.DOCUMENT_ADDED
or trigger_type == WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED
or trigger_type == WorkflowTrigger.WorkflowTriggerType.SCHEDULED
):
trigger_matched, reason = existing_document_matches_workflow(
document,

View File

@@ -0,0 +1,66 @@
# Generated by Django 5.1.1 on 2024-10-23 20:54
import django.db.models.deletion
from django.db import migrations
from django.db import models
class Migration(migrations.Migration):
dependencies = [
("documents", "1055_alter_storagepath_path"),
]
operations = [
migrations.AddField(
model_name="workflowtrigger",
name="schedule_delay",
field=models.CharField(
blank=True,
help_text="The delay before the scheduled trigger is activated.",
max_length=256,
null=True,
verbose_name="schedule delay",
),
),
migrations.AddField(
model_name="workflowtrigger",
name="schedule_delay_custom_field",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="documents.customfield",
verbose_name="schedule delay custom field",
),
),
migrations.AddField(
model_name="workflowtrigger",
name="schedule_delay_field",
field=models.CharField(
choices=[
("added", "Added"),
("created", "Created"),
("modified", "Modified"),
("custom_field", "Custom Field"),
],
default="added",
help_text="The field to use for the delay.",
max_length=20,
verbose_name="schedule delay field",
),
),
migrations.AlterField(
model_name="workflowtrigger",
name="type",
field=models.PositiveIntegerField(
choices=[
(1, "Consumption Started"),
(2, "Document Added"),
(3, "Document Updated"),
(4, "Scheduled"),
],
default=1,
verbose_name="Workflow Trigger Type",
),
),
]

View File

@@ -1016,12 +1016,19 @@ class WorkflowTrigger(models.Model):
CONSUMPTION = 1, _("Consumption Started")
DOCUMENT_ADDED = 2, _("Document Added")
DOCUMENT_UPDATED = 3, _("Document Updated")
SCHEDULED = 4, _("Scheduled")
class DocumentSourceChoices(models.IntegerChoices):
CONSUME_FOLDER = DocumentSource.ConsumeFolder.value, _("Consume Folder")
API_UPLOAD = DocumentSource.ApiUpload.value, _("Api Upload")
MAIL_FETCH = DocumentSource.MailFetch.value, _("Mail Fetch")
class ScheduleDelayField(models.TextChoices):
ADDED = "added", _("Added")
CREATED = "created", _("Created")
MODIFIED = "modified", _("Modified")
CUSTOM_FIELD = "custom_field", _("Custom Field")
type = models.PositiveIntegerField(
_("Workflow Trigger Type"),
choices=WorkflowTriggerType.choices,
@@ -1098,6 +1105,34 @@ class WorkflowTrigger(models.Model):
verbose_name=_("has this correspondent"),
)
schedule_delay = models.CharField(
_("schedule delay"),
max_length=256,
null=True,
blank=True,
help_text=_(
"The delay before the scheduled trigger is activated.",
),
)
schedule_delay_field = models.CharField(
_("schedule delay field"),
max_length=20,
choices=ScheduleDelayField.choices,
default=ScheduleDelayField.ADDED,
help_text=_(
"The field to use for the delay.",
),
)
schedule_delay_custom_field = models.ForeignKey(
CustomField,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("schedule delay custom field"),
)
class Meta:
verbose_name = _("workflow trigger")
verbose_name_plural = _("workflow triggers")

View File

@@ -21,6 +21,7 @@ from django.utils.crypto import get_random_string
from django.utils.text import slugify
from django.utils.translation import gettext as _
from drf_writable_nested.serializers import NestedUpdateMixin
from duration_parser import parse_timedelta
from guardian.core import ObjectPermissionChecker
from guardian.shortcuts import get_users_with_perms
from guardian.utils import get_group_obj_perms_model
@@ -1737,6 +1738,10 @@ class WorkflowTriggerSerializer(serializers.ModelSerializer):
label="Trigger Type",
)
schedule_delay = serializers.CharField(
required=False,
)
class Meta:
model = WorkflowTrigger
fields = [
@@ -1752,8 +1757,21 @@ class WorkflowTriggerSerializer(serializers.ModelSerializer):
"filter_has_tags",
"filter_has_correspondent",
"filter_has_document_type",
"schedule_delay",
"schedule_delay_field",
"schedule_delay_custom_field",
]
def validate_schedule_delay(self, value):
if value is not None:
try:
parse_timedelta(value)
except Exception as e:
raise serializers.ValidationError(
f"Invalid schedule delay format: {e}",
)
return value
def validate(self, attrs):
# Empty strings treated as None to avoid unexpected behavior
if (
@@ -1779,6 +1797,11 @@ class WorkflowTriggerSerializer(serializers.ModelSerializer):
"File name, path or mail rule filter are required",
)
if attrs["type"] == WorkflowTrigger.WorkflowTriggerType.SCHEDULED and (
"schedule_delay" not in attrs or attrs["schedule_delay"] is None
):
raise serializers.ValidationError("Schedule delay is required")
return attrs

View File

@@ -14,6 +14,7 @@ from django.db import models
from django.db import transaction
from django.db.models.signals import post_save
from django.utils import timezone
from duration_parser import parse_timedelta
from filelock import FileLock
from whoosh.writing import AsyncWriter
@@ -31,10 +32,13 @@ from documents.double_sided import CollatePlugin
from documents.file_handling import create_source_path_directory
from documents.file_handling import generate_unique_filename
from documents.models import Correspondent
from documents.models import CustomFieldInstance
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.models import Workflow
from documents.models import WorkflowTrigger
from documents.parsers import DocumentParser
from documents.parsers import get_parser_class_for_mime_type
from documents.plugins.base import ConsumeTaskPlugin
@@ -44,6 +48,7 @@ from documents.plugins.helpers import ProgressStatusOptions
from documents.sanity_checker import SanityCheckFailedException
from documents.signals import document_updated
from documents.signals.handlers import cleanup_document_deletion
from documents.signals.handlers import run_workflows
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import LogEntry
@@ -319,3 +324,65 @@ def empty_trash(doc_ids=None):
cleanup_document_deletion,
sender=Document,
)
@shared_task
def check_scheduled_workflows():
scheduled_workflows: list[Workflow] = Workflow.objects.filter(
triggers__type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
enabled=True,
).prefetch_related("triggers")
if scheduled_workflows.count() > 0:
logger.debug(f"Checking {len(scheduled_workflows)} scheduled workflows")
for workflow in scheduled_workflows:
schedule_triggers = workflow.triggers.filter(
type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
)
trigger: WorkflowTrigger
for trigger in schedule_triggers:
documents = Document.objects.none()
delay_td = parse_timedelta(trigger.schedule_delay)
logger.debug(
f"Checking trigger {trigger} with delay {delay_td} against field: {trigger.schedule_delay_field}",
)
if (
trigger.schedule_delay_field
== WorkflowTrigger.ScheduleDelayField.ADDED
):
documents = Document.objects.filter(
added__lt=timezone.now() - delay_td,
)
elif (
trigger.schedule_delay_field
== WorkflowTrigger.ScheduleDelayField.CREATED
):
documents = Document.objects.filter(
created__lt=timezone.now() - delay_td,
)
elif (
trigger.schedule_delay_field
== WorkflowTrigger.ScheduleDelayField.MODIFIED
):
documents = Document.objects.filter(
modified__lt=timezone.now() - delay_td,
)
elif (
trigger.schedule_delay_field
== WorkflowTrigger.ScheduleDelayField.CUSTOM_FIELD
):
cf_instances = CustomFieldInstance.objects.filter(
field=trigger.schedule_delay_custom_field,
value_date__lt=timezone.now() - delay_td,
)
documents = Document.objects.filter(
id__in=cf_instances.values_list("document", flat=True),
)
if documents.count() > 0:
logger.debug(
f"Found {documents.count()} documents for trigger {trigger}",
)
for document in documents:
run_workflows(
WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
document,
)

View File

@@ -1370,7 +1370,7 @@ class TestWorkflows(DirectoriesMixin, FileSystemAssertsMixin, APITestCase):
doc = Document.objects.create(
title="test",
)
self.assertRaises(Exception, document_matches_workflow, doc, w, 4)
self.assertRaises(Exception, document_matches_workflow, doc, w, 99)
def test_removal_action_document_updated_workflow(self):
"""

View File

@@ -216,6 +216,17 @@ def _parse_beat_schedule() -> dict:
"expires": 23.0 * 60.0 * 60.0,
},
},
{
"name": "Check and run scheduled workflows",
"env_key": "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON",
# Default every 5 minutes
"env_default": "*/5 * * * *",
"task": "documents.tasks.check_scheduled_workflows",
"options": {
# 1 minute before default schedule sends again
"expires": 4.0 * 60.0,
},
},
]
for task in tasks:
# Either get the environment setting or use the default

View File

@@ -157,6 +157,7 @@ class TestCeleryScheduleParsing(TestCase):
INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
SANITY_EXPIRE_TIME = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0
EMPTY_TRASH_EXPIRE_TIME = 23.0 * 60.0 * 60.0
RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME = 4.0 * 60.0
def test_schedule_configuration_default(self):
"""
@@ -196,6 +197,11 @@ class TestCeleryScheduleParsing(TestCase):
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="*/5"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
},
schedule,
)
@@ -243,6 +249,11 @@ class TestCeleryScheduleParsing(TestCase):
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="*/5"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
},
schedule,
)
@@ -282,6 +293,11 @@ class TestCeleryScheduleParsing(TestCase):
"schedule": crontab(minute=0, hour="1"),
"options": {"expires": self.EMPTY_TRASH_EXPIRE_TIME},
},
"Check and run scheduled workflows": {
"task": "documents.tasks.check_scheduled_workflows",
"schedule": crontab(minute="*/5"),
"options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME},
},
},
schedule,
)
@@ -303,6 +319,7 @@ class TestCeleryScheduleParsing(TestCase):
"PAPERLESS_SANITY_TASK_CRON": "disable",
"PAPERLESS_INDEX_TASK_CRON": "disable",
"PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable",
"PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable",
},
):
schedule = _parse_beat_schedule()