mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
Implements a fallback to AUTH=PLAIN in the event of a UnicodeEncodeError during a normal login
This commit is contained in:
parent
f9194bd28c
commit
1e9378b429
@ -3,6 +3,7 @@ import tempfile
|
|||||||
from datetime import date
|
from datetime import date
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from fnmatch import fnmatch
|
from fnmatch import fnmatch
|
||||||
|
from imaplib import IMAP4
|
||||||
|
|
||||||
import magic
|
import magic
|
||||||
import pathvalidate
|
import pathvalidate
|
||||||
@ -145,7 +146,7 @@ class MailAccountHandler(LoggingMixin):
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
"Unknwown correspondent selector",
|
"Unknown correspondent selector",
|
||||||
) # pragma: nocover
|
) # pragma: nocover
|
||||||
|
|
||||||
def handle_mail_account(self, account):
|
def handle_mail_account(self, account):
|
||||||
@ -164,6 +165,26 @@ class MailAccountHandler(LoggingMixin):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
M.login(account.username, account.password)
|
M.login(account.username, account.password)
|
||||||
|
|
||||||
|
except UnicodeEncodeError:
|
||||||
|
try:
|
||||||
|
# rfc2595 section 6 - PLAIN SASL mechanism
|
||||||
|
client: IMAP4 = M.client
|
||||||
|
encoded = (
|
||||||
|
b"\0"
|
||||||
|
+ account.username.encode("utf8")
|
||||||
|
+ b"\0"
|
||||||
|
+ account.password.encode("utf8")
|
||||||
|
)
|
||||||
|
# Assumption is the server supports AUTH=PLAIN capability
|
||||||
|
# Could check the list with client.capability(), but then what?
|
||||||
|
# We're failing anyway then
|
||||||
|
client.authenticate("PLAIN", lambda x: encoded)
|
||||||
|
|
||||||
|
# Need to transition out of AUTH state to SELECTED
|
||||||
|
M.folder.set("INBOX")
|
||||||
|
except Exception:
|
||||||
|
raise MailError(f"Error while authenticating account {account}")
|
||||||
except Exception:
|
except Exception:
|
||||||
raise MailError(f"Error while authenticating account {account}")
|
raise MailError(f"Error while authenticating account {account}")
|
||||||
|
|
||||||
@ -184,7 +205,7 @@ class MailAccountHandler(LoggingMixin):
|
|||||||
|
|
||||||
return total_processed_files
|
return total_processed_files
|
||||||
|
|
||||||
def handle_mail_rule(self, M, rule):
|
def handle_mail_rule(self, M: MailBox, rule):
|
||||||
|
|
||||||
self.log("debug", f"Rule {rule}: Selecting folder {rule.folder}")
|
self.log("debug", f"Rule {rule}: Selecting folder {rule.folder}")
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ from documents.tests.utils import DirectoriesMixin
|
|||||||
from imap_tools import EmailAddress
|
from imap_tools import EmailAddress
|
||||||
from imap_tools import FolderInfo
|
from imap_tools import FolderInfo
|
||||||
from imap_tools import MailboxFolderSelectError
|
from imap_tools import MailboxFolderSelectError
|
||||||
|
from imap_tools import MailboxLoginError
|
||||||
from imap_tools import MailMessage
|
from imap_tools import MailMessage
|
||||||
from imap_tools import MailMessageFlags
|
from imap_tools import MailMessageFlags
|
||||||
from paperless_mail import tasks
|
from paperless_mail import tasks
|
||||||
@ -44,6 +45,14 @@ class BogusFolderManager:
|
|||||||
self.current_folder = new_folder
|
self.current_folder = new_folder
|
||||||
|
|
||||||
|
|
||||||
|
class BogusClient(object):
|
||||||
|
def authenticate(self, mechanism, authobject):
|
||||||
|
# authobject must be a callable object
|
||||||
|
auth_bytes = authobject(None)
|
||||||
|
if auth_bytes != b"\x00admin\x00w57\xc3\xa4\xc3\xb6\xc3\xbcw4b6huwb6nhu":
|
||||||
|
raise MailboxLoginError("BAD", "OK")
|
||||||
|
|
||||||
|
|
||||||
class BogusMailBox(ContextManager):
|
class BogusMailBox(ContextManager):
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
@ -55,10 +64,14 @@ class BogusMailBox(ContextManager):
|
|||||||
self.messages: List[MailMessage] = []
|
self.messages: List[MailMessage] = []
|
||||||
self.messages_spam: List[MailMessage] = []
|
self.messages_spam: List[MailMessage] = []
|
||||||
self.folder = BogusFolderManager()
|
self.folder = BogusFolderManager()
|
||||||
|
self.client = BogusClient()
|
||||||
|
|
||||||
def login(self, username, password):
|
def login(self, username, password):
|
||||||
if not (username == "admin" and password == "secret"):
|
# This will raise a UnicodeEncodeError if the password is not ASCII only
|
||||||
raise Exception()
|
password.encode("ascii")
|
||||||
|
# Otherwise, check for correct values
|
||||||
|
if username != "admin" or password not in {"secret"}:
|
||||||
|
raise MailboxLoginError("BAD", "OK")
|
||||||
|
|
||||||
def fetch(self, criteria, mark_seen, charset=""):
|
def fetch(self, criteria, mark_seen, charset=""):
|
||||||
msg = self.messages
|
msg = self.messages
|
||||||
@ -821,6 +834,66 @@ class TestMail(DirectoriesMixin, TestCase):
|
|||||||
self.assertEqual(len(self.bogus_mailbox.messages), 2)
|
self.assertEqual(len(self.bogus_mailbox.messages), 2)
|
||||||
self.assertEqual(self.async_task.call_count, 5)
|
self.assertEqual(self.async_task.call_count, 5)
|
||||||
|
|
||||||
|
def test_auth_plain_fallback(self):
|
||||||
|
"""
|
||||||
|
GIVEN:
|
||||||
|
- Mail account with password containing non-ASCII characters
|
||||||
|
THEN:
|
||||||
|
- Should still authenticate to the mail account
|
||||||
|
"""
|
||||||
|
account = MailAccount.objects.create(
|
||||||
|
name="test",
|
||||||
|
imap_server="",
|
||||||
|
username="admin",
|
||||||
|
# Note the non-ascii characters here
|
||||||
|
password="w57äöüw4b6huwb6nhu",
|
||||||
|
)
|
||||||
|
|
||||||
|
_ = MailRule.objects.create(
|
||||||
|
name="testrule",
|
||||||
|
account=account,
|
||||||
|
action=MailRule.AttachmentAction.MARK_READ,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||||
|
self.assertEqual(self.async_task.call_count, 0)
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
|
||||||
|
|
||||||
|
self.mail_account_handler.handle_mail_account(account)
|
||||||
|
|
||||||
|
self.assertEqual(self.async_task.call_count, 2)
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
|
||||||
|
self.assertEqual(len(self.bogus_mailbox.messages), 3)
|
||||||
|
|
||||||
|
def test_auth_plain_fallback_fails_still(self):
|
||||||
|
"""
|
||||||
|
GIVEN:
|
||||||
|
- Mail account with password containing non-ASCII characters
|
||||||
|
- Incorrect password alue
|
||||||
|
THEN:
|
||||||
|
- Should raise a MailError for the account
|
||||||
|
"""
|
||||||
|
account = MailAccount.objects.create(
|
||||||
|
name="test",
|
||||||
|
imap_server="",
|
||||||
|
username="admin",
|
||||||
|
# Note the non-ascii characters here
|
||||||
|
# Passes the check in login, not in authenticate
|
||||||
|
password="réception",
|
||||||
|
)
|
||||||
|
|
||||||
|
_ = MailRule.objects.create(
|
||||||
|
name="testrule",
|
||||||
|
account=account,
|
||||||
|
action=MailRule.AttachmentAction.MARK_READ,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
MailError,
|
||||||
|
self.mail_account_handler.handle_mail_account,
|
||||||
|
account,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestManagementCommand(TestCase):
|
class TestManagementCommand(TestCase):
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user