mark mails as processed internally, don't process processed mails again

This commit is contained in:
Jonas Winkler
2023-02-21 13:50:34 +01:00
parent 7e1c1da424
commit a37177703c
3 changed files with 162 additions and 21 deletions

View File

@@ -2,6 +2,7 @@ import itertools
import os
import re
import tempfile
import traceback
from datetime import date
from datetime import timedelta
from fnmatch import fnmatch
@@ -11,6 +12,7 @@ import magic
import pathvalidate
from celery import chord
from celery import shared_task
from celery.canvas import Signature
from django.conf import settings
from django.db import DatabaseError
from documents.loggers import LoggingMixin
@@ -27,6 +29,7 @@ from imap_tools import NOT
from imap_tools.mailbox import MailBoxTls
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.models import ProcessedMail
# Apple Mail sets multiple IMAP KEYWORD and the general "\Flagged" FLAG
# imaplib => conn.fetch(b"<message_id>", "FLAGS")
@@ -154,16 +157,62 @@ def apply_mail_action(
rule = MailRule.objects.get(pk=rule_id)
account = MailAccount.objects.get(pk=rule.account.pk)
action = get_rule_action(rule)
try:
with get_mailbox(
server=account.imap_server,
port=account.imap_port,
security=account.imap_security,
) as M:
M.login(username=account.username, password=account.password)
M.folder.set(rule.folder)
action.post_consume(M, message_uid, rule.action_parameter)
action = get_rule_action(rule)
with get_mailbox(
server=account.imap_server,
port=account.imap_port,
security=account.imap_security,
) as M:
M.login(username=account.username, password=account.password)
M.folder.set(rule.folder)
action.post_consume(M, message_uid, rule.action_parameter)
ProcessedMail.objects.create(
rule=rule,
folder=rule.folder,
uid=message_uid,
status="SUCCESS",
)
except Exception as e:
ProcessedMail.objects.create(
rule=rule,
folder=rule.folder,
uid=message_uid,
status="FAILED",
error=traceback.format_exc(),
)
raise e
@shared_task
def error_callback(request, exc, tb, rule_id: int, message_uid: str):
rule = MailRule.objects.get(pk=rule_id)
ProcessedMail.objects.create(
rule=rule,
folder=rule.folder,
uid=message_uid,
status="FAILED",
error=traceback.format_exc(),
)
def queue_consumption_tasks(
consume_tasks: list[Signature],
rule: MailRule,
message_uid: str,
):
mail_action_task = apply_mail_action.s(
rule_id=rule.pk,
message_uid=message_uid,
)
chord(header=consume_tasks, body=mail_action_task).on_error(
error_callback.s(rule_id=rule.pk, message_uid=message_uid),
).delay()
def get_rule_action(rule) -> BaseMailAction:
@@ -399,6 +448,14 @@ class MailAccountHandler(LoggingMixin):
total_processed_files = 0
for message in messages:
if ProcessedMail.objects.filter(
rule=rule,
uid=message.uid,
folder=rule.folder,
).exists():
self.log("debug", f"Skipping mail {message}, already processed.")
continue
try:
processed_files = self.handle_message(message, rule)
@@ -548,12 +605,7 @@ class MailAccountHandler(LoggingMixin):
f"by paperless",
)
mail_action_task = apply_mail_action.s(
rule_id=rule.pk,
message_uid=message.uid,
)
chord(header=consume_tasks, body=mail_action_task).delay()
queue_consumption_tasks(consume_tasks, rule, message.uid)
return processed_attachments
@@ -611,12 +663,7 @@ class MailAccountHandler(LoggingMixin):
override_owner_id=rule.owner.id if rule.owner else None,
)
mail_action_task = apply_mail_action.s(
rule_id=rule.pk,
message_uid=message.uid,
)
(consume_task | mail_action_task).delay()
queue_consumption_tasks([consume_task], rule, message.uid)
processed_elements = 1
return processed_elements