diff --git a/src/paperless_mail/tests/test_mail.py b/src/paperless_mail/tests/test_mail.py index 6d78f5306..5d7e2a3b6 100644 --- a/src/paperless_mail/tests/test_mail.py +++ b/src/paperless_mail/tests/test_mail.py @@ -1,7 +1,12 @@ +import dataclasses +import email.contentmanager import os +import random import uuid from collections import namedtuple from typing import ContextManager +from typing import List +from typing import Union from unittest import mock from django.core.management import call_command @@ -11,6 +16,7 @@ from documents.models import Correspondent from documents.tests.utils import DirectoriesMixin from imap_tools import EmailAddress from imap_tools import MailboxFolderSelectError +from imap_tools import MailMessage from imap_tools import MailMessageFlags from paperless_mail import tasks from paperless_mail.mail import MailAccountHandler @@ -19,6 +25,15 @@ from paperless_mail.models import MailAccount from paperless_mail.models import MailRule +@dataclasses.dataclass +class _AttachmentDef(object): + filename: str = "a_file.pdf" + maintype: str = "application/pdf" + subtype: str = "pdf" + disposition: str = "attachment" + content: bytes = b"a PDF document" + + class BogusFolderManager: current_folder = "INBOX" @@ -36,8 +51,8 @@ class BogusMailBox(ContextManager): pass def __init__(self): - self.messages = [] - self.messages_spam = [] + self.messages: List[MailMessage] = [] + self.messages_spam: List[MailMessage] = [] def login(self, username, password): if not (username == "admin" and password == "secret"): @@ -59,7 +74,7 @@ class BogusMailBox(ContextManager): if "BODY" in criteria: body = criteria[criteria.index("BODY") + 1].strip('"') - msg = filter(lambda m: body in m.body, msg) + msg = filter(lambda m: body in m.text, msg) if "FROM" in criteria: from_ = criteria[criteria.index("FROM") + 1].strip('"') @@ -84,7 +99,7 @@ class BogusMailBox(ContextManager): def move(self, uid_list, folder): if folder == "spam": - self.messages_spam.append( + self.messages_spam += list( filter(lambda m: m.uid in uid_list, self.messages), ) self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages)) @@ -92,40 +107,63 @@ class BogusMailBox(ContextManager): raise Exception() +_used_uids = set() + + def create_message( - num_attachments=1, - body="", - subject="the suject", - from_="noone@mail.com", - seen=False, - flagged=False, -): - message = namedtuple("MailMessage", []) + attachments: Union[int, List[_AttachmentDef]] = 1, + body: str = "", + subject: str = "the suject", + from_: str = "noone@mail.com", + seen: bool = False, + flagged: bool = False, +) -> MailMessage: + email_msg = email.message.EmailMessage() + # TODO: This does NOT set the UID + email_msg["Message-ID"] = str(uuid.uuid4()) + email_msg["Subject"] = subject + email_msg["From"] = from_ + email_msg.set_content(body) - message.uid = uuid.uuid4() - message.subject = subject - message.attachments = [] - message.from_ = from_ - message.body = body - for i in range(num_attachments): - message.attachments.append(create_attachment(filename=f"file_{i}.pdf")) + # Either add some default number of attachments + # or the provided attachments + if isinstance(attachments, int): + for i in range(attachments): + attachment = _AttachmentDef(filename=f"file_{i}.pdf") + email_msg.add_attachment( + attachment.content, + maintype=attachment.maintype, + subtype=attachment.subtype, + disposition=attachment.disposition, + filename=attachment.filename, + ) + else: + for attachment in attachments: + email_msg.add_attachment( + attachment.content, + maintype=attachment.maintype, + subtype=attachment.subtype, + disposition=attachment.disposition, + filename=attachment.filename, + ) - message.seen = seen - message.flagged = flagged + # Convert the EmailMessage to an imap_tools MailMessage + imap_msg = MailMessage.from_bytes(email_msg.as_bytes()) - return message + # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in, + # based on how imap_tools uses regex to extract it. + # This should be a large enough pool + uid = random.randint(1, 10000) + while uid in _used_uids: + uid = random.randint(1, 10000) + _used_uids.add(uid) + imap_msg._raw_uid_data = f"UID {uid}".encode() -def create_attachment( - filename="the_file.pdf", - content_disposition="attachment", - payload=b"a PDF document", -): - attachment = namedtuple("Attachment", []) - attachment.filename = filename - attachment.content_disposition = content_disposition - attachment.payload = payload - return attachment + imap_msg.seen = seen + imap_msg.flagged = flagged + + return imap_msg def fake_magic_from_buffer(buffer, mime=False): @@ -260,7 +298,7 @@ class TestMail(DirectoriesMixin, TestCase): message = create_message( subject="the message title", from_="Myself", - num_attachments=2, + attachments=2, ) account = MailAccount() @@ -297,14 +335,15 @@ class TestMail(DirectoriesMixin, TestCase): self.assertEqual(result, 0) def test_handle_unknown_mime_type(self): - message = create_message() - message.attachments = [ - create_attachment(filename="f1.pdf"), - create_attachment( - filename="f2.json", - payload=b"{'much': 'payload.', 'so': 'json', 'wow': true}", - ), - ] + message = create_message( + attachments=[ + _AttachmentDef(filename="f1.pdf"), + _AttachmentDef( + filename="f2.json", + content=b"{'much': 'payload.', 'so': 'json', 'wow': true}", + ), + ], + ) account = MailAccount() rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) @@ -319,11 +358,15 @@ class TestMail(DirectoriesMixin, TestCase): self.assertEqual(kwargs["override_filename"], "f1.pdf") def test_handle_disposition(self): - message = create_message() - message.attachments = [ - create_attachment(filename="f1.pdf", content_disposition="inline"), - create_attachment(filename="f2.pdf", content_disposition="attachment"), - ] + message = create_message( + attachments=[ + _AttachmentDef( + filename="f1.pdf", + disposition="inline", + ), + _AttachmentDef(filename="f2.pdf"), + ], + ) account = MailAccount() rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) @@ -337,11 +380,15 @@ class TestMail(DirectoriesMixin, TestCase): self.assertEqual(kwargs["override_filename"], "f2.pdf") def test_handle_inline_files(self): - message = create_message() - message.attachments = [ - create_attachment(filename="f1.pdf", content_disposition="inline"), - create_attachment(filename="f2.pdf", content_disposition="attachment"), - ] + message = create_message( + attachments=[ + _AttachmentDef( + filename="f1.pdf", + disposition="inline", + ), + _AttachmentDef(filename="f2.pdf"), + ], + ) account = MailAccount() rule = MailRule( @@ -356,13 +403,14 @@ class TestMail(DirectoriesMixin, TestCase): self.assertEqual(self.async_task.call_count, 2) def test_filename_filter(self): - message = create_message() - message.attachments = [ - create_attachment(filename="f1.pdf"), - create_attachment(filename="f2.pdf"), - create_attachment(filename="f3.pdf"), - create_attachment(filename="f2.png"), - ] + message = create_message( + attachments=[ + _AttachmentDef(filename="f1.pdf"), + _AttachmentDef(filename="f2.pdf"), + _AttachmentDef(filename="f3.pdf"), + _AttachmentDef(filename="f2.png"), + ], + ) tests = [ ("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf"]), @@ -398,7 +446,7 @@ class TestMail(DirectoriesMixin, TestCase): password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_MARK_READ, @@ -421,7 +469,7 @@ class TestMail(DirectoriesMixin, TestCase): password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_DELETE, @@ -442,7 +490,7 @@ class TestMail(DirectoriesMixin, TestCase): password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_FLAG, @@ -465,7 +513,7 @@ class TestMail(DirectoriesMixin, TestCase): password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_MOVE, @@ -476,7 +524,9 @@ class TestMail(DirectoriesMixin, TestCase): self.assertEqual(self.async_task.call_count, 0) self.assertEqual(len(self.bogus_mailbox.messages), 3) self.assertEqual(len(self.bogus_mailbox.messages_spam), 0) + self.mail_account_handler.handle_mail_account(account) + self.assertEqual(self.async_task.call_count, 1) self.assertEqual(len(self.bogus_mailbox.messages), 2) self.assertEqual(len(self.bogus_mailbox.messages_spam), 1) @@ -496,7 +546,7 @@ class TestMail(DirectoriesMixin, TestCase): ) def test_error_skip_account(self): - account_faulty = MailAccount.objects.create( + _ = MailAccount.objects.create( name="test", imap_server="", username="admin", @@ -509,7 +559,7 @@ class TestMail(DirectoriesMixin, TestCase): username="admin", password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_MOVE, @@ -530,7 +580,7 @@ class TestMail(DirectoriesMixin, TestCase): username="admin", password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_MOVE, @@ -539,7 +589,7 @@ class TestMail(DirectoriesMixin, TestCase): order=1, folder="uuuhhhh", ) - rule2 = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule2", account=account, action=MailRule.ACTION_MOVE, @@ -569,7 +619,7 @@ class TestMail(DirectoriesMixin, TestCase): username="admin", password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", account=account, action=MailRule.ACTION_MOVE, @@ -593,7 +643,7 @@ class TestMail(DirectoriesMixin, TestCase): username="admin", password="secret", ) - rule = MailRule.objects.create( + _ = MailRule.objects.create( name="testrule", filter_from="amazon@amazon.de", account=account,