diff --git a/src/paperless_mail/tests.py b/src/paperless_mail/tests.py deleted file mode 100644 index 7ce503c2d..000000000 --- a/src/paperless_mail/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/src/paperless_mail/tests/test_mail.py b/src/paperless_mail/tests/test_mail.py index 9ad826b06..20cf17ec7 100644 --- a/src/paperless_mail/tests/test_mail.py +++ b/src/paperless_mail/tests/test_mail.py @@ -1,15 +1,138 @@ +import uuid from collections import namedtuple +from typing import ContextManager from unittest import mock from django.test import TestCase +from imap_tools import MailMessageFlags, MailboxFolderSelectError from documents.models import Correspondent -from paperless_mail.mail import get_correspondent, get_title, handle_message -from paperless_mail.models import MailRule +from paperless_mail.mail import get_correspondent, get_title, handle_message, handle_mail_account, MailError +from paperless_mail.models import MailRule, MailAccount + + +class BogusFolderManager: + + current_folder = "INBOX" + + def set(self, new_folder): + if new_folder not in ["INBOX", "spam"]: + raise MailboxFolderSelectError(None, "uhm") + self.current_folder = new_folder + + +class BogusMailBox(ContextManager): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __init__(self): + self.messages = [] + self.messages_spam = [] + + def login(self, username, password): + if not (username == 'admin' and password == 'secret'): + raise Exception() + + folder = BogusFolderManager() + + def fetch(self, criteria, mark_seen): + msg = self.messages + + criteria = str(criteria).strip('()').split(" ") + + if 'UNSEEN' in criteria: + msg = filter(lambda m: not m.seen, msg) + + if 'SUBJECT' in criteria: + subject = criteria[criteria.index('SUBJECT') + 1].strip('"') + msg = filter(lambda m: subject in m.subject, msg) + + if 'BODY' in criteria: + body = criteria[criteria.index('BODY') + 1].strip('"') + msg = filter(lambda m: body in m.body, msg) + + if 'FROM' in criteria: + from_ = criteria[criteria.index('FROM') + 1].strip('"') + msg = filter(lambda m: from_ in m.from_, msg) + + if 'UNFLAGGED' in criteria: + msg = filter(lambda m: not m.flagged, msg) + + return list(msg) + + def seen(self, uid_list, seen_val): + for message in self.messages: + if message.uid in uid_list: + message.seen = seen_val + + def delete(self, uid_list): + self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages)) + + def flag(self, uid_list, flag_set, value): + for message in self.messages: + if message.uid in uid_list: + for flag in flag_set: + if flag == MailMessageFlags.FLAGGED: + message.flagged = value + + def move(self, uid_list, folder): + if folder == "spam": + self.messages_spam.append( + filter(lambda m: m.uid in uid_list, self.messages) + ) + self.messages = list( + filter(lambda m: m.uid not in uid_list, self.messages) + ) + else: + raise Exception() + + +def create_message(num_attachments=1, body="", subject="the suject", from_="noone@mail.com", seen=False, flagged=False): + message = namedtuple('MailMessage', []) + + message.uid = uuid.uuid4() + message.subject = subject + message.attachments = [] + message.from_ = from_ + message.body = body + for i in range(num_attachments): + attachment = namedtuple('Attachment', []) + attachment.filename = 'some_file.pdf' + attachment.content_type = 'application/pdf' + attachment.payload = b'content of the attachment' + message.attachments.append(attachment) + + message.seen = seen + message.flagged = flagged + + return message class TestMail(TestCase): + def setUp(self): + patcher = mock.patch('paperless_mail.mail.MailBox') + m = patcher.start() + self.bogus_mailbox = BogusMailBox() + m.return_value = self.bogus_mailbox + self.addCleanup(patcher.stop) + + patcher = mock.patch('paperless_mail.mail.async_task') + self.async_task = patcher.start() + self.addCleanup(patcher.stop) + + self.reset_bogus_mailbox() + + def reset_bogus_mailbox(self): + self.bogus_mailbox.messages = [] + self.bogus_mailbox.messages_spam = [] + self.bogus_mailbox.messages.append(create_message(subject="Invoice 1", from_="amazon@amazon.de", body="cables", seen=True, flagged=False)) + self.bogus_mailbox.messages.append(create_message(subject="Invoice 2", body="from my favorite electronic store", seen=False, flagged=True)) + self.bogus_mailbox.messages.append(create_message(subject="Claim your $10M price now!", from_="amazon@amazon-some-indian-site.org", seen=False)) + def test_get_correspondent(self): message = namedtuple('MailMessage', []) message.from_ = "someone@somewhere.com" @@ -56,8 +179,7 @@ class TestMail(TestCase): rule = MailRule(assign_title_from=MailRule.TITLE_FROM_SUBJECT) self.assertEqual(get_title(message, att, rule), "the message title") - @mock.patch("paperless_mail.mail.async_task") - def test_handle_message(self, m): + def test_handle_message(self): message = namedtuple('MailMessage', []) message.subject = "the message title" @@ -84,10 +206,10 @@ class TestMail(TestCase): self.assertEqual(result, 2) - self.assertEqual(len(m.call_args_list), 2) + self.assertEqual(len(self.async_task.call_args_list), 2) - args1, kwargs1 = m.call_args_list[0] - args2, kwargs2 = m.call_args_list[1] + args1, kwargs1 = self.async_task.call_args_list[0] + args2, kwargs2 = self.async_task.call_args_list[1] self.assertEqual(kwargs1['override_title'], "test1") self.assertEqual(kwargs1['override_filename'], "test1.pdf") @@ -106,3 +228,125 @@ class TestMail(TestCase): self.assertFalse(m.called) self.assertEqual(result, 0) + + def test_handle_mail_account_mark_read(self): + + account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret") + + rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_MARK_READ) + + self.assertEqual(self.async_task.call_count, 0) + self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2) + handle_mail_account(account) + self.assertEqual(self.async_task.call_count, 2) + self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0) + + def test_handle_mail_account_delete(self): + + account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret") + + rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_DELETE, filter_subject="Invoice") + + self.assertEqual(self.async_task.call_count, 0) + self.assertEqual(len(self.bogus_mailbox.messages), 3) + handle_mail_account(account) + self.assertEqual(self.async_task.call_count, 2) + self.assertEqual(len(self.bogus_mailbox.messages), 1) + + def test_handle_mail_account_flag(self): + account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret") + + rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_FLAG, filter_subject="Invoice") + + self.assertEqual(self.async_task.call_count, 0) + self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2) + handle_mail_account(account) + self.assertEqual(self.async_task.call_count, 1) + self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 1) + + def test_handle_mail_account_move(self): + account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret") + + rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_MOVE, action_parameter="spam", filter_subject="Claim") + + self.assertEqual(self.async_task.call_count, 0) + self.assertEqual(len(self.bogus_mailbox.messages), 3) + self.assertEqual(len(self.bogus_mailbox.messages_spam), 0) + handle_mail_account(account) + self.assertEqual(self.async_task.call_count, 1) + self.assertEqual(len(self.bogus_mailbox.messages), 2) + self.assertEqual(len(self.bogus_mailbox.messages_spam), 1) + + def test_errors(self): + account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="wrong") + + try: + handle_mail_account(account) + except MailError as e: + self.assertTrue(str(e).startswith("Error while authenticating account")) + else: + self.fail("Should raise exception") + + account = MailAccount.objects.create(name="test2", imap_server="", username="admin", password="secret") + rule = MailRule.objects.create(name="testrule", account=account, folder="uuuh") + + try: + handle_mail_account(account) + except MailError as e: + self.assertTrue("uuuh does not exist" in str(e)) + else: + self.fail("Should raise exception") + + account = MailAccount.objects.create(name="test3", imap_server="", username="admin", password="secret") + + rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_MOVE, action_parameter="doesnotexist", filter_subject="Claim") + + try: + handle_mail_account(account) + except MailError as e: + self.assertTrue("Error while processing post-consume actions" in str(e)) + else: + self.fail("Should raise exception") + + def test_filters(self): + + account = MailAccount.objects.create(name="test3", imap_server="", username="admin", password="secret") + rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_DELETE, filter_subject="Claim") + + self.assertEqual(self.async_task.call_count, 0) + + self.assertEqual(len(self.bogus_mailbox.messages), 3) + handle_mail_account(account) + self.assertEqual(len(self.bogus_mailbox.messages), 2) + self.assertEqual(self.async_task.call_count, 1) + + self.reset_bogus_mailbox() + + rule.filter_subject = None + rule.filter_body = "electronic" + rule.save() + self.assertEqual(len(self.bogus_mailbox.messages), 3) + handle_mail_account(account) + self.assertEqual(len(self.bogus_mailbox.messages), 2) + self.assertEqual(self.async_task.call_count, 2) + + self.reset_bogus_mailbox() + + rule.filter_from = "amazon" + rule.filter_body = None + rule.save() + self.assertEqual(len(self.bogus_mailbox.messages), 3) + handle_mail_account(account) + self.assertEqual(len(self.bogus_mailbox.messages), 1) + self.assertEqual(self.async_task.call_count, 4) + + self.reset_bogus_mailbox() + + rule.filter_from = "amazon" + rule.filter_body = "cables" + rule.filter_subject = "Invoice" + rule.save() + self.assertEqual(len(self.bogus_mailbox.messages), 3) + handle_mail_account(account) + self.assertEqual(len(self.bogus_mailbox.messages), 2) + self.assertEqual(self.async_task.call_count, 5)