Merge pull request #393 from stumpylog/fix-imap-tools-bug

Fix imap tools bug
This commit is contained in:
shamoon 2022-03-20 13:37:17 -07:00 committed by GitHub
commit f74e15840d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 86 deletions

View File

@ -16,6 +16,7 @@ from imap_tools import AND
from imap_tools import MailBox from imap_tools import MailBox
from imap_tools import MailboxFolderSelectError from imap_tools import MailboxFolderSelectError
from imap_tools import MailBoxUnencrypted from imap_tools import MailBoxUnencrypted
from imap_tools import MailMessage
from imap_tools import MailMessageFlags from imap_tools import MailMessageFlags
from paperless_mail.models import MailAccount from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule from paperless_mail.models import MailRule
@ -122,7 +123,7 @@ class MailAccountHandler(LoggingMixin):
"Unknown title selector.", "Unknown title selector.",
) # pragma: nocover ) # pragma: nocover
def get_correspondent(self, message, rule): def get_correspondent(self, message: MailMessage, rule):
c_from = rule.assign_correspondent_from c_from = rule.assign_correspondent_from
if c_from == MailRule.CORRESPONDENT_FROM_NOTHING: if c_from == MailRule.CORRESPONDENT_FROM_NOTHING:
@ -132,12 +133,9 @@ class MailAccountHandler(LoggingMixin):
return self._correspondent_from_name(message.from_) return self._correspondent_from_name(message.from_)
elif c_from == MailRule.CORRESPONDENT_FROM_NAME: elif c_from == MailRule.CORRESPONDENT_FROM_NAME:
if ( from_values = message.from_values
message.from_values if from_values is not None and len(from_values.name) > 0:
and "name" in message.from_values return self._correspondent_from_name(from_values.name)
and message.from_values["name"]
):
return self._correspondent_from_name(message.from_values["name"])
else: else:
return self._correspondent_from_name(message.from_) return self._correspondent_from_name(message.from_)

View File

@ -1,7 +1,12 @@
import dataclasses
import email.contentmanager
import os import os
import random
import uuid import uuid
from collections import namedtuple from collections import namedtuple
from typing import ContextManager from typing import ContextManager
from typing import List
from typing import Union
from unittest import mock from unittest import mock
from django.core.management import call_command from django.core.management import call_command
@ -9,7 +14,9 @@ from django.db import DatabaseError
from django.test import TestCase from django.test import TestCase
from documents.models import Correspondent from documents.models import Correspondent
from documents.tests.utils import DirectoriesMixin from documents.tests.utils import DirectoriesMixin
from imap_tools import EmailAddress
from imap_tools import MailboxFolderSelectError from imap_tools import MailboxFolderSelectError
from imap_tools import MailMessage
from imap_tools import MailMessageFlags from imap_tools import MailMessageFlags
from paperless_mail import tasks from paperless_mail import tasks
from paperless_mail.mail import MailAccountHandler from paperless_mail.mail import MailAccountHandler
@ -18,8 +25,16 @@ from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule from paperless_mail.models import MailRule
class BogusFolderManager: @dataclasses.dataclass
class _AttachmentDef(object):
filename: str = "a_file.pdf"
maintype: str = "application/pdf"
subtype: str = "pdf"
disposition: str = "attachment"
content: bytes = b"a PDF document"
class BogusFolderManager:
current_folder = "INBOX" current_folder = "INBOX"
def set(self, new_folder): def set(self, new_folder):
@ -36,8 +51,8 @@ class BogusMailBox(ContextManager):
pass pass
def __init__(self): def __init__(self):
self.messages = [] self.messages: List[MailMessage] = []
self.messages_spam = [] self.messages_spam: List[MailMessage] = []
def login(self, username, password): def login(self, username, password):
if not (username == "admin" and password == "secret"): if not (username == "admin" and password == "secret"):
@ -59,7 +74,7 @@ class BogusMailBox(ContextManager):
if "BODY" in criteria: if "BODY" in criteria:
body = criteria[criteria.index("BODY") + 1].strip('"') body = criteria[criteria.index("BODY") + 1].strip('"')
msg = filter(lambda m: body in m.body, msg) msg = filter(lambda m: body in m.text, msg)
if "FROM" in criteria: if "FROM" in criteria:
from_ = criteria[criteria.index("FROM") + 1].strip('"') from_ = criteria[criteria.index("FROM") + 1].strip('"')
@ -84,7 +99,7 @@ class BogusMailBox(ContextManager):
def move(self, uid_list, folder): def move(self, uid_list, folder):
if folder == "spam": if folder == "spam":
self.messages_spam.append( self.messages_spam += list(
filter(lambda m: m.uid in uid_list, self.messages), filter(lambda m: m.uid in uid_list, self.messages),
) )
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages)) self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
@ -92,44 +107,66 @@ class BogusMailBox(ContextManager):
raise Exception() raise Exception()
_used_uids = set()
def create_message( def create_message(
num_attachments=1, attachments: Union[int, List[_AttachmentDef]] = 1,
body="", body: str = "",
subject="the suject", subject: str = "the suject",
from_="noone@mail.com", from_: str = "noone@mail.com",
seen=False, seen: bool = False,
flagged=False, flagged: bool = False,
): ) -> MailMessage:
message = namedtuple("MailMessage", []) email_msg = email.message.EmailMessage()
# TODO: This does NOT set the UID
email_msg["Message-ID"] = str(uuid.uuid4())
email_msg["Subject"] = subject
email_msg["From"] = from_
email_msg.set_content(body)
message.uid = uuid.uuid4() # Either add some default number of attachments
message.subject = subject # or the provided attachments
message.attachments = [] if isinstance(attachments, int):
message.from_ = from_ for i in range(attachments):
message.body = body attachment = _AttachmentDef(filename=f"file_{i}.pdf")
for i in range(num_attachments): email_msg.add_attachment(
message.attachments.append(create_attachment(filename=f"file_{i}.pdf")) attachment.content,
maintype=attachment.maintype,
subtype=attachment.subtype,
disposition=attachment.disposition,
filename=attachment.filename,
)
else:
for attachment in attachments:
email_msg.add_attachment(
attachment.content,
maintype=attachment.maintype,
subtype=attachment.subtype,
disposition=attachment.disposition,
filename=attachment.filename,
)
message.seen = seen # Convert the EmailMessage to an imap_tools MailMessage
message.flagged = flagged imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
return message # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
# based on how imap_tools uses regex to extract it.
# This should be a large enough pool
uid = random.randint(1, 10000)
while uid in _used_uids:
uid = random.randint(1, 10000)
_used_uids.add(uid)
imap_msg._raw_uid_data = f"UID {uid}".encode()
def create_attachment( imap_msg.seen = seen
filename="the_file.pdf", imap_msg.flagged = flagged
content_disposition="attachment",
payload=b"a PDF document", return imap_msg
):
attachment = namedtuple("Attachment", [])
attachment.filename = filename
attachment.content_disposition = content_disposition
attachment.payload = payload
return attachment
def fake_magic_from_buffer(buffer, mime=False): def fake_magic_from_buffer(buffer, mime=False):
if mime: if mime:
if "PDF" in str(buffer): if "PDF" in str(buffer):
return "application/pdf" return "application/pdf"
@ -188,11 +225,19 @@ class TestMail(DirectoriesMixin, TestCase):
def test_get_correspondent(self): def test_get_correspondent(self):
message = namedtuple("MailMessage", []) message = namedtuple("MailMessage", [])
message.from_ = "someone@somewhere.com" message.from_ = "someone@somewhere.com"
message.from_values = {"name": "Someone!", "email": "someone@somewhere.com"} message.from_values = EmailAddress(
"Someone!",
"someone@somewhere.com",
"Someone! <someone@somewhere.com>",
)
message2 = namedtuple("MailMessage", []) message2 = namedtuple("MailMessage", [])
message2.from_ = "me@localhost.com" message2.from_ = "me@localhost.com"
message2.from_values = {"name": "", "email": "fake@localhost.com"} message2.from_values = EmailAddress(
"",
"fake@localhost.com",
"",
)
me_localhost = Correspondent.objects.create(name=message2.from_) me_localhost = Correspondent.objects.create(name=message2.from_)
someone_else = Correspondent.objects.create(name="someone else") someone_else = Correspondent.objects.create(name="someone else")
@ -253,7 +298,7 @@ class TestMail(DirectoriesMixin, TestCase):
message = create_message( message = create_message(
subject="the message title", subject="the message title",
from_="Myself", from_="Myself",
num_attachments=2, attachments=2,
) )
account = MailAccount() account = MailAccount()
@ -290,14 +335,15 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(result, 0) self.assertEqual(result, 0)
def test_handle_unknown_mime_type(self): def test_handle_unknown_mime_type(self):
message = create_message() message = create_message(
message.attachments = [ attachments=[
create_attachment(filename="f1.pdf"), _AttachmentDef(filename="f1.pdf"),
create_attachment( _AttachmentDef(
filename="f2.json", filename="f2.json",
payload=b"{'much': 'payload.', 'so': 'json', 'wow': true}", content=b"{'much': 'payload.', 'so': 'json', 'wow': true}",
), ),
] ],
)
account = MailAccount() account = MailAccount()
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
@ -312,11 +358,15 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(kwargs["override_filename"], "f1.pdf") self.assertEqual(kwargs["override_filename"], "f1.pdf")
def test_handle_disposition(self): def test_handle_disposition(self):
message = create_message() message = create_message(
message.attachments = [ attachments=[
create_attachment(filename="f1.pdf", content_disposition="inline"), _AttachmentDef(
create_attachment(filename="f2.pdf", content_disposition="attachment"), filename="f1.pdf",
] disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
account = MailAccount() account = MailAccount()
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
@ -330,11 +380,15 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(kwargs["override_filename"], "f2.pdf") self.assertEqual(kwargs["override_filename"], "f2.pdf")
def test_handle_inline_files(self): def test_handle_inline_files(self):
message = create_message() message = create_message(
message.attachments = [ attachments=[
create_attachment(filename="f1.pdf", content_disposition="inline"), _AttachmentDef(
create_attachment(filename="f2.pdf", content_disposition="attachment"), filename="f1.pdf",
] disposition="inline",
),
_AttachmentDef(filename="f2.pdf"),
],
)
account = MailAccount() account = MailAccount()
rule = MailRule( rule = MailRule(
@ -349,13 +403,14 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(self.async_task.call_count, 2) self.assertEqual(self.async_task.call_count, 2)
def test_filename_filter(self): def test_filename_filter(self):
message = create_message() message = create_message(
message.attachments = [ attachments=[
create_attachment(filename="f1.pdf"), _AttachmentDef(filename="f1.pdf"),
create_attachment(filename="f2.pdf"), _AttachmentDef(filename="f2.pdf"),
create_attachment(filename="f3.pdf"), _AttachmentDef(filename="f3.pdf"),
create_attachment(filename="f2.png"), _AttachmentDef(filename="f2.png"),
] ],
)
tests = [ tests = [
("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf"]), ("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf"]),
@ -391,7 +446,7 @@ class TestMail(DirectoriesMixin, TestCase):
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_MARK_READ, action=MailRule.ACTION_MARK_READ,
@ -414,7 +469,7 @@ class TestMail(DirectoriesMixin, TestCase):
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_DELETE, action=MailRule.ACTION_DELETE,
@ -435,7 +490,7 @@ class TestMail(DirectoriesMixin, TestCase):
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_FLAG, action=MailRule.ACTION_FLAG,
@ -458,7 +513,7 @@ class TestMail(DirectoriesMixin, TestCase):
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_MOVE, action=MailRule.ACTION_MOVE,
@ -469,7 +524,9 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(self.async_task.call_count, 0) self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3) self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 0) self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
self.mail_account_handler.handle_mail_account(account) self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 1) self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2) self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1) self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
@ -482,15 +539,14 @@ class TestMail(DirectoriesMixin, TestCase):
password="wrong", password="wrong",
) )
try: with self.assertRaises(MailError) as context:
self.mail_account_handler.handle_mail_account(account) self.mail_account_handler.handle_mail_account(account)
except MailError as e: self.assertTrue(
self.assertTrue(str(e).startswith("Error while authenticating account")) str(context).startswith("Error while authenticating account"),
else: )
self.fail("Should raise exception")
def test_error_skip_account(self): def test_error_skip_account(self):
account_faulty = MailAccount.objects.create( _ = MailAccount.objects.create(
name="test", name="test",
imap_server="", imap_server="",
username="admin", username="admin",
@ -503,7 +559,7 @@ class TestMail(DirectoriesMixin, TestCase):
username="admin", username="admin",
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_MOVE, action=MailRule.ACTION_MOVE,
@ -524,7 +580,7 @@ class TestMail(DirectoriesMixin, TestCase):
username="admin", username="admin",
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_MOVE, action=MailRule.ACTION_MOVE,
@ -533,7 +589,7 @@ class TestMail(DirectoriesMixin, TestCase):
order=1, order=1,
folder="uuuhhhh", folder="uuuhhhh",
) )
rule2 = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule2", name="testrule2",
account=account, account=account,
action=MailRule.ACTION_MOVE, action=MailRule.ACTION_MOVE,
@ -563,7 +619,7 @@ class TestMail(DirectoriesMixin, TestCase):
username="admin", username="admin",
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
account=account, account=account,
action=MailRule.ACTION_MOVE, action=MailRule.ACTION_MOVE,
@ -587,7 +643,7 @@ class TestMail(DirectoriesMixin, TestCase):
username="admin", username="admin",
password="secret", password="secret",
) )
rule = MailRule.objects.create( _ = MailRule.objects.create(
name="testrule", name="testrule",
filter_from="amazon@amazon.de", filter_from="amazon@amazon.de",
account=account, account=account,
@ -676,7 +732,6 @@ class TestManagementCommand(TestCase):
"paperless_mail.management.commands.mail_fetcher.tasks.process_mail_accounts", "paperless_mail.management.commands.mail_fetcher.tasks.process_mail_accounts",
) )
def test_mail_fetcher(self, m): def test_mail_fetcher(self, m):
call_command("mail_fetcher") call_command("mail_fetcher")
m.assert_called_once() m.assert_called_once()
@ -711,7 +766,6 @@ class TestTasks(TestCase):
@mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account") @mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account")
def test_single_accounts(self, m): def test_single_accounts(self, m):
MailAccount.objects.create( MailAccount.objects.create(
name="A", name="A",
imap_server="A", imap_server="A",