mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
Improves the mail handling testing to use more of the imap_tools library types instead of internal types
This commit is contained in:
parent
4f287b5ecd
commit
9544e6c757
@ -1,7 +1,12 @@
|
||||
import dataclasses
|
||||
import email.contentmanager
|
||||
import os
|
||||
import random
|
||||
import uuid
|
||||
from collections import namedtuple
|
||||
from typing import ContextManager
|
||||
from typing import List
|
||||
from typing import Union
|
||||
from unittest import mock
|
||||
|
||||
from django.core.management import call_command
|
||||
@ -11,6 +16,7 @@ from documents.models import Correspondent
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from imap_tools import EmailAddress
|
||||
from imap_tools import MailboxFolderSelectError
|
||||
from imap_tools import MailMessage
|
||||
from imap_tools import MailMessageFlags
|
||||
from paperless_mail import tasks
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
@ -19,6 +25,15 @@ from paperless_mail.models import MailAccount
|
||||
from paperless_mail.models import MailRule
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class _AttachmentDef(object):
|
||||
filename: str = "a_file.pdf"
|
||||
maintype: str = "application/pdf"
|
||||
subtype: str = "pdf"
|
||||
disposition: str = "attachment"
|
||||
content: bytes = b"a PDF document"
|
||||
|
||||
|
||||
class BogusFolderManager:
|
||||
current_folder = "INBOX"
|
||||
|
||||
@ -36,8 +51,8 @@ class BogusMailBox(ContextManager):
|
||||
pass
|
||||
|
||||
def __init__(self):
|
||||
self.messages = []
|
||||
self.messages_spam = []
|
||||
self.messages: List[MailMessage] = []
|
||||
self.messages_spam: List[MailMessage] = []
|
||||
|
||||
def login(self, username, password):
|
||||
if not (username == "admin" and password == "secret"):
|
||||
@ -59,7 +74,7 @@ class BogusMailBox(ContextManager):
|
||||
|
||||
if "BODY" in criteria:
|
||||
body = criteria[criteria.index("BODY") + 1].strip('"')
|
||||
msg = filter(lambda m: body in m.body, msg)
|
||||
msg = filter(lambda m: body in m.text, msg)
|
||||
|
||||
if "FROM" in criteria:
|
||||
from_ = criteria[criteria.index("FROM") + 1].strip('"')
|
||||
@ -84,7 +99,7 @@ class BogusMailBox(ContextManager):
|
||||
|
||||
def move(self, uid_list, folder):
|
||||
if folder == "spam":
|
||||
self.messages_spam.append(
|
||||
self.messages_spam += list(
|
||||
filter(lambda m: m.uid in uid_list, self.messages),
|
||||
)
|
||||
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
|
||||
@ -92,40 +107,63 @@ class BogusMailBox(ContextManager):
|
||||
raise Exception()
|
||||
|
||||
|
||||
_used_uids = set()
|
||||
|
||||
|
||||
def create_message(
|
||||
num_attachments=1,
|
||||
body="",
|
||||
subject="the suject",
|
||||
from_="noone@mail.com",
|
||||
seen=False,
|
||||
flagged=False,
|
||||
):
|
||||
message = namedtuple("MailMessage", [])
|
||||
attachments: Union[int, List[_AttachmentDef]] = 1,
|
||||
body: str = "",
|
||||
subject: str = "the suject",
|
||||
from_: str = "noone@mail.com",
|
||||
seen: bool = False,
|
||||
flagged: bool = False,
|
||||
) -> MailMessage:
|
||||
email_msg = email.message.EmailMessage()
|
||||
# TODO: This does NOT set the UID
|
||||
email_msg["Message-ID"] = str(uuid.uuid4())
|
||||
email_msg["Subject"] = subject
|
||||
email_msg["From"] = from_
|
||||
email_msg.set_content(body)
|
||||
|
||||
message.uid = uuid.uuid4()
|
||||
message.subject = subject
|
||||
message.attachments = []
|
||||
message.from_ = from_
|
||||
message.body = body
|
||||
for i in range(num_attachments):
|
||||
message.attachments.append(create_attachment(filename=f"file_{i}.pdf"))
|
||||
# Either add some default number of attachments
|
||||
# or the provided attachments
|
||||
if isinstance(attachments, int):
|
||||
for i in range(attachments):
|
||||
attachment = _AttachmentDef(filename=f"file_{i}.pdf")
|
||||
email_msg.add_attachment(
|
||||
attachment.content,
|
||||
maintype=attachment.maintype,
|
||||
subtype=attachment.subtype,
|
||||
disposition=attachment.disposition,
|
||||
filename=attachment.filename,
|
||||
)
|
||||
else:
|
||||
for attachment in attachments:
|
||||
email_msg.add_attachment(
|
||||
attachment.content,
|
||||
maintype=attachment.maintype,
|
||||
subtype=attachment.subtype,
|
||||
disposition=attachment.disposition,
|
||||
filename=attachment.filename,
|
||||
)
|
||||
|
||||
message.seen = seen
|
||||
message.flagged = flagged
|
||||
# Convert the EmailMessage to an imap_tools MailMessage
|
||||
imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
|
||||
|
||||
return message
|
||||
# TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
|
||||
# based on how imap_tools uses regex to extract it.
|
||||
# This should be a large enough pool
|
||||
uid = random.randint(1, 10000)
|
||||
while uid in _used_uids:
|
||||
uid = random.randint(1, 10000)
|
||||
_used_uids.add(uid)
|
||||
|
||||
imap_msg._raw_uid_data = f"UID {uid}".encode()
|
||||
|
||||
def create_attachment(
|
||||
filename="the_file.pdf",
|
||||
content_disposition="attachment",
|
||||
payload=b"a PDF document",
|
||||
):
|
||||
attachment = namedtuple("Attachment", [])
|
||||
attachment.filename = filename
|
||||
attachment.content_disposition = content_disposition
|
||||
attachment.payload = payload
|
||||
return attachment
|
||||
imap_msg.seen = seen
|
||||
imap_msg.flagged = flagged
|
||||
|
||||
return imap_msg
|
||||
|
||||
|
||||
def fake_magic_from_buffer(buffer, mime=False):
|
||||
@ -260,7 +298,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
message = create_message(
|
||||
subject="the message title",
|
||||
from_="Myself",
|
||||
num_attachments=2,
|
||||
attachments=2,
|
||||
)
|
||||
|
||||
account = MailAccount()
|
||||
@ -297,14 +335,15 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(result, 0)
|
||||
|
||||
def test_handle_unknown_mime_type(self):
|
||||
message = create_message()
|
||||
message.attachments = [
|
||||
create_attachment(filename="f1.pdf"),
|
||||
create_attachment(
|
||||
filename="f2.json",
|
||||
payload=b"{'much': 'payload.', 'so': 'json', 'wow': true}",
|
||||
),
|
||||
]
|
||||
message = create_message(
|
||||
attachments=[
|
||||
_AttachmentDef(filename="f1.pdf"),
|
||||
_AttachmentDef(
|
||||
filename="f2.json",
|
||||
content=b"{'much': 'payload.', 'so': 'json', 'wow': true}",
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
account = MailAccount()
|
||||
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
|
||||
@ -319,11 +358,15 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(kwargs["override_filename"], "f1.pdf")
|
||||
|
||||
def test_handle_disposition(self):
|
||||
message = create_message()
|
||||
message.attachments = [
|
||||
create_attachment(filename="f1.pdf", content_disposition="inline"),
|
||||
create_attachment(filename="f2.pdf", content_disposition="attachment"),
|
||||
]
|
||||
message = create_message(
|
||||
attachments=[
|
||||
_AttachmentDef(
|
||||
filename="f1.pdf",
|
||||
disposition="inline",
|
||||
),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
],
|
||||
)
|
||||
|
||||
account = MailAccount()
|
||||
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
|
||||
@ -337,11 +380,15 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(kwargs["override_filename"], "f2.pdf")
|
||||
|
||||
def test_handle_inline_files(self):
|
||||
message = create_message()
|
||||
message.attachments = [
|
||||
create_attachment(filename="f1.pdf", content_disposition="inline"),
|
||||
create_attachment(filename="f2.pdf", content_disposition="attachment"),
|
||||
]
|
||||
message = create_message(
|
||||
attachments=[
|
||||
_AttachmentDef(
|
||||
filename="f1.pdf",
|
||||
disposition="inline",
|
||||
),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
],
|
||||
)
|
||||
|
||||
account = MailAccount()
|
||||
rule = MailRule(
|
||||
@ -356,13 +403,14 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(self.async_task.call_count, 2)
|
||||
|
||||
def test_filename_filter(self):
|
||||
message = create_message()
|
||||
message.attachments = [
|
||||
create_attachment(filename="f1.pdf"),
|
||||
create_attachment(filename="f2.pdf"),
|
||||
create_attachment(filename="f3.pdf"),
|
||||
create_attachment(filename="f2.png"),
|
||||
]
|
||||
message = create_message(
|
||||
attachments=[
|
||||
_AttachmentDef(filename="f1.pdf"),
|
||||
_AttachmentDef(filename="f2.pdf"),
|
||||
_AttachmentDef(filename="f3.pdf"),
|
||||
_AttachmentDef(filename="f2.png"),
|
||||
],
|
||||
)
|
||||
|
||||
tests = [
|
||||
("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf"]),
|
||||
@ -398,7 +446,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
password="secret",
|
||||
)
|
||||
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_MARK_READ,
|
||||
@ -421,7 +469,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
password="secret",
|
||||
)
|
||||
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_DELETE,
|
||||
@ -442,7 +490,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
password="secret",
|
||||
)
|
||||
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_FLAG,
|
||||
@ -465,7 +513,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
password="secret",
|
||||
)
|
||||
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_MOVE,
|
||||
@ -476,7 +524,9 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(self.async_task.call_count, 0)
|
||||
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||
self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
|
||||
|
||||
self.mail_account_handler.handle_mail_account(account)
|
||||
|
||||
self.assertEqual(self.async_task.call_count, 1)
|
||||
self.assertEqual(len(self.bogus_mailbox.messages), 2)
|
||||
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
|
||||
@ -496,7 +546,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
)
|
||||
|
||||
def test_error_skip_account(self):
|
||||
account_faulty = MailAccount.objects.create(
|
||||
_ = MailAccount.objects.create(
|
||||
name="test",
|
||||
imap_server="",
|
||||
username="admin",
|
||||
@ -509,7 +559,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
username="admin",
|
||||
password="secret",
|
||||
)
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_MOVE,
|
||||
@ -530,7 +580,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
username="admin",
|
||||
password="secret",
|
||||
)
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_MOVE,
|
||||
@ -539,7 +589,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
order=1,
|
||||
folder="uuuhhhh",
|
||||
)
|
||||
rule2 = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule2",
|
||||
account=account,
|
||||
action=MailRule.ACTION_MOVE,
|
||||
@ -569,7 +619,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
username="admin",
|
||||
password="secret",
|
||||
)
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.ACTION_MOVE,
|
||||
@ -593,7 +643,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
username="admin",
|
||||
password="secret",
|
||||
)
|
||||
rule = MailRule.objects.create(
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
filter_from="amazon@amazon.de",
|
||||
account=account,
|
||||
|
Loading…
x
Reference in New Issue
Block a user