Add first draft implementation, test broken.

This commit is contained in:
Oliver Rümpelein 2022-05-25 19:22:50 +02:00
parent ce3f6837e9
commit 5a809d7e31
3 changed files with 56 additions and 1 deletions

View File

@ -62,6 +62,17 @@ class FlagMailAction(BaseMailAction):
M.flag(message_uids, [MailMessageFlags.FLAGGED], True) M.flag(message_uids, [MailMessageFlags.FLAGGED], True)
class TagMailAction(BaseMailAction):
def __init__(self, parameter):
self.keyword = parameter
def get_criteria(self):
return {"no_keyword": self.keyword}
def post_consume(self, M: MailBox, message_uids, parameter):
M.flag(message_uids, [self.keyword], True)
def get_rule_action(rule): def get_rule_action(rule):
if rule.action == MailRule.MailAction.FLAG: if rule.action == MailRule.MailAction.FLAG:
return FlagMailAction() return FlagMailAction()
@ -71,6 +82,8 @@ def get_rule_action(rule):
return MoveMailAction() return MoveMailAction()
elif rule.action == MailRule.MailAction.MARK_READ: elif rule.action == MailRule.MailAction.MARK_READ:
return MarkReadMailAction() return MarkReadMailAction()
elif rule.action == MailRule.MailAction.TAG:
return TagMailAction(rule.action_parameter)
else: else:
raise NotImplementedError("Unknown action.") # pragma: nocover raise NotImplementedError("Unknown action.") # pragma: nocover

View File

@ -65,6 +65,7 @@ class MailRule(models.Model):
MOVE = 2, _("Move to specified folder") MOVE = 2, _("Move to specified folder")
MARK_READ = 3, _("Mark as read, don't process read mails") MARK_READ = 3, _("Mark as read, don't process read mails")
FLAG = 4, _("Flag the mail, don't process flagged mails") FLAG = 4, _("Flag the mail, don't process flagged mails")
TAG = 5, _("Tag the mail with specified tag, don't process tagged mails")
class TitleSource(models.IntegerChoices): class TitleSource(models.IntegerChoices):
FROM_SUBJECT = 1, _("Use subject as title") FROM_SUBJECT = 1, _("Use subject as title")

View File

@ -4,7 +4,7 @@ import os
import random import random
import uuid import uuid
from collections import namedtuple from collections import namedtuple
from typing import ContextManager from typing import ContextManager, Callable
from typing import List from typing import List
from typing import Union from typing import Union
from unittest import mock from unittest import mock
@ -96,6 +96,18 @@ class BogusMailBox(ContextManager):
if "UNFLAGGED" in criteria: if "UNFLAGGED" in criteria:
msg = filter(lambda m: not m.flagged, msg) msg = filter(lambda m: not m.flagged, msg)
if "NO_KEYWORD" in criteria:
tag: str = criteria[criteria.index("NO_KEYWORD") + 1].strip('"')
print(f"selected tag is {tag}.")
for m in msg:
print(f"Message with id {m.uid} has tags {','.join(m.flags)}")
msg = filter(lambda m: tag not in m.flags, msg)
if "KEYWORD" in criteria:
tag = criteria[criteria.index("KEYWORD") + 1].strip('"')
print(f"selected tag is {tag}.")
msg = filter(lambda m: tag in m.flags, msg)
return list(msg) return list(msg)
def delete(self, uid_list): def delete(self, uid_list):
@ -130,6 +142,7 @@ def create_message(
from_: str = "noone@mail.com", from_: str = "noone@mail.com",
seen: bool = False, seen: bool = False,
flagged: bool = False, flagged: bool = False,
processed: bool = False,
) -> MailMessage: ) -> MailMessage:
email_msg = email.message.EmailMessage() email_msg = email.message.EmailMessage()
# TODO: This does NOT set the UID # TODO: This does NOT set the UID
@ -176,6 +189,10 @@ def create_message(
imap_msg.seen = seen imap_msg.seen = seen
imap_msg.flagged = flagged imap_msg.flagged = flagged
# Allows to test for both unset tags, and explicitly 'false' ones.
if processed:
imap_msg._raw_flag_data.append("+FLAGS (processed)".encode())
return imap_msg return imap_msg
@ -225,6 +242,7 @@ class TestMail(DirectoriesMixin, TestCase):
body="from my favorite electronic store", body="from my favorite electronic store",
seen=False, seen=False,
flagged=True, flagged=True,
processed=True
), ),
) )
self.bogus_mailbox.messages.append( self.bogus_mailbox.messages.append(
@ -571,6 +589,29 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(len(self.bogus_mailbox.messages), 2) self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1) self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
def test_handle_mail_account_tag(self):
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
password="secret",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.MailAction.TAG,
action_parameter="processed",
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("NO_KEYWORD processed", False)), 2)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.fetch("NO_KEYWORD processed", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
def test_error_login(self): def test_error_login(self):
account = MailAccount.objects.create( account = MailAccount.objects.create(
name="test", name="test",