mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
updated mail: now uses mime type detection
This commit is contained in:
parent
3d5b66c2b7
commit
09acb134b7
@ -2,6 +2,7 @@ import os
|
||||
import tempfile
|
||||
from datetime import timedelta, date
|
||||
|
||||
import magic
|
||||
from django.conf import settings
|
||||
from django.utils.text import slugify
|
||||
from django_q.tasks import async_task
|
||||
@ -248,9 +249,21 @@ class MailAccountHandler(LoggingMixin):
|
||||
|
||||
for att in message.attachments:
|
||||
|
||||
if not att.content_disposition == "attachment":
|
||||
self.log(
|
||||
'debug',
|
||||
f"Rule {rule.account}.{rule}: "
|
||||
f"Skipping attachment {att.filename} "
|
||||
f"with content disposition inline")
|
||||
continue
|
||||
|
||||
title = get_title(message, att, rule)
|
||||
|
||||
if is_mime_type_supported(att.content_type):
|
||||
# don't trust the content type of the attachment. Could be
|
||||
# generic application/octet-stream.
|
||||
mime_type = magic.from_buffer(att.payload, mime=True)
|
||||
|
||||
if is_mime_type_supported(mime_type):
|
||||
|
||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||
_, temp_filename = tempfile.mkstemp(prefix="paperless-mail-", dir=settings.SCRATCH_DIR)
|
||||
@ -275,5 +288,12 @@ class MailAccountHandler(LoggingMixin):
|
||||
)
|
||||
|
||||
processed_attachments += 1
|
||||
else:
|
||||
self.log(
|
||||
'debug',
|
||||
f"Rule {rule.account}.{rule}: "
|
||||
f"Skipping attachment {att.filename} "
|
||||
f"since guessed mime type {mime_type} is not supported "
|
||||
f"by paperless")
|
||||
|
||||
return processed_attachments
|
||||
|
@ -99,11 +99,7 @@ def create_message(num_attachments=1, body="", subject="the suject", from_="noon
|
||||
message.from_ = from_
|
||||
message.body = body
|
||||
for i in range(num_attachments):
|
||||
attachment = namedtuple('Attachment', [])
|
||||
attachment.filename = 'some_file.pdf'
|
||||
attachment.content_type = 'application/pdf'
|
||||
attachment.payload = b'content of the attachment'
|
||||
message.attachments.append(attachment)
|
||||
message.attachments.append(create_attachment(filename=f"file_{i}.pdf"))
|
||||
|
||||
message.seen = seen
|
||||
message.flagged = flagged
|
||||
@ -111,6 +107,26 @@ def create_message(num_attachments=1, body="", subject="the suject", from_="noon
|
||||
return message
|
||||
|
||||
|
||||
def create_attachment(filename="the_file.pdf", content_disposition="attachment", payload=b"a PDF document"):
|
||||
attachment = namedtuple('Attachment', [])
|
||||
attachment.filename = filename
|
||||
attachment.content_disposition = content_disposition
|
||||
attachment.payload = payload
|
||||
return attachment
|
||||
|
||||
|
||||
def fake_magic_from_buffer(buffer, mime=False):
|
||||
|
||||
if mime:
|
||||
if 'PDF' in str(buffer):
|
||||
return 'application/pdf'
|
||||
else:
|
||||
return 'unknown/type'
|
||||
else:
|
||||
return 'Some verbose file description'
|
||||
|
||||
|
||||
@mock.patch('paperless_mail.mail.magic.from_buffer', fake_magic_from_buffer)
|
||||
class TestMail(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -182,26 +198,7 @@ class TestMail(TestCase):
|
||||
self.assertEqual(get_title(message, att, rule), "the message title")
|
||||
|
||||
def test_handle_message(self):
|
||||
message = namedtuple('MailMessage', [])
|
||||
message.subject = "the message title"
|
||||
message.from_ = "Myself"
|
||||
|
||||
att = namedtuple('Attachment', [])
|
||||
att.filename = "test1.pdf"
|
||||
att.content_type = 'application/pdf'
|
||||
att.payload = b"attachment contents"
|
||||
|
||||
att2 = namedtuple('Attachment', [])
|
||||
att2.filename = "test2.pdf"
|
||||
att2.content_type = 'application/pdf'
|
||||
att2.payload = b"attachment contents"
|
||||
|
||||
att3 = namedtuple('Attachment', [])
|
||||
att3.filename = "test3.pdf"
|
||||
att3.content_type = 'application/invalid'
|
||||
att3.payload = b"attachment contents"
|
||||
|
||||
message.attachments = [att, att2, att3]
|
||||
message = create_message(subject="the message title", from_="Myself", num_attachments=2)
|
||||
|
||||
account = MailAccount()
|
||||
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
|
||||
@ -215,14 +212,13 @@ class TestMail(TestCase):
|
||||
args1, kwargs1 = self.async_task.call_args_list[0]
|
||||
args2, kwargs2 = self.async_task.call_args_list[1]
|
||||
|
||||
self.assertEqual(kwargs1['override_title'], "test1")
|
||||
self.assertEqual(kwargs1['override_filename'], "test1.pdf")
|
||||
self.assertEqual(kwargs1['override_title'], "file_0")
|
||||
self.assertEqual(kwargs1['override_filename'], "file_0.pdf")
|
||||
|
||||
self.assertEqual(kwargs2['override_title'], "test2")
|
||||
self.assertEqual(kwargs2['override_filename'], "test2.pdf")
|
||||
self.assertEqual(kwargs2['override_title'], "file_1")
|
||||
self.assertEqual(kwargs2['override_filename'], "file_1.pdf")
|
||||
|
||||
@mock.patch("paperless_mail.mail.async_task")
|
||||
def test_handle_empty_message(self, m):
|
||||
def test_handle_empty_message(self):
|
||||
message = namedtuple('MailMessage', [])
|
||||
|
||||
message.attachments = []
|
||||
@ -230,9 +226,45 @@ class TestMail(TestCase):
|
||||
|
||||
result = self.mail_account_handler.handle_message(message, rule)
|
||||
|
||||
self.assertFalse(m.called)
|
||||
self.assertFalse(self.async_task.called)
|
||||
self.assertEqual(result, 0)
|
||||
|
||||
def test_handle_unknown_mime_type(self):
|
||||
message = create_message()
|
||||
message.attachments = [
|
||||
create_attachment(filename="f1.pdf"),
|
||||
create_attachment(filename="f2.json", payload=b"{'much': 'payload.', 'so': 'json', 'wow': true}")
|
||||
]
|
||||
|
||||
account = MailAccount()
|
||||
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
|
||||
|
||||
result = self.mail_account_handler.handle_message(message, rule)
|
||||
|
||||
self.assertEqual(result, 1)
|
||||
self.assertEqual(self.async_task.call_count, 1)
|
||||
|
||||
args, kwargs = self.async_task.call_args
|
||||
self.assertEqual(kwargs['override_filename'], "f1.pdf")
|
||||
|
||||
def test_handle_disposition(self):
|
||||
message = create_message()
|
||||
message.attachments = [
|
||||
create_attachment(filename="f1.pdf", content_disposition='inline'),
|
||||
create_attachment(filename="f2.pdf", content_disposition='attachment')
|
||||
]
|
||||
|
||||
account = MailAccount()
|
||||
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
|
||||
|
||||
result = self.mail_account_handler.handle_message(message, rule)
|
||||
|
||||
self.assertEqual(result, 1)
|
||||
self.assertEqual(self.async_task.call_count, 1)
|
||||
|
||||
args, kwargs = self.async_task.call_args
|
||||
self.assertEqual(kwargs['override_filename'], "f2.pdf")
|
||||
|
||||
def test_handle_mail_account_mark_read(self):
|
||||
|
||||
account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret")
|
||||
|
Loading…
x
Reference in New Issue
Block a user