mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-08-16 00:36:22 +00:00
Merge remote-tracking branch 'paperless/dev' into feature-consume-eml
This commit is contained in:
70
src/paperless_mail/tests/test_live_mail.py
Normal file
70
src/paperless_mail/tests/test_live_mail.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from django.test import TestCase
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
from paperless_mail.mail import MailError
|
||||
from paperless_mail.models import MailAccount
|
||||
from paperless_mail.models import MailRule
|
||||
|
||||
# Only run if the environment is setup
|
||||
# And the environment is not empty (forks, I think)
|
||||
@pytest.mark.skipif(
|
||||
"PAPERLESS_MAIL_TEST_HOST" not in os.environ
|
||||
or not len(os.environ["PAPERLESS_MAIL_TEST_HOST"]),
|
||||
reason="Live server testing not enabled",
|
||||
)
|
||||
class TestMailLiveServer(TestCase):
|
||||
def setUp(self) -> None:
|
||||
|
||||
self.mail_account_handler = MailAccountHandler()
|
||||
self.account = MailAccount.objects.create(
|
||||
name="test",
|
||||
imap_server=os.environ["PAPERLESS_MAIL_TEST_HOST"],
|
||||
username=os.environ["PAPERLESS_MAIL_TEST_USER"],
|
||||
password=os.environ["PAPERLESS_MAIL_TEST_PASSWD"],
|
||||
imap_port=993,
|
||||
)
|
||||
|
||||
return super().setUp()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.account.delete()
|
||||
return super().tearDown()
|
||||
|
||||
def test_process_non_gmail_server_flag(self):
|
||||
|
||||
try:
|
||||
rule1 = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=self.account,
|
||||
action=MailRule.MailAction.FLAG,
|
||||
)
|
||||
|
||||
self.mail_account_handler.handle_mail_account(self.account)
|
||||
|
||||
rule1.delete()
|
||||
|
||||
except MailError as e:
|
||||
self.fail(f"Failure: {e}")
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
def test_process_non_gmail_server_tag(self):
|
||||
|
||||
try:
|
||||
|
||||
rule2 = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=self.account,
|
||||
action=MailRule.MailAction.TAG,
|
||||
)
|
||||
|
||||
self.mail_account_handler.handle_mail_account(self.account)
|
||||
|
||||
rule2.delete()
|
||||
|
||||
except MailError as e:
|
||||
self.fail(f"Failure: {e}")
|
||||
except Exception as e:
|
||||
pass
|
@@ -20,6 +20,7 @@ from imap_tools import MailboxFolderSelectError
|
||||
from imap_tools import MailboxLoginError
|
||||
from imap_tools import MailMessage
|
||||
from imap_tools import MailMessageFlags
|
||||
from imap_tools import NOT
|
||||
from paperless_mail import tasks
|
||||
from paperless_mail.mail import MailAccountHandler
|
||||
from paperless_mail.mail import MailError
|
||||
@@ -46,31 +47,66 @@ class BogusFolderManager:
|
||||
|
||||
|
||||
class BogusClient:
|
||||
def authenticate(self, mechanism, authobject):
|
||||
# authobject must be a callable object
|
||||
auth_bytes = authobject(None)
|
||||
if auth_bytes != b"\x00admin\x00w57\xc3\xa4\xc3\xb6\xc3\xbcw4b6huwb6nhu":
|
||||
raise MailboxLoginError("BAD", "OK")
|
||||
def __init__(self, messages):
|
||||
self.messages: List[MailMessage] = messages
|
||||
self.capabilities: List[str] = []
|
||||
|
||||
|
||||
class BogusMailBox(ContextManager):
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
def authenticate(self, mechanism, authobject):
|
||||
# authobject must be a callable object
|
||||
auth_bytes = authobject(None)
|
||||
if auth_bytes != b"\x00admin\x00w57\xc3\xa4\xc3\xb6\xc3\xbcw4b6huwb6nhu":
|
||||
raise MailboxLoginError("BAD", "OK")
|
||||
|
||||
def uid(self, command, *args):
|
||||
if command == "STORE":
|
||||
for message in self.messages:
|
||||
if message.uid == args[0]:
|
||||
flag = args[2]
|
||||
if flag == "processed":
|
||||
message._raw_flag_data.append(f"+FLAGS (processed)".encode())
|
||||
MailMessage.flags.fget.cache_clear()
|
||||
|
||||
|
||||
class BogusMailBox(ContextManager):
|
||||
|
||||
# Common values so tests don't need to remember an accepted login
|
||||
USERNAME: str = "admin"
|
||||
ASCII_PASSWORD: str = "secret"
|
||||
# Note the non-ascii characters here
|
||||
UTF_PASSWORD: str = "w57äöüw4b6huwb6nhu"
|
||||
|
||||
def __init__(self):
|
||||
self.messages: List[MailMessage] = []
|
||||
self.messages_spam: List[MailMessage] = []
|
||||
self.folder = BogusFolderManager()
|
||||
self.client = BogusClient()
|
||||
self.client = BogusClient(self.messages)
|
||||
self._host = ""
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
def updateClient(self):
|
||||
self.client = BogusClient(self.messages)
|
||||
|
||||
def login(self, username, password):
|
||||
# This will raise a UnicodeEncodeError if the password is not ASCII only
|
||||
password.encode("ascii")
|
||||
# Otherwise, check for correct values
|
||||
if username != "admin" or password not in {"secret"}:
|
||||
if username != self.USERNAME or password != self.ASCII_PASSWORD:
|
||||
raise MailboxLoginError("BAD", "OK")
|
||||
|
||||
def login_utf8(self, username, password):
|
||||
# Expected to only be called with the UTF-8 password
|
||||
if username != self.USERNAME or password != self.UTF_PASSWORD:
|
||||
raise MailboxLoginError("BAD", "OK")
|
||||
|
||||
def fetch(self, criteria, mark_seen, charset=""):
|
||||
@@ -100,6 +136,9 @@ class BogusMailBox(ContextManager):
|
||||
tag = criteria[criteria.index("UNKEYWORD") + 1].strip("'")
|
||||
msg = filter(lambda m: "processed" not in m.flags, msg)
|
||||
|
||||
if "(X-GM-LABELS" in criteria: # ['NOT', '(X-GM-LABELS', '"processed"']
|
||||
msg = filter(lambda m: "processed" not in m.flags, msg)
|
||||
|
||||
return list(msg)
|
||||
|
||||
def delete(self, uid_list):
|
||||
@@ -209,7 +248,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
m.return_value = self.bogus_mailbox
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
patcher = mock.patch("paperless_mail.mail.async_task")
|
||||
patcher = mock.patch("paperless_mail.mail.consume_file.delay")
|
||||
self.async_task = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
@@ -247,6 +286,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
seen=False,
|
||||
),
|
||||
)
|
||||
self.bogus_mailbox.updateClient()
|
||||
|
||||
def test_get_correspondent(self):
|
||||
message = namedtuple("MailMessage", [])
|
||||
@@ -607,6 +647,33 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
|
||||
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||
|
||||
def test_handle_mail_account_tag_gmail(self):
|
||||
self.bogus_mailbox._host = "imap.gmail.com"
|
||||
self.bogus_mailbox.client.capabilities = ["X-GM-EXT-1"]
|
||||
|
||||
account = MailAccount.objects.create(
|
||||
name="test",
|
||||
imap_server="",
|
||||
username="admin",
|
||||
password="secret",
|
||||
)
|
||||
|
||||
_ = MailRule.objects.create(
|
||||
name="testrule",
|
||||
account=account,
|
||||
action=MailRule.MailAction.TAG,
|
||||
action_parameter="processed",
|
||||
)
|
||||
|
||||
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||
self.assertEqual(self.async_task.call_count, 0)
|
||||
criteria = NOT(gmail_label="processed")
|
||||
self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 2)
|
||||
self.mail_account_handler.handle_mail_account(account)
|
||||
self.assertEqual(self.async_task.call_count, 2)
|
||||
self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 0)
|
||||
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||
|
||||
def test_error_login(self):
|
||||
account = MailAccount.objects.create(
|
||||
name="test",
|
||||
@@ -878,9 +945,9 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
account = MailAccount.objects.create(
|
||||
name="test",
|
||||
imap_server="",
|
||||
username="admin",
|
||||
username=BogusMailBox.USERNAME,
|
||||
# Note the non-ascii characters here
|
||||
password="w57äöüw4b6huwb6nhu",
|
||||
password=BogusMailBox.UTF_PASSWORD,
|
||||
)
|
||||
|
||||
_ = MailRule.objects.create(
|
||||
@@ -910,7 +977,7 @@ class TestMail(DirectoriesMixin, TestCase):
|
||||
account = MailAccount.objects.create(
|
||||
name="test",
|
||||
imap_server="",
|
||||
username="admin",
|
||||
username=BogusMailBox.USERNAME,
|
||||
# Note the non-ascii characters here
|
||||
# Passes the check in login, not in authenticate
|
||||
password="réception",
|
||||
@@ -965,20 +1032,3 @@ class TestTasks(TestCase):
|
||||
m.side_effect = lambda account: 0
|
||||
result = tasks.process_mail_accounts()
|
||||
self.assertIn("No new", result)
|
||||
|
||||
@mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account")
|
||||
def test_single_accounts(self, m):
|
||||
MailAccount.objects.create(
|
||||
name="A",
|
||||
imap_server="A",
|
||||
username="A",
|
||||
password="A",
|
||||
)
|
||||
|
||||
tasks.process_mail_account("A")
|
||||
|
||||
m.assert_called_once()
|
||||
m.reset_mock()
|
||||
|
||||
tasks.process_mail_account("B")
|
||||
m.assert_not_called()
|
||||
|
Reference in New Issue
Block a user