diff --git a/src/paperless_mail/tests/test_mail.py b/src/paperless_mail/tests/test_mail.py index 0c85013ae..67293c6f5 100644 --- a/src/paperless_mail/tests/test_mail.py +++ b/src/paperless_mail/tests/test_mail.py @@ -46,12 +46,30 @@ class BogusFolderManager: class BogusClient: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __init__(self, messages): + self.messages: List[MailMessage] = messages + def authenticate(self, mechanism, authobject): # authobject must be a callable object auth_bytes = authobject(None) if auth_bytes != b"\x00admin\x00w57\xc3\xa4\xc3\xb6\xc3\xbcw4b6huwb6nhu": raise MailboxLoginError("BAD", "OK") + def uid(self, command, *args): + if command == "STORE": + for message in self.messages: + if message.uid == args[0]: + flag = args[2] + if flag == "processed": + message._raw_flag_data.append(f"+FLAGS (processed)".encode()) + MailMessage.flags.fget.cache_clear() + class BogusMailBox(ContextManager): def __enter__(self): @@ -64,9 +82,12 @@ class BogusMailBox(ContextManager): self.messages: List[MailMessage] = [] self.messages_spam: List[MailMessage] = [] self.folder = BogusFolderManager() - self.client = BogusClient() + self.client = BogusClient(self.messages) self._host = "" + def updateClient(self): + self.client = BogusClient(self.messages) + def login(self, username, password): # This will raise a UnicodeEncodeError if the password is not ASCII only password.encode("ascii") @@ -248,6 +269,7 @@ class TestMail(DirectoriesMixin, TestCase): seen=False, ), ) + self.bogus_mailbox.updateClient() def test_get_correspondent(self): message = namedtuple("MailMessage", []) @@ -608,6 +630,31 @@ class TestMail(DirectoriesMixin, TestCase): self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0) self.assertEqual(len(self.bogus_mailbox.messages), 3) + def test_handle_mail_mail_account_tag_gmail(self): + self.bogus_mailbox._host = "imap.gmail.com" + + account = MailAccount.objects.create( + name="test", + imap_server="", + username="admin", + password="secret", + ) + + _ = MailRule.objects.create( + name="testrule", + account=account, + action=MailRule.MailAction.TAG, + action_parameter="processed", + ) + + self.assertEqual(len(self.bogus_mailbox.messages), 3) + self.assertEqual(self.async_task.call_count, 0) + self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 2) + self.mail_account_handler.handle_mail_account(account) + self.assertEqual(self.async_task.call_count, 2) + self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0) + self.assertEqual(len(self.bogus_mailbox.messages), 3) + def test_error_login(self): account = MailAccount.objects.create( name="test",