mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-24 03:26:11 -05:00 
			
		
		
		
	updated mail: now uses mime type detection
This commit is contained in:
		| @@ -2,6 +2,7 @@ import os | ||||
| import tempfile | ||||
| from datetime import timedelta, date | ||||
|  | ||||
| import magic | ||||
| from django.conf import settings | ||||
| from django.utils.text import slugify | ||||
| from django_q.tasks import async_task | ||||
| @@ -248,9 +249,21 @@ class MailAccountHandler(LoggingMixin): | ||||
|  | ||||
|         for att in message.attachments: | ||||
|  | ||||
|             if not att.content_disposition == "attachment": | ||||
|                 self.log( | ||||
|                     'debug', | ||||
|                     f"Rule {rule.account}.{rule}: " | ||||
|                     f"Skipping attachment {att.filename} " | ||||
|                     f"with content disposition inline") | ||||
|                 continue | ||||
|  | ||||
|             title = get_title(message, att, rule) | ||||
|  | ||||
|             if is_mime_type_supported(att.content_type): | ||||
|             # don't trust the content type of the attachment. Could be | ||||
|             # generic application/octet-stream. | ||||
|             mime_type = magic.from_buffer(att.payload, mime=True) | ||||
|  | ||||
|             if is_mime_type_supported(mime_type): | ||||
|  | ||||
|                 os.makedirs(settings.SCRATCH_DIR, exist_ok=True) | ||||
|                 _, temp_filename = tempfile.mkstemp(prefix="paperless-mail-", dir=settings.SCRATCH_DIR) | ||||
| @@ -275,5 +288,12 @@ class MailAccountHandler(LoggingMixin): | ||||
|                 ) | ||||
|  | ||||
|                 processed_attachments += 1 | ||||
|             else: | ||||
|                 self.log( | ||||
|                     'debug', | ||||
|                     f"Rule {rule.account}.{rule}: " | ||||
|                     f"Skipping attachment {att.filename} " | ||||
|                     f"since guessed mime type {mime_type} is not supported " | ||||
|                     f"by paperless") | ||||
|  | ||||
|         return processed_attachments | ||||
|   | ||||
| @@ -99,11 +99,7 @@ def create_message(num_attachments=1, body="", subject="the suject", from_="noon | ||||
|     message.from_ = from_ | ||||
|     message.body = body | ||||
|     for i in range(num_attachments): | ||||
|         attachment = namedtuple('Attachment', []) | ||||
|         attachment.filename = 'some_file.pdf' | ||||
|         attachment.content_type = 'application/pdf' | ||||
|         attachment.payload = b'content of the attachment' | ||||
|         message.attachments.append(attachment) | ||||
|         message.attachments.append(create_attachment(filename=f"file_{i}.pdf")) | ||||
|  | ||||
|     message.seen = seen | ||||
|     message.flagged = flagged | ||||
| @@ -111,6 +107,26 @@ def create_message(num_attachments=1, body="", subject="the suject", from_="noon | ||||
|     return message | ||||
|  | ||||
|  | ||||
| def create_attachment(filename="the_file.pdf", content_disposition="attachment", payload=b"a PDF document"): | ||||
|     attachment = namedtuple('Attachment', []) | ||||
|     attachment.filename = filename | ||||
|     attachment.content_disposition = content_disposition | ||||
|     attachment.payload = payload | ||||
|     return attachment | ||||
|  | ||||
|  | ||||
| def fake_magic_from_buffer(buffer, mime=False): | ||||
|  | ||||
|     if mime: | ||||
|         if 'PDF' in str(buffer): | ||||
|             return 'application/pdf' | ||||
|         else: | ||||
|             return 'unknown/type' | ||||
|     else: | ||||
|         return 'Some verbose file description' | ||||
|  | ||||
|  | ||||
| @mock.patch('paperless_mail.mail.magic.from_buffer', fake_magic_from_buffer) | ||||
| class TestMail(TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
| @@ -182,26 +198,7 @@ class TestMail(TestCase): | ||||
|         self.assertEqual(get_title(message, att, rule), "the message title") | ||||
|  | ||||
|     def test_handle_message(self): | ||||
|         message = namedtuple('MailMessage', []) | ||||
|         message.subject = "the message title" | ||||
|         message.from_ = "Myself" | ||||
|  | ||||
|         att = namedtuple('Attachment', []) | ||||
|         att.filename = "test1.pdf" | ||||
|         att.content_type = 'application/pdf' | ||||
|         att.payload = b"attachment contents" | ||||
|  | ||||
|         att2 = namedtuple('Attachment', []) | ||||
|         att2.filename = "test2.pdf" | ||||
|         att2.content_type = 'application/pdf' | ||||
|         att2.payload = b"attachment contents" | ||||
|  | ||||
|         att3 = namedtuple('Attachment', []) | ||||
|         att3.filename = "test3.pdf" | ||||
|         att3.content_type = 'application/invalid' | ||||
|         att3.payload = b"attachment contents" | ||||
|  | ||||
|         message.attachments = [att, att2, att3] | ||||
|         message = create_message(subject="the message title", from_="Myself", num_attachments=2) | ||||
|  | ||||
|         account = MailAccount() | ||||
|         rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) | ||||
| @@ -215,14 +212,13 @@ class TestMail(TestCase): | ||||
|         args1, kwargs1 = self.async_task.call_args_list[0] | ||||
|         args2, kwargs2 = self.async_task.call_args_list[1] | ||||
|  | ||||
|         self.assertEqual(kwargs1['override_title'], "test1") | ||||
|         self.assertEqual(kwargs1['override_filename'], "test1.pdf") | ||||
|         self.assertEqual(kwargs1['override_title'], "file_0") | ||||
|         self.assertEqual(kwargs1['override_filename'], "file_0.pdf") | ||||
|  | ||||
|         self.assertEqual(kwargs2['override_title'], "test2") | ||||
|         self.assertEqual(kwargs2['override_filename'], "test2.pdf") | ||||
|         self.assertEqual(kwargs2['override_title'], "file_1") | ||||
|         self.assertEqual(kwargs2['override_filename'], "file_1.pdf") | ||||
|  | ||||
|     @mock.patch("paperless_mail.mail.async_task") | ||||
|     def test_handle_empty_message(self, m): | ||||
|     def test_handle_empty_message(self): | ||||
|         message = namedtuple('MailMessage', []) | ||||
|  | ||||
|         message.attachments = [] | ||||
| @@ -230,9 +226,45 @@ class TestMail(TestCase): | ||||
|  | ||||
|         result = self.mail_account_handler.handle_message(message, rule) | ||||
|  | ||||
|         self.assertFalse(m.called) | ||||
|         self.assertFalse(self.async_task.called) | ||||
|         self.assertEqual(result, 0) | ||||
|  | ||||
|     def test_handle_unknown_mime_type(self): | ||||
|         message = create_message() | ||||
|         message.attachments = [ | ||||
|             create_attachment(filename="f1.pdf"), | ||||
|             create_attachment(filename="f2.json", payload=b"{'much': 'payload.', 'so': 'json', 'wow': true}") | ||||
|         ] | ||||
|  | ||||
|         account = MailAccount() | ||||
|         rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) | ||||
|  | ||||
|         result = self.mail_account_handler.handle_message(message, rule) | ||||
|  | ||||
|         self.assertEqual(result, 1) | ||||
|         self.assertEqual(self.async_task.call_count, 1) | ||||
|  | ||||
|         args, kwargs = self.async_task.call_args | ||||
|         self.assertEqual(kwargs['override_filename'], "f1.pdf") | ||||
|  | ||||
|     def test_handle_disposition(self): | ||||
|         message = create_message() | ||||
|         message.attachments = [ | ||||
|             create_attachment(filename="f1.pdf", content_disposition='inline'), | ||||
|             create_attachment(filename="f2.pdf", content_disposition='attachment') | ||||
|         ] | ||||
|  | ||||
|         account = MailAccount() | ||||
|         rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account) | ||||
|  | ||||
|         result = self.mail_account_handler.handle_message(message, rule) | ||||
|  | ||||
|         self.assertEqual(result, 1) | ||||
|         self.assertEqual(self.async_task.call_count, 1) | ||||
|  | ||||
|         args, kwargs = self.async_task.call_args | ||||
|         self.assertEqual(kwargs['override_filename'], "f2.pdf") | ||||
|  | ||||
|     def test_handle_mail_account_mark_read(self): | ||||
|  | ||||
|         account = MailAccount.objects.create(name="test", imap_server="", username="admin", password="secret") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jonas Winkler
					Jonas Winkler