diff --git a/src/paperless_mail/mail.py b/src/paperless_mail/mail.py index 04caca63c..ca09bb683 100644 --- a/src/paperless_mail/mail.py +++ b/src/paperless_mail/mail.py @@ -9,6 +9,7 @@ from typing import Dict import magic import pathvalidate +from celery import chord from django.conf import settings from django.db import DatabaseError from documents.loggers import LoggingMixin @@ -25,6 +26,7 @@ from imap_tools import NOT from imap_tools.mailbox import MailBoxTls from paperless_mail.models import MailAccount from paperless_mail.models import MailRule +from paperless_mail.tasks import apply_mail_action # Apple Mail sets multiple IMAP KEYWORD and the general "\Flagged" FLAG # imaplib => conn.fetch(b"", "FLAGS") @@ -57,34 +59,34 @@ class BaseMailAction: def get_criteria(self) -> Dict: return {} - def post_consume(self, M, message_uids, parameter): + def post_consume(self, M, message_uid, parameter): pass # pragma: nocover class DeleteMailAction(BaseMailAction): - def post_consume(self, M, message_uids, parameter): - M.delete(message_uids) + def post_consume(self, M, message_uid, parameter): + M.delete(message_uid) class MarkReadMailAction(BaseMailAction): def get_criteria(self): return {"seen": False} - def post_consume(self, M, message_uids, parameter): - M.flag(message_uids, [MailMessageFlags.SEEN], True) + def post_consume(self, M, message_uid, parameter): + M.flag(message_uid, [MailMessageFlags.SEEN], True) class MoveMailAction(BaseMailAction): - def post_consume(self, M, message_uids, parameter): - M.move(message_uids, parameter) + def post_consume(self, M, message_uid, parameter): + M.move(message_uid, parameter) class FlagMailAction(BaseMailAction): def get_criteria(self): return {"flagged": False} - def post_consume(self, M, message_uids, parameter): - M.flag(message_uids, [MailMessageFlags.FLAGGED], True) + def post_consume(self, M, message_uid, parameter): + M.flag(message_uid, [MailMessageFlags.FLAGGED], True) class TagMailAction(BaseMailAction): @@ -113,9 +115,9 @@ class TagMailAction(BaseMailAction): return {"no_keyword": self.keyword, "gmail_label": self.keyword} - def post_consume(self, M: MailBox, message_uids, parameter): + def post_consume(self, M: MailBox, message_uid, parameter): if re.search(r"gmail\.com$|googlemail\.com$", M._host): - for uid in message_uids: + for uid in message_uid: M.client.uid("STORE", uid, "X-GM-LABELS", self.keyword) # AppleMail @@ -123,21 +125,21 @@ class TagMailAction(BaseMailAction): # Remove all existing $MailFlagBits M.flag( - message_uids, + message_uid, set(itertools.chain(*APPLE_MAIL_TAG_COLORS.values())), False, ) # Set new $MailFlagBits - M.flag(message_uids, APPLE_MAIL_TAG_COLORS.get(self.color), True) + M.flag(message_uid, APPLE_MAIL_TAG_COLORS.get(self.color), True) # Set the general \Flagged # This defaults to the "red" flag in AppleMail and # "stars" in Thunderbird or GMail - M.flag(message_uids, [MailMessageFlags.FLAGGED], True) + M.flag(message_uid, [MailMessageFlags.FLAGGED], True) elif self.keyword: - M.flag(message_uids, [self.keyword], True) + M.flag(message_uid, [self.keyword], True) else: raise MailError("No keyword specified.") @@ -372,16 +374,12 @@ class MailAccountHandler(LoggingMixin): f"Rule {rule}: Error while fetching folder {rule.folder}", ) from err - post_consume_messages = [] - mails_processed = 0 total_processed_files = 0 for message in messages: try: - processed_files = self.handle_message(message, rule) - if processed_files > 0: - post_consume_messages.append(message.uid) + processed_files = self.handle_message(M, message, rule) total_processed_files += processed_files mails_processed += 1 @@ -394,27 +392,9 @@ class MailAccountHandler(LoggingMixin): self.log("debug", f"Rule {rule}: Processed {mails_processed} matching mail(s)") - self.log( - "debug", - f"Rule {rule}: Running mail actions on " - f"{len(post_consume_messages)} mails", - ) - - try: - get_rule_action(rule).post_consume( - M, - post_consume_messages, - rule.action_parameter, - ) - - except Exception as e: - raise MailError( - f"Rule {rule}: Error while processing post-consume actions: " f"{e}", - ) from e - return total_processed_files - def handle_message(self, message, rule: MailRule) -> int: + def handle_message(self, M: MailBox, message, rule: MailRule) -> int: processed_elements = 0 # Skip Message handling when only attachments are to be processed but @@ -441,6 +421,7 @@ class MailAccountHandler(LoggingMixin): or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING ): processed_elements += self.process_eml( + M, message, rule, correspondent, @@ -453,6 +434,7 @@ class MailAccountHandler(LoggingMixin): or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING ): processed_elements += self.process_attachments( + M, message, rule, correspondent, @@ -464,6 +446,7 @@ class MailAccountHandler(LoggingMixin): def process_attachments( self, + M: MailBox, message: MailMessage, rule: MailRule, correspondent, @@ -471,6 +454,9 @@ class MailAccountHandler(LoggingMixin): doc_type, ): processed_attachments = 0 + + consume_tasks = list() + for att in message.attachments: if ( @@ -518,7 +504,7 @@ class MailAccountHandler(LoggingMixin): f"{message.subject} from {message.from_}", ) - consume_file.delay( + consume_task = consume_file.s( path=temp_filename, override_filename=pathvalidate.sanitize_filename( att.filename, @@ -531,6 +517,8 @@ class MailAccountHandler(LoggingMixin): override_tag_ids=tag_ids, ) + consume_tasks.append(consume_task) + processed_attachments += 1 else: self.log( @@ -540,10 +528,21 @@ class MailAccountHandler(LoggingMixin): f"since guessed mime type {mime_type} is not supported " f"by paperless", ) + + mail_action_task = apply_mail_action.s( + M=M, + action=get_rule_action(rule), + message_uid=message.uid, + parameter=rule.action_parameter, + ) + + chord(header=consume_tasks, body=mail_action_task).delay() + return processed_attachments def process_eml( self, + M: MailBox, message: MailMessage, rule: MailRule, correspondent, @@ -584,7 +583,7 @@ class MailAccountHandler(LoggingMixin): f"{message.subject} from {message.from_}", ) - consume_file.delay( + consume_task = consume_file.s( path=temp_filename, override_filename=pathvalidate.sanitize_filename( message.subject + ".eml", @@ -594,5 +593,15 @@ class MailAccountHandler(LoggingMixin): override_document_type_id=doc_type.id if doc_type else None, override_tag_ids=tag_ids, ) + + mail_action_task = apply_mail_action.s( + M=M, + action=get_rule_action(rule), + message_uid=message.uid, + parameter=rule.action_parameter, + ) + + (consume_task | mail_action_task).delay() + processed_elements = 1 return processed_elements diff --git a/src/paperless_mail/tasks.py b/src/paperless_mail/tasks.py index 5c92233de..9716f13a8 100644 --- a/src/paperless_mail/tasks.py +++ b/src/paperless_mail/tasks.py @@ -1,6 +1,8 @@ import logging from celery import shared_task +from imap_tools import MailBox +from paperless_mail.mail import BaseMailAction from paperless_mail.mail import MailAccountHandler from paperless_mail.mail import MailError from paperless_mail.models import MailAccount @@ -8,12 +10,23 @@ from paperless_mail.models import MailAccount logger = logging.getLogger("paperless.mail.tasks") +@shared_task +def apply_mail_action( + result: str, + M: MailBox, + action: BaseMailAction, + message_uid: str, + parameter: str, +): + action.post_consume(M, message_uid, parameter) + + @shared_task def process_mail_accounts(): total_new_documents = 0 for account in MailAccount.objects.all(): try: - total_new_documents += MailAccountHandler().handle_mail_account(account) + total_new_documents += MailAccountHandler().handl2e_mail_account(account) except MailError: logger.exception(f"Error while processing mail account {account}")