mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	adjust mail workflow, execute mail actions only after consumption is successful
This commit is contained in:
		| @@ -9,6 +9,7 @@ from typing import Dict | ||||
|  | ||||
| import magic | ||||
| import pathvalidate | ||||
| from celery import chord | ||||
| from django.conf import settings | ||||
| from django.db import DatabaseError | ||||
| from documents.loggers import LoggingMixin | ||||
| @@ -25,6 +26,7 @@ from imap_tools import NOT | ||||
| from imap_tools.mailbox import MailBoxTls | ||||
| from paperless_mail.models import MailAccount | ||||
| from paperless_mail.models import MailRule | ||||
| from paperless_mail.tasks import apply_mail_action | ||||
|  | ||||
| # Apple Mail sets multiple IMAP KEYWORD and the general "\Flagged" FLAG | ||||
| # imaplib => conn.fetch(b"<message_id>", "FLAGS") | ||||
| @@ -57,34 +59,34 @@ class BaseMailAction: | ||||
|     def get_criteria(self) -> Dict: | ||||
|         return {} | ||||
|  | ||||
|     def post_consume(self, M, message_uids, parameter): | ||||
|     def post_consume(self, M, message_uid, parameter): | ||||
|         pass  # pragma: nocover | ||||
|  | ||||
|  | ||||
| class DeleteMailAction(BaseMailAction): | ||||
|     def post_consume(self, M, message_uids, parameter): | ||||
|         M.delete(message_uids) | ||||
|     def post_consume(self, M, message_uid, parameter): | ||||
|         M.delete(message_uid) | ||||
|  | ||||
|  | ||||
| class MarkReadMailAction(BaseMailAction): | ||||
|     def get_criteria(self): | ||||
|         return {"seen": False} | ||||
|  | ||||
|     def post_consume(self, M, message_uids, parameter): | ||||
|         M.flag(message_uids, [MailMessageFlags.SEEN], True) | ||||
|     def post_consume(self, M, message_uid, parameter): | ||||
|         M.flag(message_uid, [MailMessageFlags.SEEN], True) | ||||
|  | ||||
|  | ||||
| class MoveMailAction(BaseMailAction): | ||||
|     def post_consume(self, M, message_uids, parameter): | ||||
|         M.move(message_uids, parameter) | ||||
|     def post_consume(self, M, message_uid, parameter): | ||||
|         M.move(message_uid, parameter) | ||||
|  | ||||
|  | ||||
| class FlagMailAction(BaseMailAction): | ||||
|     def get_criteria(self): | ||||
|         return {"flagged": False} | ||||
|  | ||||
|     def post_consume(self, M, message_uids, parameter): | ||||
|         M.flag(message_uids, [MailMessageFlags.FLAGGED], True) | ||||
|     def post_consume(self, M, message_uid, parameter): | ||||
|         M.flag(message_uid, [MailMessageFlags.FLAGGED], True) | ||||
|  | ||||
|  | ||||
| class TagMailAction(BaseMailAction): | ||||
| @@ -113,9 +115,9 @@ class TagMailAction(BaseMailAction): | ||||
|  | ||||
|         return {"no_keyword": self.keyword, "gmail_label": self.keyword} | ||||
|  | ||||
|     def post_consume(self, M: MailBox, message_uids, parameter): | ||||
|     def post_consume(self, M: MailBox, message_uid, parameter): | ||||
|         if re.search(r"gmail\.com$|googlemail\.com$", M._host): | ||||
|             for uid in message_uids: | ||||
|             for uid in message_uid: | ||||
|                 M.client.uid("STORE", uid, "X-GM-LABELS", self.keyword) | ||||
|  | ||||
|         # AppleMail | ||||
| @@ -123,21 +125,21 @@ class TagMailAction(BaseMailAction): | ||||
|  | ||||
|             # Remove all existing $MailFlagBits | ||||
|             M.flag( | ||||
|                 message_uids, | ||||
|                 message_uid, | ||||
|                 set(itertools.chain(*APPLE_MAIL_TAG_COLORS.values())), | ||||
|                 False, | ||||
|             ) | ||||
|  | ||||
|             # Set new $MailFlagBits | ||||
|             M.flag(message_uids, APPLE_MAIL_TAG_COLORS.get(self.color), True) | ||||
|             M.flag(message_uid, APPLE_MAIL_TAG_COLORS.get(self.color), True) | ||||
|  | ||||
|             # Set the general \Flagged | ||||
|             # This defaults to the "red" flag in AppleMail and | ||||
|             # "stars" in Thunderbird or GMail | ||||
|             M.flag(message_uids, [MailMessageFlags.FLAGGED], True) | ||||
|             M.flag(message_uid, [MailMessageFlags.FLAGGED], True) | ||||
|  | ||||
|         elif self.keyword: | ||||
|             M.flag(message_uids, [self.keyword], True) | ||||
|             M.flag(message_uid, [self.keyword], True) | ||||
|  | ||||
|         else: | ||||
|             raise MailError("No keyword specified.") | ||||
| @@ -372,16 +374,12 @@ class MailAccountHandler(LoggingMixin): | ||||
|                 f"Rule {rule}: Error while fetching folder {rule.folder}", | ||||
|             ) from err | ||||
|  | ||||
|         post_consume_messages = [] | ||||
|  | ||||
|         mails_processed = 0 | ||||
|         total_processed_files = 0 | ||||
|  | ||||
|         for message in messages: | ||||
|             try: | ||||
|                 processed_files = self.handle_message(message, rule) | ||||
|                 if processed_files > 0: | ||||
|                     post_consume_messages.append(message.uid) | ||||
|                 processed_files = self.handle_message(M, message, rule) | ||||
|  | ||||
|                 total_processed_files += processed_files | ||||
|                 mails_processed += 1 | ||||
| @@ -394,27 +392,9 @@ class MailAccountHandler(LoggingMixin): | ||||
|  | ||||
|         self.log("debug", f"Rule {rule}: Processed {mails_processed} matching mail(s)") | ||||
|  | ||||
|         self.log( | ||||
|             "debug", | ||||
|             f"Rule {rule}: Running mail actions on " | ||||
|             f"{len(post_consume_messages)} mails", | ||||
|         ) | ||||
|  | ||||
|         try: | ||||
|             get_rule_action(rule).post_consume( | ||||
|                 M, | ||||
|                 post_consume_messages, | ||||
|                 rule.action_parameter, | ||||
|             ) | ||||
|  | ||||
|         except Exception as e: | ||||
|             raise MailError( | ||||
|                 f"Rule {rule}: Error while processing post-consume actions: " f"{e}", | ||||
|             ) from e | ||||
|  | ||||
|         return total_processed_files | ||||
|  | ||||
|     def handle_message(self, message, rule: MailRule) -> int: | ||||
|     def handle_message(self, M: MailBox, message, rule: MailRule) -> int: | ||||
|         processed_elements = 0 | ||||
|  | ||||
|         # Skip Message handling when only attachments are to be processed but | ||||
| @@ -441,6 +421,7 @@ class MailAccountHandler(LoggingMixin): | ||||
|             or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING | ||||
|         ): | ||||
|             processed_elements += self.process_eml( | ||||
|                 M, | ||||
|                 message, | ||||
|                 rule, | ||||
|                 correspondent, | ||||
| @@ -453,6 +434,7 @@ class MailAccountHandler(LoggingMixin): | ||||
|             or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING | ||||
|         ): | ||||
|             processed_elements += self.process_attachments( | ||||
|                 M, | ||||
|                 message, | ||||
|                 rule, | ||||
|                 correspondent, | ||||
| @@ -464,6 +446,7 @@ class MailAccountHandler(LoggingMixin): | ||||
|  | ||||
|     def process_attachments( | ||||
|         self, | ||||
|         M: MailBox, | ||||
|         message: MailMessage, | ||||
|         rule: MailRule, | ||||
|         correspondent, | ||||
| @@ -471,6 +454,9 @@ class MailAccountHandler(LoggingMixin): | ||||
|         doc_type, | ||||
|     ): | ||||
|         processed_attachments = 0 | ||||
|  | ||||
|         consume_tasks = list() | ||||
|  | ||||
|         for att in message.attachments: | ||||
|  | ||||
|             if ( | ||||
| @@ -518,7 +504,7 @@ class MailAccountHandler(LoggingMixin): | ||||
|                     f"{message.subject} from {message.from_}", | ||||
|                 ) | ||||
|  | ||||
|                 consume_file.delay( | ||||
|                 consume_task = consume_file.s( | ||||
|                     path=temp_filename, | ||||
|                     override_filename=pathvalidate.sanitize_filename( | ||||
|                         att.filename, | ||||
| @@ -531,6 +517,8 @@ class MailAccountHandler(LoggingMixin): | ||||
|                     override_tag_ids=tag_ids, | ||||
|                 ) | ||||
|  | ||||
|                 consume_tasks.append(consume_task) | ||||
|  | ||||
|                 processed_attachments += 1 | ||||
|             else: | ||||
|                 self.log( | ||||
| @@ -540,10 +528,21 @@ class MailAccountHandler(LoggingMixin): | ||||
|                     f"since guessed mime type {mime_type} is not supported " | ||||
|                     f"by paperless", | ||||
|                 ) | ||||
|  | ||||
|         mail_action_task = apply_mail_action.s( | ||||
|             M=M, | ||||
|             action=get_rule_action(rule), | ||||
|             message_uid=message.uid, | ||||
|             parameter=rule.action_parameter, | ||||
|         ) | ||||
|  | ||||
|         chord(header=consume_tasks, body=mail_action_task).delay() | ||||
|  | ||||
|         return processed_attachments | ||||
|  | ||||
|     def process_eml( | ||||
|         self, | ||||
|         M: MailBox, | ||||
|         message: MailMessage, | ||||
|         rule: MailRule, | ||||
|         correspondent, | ||||
| @@ -584,7 +583,7 @@ class MailAccountHandler(LoggingMixin): | ||||
|             f"{message.subject} from {message.from_}", | ||||
|         ) | ||||
|  | ||||
|         consume_file.delay( | ||||
|         consume_task = consume_file.s( | ||||
|             path=temp_filename, | ||||
|             override_filename=pathvalidate.sanitize_filename( | ||||
|                 message.subject + ".eml", | ||||
| @@ -594,5 +593,15 @@ class MailAccountHandler(LoggingMixin): | ||||
|             override_document_type_id=doc_type.id if doc_type else None, | ||||
|             override_tag_ids=tag_ids, | ||||
|         ) | ||||
|  | ||||
|         mail_action_task = apply_mail_action.s( | ||||
|             M=M, | ||||
|             action=get_rule_action(rule), | ||||
|             message_uid=message.uid, | ||||
|             parameter=rule.action_parameter, | ||||
|         ) | ||||
|  | ||||
|         (consume_task | mail_action_task).delay() | ||||
|  | ||||
|         processed_elements = 1 | ||||
|         return processed_elements | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| import logging | ||||
|  | ||||
| from celery import shared_task | ||||
| from imap_tools import MailBox | ||||
| from paperless_mail.mail import BaseMailAction | ||||
| from paperless_mail.mail import MailAccountHandler | ||||
| from paperless_mail.mail import MailError | ||||
| from paperless_mail.models import MailAccount | ||||
| @@ -8,12 +10,23 @@ from paperless_mail.models import MailAccount | ||||
| logger = logging.getLogger("paperless.mail.tasks") | ||||
|  | ||||
|  | ||||
| @shared_task | ||||
| def apply_mail_action( | ||||
|     result: str, | ||||
|     M: MailBox, | ||||
|     action: BaseMailAction, | ||||
|     message_uid: str, | ||||
|     parameter: str, | ||||
| ): | ||||
|     action.post_consume(M, message_uid, parameter) | ||||
|  | ||||
|  | ||||
| @shared_task | ||||
| def process_mail_accounts(): | ||||
|     total_new_documents = 0 | ||||
|     for account in MailAccount.objects.all(): | ||||
|         try: | ||||
|             total_new_documents += MailAccountHandler().handle_mail_account(account) | ||||
|             total_new_documents += MailAccountHandler().handl2e_mail_account(account) | ||||
|         except MailError: | ||||
|             logger.exception(f"Error while processing mail account {account}") | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jonas
					Jonas