mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Enhancement: support negative offset in scheduled workflows (#9746)
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
import datetime
|
||||
import hashlib
|
||||
import logging
|
||||
import shutil
|
||||
import uuid
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
@@ -357,7 +357,7 @@ def empty_trash(doc_ids=None):
|
||||
if doc_ids is not None
|
||||
else Document.deleted_objects.filter(
|
||||
deleted_at__lt=timezone.localtime(timezone.now())
|
||||
- timedelta(
|
||||
- datetime.timedelta(
|
||||
days=settings.EMPTY_TRASH_DELAY,
|
||||
),
|
||||
)
|
||||
@@ -397,6 +397,7 @@ def check_scheduled_workflows():
|
||||
)
|
||||
if scheduled_workflows.count() > 0:
|
||||
logger.debug(f"Checking {len(scheduled_workflows)} scheduled workflows")
|
||||
now = timezone.now()
|
||||
for workflow in scheduled_workflows:
|
||||
schedule_triggers = workflow.triggers.filter(
|
||||
type=WorkflowTrigger.WorkflowTriggerType.SCHEDULED,
|
||||
@@ -404,31 +405,60 @@ def check_scheduled_workflows():
|
||||
trigger: WorkflowTrigger
|
||||
for trigger in schedule_triggers:
|
||||
documents = Document.objects.none()
|
||||
offset_td = timedelta(days=trigger.schedule_offset_days)
|
||||
offset_td = datetime.timedelta(days=-trigger.schedule_offset_days)
|
||||
threshold = now - offset_td
|
||||
logger.debug(
|
||||
f"Checking trigger {trigger} with offset {offset_td} against field: {trigger.schedule_date_field}",
|
||||
f"Trigger {trigger.id}: checking if (date + {offset_td}) <= now ({now})",
|
||||
)
|
||||
|
||||
match trigger.schedule_date_field:
|
||||
case WorkflowTrigger.ScheduleDateField.ADDED:
|
||||
documents = Document.objects.filter(
|
||||
added__lt=timezone.now() - offset_td,
|
||||
)
|
||||
documents = Document.objects.filter(added__lte=threshold)
|
||||
|
||||
case WorkflowTrigger.ScheduleDateField.CREATED:
|
||||
documents = Document.objects.filter(
|
||||
created__lt=timezone.now() - offset_td,
|
||||
)
|
||||
documents = Document.objects.filter(created__lte=threshold)
|
||||
|
||||
case WorkflowTrigger.ScheduleDateField.MODIFIED:
|
||||
documents = Document.objects.filter(
|
||||
modified__lt=timezone.now() - offset_td,
|
||||
)
|
||||
documents = Document.objects.filter(modified__lte=threshold)
|
||||
|
||||
case WorkflowTrigger.ScheduleDateField.CUSTOM_FIELD:
|
||||
cf_instances = CustomFieldInstance.objects.filter(
|
||||
field=trigger.schedule_date_custom_field,
|
||||
value_date__lt=timezone.now() - offset_td,
|
||||
)
|
||||
documents = Document.objects.filter(
|
||||
id__in=cf_instances.values_list("document", flat=True),
|
||||
# cap earliest date to avoid massive scans
|
||||
earliest_date = now - datetime.timedelta(days=365)
|
||||
if offset_td.days < -365:
|
||||
logger.warning(
|
||||
f"Trigger {trigger.id} has large negative offset ({offset_td.days}), "
|
||||
f"limiting earliest scan date to {earliest_date}",
|
||||
)
|
||||
|
||||
cf_filter_kwargs = {
|
||||
"field": trigger.schedule_date_custom_field,
|
||||
"value_date__isnull": False,
|
||||
"value_date__lte": threshold,
|
||||
"value_date__gte": earliest_date,
|
||||
}
|
||||
|
||||
recent_cf_instances = CustomFieldInstance.objects.filter(
|
||||
**cf_filter_kwargs,
|
||||
)
|
||||
|
||||
matched_ids = [
|
||||
cfi.document_id
|
||||
for cfi in recent_cf_instances
|
||||
if cfi.value_date
|
||||
and (
|
||||
timezone.make_aware(
|
||||
datetime.datetime.combine(
|
||||
cfi.value_date,
|
||||
datetime.time.min,
|
||||
),
|
||||
)
|
||||
+ offset_td
|
||||
<= now
|
||||
)
|
||||
]
|
||||
|
||||
documents = Document.objects.filter(id__in=matched_ids)
|
||||
|
||||
if documents.count() > 0:
|
||||
logger.debug(
|
||||
f"Found {documents.count()} documents for trigger {trigger}",
|
||||
@@ -440,18 +470,18 @@ def check_scheduled_workflows():
|
||||
workflow=workflow,
|
||||
).order_by("-run_at")
|
||||
if not trigger.schedule_is_recurring and workflow_runs.exists():
|
||||
# schedule is non-recurring and the workflow has already been run
|
||||
logger.debug(
|
||||
f"Skipping document {document} for non-recurring workflow {workflow} as it has already been run",
|
||||
)
|
||||
continue
|
||||
elif (
|
||||
|
||||
if (
|
||||
trigger.schedule_is_recurring
|
||||
and workflow_runs.exists()
|
||||
and (
|
||||
workflow_runs.last().run_at
|
||||
> timezone.now()
|
||||
- timedelta(
|
||||
> now
|
||||
- datetime.timedelta(
|
||||
days=trigger.schedule_recurring_interval_days,
|
||||
)
|
||||
)
|
||||
|
Reference in New Issue
Block a user