From a37177703c476fd7424e72ca6eaaf0ca67befa16 Mon Sep 17 00:00:00 2001 From: Jonas Winkler <17569239+jonaswinkler@users.noreply.github.com> Date: Tue, 21 Feb 2023 13:50:34 +0100 Subject: [PATCH] mark mails as processed internally, don't process processed mails again --- src/paperless_mail/mail.py | 89 ++++++++++++++----- .../migrations/0018_processedmail.py | 57 ++++++++++++ src/paperless_mail/models.py | 37 ++++++++ 3 files changed, 162 insertions(+), 21 deletions(-) create mode 100644 src/paperless_mail/migrations/0018_processedmail.py diff --git a/src/paperless_mail/mail.py b/src/paperless_mail/mail.py index 316803fb3..b0cb7b750 100644 --- a/src/paperless_mail/mail.py +++ b/src/paperless_mail/mail.py @@ -2,6 +2,7 @@ import itertools import os import re import tempfile +import traceback from datetime import date from datetime import timedelta from fnmatch import fnmatch @@ -11,6 +12,7 @@ import magic import pathvalidate from celery import chord from celery import shared_task +from celery.canvas import Signature from django.conf import settings from django.db import DatabaseError from documents.loggers import LoggingMixin @@ -27,6 +29,7 @@ from imap_tools import NOT from imap_tools.mailbox import MailBoxTls from paperless_mail.models import MailAccount from paperless_mail.models import MailRule +from paperless_mail.models import ProcessedMail # Apple Mail sets multiple IMAP KEYWORD and the general "\Flagged" FLAG # imaplib => conn.fetch(b"", "FLAGS") @@ -154,16 +157,62 @@ def apply_mail_action( rule = MailRule.objects.get(pk=rule_id) account = MailAccount.objects.get(pk=rule.account.pk) - action = get_rule_action(rule) + try: - with get_mailbox( - server=account.imap_server, - port=account.imap_port, - security=account.imap_security, - ) as M: - M.login(username=account.username, password=account.password) - M.folder.set(rule.folder) - action.post_consume(M, message_uid, rule.action_parameter) + action = get_rule_action(rule) + + with get_mailbox( + server=account.imap_server, + port=account.imap_port, + security=account.imap_security, + ) as M: + M.login(username=account.username, password=account.password) + M.folder.set(rule.folder) + action.post_consume(M, message_uid, rule.action_parameter) + + ProcessedMail.objects.create( + rule=rule, + folder=rule.folder, + uid=message_uid, + status="SUCCESS", + ) + + except Exception as e: + ProcessedMail.objects.create( + rule=rule, + folder=rule.folder, + uid=message_uid, + status="FAILED", + error=traceback.format_exc(), + ) + raise e + + +@shared_task +def error_callback(request, exc, tb, rule_id: int, message_uid: str): + rule = MailRule.objects.get(pk=rule_id) + + ProcessedMail.objects.create( + rule=rule, + folder=rule.folder, + uid=message_uid, + status="FAILED", + error=traceback.format_exc(), + ) + + +def queue_consumption_tasks( + consume_tasks: list[Signature], + rule: MailRule, + message_uid: str, +): + mail_action_task = apply_mail_action.s( + rule_id=rule.pk, + message_uid=message_uid, + ) + chord(header=consume_tasks, body=mail_action_task).on_error( + error_callback.s(rule_id=rule.pk, message_uid=message_uid), + ).delay() def get_rule_action(rule) -> BaseMailAction: @@ -399,6 +448,14 @@ class MailAccountHandler(LoggingMixin): total_processed_files = 0 for message in messages: + if ProcessedMail.objects.filter( + rule=rule, + uid=message.uid, + folder=rule.folder, + ).exists(): + self.log("debug", f"Skipping mail {message}, already processed.") + continue + try: processed_files = self.handle_message(message, rule) @@ -548,12 +605,7 @@ class MailAccountHandler(LoggingMixin): f"by paperless", ) - mail_action_task = apply_mail_action.s( - rule_id=rule.pk, - message_uid=message.uid, - ) - - chord(header=consume_tasks, body=mail_action_task).delay() + queue_consumption_tasks(consume_tasks, rule, message.uid) return processed_attachments @@ -611,12 +663,7 @@ class MailAccountHandler(LoggingMixin): override_owner_id=rule.owner.id if rule.owner else None, ) - mail_action_task = apply_mail_action.s( - rule_id=rule.pk, - message_uid=message.uid, - ) - - (consume_task | mail_action_task).delay() + queue_consumption_tasks([consume_task], rule, message.uid) processed_elements = 1 return processed_elements diff --git a/src/paperless_mail/migrations/0018_processedmail.py b/src/paperless_mail/migrations/0018_processedmail.py new file mode 100644 index 000000000..ff15ffb77 --- /dev/null +++ b/src/paperless_mail/migrations/0018_processedmail.py @@ -0,0 +1,57 @@ +# Generated by Django 4.1.5 on 2023-02-21 12:48 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ("paperless_mail", "0017_mailaccount_owner_mailrule_owner"), + ] + + operations = [ + migrations.CreateModel( + name="ProcessedMail", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("folder", models.CharField(max_length=256, verbose_name="folder")), + ("uid", models.CharField(max_length=256, verbose_name="folder")), + ("status", models.CharField(max_length=256, verbose_name="status")), + ( + "error", + models.TextField(blank=True, null=True, verbose_name="error"), + ), + ( + "owner", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to=settings.AUTH_USER_MODEL, + verbose_name="owner", + ), + ), + ( + "rule", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="paperless_mail.mailrule", + ), + ), + ], + options={ + "abstract": False, + }, + ), + ] diff --git a/src/paperless_mail/models.py b/src/paperless_mail/models.py index f8a30c3be..c702181ef 100644 --- a/src/paperless_mail/models.py +++ b/src/paperless_mail/models.py @@ -214,3 +214,40 @@ class MailRule(document_models.ModelWithOwner): def __str__(self): return f"{self.account.name}.{self.name}" + + +class ProcessedMail(document_models.ModelWithOwner): + + rule = models.ForeignKey( + MailRule, + null=False, + blank=False, + on_delete=models.CASCADE, + ) + + folder = models.CharField( + _("folder"), + null=False, + blank=False, + max_length=256, + ) + + uid = models.CharField( + _("folder"), + null=False, + blank=False, + max_length=256, + ) + + status = models.CharField( + _("status"), + null=False, + blank=False, + max_length=256, + ) + + error = models.TextField( + _("error"), + null=True, + blank=True, + )