Merge remote-tracking branch 'paperless/dev' into feature-consume-eml

This commit is contained in:
phail
2022-04-29 23:52:56 +02:00
98 changed files with 5103 additions and 2041 deletions

View File

@@ -22,6 +22,8 @@ from documents.models import Tag
from documents.sanity_checker import SanityCheckFailedException
from pdf2image import convert_from_path
from pikepdf import Pdf
from PIL import Image
from PIL import ImageSequence
from pyzbar import pyzbar
from whoosh.writing import AsyncWriter
@@ -93,9 +95,41 @@ def barcode_reader(image) -> List[str]:
return barcodes
def convert_from_tiff_to_pdf(filepath: str) -> str:
"""
converts a given TIFF image file to pdf into a temp. directory.
Returns the new pdf file.
"""
file_name = os.path.splitext(os.path.basename(filepath))[0]
file_extension = os.path.splitext(os.path.basename(filepath))[1].lower()
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
# use old file name with pdf extension
if file_extension == ".tif" or file_extension == ".tiff":
newpath = os.path.join(tempdir, file_name + ".pdf")
else:
logger.warning(f"Cannot convert from {str(file_extension)} to pdf.")
return None
with Image.open(filepath) as image:
images = []
for i, page in enumerate(ImageSequence.Iterator(image)):
page = page.convert("RGB")
images.append(page)
try:
if len(images) == 1:
images[0].save(newpath)
else:
images[0].save(newpath, save_all=True, append_images=images[1:])
except OSError as e:
logger.warning(
f"Could not save the file as pdf. Error: {str(e)}",
)
return None
return newpath
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
"""
Scan the provided file for page separating barcodes
Scan the provided pdf file for page separating barcodes
Returns a list of pagenumbers, which separate the file
"""
separator_page_numbers = []
@@ -112,7 +146,7 @@ def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
"""
Separate the provided file on the pages_to_split_on.
Separate the provided pdf file on the pages_to_split_on.
The pages which are defined by page_numbers will be removed.
Returns a list of (temporary) filepaths to consume.
These will need to be deleted later.
@@ -195,42 +229,70 @@ def consume_file(
if settings.CONSUMER_ENABLE_BARCODES:
separators = []
document_list = []
separators = scan_file_for_separating_barcodes(path)
if separators:
logger.debug(f"Pages with separators found in: {str(path)}")
document_list = separate_pages(path, separators)
if document_list:
for n, document in enumerate(document_list):
# save to consumption dir
# rename it to the original filename with number prefix
if override_filename:
newname = f"{str(n)}_" + override_filename
else:
newname = None
save_to_dir(document, newname=newname)
# if we got here, the document was successfully split
# and can safely be deleted
logger.debug("Deleting file {}".format(path))
os.unlink(path)
# notify the sender, otherwise the progress bar
# in the UI stays stuck
payload = {
"filename": override_filename,
"task_id": task_id,
"current_progress": 100,
"max_progress": 100,
"status": "SUCCESS",
"message": "finished",
}
try:
async_to_sync(get_channel_layer().group_send)(
"status_updates",
{"type": "status_update", "data": payload},
converted_tiff = None
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
supported_extensions = [".pdf", ".tiff", ".tif"]
else:
supported_extensions = [".pdf"]
file_extension = os.path.splitext(os.path.basename(path))[1].lower()
if file_extension not in supported_extensions:
# if not supported, skip this routine
logger.warning(
f"Unsupported file format for barcode reader: {str(file_extension)}",
)
else:
if file_extension in {".tif", ".tiff"}:
file_to_process = convert_from_tiff_to_pdf(path)
else:
file_to_process = path
separators = scan_file_for_separating_barcodes(file_to_process)
if separators:
logger.debug(
f"Pages with separators found in: {str(path)}",
)
except OSError as e:
logger.warning("OSError. It could be, the broker cannot be reached.")
logger.warning(str(e))
return "File successfully split"
document_list = separate_pages(file_to_process, separators)
if document_list:
for n, document in enumerate(document_list):
# save to consumption dir
# rename it to the original filename with number prefix
if override_filename:
newname = f"{str(n)}_" + override_filename
else:
newname = None
save_to_dir(document, newname=newname)
# if we got here, the document was successfully split
# and can safely be deleted
if converted_tiff:
logger.debug("Deleting file {}".format(file_to_process))
os.unlink(file_to_process)
logger.debug("Deleting file {}".format(path))
os.unlink(path)
# notify the sender, otherwise the progress bar
# in the UI stays stuck
payload = {
"filename": override_filename,
"task_id": task_id,
"current_progress": 100,
"max_progress": 100,
"status": "SUCCESS",
"message": "finished",
}
try:
async_to_sync(get_channel_layer().group_send)(
"status_updates",
{"type": "status_update", "data": payload},
)
except OSError as e:
logger.warning(
"OSError. It could be, the broker cannot be reached.",
)
logger.warning(str(e))
# consuming stops here, since the original document with
# the barcodes has been split and will be consumed separately
return "File successfully split"
# continue with consumption if no barcode was found
document = Consumer().try_consume_file(

Binary file not shown.

View File

@@ -204,6 +204,29 @@ class TestTasks(DirectoriesMixin, TestCase):
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
def test_convert_from_tiff_to_pdf(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"simple.tiff",
)
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
shutil.copy(test_file, dst)
target_file = tasks.convert_from_tiff_to_pdf(dst)
file_extension = os.path.splitext(os.path.basename(target_file))[1]
self.assertTrue(os.path.isfile(target_file))
self.assertEqual(file_extension, ".pdf")
def test_convert_error_from_pdf_to_pdf(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"simple.pdf",
)
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
shutil.copy(test_file, dst)
self.assertIsNone(tasks.convert_from_tiff_to_pdf(dst))
def test_scan_file_for_separating_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
@@ -400,11 +423,64 @@ class TestTasks(DirectoriesMixin, TestCase):
"barcodes",
"patch-code-t-middle.pdf",
)
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pd")
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
shutil.copy(test_file, dst)
self.assertEqual(tasks.consume_file(dst), "File successfully split")
@override_settings(
CONSUMER_ENABLE_BARCODES=True,
CONSUMER_BARCODE_TIFF_SUPPORT=True,
)
def test_consume_barcode_tiff_file(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle.tiff",
)
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
shutil.copy(test_file, dst)
self.assertEqual(tasks.consume_file(dst), "File successfully split")
@override_settings(
CONSUMER_ENABLE_BARCODES=True,
CONSUMER_BARCODE_TIFF_SUPPORT=True,
)
@mock.patch("documents.consumer.Consumer.try_consume_file")
def test_consume_barcode_unsupported_jpg_file(self, m):
"""
This test assumes barcode and TIFF support are enabled and
the user uploads an unsupported image file (e.g. jpg)
The function shouldn't try to scan for separating barcodes
and continue archiving the file as is.
"""
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"simple.jpg",
)
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
shutil.copy(test_file, dst)
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
self.assertIn("Success", tasks.consume_file(dst))
self.assertEqual(
cm.output,
[
"WARNING:paperless.tasks:Unsupported file format for barcode reader: .jpg",
],
)
m.assert_called_once()
args, kwargs = m.call_args
self.assertIsNone(kwargs["override_filename"])
self.assertIsNone(kwargs["override_title"])
self.assertIsNone(kwargs["override_correspondent_id"])
self.assertIsNone(kwargs["override_document_type_id"])
self.assertIsNone(kwargs["override_tag_ids"])
@mock.patch("documents.tasks.sanity_checker.check_sanity")
def test_sanity_check_success(self, m):
m.return_value = SanityCheckMessages()

View File

@@ -3,7 +3,7 @@ msgstr ""
"Project-Id-Version: paperless-ngx\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2022-03-02 11:20-0800\n"
"PO-Revision-Date: 2022-03-14 23:41\n"
"PO-Revision-Date: 2022-04-12 15:26\n"
"Last-Translator: \n"
"Language-Team: Polish\n"
"Language: pl_PL\n"
@@ -60,7 +60,7 @@ msgstr "algorytm dopasowania"
#: documents/models.py:48
msgid "is insensitive"
msgstr "bez rozróżniania wielkości liter"
msgstr "bez rozróżniania wielkości znaków"
#: documents/models.py:61 documents/models.py:104
msgid "correspondent"
@@ -240,7 +240,7 @@ msgstr "użytkownik"
#: documents/models.py:317
msgid "show on dashboard"
msgstr "pokaż na pulpicie"
msgstr "pokaż na stronie głównej"
#: documents/models.py:320
msgid "show in sidebar"
@@ -638,7 +638,7 @@ msgstr "konto"
#: paperless_mail/models.py:119
msgid "folder"
msgstr "katalog"
msgstr "folder"
#: paperless_mail/models.py:122
msgid "Subfolders must be separated by dots."

View File

@@ -251,7 +251,8 @@ if _paperless_url:
if _allowed_hosts:
ALLOWED_HOSTS.append(_paperless_uri.hostname)
else:
ALLOWED_HOSTS = [_paperless_uri.hostname]
# always allow localhost. Necessary e.g. for healthcheck in docker.
ALLOWED_HOSTS = [_paperless_uri.hostname] + ["localhost"]
# The secret key has a default that should be fine so long as you're hosting
# Paperless on a closed network. However, if you're putting this anywhere
@@ -502,6 +503,10 @@ CONSUMER_ENABLE_BARCODES = __get_boolean(
"PAPERLESS_CONSUMER_ENABLE_BARCODES",
)
CONSUMER_BARCODE_TIFF_SUPPORT = __get_boolean(
"PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT",
)
CONSUMER_BARCODE_STRING = os.getenv("PAPERLESS_CONSUMER_BARCODE_STRING", "PATCHT")
OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true")

View File

@@ -1 +1 @@
__version__ = (1, 6, 0)
__version__ = (1, 7, 0)

View File

@@ -83,7 +83,7 @@ class MailRuleAdmin(admin.ModelAdmin):
),
"fields": (
"assign_title_from",
"assign_tag",
"assign_tags",
"assign_document_type",
"assign_correspondent_from",
"assign_correspondent",

View File

@@ -3,6 +3,7 @@ import tempfile
from datetime import date
from datetime import timedelta
from fnmatch import fnmatch
from imaplib import IMAP4
import magic
import pathvalidate
@@ -145,7 +146,7 @@ class MailAccountHandler(LoggingMixin):
else:
raise NotImplementedError(
"Unknwown correspondent selector",
"Unknown correspondent selector",
) # pragma: nocover
def handle_mail_account(self, account):
@@ -161,8 +162,34 @@ class MailAccountHandler(LoggingMixin):
account.imap_port,
account.imap_security,
) as M:
try:
M.login(account.username, account.password)
except UnicodeEncodeError:
self.log("debug", "Falling back to AUTH=PLAIN")
try:
# rfc2595 section 6 - PLAIN SASL mechanism
client: IMAP4 = M.client
encoded = (
b"\0"
+ account.username.encode("utf8")
+ b"\0"
+ account.password.encode("utf8")
)
# Assumption is the server supports AUTH=PLAIN capability
# Could check the list with client.capability(), but then what?
# We're failing anyway then
client.authenticate("PLAIN", lambda x: encoded)
# Need to transition out of AUTH state to SELECTED
M.folder.set("INBOX")
except Exception:
self.log(
"error",
"Unable to authenticate with mail server using AUTH=PLAIN",
)
raise MailError(f"Error while authenticating account {account}")
except Exception as e:
self.log(
"error",
@@ -199,13 +226,28 @@ class MailAccountHandler(LoggingMixin):
return total_processed_files
def handle_mail_rule(self, M, rule: MailRule):
def handle_mail_rule(self, M: MailBox, rule: MailRule):
self.log("debug", f"Rule {rule}: Selecting folder {rule.folder}")
try:
M.folder.set(rule.folder)
except MailboxFolderSelectError:
self.log(
"error",
f"Unable to access folder {rule.folder}, attempting folder listing",
)
try:
for folder_info in M.folder.list():
self.log("info", f"Located folder: {folder_info.name}")
except Exception as e:
self.log(
"error",
"Exception during folder listing, unable to provide list folders: "
+ str(e),
)
raise MailError(
f"Rule {rule}: Folder {rule.folder} "
f"does not exist in account {rule.account}",
@@ -284,7 +326,7 @@ class MailAccountHandler(LoggingMixin):
)
correspondent = self.get_correspondent(message, rule)
tag = rule.assign_tag
tag_ids = [tag.id for tag in rule.assign_tags.all()]
doc_type = rule.assign_document_type
processed_attachments = 0
@@ -317,7 +359,7 @@ class MailAccountHandler(LoggingMixin):
override_title=message.subject,
override_correspondent_id=correspondent.id if correspondent else None,
override_document_type_id=doc_type.id if doc_type else None,
override_tag_ids=[tag.id] if tag else None,
override_tag_ids=tag_ids,
task_name=message.subject[:100],
)
processed_attachments += 1
@@ -384,7 +426,7 @@ class MailAccountHandler(LoggingMixin):
if correspondent
else None,
override_document_type_id=doc_type.id if doc_type else None,
override_tag_ids=[tag.id] if tag else None,
override_tag_ids=tag_ids,
task_name=att.filename[:100],
)

View File

@@ -0,0 +1,23 @@
# Generated by Django 3.2.12 on 2022-03-11 15:00
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0008_auto_20210516_0940"),
]
operations = [
migrations.AddField(
model_name="mailrule",
name="assign_tags",
field=models.ManyToManyField(
blank=True,
related_name="mail_rules_multi",
to="documents.Tag",
verbose_name="assign this tag",
),
),
]

View File

@@ -0,0 +1,40 @@
# Generated by Django 3.2.12 on 2022-03-11 15:02
from django.db import migrations
def migrate_tag_to_tags(apps, schema_editor):
# Manual data migration, see
# https://docs.djangoproject.com/en/3.2/topics/migrations/#data-migrations
#
# Copy the assign_tag property to the new assign_tags set if it exists.
MailRule = apps.get_model("paperless_mail", "MailRule")
for mail_rule in MailRule.objects.all():
if mail_rule.assign_tag:
mail_rule.assign_tags.add(mail_rule.assign_tag)
mail_rule.save()
def migrate_tags_to_tag(apps, schema_editor):
# Manual data migration, see
# https://docs.djangoproject.com/en/3.2/topics/migrations/#data-migrations
#
# Copy the unique value in the assign_tags set to the old assign_tag property.
# Do nothing if the tag is not unique.
MailRule = apps.get_model("paperless_mail", "MailRule")
for mail_rule in MailRule.objects.all():
tags = mail_rule.assign_tags.all()
if len(tags) == 1:
mail_rule.assign_tag = tags[0]
mail_rule.save()
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0009_mailrule_assign_tags"),
]
operations = [
migrations.RunPython(migrate_tag_to_tags, migrate_tags_to_tag),
]

View File

@@ -0,0 +1,17 @@
# Generated by Django 3.2.12 on 2022-03-11 15:18
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0010_auto_20220311_1602"),
]
operations = [
migrations.RemoveField(
model_name="mailrule",
name="assign_tag",
),
]

View File

@@ -0,0 +1,20 @@
# Generated by Django 3.2.12 on 2022-03-11 16:21
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0011_remove_mailrule_assign_tag"),
]
operations = [
migrations.AlterField(
model_name="mailrule",
name="assign_tags",
field=models.ManyToManyField(
blank=True, to="documents.Tag", verbose_name="assign this tag"
),
),
]

View File

@@ -0,0 +1,13 @@
# Generated by Django 4.0.4 on 2022-04-12 08:51
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0009_alter_mailrule_action_alter_mailrule_folder"),
("paperless_mail", "0012_alter_mailrule_assign_tags"),
]
operations = []

View File

@@ -0,0 +1,27 @@
# Generated by Django 4.0.4 on 2022-04-18 22:57
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0013_merge_20220412_1051"),
]
operations = [
migrations.AlterField(
model_name="mailrule",
name="action",
field=models.PositiveIntegerField(
choices=[
(1, "Delete"),
(2, "Move to specified folder"),
(3, "Mark as read, don't process read mails"),
(4, "Flag the mail, don't process flagged mails"),
],
default=3,
verbose_name="action",
),
),
]

View File

@@ -183,11 +183,9 @@ class MailRule(models.Model):
default=TitleSource.FROM_SUBJECT,
)
assign_tag = models.ForeignKey(
assign_tags = models.ManyToManyField(
document_models.Tag,
null=True,
blank=True,
on_delete=models.SET_NULL,
verbose_name=_("assign this tag"),
)

View File

@@ -15,7 +15,9 @@ from django.test import TestCase
from documents.models import Correspondent
from documents.tests.utils import DirectoriesMixin
from imap_tools import EmailAddress
from imap_tools import FolderInfo
from imap_tools import MailboxFolderSelectError
from imap_tools import MailboxLoginError
from imap_tools import MailMessage
from imap_tools import MailMessageFlags
from paperless_mail import tasks
@@ -43,6 +45,14 @@ class BogusFolderManager:
self.current_folder = new_folder
class BogusClient(object):
def authenticate(self, mechanism, authobject):
# authobject must be a callable object
auth_bytes = authobject(None)
if auth_bytes != b"\x00admin\x00w57\xc3\xa4\xc3\xb6\xc3\xbcw4b6huwb6nhu":
raise MailboxLoginError("BAD", "OK")
class BogusMailBox(ContextManager):
def __enter__(self):
return self
@@ -53,12 +63,15 @@ class BogusMailBox(ContextManager):
def __init__(self):
self.messages: List[MailMessage] = []
self.messages_spam: List[MailMessage] = []
self.folder = BogusFolderManager()
self.client = BogusClient()
def login(self, username, password):
if not (username == "admin" and password == "secret"):
raise Exception()
folder = BogusFolderManager()
# This will raise a UnicodeEncodeError if the password is not ASCII only
password.encode("ascii")
# Otherwise, check for correct values
if username != "admin" or password not in {"secret"}:
raise MailboxLoginError("BAD", "OK")
def fetch(self, criteria, mark_seen, charset=""):
msg = self.messages
@@ -228,7 +241,6 @@ class TestMail(DirectoriesMixin, TestCase):
message.from_values = EmailAddress(
"Someone!",
"someone@somewhere.com",
"Someone! <someone@somewhere.com>",
)
message2 = namedtuple("MailMessage", [])
@@ -236,7 +248,6 @@ class TestMail(DirectoriesMixin, TestCase):
message2.from_values = EmailAddress(
"",
"fake@localhost.com",
"",
)
me_localhost = Correspondent.objects.create(name=message2.from_)
@@ -308,10 +319,12 @@ class TestMail(DirectoriesMixin, TestCase):
)
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
@@ -355,10 +368,12 @@ class TestMail(DirectoriesMixin, TestCase):
)
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
@@ -381,10 +396,12 @@ class TestMail(DirectoriesMixin, TestCase):
)
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
@@ -406,11 +423,13 @@ class TestMail(DirectoriesMixin, TestCase):
)
account = MailAccount()
account.save()
rule = MailRule(
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
attachment_type=MailRule.AttachmentProcessing.EVERYTHING,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
@@ -440,12 +459,15 @@ class TestMail(DirectoriesMixin, TestCase):
for (pattern, matches) in tests:
matches.sort()
self.async_task.reset_mock()
account = MailAccount()
account = MailAccount(name=str(uuid.uuid4()))
account.save()
rule = MailRule(
name=str(uuid.uuid4()),
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
account=account,
filter_attachment_filename=pattern,
)
rule.save()
result = self.mail_account_handler.handle_message(message, rule)
@@ -621,6 +643,72 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
def test_error_folder_set(self):
"""
GIVEN:
- Mail rule with non-existent folder
THEN:
- Should call list to output all folders in the account
- Should not process any messages
"""
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.MailAction.MOVE,
action_parameter="spam",
filter_subject="Claim",
order=1,
folder="uuuhhhh", # Invalid folder name
)
self.bogus_mailbox.folder.list = mock.Mock(
return_value=[FolderInfo("SomeFoldername", "|", ())],
)
self.mail_account_handler.handle_mail_account(account)
self.bogus_mailbox.folder.list.assert_called_once()
self.assertEqual(self.async_task.call_count, 0)
def test_error_folder_set_error_listing(self):
"""
GIVEN:
- Mail rule with non-existent folder
- Mail account folder listing raises exception
THEN:
- Should not process any messages
"""
account = MailAccount.objects.create(
name="test2",
imap_server="",
username="admin",
password="secret",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.MailAction.MOVE,
action_parameter="spam",
filter_subject="Claim",
order=1,
folder="uuuhhhh", # Invalid folder name
)
self.bogus_mailbox.folder.list = mock.Mock(
side_effect=MailboxFolderSelectError(None, "uhm"),
)
self.mail_account_handler.handle_mail_account(account)
self.bogus_mailbox.folder.list.assert_called_once()
self.assertEqual(self.async_task.call_count, 0)
@mock.patch("paperless_mail.mail.MailAccountHandler.get_correspondent")
def test_error_skip_mail(self, m):
def get_correspondent_fake(message, rule):
@@ -744,6 +832,66 @@ class TestMail(DirectoriesMixin, TestCase):
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(self.async_task.call_count, 5)
def test_auth_plain_fallback(self):
"""
GIVEN:
- Mail account with password containing non-ASCII characters
THEN:
- Should still authenticate to the mail account
"""
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
# Note the non-ascii characters here
password="w57äöüw4b6huwb6nhu",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.MailAction.MARK_READ,
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
self.mail_account_handler.handle_mail_account(account)
self.assertEqual(self.async_task.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
def test_auth_plain_fallback_fails_still(self):
"""
GIVEN:
- Mail account with password containing non-ASCII characters
- Incorrect password value
THEN:
- Should raise a MailError for the account
"""
account = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
# Note the non-ascii characters here
# Passes the check in login, not in authenticate
password="réception",
)
_ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.MailAction.MARK_READ,
)
self.assertRaises(
MailError,
self.mail_account_handler.handle_mail_account,
account,
)
class TestManagementCommand(TestCase):
@mock.patch(