the new mail features is now fully tested.

This commit is contained in:
Jonas Winkler 2020-11-17 22:22:30 +01:00
parent c5b2bd6b48
commit de1643f0ea
2 changed files with 251 additions and 10 deletions

View File

@ -1,3 +0,0 @@
from django.test import TestCase
# Create your tests here.

View File

@ -1,15 +1,138 @@
import uuid
from collections import namedtuple from collections import namedtuple
from typing import ContextManager
from unittest import mock from unittest import mock
from django.test import TestCase from django.test import TestCase
from imap_tools import MailMessageFlags, MailboxFolderSelectError
from documents.models import Correspondent from documents.models import Correspondent
from paperless_mail.mail import get_correspondent, get_title, handle_message from paperless_mail.mail import get_correspondent, get_title, handle_message, handle_mail_account, MailError
from paperless_mail.models import MailRule from paperless_mail.models import MailRule, MailAccount
class BogusFolderManager:
current_folder = "INBOX"
def set(self, new_folder):
if new_folder not in ["INBOX", "spam"]:
raise MailboxFolderSelectError(None, "uhm")
self.current_folder = new_folder
class BogusMailBox(ContextManager):
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __init__(self):
self.messages = []
self.messages_spam = []
def login(self, username, password):
if not (username == 'admin' and password == 'secret'):
raise Exception()
folder = BogusFolderManager()
def fetch(self, criteria, mark_seen):
msg = self.messages
criteria = str(criteria).strip('()').split(" ")
if 'UNSEEN' in criteria:
msg = filter(lambda m: not m.seen, msg)
if 'SUBJECT' in criteria:
subject = criteria[criteria.index('SUBJECT') + 1].strip('"')
msg = filter(lambda m: subject in m.subject, msg)
if 'BODY' in criteria:
body = criteria[criteria.index('BODY') + 1].strip('"')
msg = filter(lambda m: body in m.body, msg)
if 'FROM' in criteria:
from_ = criteria[criteria.index('FROM') + 1].strip('"')
msg = filter(lambda m: from_ in m.from_, msg)
if 'UNFLAGGED' in criteria:
msg = filter(lambda m: not m.flagged, msg)
return list(msg)
def seen(self, uid_list, seen_val):
for message in self.messages:
if message.uid in uid_list:
message.seen = seen_val
def delete(self, uid_list):
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
def flag(self, uid_list, flag_set, value):
for message in self.messages:
if message.uid in uid_list:
for flag in flag_set:
if flag == MailMessageFlags.FLAGGED:
message.flagged = value
def move(self, uid_list, folder):
if folder == "spam":
self.messages_spam.append(
filter(lambda m: m.uid in uid_list, self.messages)
)
self.messages = list(
filter(lambda m: m.uid not in uid_list, self.messages)
)
else:
raise Exception()
def create_message(num_attachments=1, body="", subject="the suject", from_="noone@mail.com", seen=False, flagged=False):
message = namedtuple('MailMessage', [])
message.uid = uuid.uuid4()
message.subject = subject
message.attachments = []
message.from_ = from_
message.body = body
for i in range(num_attachments):
attachment = namedtuple('Attachment', [])
attachment.filename = 'some_file.pdf'
attachment.content_type = 'application/pdf'
attachment.payload = b'content of the attachment'
message.attachments.append(attachment)
message.seen = seen
message.flagged = flagged
return message
class TestMail(TestCase): class TestMail(TestCase):
def setUp(self):
patcher = mock.patch('paperless_mail.mail.MailBox')
m = patcher.start()
self.bogus_mailbox = BogusMailBox()
m.return_value = self.bogus_mailbox
self.addCleanup(patcher.stop)
patcher = mock.patch('paperless_mail.mail.async_task')
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
self.reset_bogus_mailbox()
def reset_bogus_mailbox(self):
self.bogus_mailbox.messages = []
self.bogus_mailbox.messages_spam = []
self.bogus_mailbox.messages.append(create_message(subject="Invoice 1", from_="amazon@amazon.de", body="cables", seen=True, flagged=False))
self.bogus_mailbox.messages.append(create_message(subject="Invoice 2", body="from my favorite electronic store", seen=False, flagged=True))
self.bogus_mailbox.messages.append(create_message(subject="Claim your $10M price now!", from_="amazon@amazon-some-indian-site.org", seen=False))
def test_get_correspondent(self): def test_get_correspondent(self):
message = namedtuple('MailMessage', []) message = namedtuple('MailMessage', [])
message.from_ = "someone@somewhere.com" message.from_ = "someone@somewhere.com"
@ -56,8 +179,7 @@ class TestMail(TestCase):
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_SUBJECT) rule = MailRule(assign_title_from=MailRule.TITLE_FROM_SUBJECT)
self.assertEqual(get_title(message, att, rule), "the message title") self.assertEqual(get_title(message, att, rule), "the message title")
@mock.patch("paperless_mail.mail.async_task") def test_handle_message(self):
def test_handle_message(self, m):
message = namedtuple('MailMessage', []) message = namedtuple('MailMessage', [])
message.subject = "the message title" message.subject = "the message title"
@ -84,10 +206,10 @@ class TestMail(TestCase):
self.assertEqual(result, 2) self.assertEqual(result, 2)
self.assertEqual(len(m.call_args_list), 2) self.assertEqual(len(self.async_task.call_args_list), 2)
args1, kwargs1 = m.call_args_list[0] args1, kwargs1 = self.async_task.call_args_list[0]
args2, kwargs2 = m.call_args_list[1] args2, kwargs2 = self.async_task.call_args_list[1]
self.assertEqual(kwargs1['override_title'], "test1") self.assertEqual(kwargs1['override_title'], "test1")
self.assertEqual(kwargs1['override_filename'], "test1.pdf") self.assertEqual(kwargs1['override_filename'], "test1.pdf")
@ -106,3 +228,125 @@ class TestMail(TestCase):
self.assertFalse(m.called) self.assertFalse(m.called)
self.assertEqual(result, 0) self.assertEqual(result, 0)
def test_handle_mail_account_mark_read(self):
account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_MARK_READ)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
def test_handle_mail_account_delete(self):
account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_DELETE, filter_subject="Invoice")
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.messages), 1)
def test_handle_mail_account_flag(self):
account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_FLAG, filter_subject="Invoice")
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 1)
def test_handle_mail_account_move(self):
account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_MOVE, action_parameter="spam", filter_subject="Claim")
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
def test_errors(self):
account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="wrong")
try:
handle_mail_account(account)
except MailError as e:
self.assertTrue(str(e).startswith("Error while authenticating account"))
else:
self.fail("Should raise exception")
account = MailAccount.objects.create(name="test2", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, folder="uuuh")
try:
handle_mail_account(account)
except MailError as e:
self.assertTrue("uuuh does not exist" in str(e))
else:
self.fail("Should raise exception")
account = MailAccount.objects.create(name="test3", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_MOVE, action_parameter="doesnotexist", filter_subject="Claim")
try:
handle_mail_account(account)
except MailError as e:
self.assertTrue("Error while processing post-consume actions" in str(e))
else:
self.fail("Should raise exception")
def test_filters(self):
account = MailAccount.objects.create(name="test3", imap_server="", username="admin", password="secret")
rule = MailRule.objects.create(name="testrule", account=account, action=MailRule.ACTION_DELETE, filter_subject="Claim")
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 1)
self.reset_bogus_mailbox()
rule.filter_subject = None
rule.filter_body = "electronic"
rule.save()
self.assertEqual(len(self.bogus_mailbox.messages), 3)
handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 2)
self.reset_bogus_mailbox()
rule.filter_from = "amazon"
rule.filter_body = None
rule.save()
self.assertEqual(len(self.bogus_mailbox.messages), 3)
handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 1)
self.assertEqual(self.async_task.call_count, 4)
self.reset_bogus_mailbox()
rule.filter_from = "amazon"
rule.filter_body = "cables"
rule.filter_subject = "Invoice"
rule.save()
self.assertEqual(len(self.bogus_mailbox.messages), 3)
handle_mail_account(account)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 5)