mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-15 10:13:15 -05:00
Make test work.
This commit is contained in:
parent
104a684514
commit
c90129957e
@ -1,5 +1,6 @@
|
|||||||
import dataclasses
|
import dataclasses
|
||||||
import email.contentmanager
|
import email.contentmanager
|
||||||
|
import imaplib
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
import uuid
|
import uuid
|
||||||
@ -96,6 +97,11 @@ class BogusMailBox(ContextManager):
|
|||||||
if "UNFLAGGED" in criteria:
|
if "UNFLAGGED" in criteria:
|
||||||
msg = filter(lambda m: not m.flagged, msg)
|
msg = filter(lambda m: not m.flagged, msg)
|
||||||
|
|
||||||
|
if "UNKEYWORD" in criteria:
|
||||||
|
tag = criteria[criteria.index("UNKEYWORD") + 1].strip("'")
|
||||||
|
# For custom-flags, we need to access the `_raw_flags` field, as calls to `.flags` get cached, thus breaking the test.
|
||||||
|
msg = filter(lambda m: "processed" not in parse_raw_tags_omit_cache(m), msg)
|
||||||
|
|
||||||
return list(msg)
|
return list(msg)
|
||||||
|
|
||||||
def delete(self, uid_list):
|
def delete(self, uid_list):
|
||||||
@ -109,6 +115,8 @@ class BogusMailBox(ContextManager):
|
|||||||
message.flagged = value
|
message.flagged = value
|
||||||
if flag == MailMessageFlags.SEEN:
|
if flag == MailMessageFlags.SEEN:
|
||||||
message.seen = value
|
message.seen = value
|
||||||
|
if flag == "processed":
|
||||||
|
message._raw_flag_data.append(f"+FLAGS (processed)".encode())
|
||||||
|
|
||||||
def move(self, uid_list, folder):
|
def move(self, uid_list, folder):
|
||||||
if folder == "spam":
|
if folder == "spam":
|
||||||
@ -130,6 +138,7 @@ def create_message(
|
|||||||
from_: str = "noone@mail.com",
|
from_: str = "noone@mail.com",
|
||||||
seen: bool = False,
|
seen: bool = False,
|
||||||
flagged: bool = False,
|
flagged: bool = False,
|
||||||
|
processed: bool = False,
|
||||||
) -> MailMessage:
|
) -> MailMessage:
|
||||||
email_msg = email.message.EmailMessage()
|
email_msg = email.message.EmailMessage()
|
||||||
# TODO: This does NOT set the UID
|
# TODO: This does NOT set the UID
|
||||||
@ -175,6 +184,8 @@ def create_message(
|
|||||||
|
|
||||||
imap_msg.seen = seen
|
imap_msg.seen = seen
|
||||||
imap_msg.flagged = flagged
|
imap_msg.flagged = flagged
|
||||||
|
if processed:
|
||||||
|
imap_msg._raw_flag_data.append(f"+FLAGS (processed)".encode())
|
||||||
|
|
||||||
return imap_msg
|
return imap_msg
|
||||||
|
|
||||||
@ -189,6 +200,13 @@ def fake_magic_from_buffer(buffer, mime=False):
|
|||||||
return "Some verbose file description"
|
return "Some verbose file description"
|
||||||
|
|
||||||
|
|
||||||
|
def parse_raw_tags_omit_cache(m: MailMessage) -> List[str]:
|
||||||
|
raw_result: list[bytes] = []
|
||||||
|
for rf in m._raw_flag_data:
|
||||||
|
raw_result.extend(imaplib.ParseFlags(rf))
|
||||||
|
return [f.decode().strip() for f in raw_result]
|
||||||
|
|
||||||
|
|
||||||
@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
|
@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
|
||||||
class TestMail(DirectoriesMixin, TestCase):
|
class TestMail(DirectoriesMixin, TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -217,6 +235,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
|||||||
body="cables",
|
body="cables",
|
||||||
seen=True,
|
seen=True,
|
||||||
flagged=False,
|
flagged=False,
|
||||||
|
processed=False,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
self.bogus_mailbox.messages.append(
|
self.bogus_mailbox.messages.append(
|
||||||
@ -225,6 +244,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
|||||||
body="from my favorite electronic store",
|
body="from my favorite electronic store",
|
||||||
seen=False,
|
seen=False,
|
||||||
flagged=True,
|
flagged=True,
|
||||||
|
processed=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
self.bogus_mailbox.messages.append(
|
self.bogus_mailbox.messages.append(
|
||||||
@ -571,6 +591,29 @@ class TestMail(DirectoriesMixin, TestCase):
|
|||||||
self.assertEqual(len(self.bogus_mailbox.messages), 2)
|
self.assertEqual(len(self.bogus_mailbox.messages), 2)
|
||||||
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
|
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
|
||||||
|
|
||||||
|
def test_handle_mail_account_tag(self):
|
||||||
|
account = MailAccount.objects.create(
|
||||||
|
name="test",
|
||||||
|
imap_server="",
|
||||||
|
username="admin",
|
||||||
|
password="secret",
|
||||||
|
)
|
||||||
|
|
||||||
|
_ = MailRule.objects.create(
|
||||||
|
name="testrule",
|
||||||
|
account=account,
|
||||||
|
action=MailRule.MailAction.TAG,
|
||||||
|
action_parameter="processed",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||||
|
self.assertEqual(self.async_task.call_count, 0)
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 2)
|
||||||
|
self.mail_account_handler.handle_mail_account(account)
|
||||||
|
self.assertEqual(self.async_task.call_count, 2)
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||||
|
|
||||||
def test_error_login(self):
|
def test_error_login(self):
|
||||||
account = MailAccount.objects.create(
|
account = MailAccount.objects.create(
|
||||||
name="test",
|
name="test",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user