Rewrites the email parsing to be more clear and concise.

Adds testing to use httpx mocked responses to stand in as a server even offline
This commit is contained in:
Trenton H
2023-06-01 14:50:08 -07:00
parent 36f09c4974
commit 1b3492a01f
9 changed files with 701 additions and 823 deletions

View File

@@ -1,24 +1,39 @@
import datetime
import os
from pathlib import Path
from unittest import mock
import httpx
from django.test import TestCase
from documents.parsers import ParseError
from documents.tests.utils import FileSystemAssertsMixin
from paperless_mail.parsers import MailDocumentParser
from paperless_tika.tests.utils import HttpxMockMixin
class TestParser(FileSystemAssertsMixin, TestCase):
SAMPLE_FILES = os.path.join(os.path.dirname(__file__), "samples")
class BaseMailParserTestCase(TestCase):
"""
Basic setup for the below test cases
"""
SAMPLE_DIR = Path(__file__).parent / "samples"
def setUp(self) -> None:
super().setUp()
self.parser = MailDocumentParser(logging_group=None)
def tearDown(self) -> None:
super().tearDown()
self.parser.cleanup()
def test_get_parsed_missing_file(self):
class TestEmailFileParsing(FileSystemAssertsMixin, BaseMailParserTestCase):
"""
Tests around reading a file and parsing it into a
MailMessage
"""
def test_parse_error_missing_file(self):
"""
GIVEN:
- Fresh parser
@@ -28,13 +43,17 @@ class TestParser(FileSystemAssertsMixin, TestCase):
- An Exception is thrown
"""
# Check if exception is raised when parsing fails.
test_file = self.SAMPLE_DIR / "doesntexist.eml"
self.assertIsNotFile(test_file)
self.assertRaises(
ParseError,
self.parser.get_parsed,
os.path.join(self.SAMPLE_FILES, "na"),
self.parser.parse,
test_file,
"messages/rfc822",
)
def test_get_parsed_broken_file(self):
def test_parse_error_invalid_email(self):
"""
GIVEN:
- Fresh parser
@@ -46,11 +65,12 @@ class TestParser(FileSystemAssertsMixin, TestCase):
# Check if exception is raised when the mail is faulty.
self.assertRaises(
ParseError,
self.parser.get_parsed,
os.path.join(self.SAMPLE_FILES, "broken.eml"),
self.parser.parse,
self.SAMPLE_DIR / "broken.eml",
"messages/rfc822",
)
def test_get_parsed_simple_text_mail(self):
def test_parse_simple_text_email_file(self):
"""
GIVEN:
- Fresh parser
@@ -60,8 +80,8 @@ class TestParser(FileSystemAssertsMixin, TestCase):
- The content of the mail should be available in the parse result.
"""
# Parse Test file and check relevant content
parsed1 = self.parser.get_parsed(
os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
parsed1 = self.parser.parse_file_to_message(
self.SAMPLE_DIR / "simple_text.eml",
)
self.assertEqual(parsed1.date.year, 2022)
@@ -76,58 +96,11 @@ class TestParser(FileSystemAssertsMixin, TestCase):
self.assertEqual(parsed1.text, "This is just a simple Text Mail.\n")
self.assertEqual(parsed1.to, ("some@one.de",))
def test_get_parsed_reparse(self):
"""
GIVEN:
- An E-Mail was parsed
WHEN:
- Another .eml file should be parsed
THEN:
- The parser should not retry to parse and return the old results
"""
# Parse Test file and check relevant content
parsed1 = self.parser.get_parsed(
os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
)
# Check if same parsed object as before is returned, even if another file is given.
parsed2 = self.parser.get_parsed(
os.path.join(os.path.join(self.SAMPLE_FILES, "html.eml")),
)
self.assertEqual(parsed1, parsed2)
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
@mock.patch("paperless_mail.parsers.make_thumbnail_from_pdf")
def test_get_thumbnail(
self,
mock_make_thumbnail_from_pdf: mock.MagicMock,
mock_generate_pdf: mock.MagicMock,
):
"""
GIVEN:
- An E-Mail was parsed
WHEN:
- The Thumbnail is requested
THEN:
- The parser should call the functions which generate the thumbnail
"""
mocked_return = "Passing the return value through.."
mock_make_thumbnail_from_pdf.return_value = mocked_return
mock_generate_pdf.return_value = "Mocked return value.."
thumb = self.parser.get_thumbnail(
os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
"message/rfc822",
)
self.assertEqual(
self.parser.archive_path,
mock_make_thumbnail_from_pdf.call_args_list[0].args[0],
)
self.assertEqual(
self.parser.tempdir,
mock_make_thumbnail_from_pdf.call_args_list[0].args[1],
)
self.assertEqual(mocked_return, thumb)
class TestEmailMetadataExtraction(BaseMailParserTestCase):
"""
Tests extraction of metadata from an email
"""
def test_extract_metadata_fail(self):
"""
@@ -157,7 +130,7 @@ class TestParser(FileSystemAssertsMixin, TestCase):
"""
# Validate Metadata parsing returns the expected results
metadata = self.parser.extract_metadata(
os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
self.SAMPLE_DIR / "simple_text.eml",
"message/rfc822",
)
@@ -287,90 +260,53 @@ class TestParser(FileSystemAssertsMixin, TestCase):
metadata,
)
def test_parse_na(self):
class TestEmailThumbnailGenerate(BaseMailParserTestCase):
"""
Tests the correct generation of an thumbnail for an email
"""
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
@mock.patch("paperless_mail.parsers.make_thumbnail_from_pdf")
def test_get_thumbnail(
self,
mock_make_thumbnail_from_pdf: mock.MagicMock,
mock_generate_pdf: mock.MagicMock,
):
"""
GIVEN:
- Fresh start
- An E-Mail was parsed
WHEN:
- parsing is attempted with nonexistent file
- The Thumbnail is requested
THEN:
- Exception is thrown
- The parser should call the functions which generate the thumbnail
"""
# Check if exception is raised when parsing fails.
self.assertRaises(
ParseError,
self.parser.parse,
os.path.join(self.SAMPLE_FILES, "na"),
mocked_return = "Passing the return value through.."
mock_make_thumbnail_from_pdf.return_value = mocked_return
mock_generate_pdf.return_value = "Mocked return value.."
test_file = self.SAMPLE_DIR / "simple_text.eml"
thumb = self.parser.get_thumbnail(
test_file,
"message/rfc822",
)
@mock.patch("paperless_mail.parsers.MailDocumentParser.tika_parse")
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
def test_parse_html_eml(self, n, mock_tika_parse: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- parsing is done with html mail
THEN:
- Tika is called, parsed information from non html parts is available
"""
# Validate parsing returns the expected results
text_expected = "Subject: HTML Message\n\nFrom: Name <someone@example.de>\n\nTo: someone@example.de\n\nAttachments: IntM6gnXFm00FEV5.png (6.89 KiB), 600+kbfile.txt (600.24 KiB)\n\nHTML content: tika return\n\nSome Text and an embedded image."
mock_tika_parse.return_value = "tika return"
self.parser.parse(os.path.join(self.SAMPLE_FILES, "html.eml"), "message/rfc822")
self.assertEqual(text_expected, self.parser.text)
self.assertEqual(
datetime.datetime(
2022,
10,
15,
11,
23,
19,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
self.parser.date,
mock_generate_pdf.assert_called_once_with(
test_file,
)
mock_make_thumbnail_from_pdf.assert_called_once_with(
"Mocked return value..",
self.parser.tempdir,
None,
)
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
def test_parse_simple_eml(self, m: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- parsing is done with non html mail
THEN:
- parsed information is available
"""
# Validate parsing returns the expected results
self.assertEqual(mocked_return, thumb)
self.parser.parse(
os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
"message/rfc822",
)
text_expected = "Subject: Simple Text Mail\n\nFrom: Some One <mail@someserver.de>\n\nTo: some@one.de\n\nCC: asdasd@æsdasd.de, asdadasdasdasda.asdasd@æsdasd.de\n\nBCC: fdf@fvf.de\n\n\n\nThis is just a simple Text Mail."
self.assertEqual(text_expected, self.parser.text)
self.assertEqual(
datetime.datetime(
2022,
10,
12,
21,
40,
43,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
self.parser.date,
)
# Just check if tried to generate archive, the unittest for generate_pdf() goes deeper.
m.assert_called()
@mock.patch("paperless_mail.parsers.parser.from_buffer")
def test_tika_parse_unsuccessful(self, mock_from_buffer: mock.MagicMock):
class TestTikaHtmlParse(HttpxMockMixin, BaseMailParserTestCase):
def test_tika_parse_unsuccessful(self):
"""
GIVEN:
- Fresh start
@@ -380,12 +316,13 @@ class TestParser(FileSystemAssertsMixin, TestCase):
- the parser should return an empty string
"""
# Check unsuccessful parsing
mock_from_buffer.return_value = {"content": None}
parsed = self.parser.tika_parse(None)
self.httpx_mock.add_response(
json={"Content-Type": "text/html", "X-TIKA:Parsed-By": []},
)
parsed = self.parser.tika_parse("None")
self.assertEqual("", parsed)
@mock.patch("paperless_mail.parsers.parser.from_buffer")
def test_tika_parse(self, mock_from_buffer: mock.MagicMock):
def test_tika_parse(self):
"""
GIVEN:
- Fresh start
@@ -397,14 +334,18 @@ class TestParser(FileSystemAssertsMixin, TestCase):
html = '<html><head><meta http-equiv="content-type" content="text/html; charset=UTF-8"></head><body><p>Some Text</p></body></html>'
expected_text = "Some Text"
# Check successful parsing
mock_from_buffer.return_value = {"content": expected_text}
self.httpx_mock.add_response(
json={
"Content-Type": "text/html",
"X-TIKA:Parsed-By": [],
"X-TIKA:content": expected_text,
},
)
parsed = self.parser.tika_parse(html)
self.assertEqual(expected_text, parsed.strip())
mock_from_buffer.assert_called_with(html, self.parser.tika_server)
self.assertIn(self.parser.tika_server, str(self.httpx_mock.get_request().url))
@mock.patch("paperless_mail.parsers.parser.from_buffer")
def test_tika_parse_exception(self, mock_from_buffer: mock.MagicMock):
def test_tika_parse_exception(self):
"""
GIVEN:
- Fresh start
@@ -415,11 +356,8 @@ class TestParser(FileSystemAssertsMixin, TestCase):
"""
html = '<html><head><meta http-equiv="content-type" content="text/html; charset=UTF-8"></head><body><p>Some Text</p></body></html>'
# Check ParseError
def my_side_effect():
raise Exception("Test")
self.httpx_mock.add_response(status_code=httpx.codes.INTERNAL_SERVER_ERROR)
mock_from_buffer.side_effect = my_side_effect
self.assertRaises(ParseError, self.parser.tika_parse, html)
def test_tika_parse_unreachable(self):
@@ -437,258 +375,285 @@ class TestParser(FileSystemAssertsMixin, TestCase):
self.parser.tika_server = ""
self.assertRaises(ParseError, self.parser.tika_parse, html)
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_mail")
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_html")
def test_generate_pdf_parse_error(self, m: mock.MagicMock, n: mock.MagicMock):
class TestParser(FileSystemAssertsMixin, HttpxMockMixin, BaseMailParserTestCase):
def test_parse_no_file(self):
"""
GIVEN:
- Fresh start
WHEN:
- pdf generation is requested but gotenberg can not be reached
- parsing is attempted with nonexistent file
THEN:
- Exception is thrown
"""
# Check if exception is raised when parsing fails.
self.assertRaises(
ParseError,
self.parser.parse,
self.SAMPLE_DIR / "na.eml",
"message/rfc822",
)
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
def test_parse_eml_simple(self, mock_generate_pdf: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- parsing is done with non html mail
THEN:
- parsed information is available
"""
# Validate parsing returns the expected results
self.parser.parse(
self.SAMPLE_DIR / "simple_text.eml",
"message/rfc822",
)
text_expected = (
"Subject: Simple Text Mail\n\n"
"From: Some One <mail@someserver.de>\n\n"
"To: some@one.de\n\n"
"CC: asdasd@æsdasd.de, asdadasdasdasda.asdasd@æsdasd.de\n\n"
"BCC: fdf@fvf.de\n\n"
"\n\nThis is just a simple Text Mail."
)
self.assertEqual(text_expected, self.parser.text)
self.assertEqual(
datetime.datetime(
2022,
10,
12,
21,
40,
43,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
self.parser.date,
)
# Just check if tried to generate archive, the unittest for generate_pdf() goes deeper.
mock_generate_pdf.assert_called()
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
def test_parse_eml_html(self, mock_generate_pdf: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- parsing is done with html mail
THEN:
- Tika is called, parsed information from non html parts is available
"""
# Validate parsing returns the expected results
text_expected = (
"Subject: HTML Message\n\n"
"From: Name <someone@example.de>\n\n"
"To: someone@example.de\n\n"
"Attachments: IntM6gnXFm00FEV5.png (6.89 KiB), 600+kbfile.txt (600.24 KiB)\n\n"
"HTML content: tika return\n\n"
"Some Text and an embedded image."
)
self.httpx_mock.add_response(
json={
"Content-Type": "text/html",
"X-TIKA:Parsed-By": [],
"X-TIKA:content": "tika return",
},
)
self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
mock_generate_pdf.assert_called_once()
self.assertEqual(text_expected, self.parser.text)
self.assertEqual(
datetime.datetime(
2022,
10,
15,
11,
23,
19,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
),
self.parser.date,
)
def test_generate_pdf_parse_error(self):
"""
GIVEN:
- Fresh start
WHEN:
- pdf generation is requested but gotenberg fails
THEN:
- a ParseError Exception is thrown
"""
m.return_value = b""
n.return_value = b""
self.httpx_mock.add_response(status_code=httpx.codes.INTERNAL_SERVER_ERROR)
# Check if exception is raised when the pdf can not be created.
self.parser.gotenberg_server = ""
self.assertRaises(
ParseError,
self.parser.generate_pdf,
os.path.join(self.SAMPLE_FILES, "html.eml"),
self.parser.parse,
self.SAMPLE_DIR / "simple_text.eml",
"message/rfc822",
)
def test_generate_pdf_exception(self):
def test_generate_pdf_simple_email(self):
"""
GIVEN:
- Fresh start
- Simple text email with no HTML content
WHEN:
- pdf generation is requested but parsing throws an exception
- Email is parsed
THEN:
- a ParseError Exception is thrown
- Gotenberg is called to generate a PDF from HTML
- Archive file is generated
"""
# Check if exception is raised when the mail can not be parsed.
self.assertRaises(
ParseError,
self.parser.generate_pdf,
os.path.join(self.SAMPLE_FILES, "broken.eml"),
self.httpx_mock.add_response(
url="http://localhost:3000/forms/chromium/convert/html",
method="POST",
content=(self.SAMPLE_DIR / "simple_text.eml.pdf").read_bytes(),
)
@mock.patch("paperless_mail.parsers.requests.post")
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_mail")
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_html")
def test_generate_pdf(
self,
mock_generate_pdf_from_html: mock.MagicMock,
mock_generate_pdf_from_mail: mock.MagicMock,
mock_post: mock.MagicMock,
):
self.parser.parse(self.SAMPLE_DIR / "simple_text.eml", "message/rfc822")
self.assertIsNotNone(self.parser.archive_path)
def test_generate_pdf_html_email(self):
"""
GIVEN:
- Fresh start
- email with HTML content
WHEN:
- pdf generation is requested
- Email is parsed
THEN:
- gotenberg is called and the resulting file is returned
- Gotenberg is called to generate a PDF from HTML
- Gotenberg is used to merge the two PDFs
- Archive file is generated
"""
mock_generate_pdf_from_mail.return_value = b"Mail Return"
mock_generate_pdf_from_html.return_value = b"HTML Return"
self.httpx_mock.add_response(
url="http://localhost:9998/tika/text",
method="PUT",
json={
"Content-Type": "text/html",
"X-TIKA:Parsed-By": [],
"X-TIKA:content": "This is some Tika HTML text",
},
)
self.httpx_mock.add_response(
url="http://localhost:3000/forms/chromium/convert/html",
method="POST",
content=(self.SAMPLE_DIR / "html.eml.pdf").read_bytes(),
)
self.httpx_mock.add_response(
url="http://localhost:3000/forms/pdfengines/merge",
method="POST",
content=b"Pretend merged PDF content",
)
self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
mock_response = mock.MagicMock()
mock_response.content = b"Content"
mock_post.return_value = mock_response
pdf_path = self.parser.generate_pdf(os.path.join(self.SAMPLE_FILES, "html.eml"))
self.assertIsFile(pdf_path)
self.assertIsNotNone(self.parser.archive_path)
mock_generate_pdf_from_mail.assert_called_once_with(
self.parser.get_parsed(None),
def test_generate_pdf_html_email_html_to_pdf_failure(self):
"""
GIVEN:
- email with HTML content
WHEN:
- Email is parsed
- Conversion of email HTML content to PDF fails
THEN:
- ParseError is raised
"""
self.httpx_mock.add_response(
url="http://localhost:9998/tika/text",
method="PUT",
json={
"Content-Type": "text/html",
"X-TIKA:Parsed-By": [],
"X-TIKA:content": "This is some Tika HTML text",
},
)
mock_generate_pdf_from_html.assert_called_once_with(
self.parser.get_parsed(None).html,
self.parser.get_parsed(None).attachments,
self.httpx_mock.add_response(
url="http://localhost:3000/forms/chromium/convert/html",
method="POST",
content=(self.SAMPLE_DIR / "html.eml.pdf").read_bytes(),
)
self.assertEqual(
self.parser.gotenberg_server + "/forms/pdfengines/merge",
mock_post.call_args.args[0],
)
self.assertEqual({}, mock_post.call_args.kwargs["headers"])
self.assertEqual(
b"Mail Return",
mock_post.call_args.kwargs["files"]["1_mail.pdf"][1].read(),
)
self.assertEqual(
b"HTML Return",
mock_post.call_args.kwargs["files"]["2_html.pdf"][1].read(),
self.httpx_mock.add_response(
url="http://localhost:3000/forms/chromium/convert/html",
method="POST",
status_code=httpx.codes.INTERNAL_SERVER_ERROR,
)
with self.assertRaises(ParseError):
self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
mock_response.raise_for_status.assert_called_once()
with open(pdf_path, "rb") as file:
self.assertEqual(b"Content", file.read())
def test_generate_pdf_html_email_merge_failure(self):
"""
GIVEN:
- email with HTML content
WHEN:
- Email is parsed
- Merging of PDFs fails
THEN:
- ParseError is raised
"""
self.httpx_mock.add_response(
url="http://localhost:9998/tika/text",
method="PUT",
json={
"Content-Type": "text/html",
"X-TIKA:Parsed-By": [],
"X-TIKA:content": "This is some Tika HTML text",
},
)
self.httpx_mock.add_response(
url="http://localhost:3000/forms/chromium/convert/html",
method="POST",
content=(self.SAMPLE_DIR / "html.eml.pdf").read_bytes(),
)
self.httpx_mock.add_response(
url="http://localhost:3000/forms/pdfengines/merge",
method="POST",
status_code=httpx.codes.INTERNAL_SERVER_ERROR,
)
with self.assertRaises(ParseError):
self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
def test_mail_to_html(self):
"""
GIVEN:
- Fresh start
- Email message with HTML content
WHEN:
- conversion from eml to html is requested
- Email is parsed
THEN:
- html should be returned
- Resulting HTML is as expected
"""
mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
html_handle = self.parser.mail_to_html(mail)
html_received = html_handle.read()
mail = self.parser.parse_file_to_message(self.SAMPLE_DIR / "html.eml")
html_file = self.parser.mail_to_html(mail)
expected_html_file = self.SAMPLE_DIR / "html.eml.html"
with open(
os.path.join(self.SAMPLE_FILES, "html.eml.html"),
) as html_expected_handle:
html_expected = html_expected_handle.read()
self.assertHTMLEqual(expected_html_file.read_text(), html_file.read_text())
self.assertHTMLEqual(html_expected, html_received)
@mock.patch("paperless_mail.parsers.requests.post")
@mock.patch("paperless_mail.parsers.MailDocumentParser.mail_to_html")
def test_generate_pdf_from_mail(
self,
mock_mail_to_html: mock.MagicMock,
mock_post: mock.MagicMock,
):
"""
GIVEN:
- Fresh start
- Email message with HTML content
WHEN:
- conversion of PDF from .eml is requested
- Email is parsed
THEN:
- gotenberg should be called with valid intermediary html files, the resulting pdf is returned
- Gotenberg is used to convert HTML to PDF
"""
mock_response = mock.MagicMock()
mock_response.content = b"Content"
mock_post.return_value = mock_response
mock_mail_to_html.return_value = "Testresponse"
self.httpx_mock.add_response(content=b"Content")
mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
mail = self.parser.parse_file_to_message(self.SAMPLE_DIR / "html.eml")
retval = self.parser.generate_pdf_from_mail(mail)
self.assertEqual(b"Content", retval)
self.assertEqual(b"Content", retval.read_bytes())
request = self.httpx_mock.get_request()
mock_mail_to_html.assert_called_once_with(mail)
self.assertEqual(
str(request.url),
self.parser.gotenberg_server + "/forms/chromium/convert/html",
mock_post.call_args.args[0],
)
self.assertDictEqual({}, mock_post.call_args.kwargs["headers"])
self.assertDictEqual(
{
"marginTop": "0.1",
"marginBottom": "0.1",
"marginLeft": "0.1",
"marginRight": "0.1",
"paperWidth": "8.27",
"paperHeight": "11.7",
"scale": "1.0",
"pdfFormat": "PDF/A-2b",
},
mock_post.call_args.kwargs["data"],
)
self.assertEqual(
"Testresponse",
mock_post.call_args.kwargs["files"]["html"][1],
)
self.assertEqual(
"output.css",
mock_post.call_args.kwargs["files"]["css"][0],
)
mock_response.raise_for_status.assert_called_once()
def test_transform_inline_html(self):
"""
GIVEN:
- Fresh start
WHEN:
- transforming of html content from an email with an inline image attachment is requested
THEN:
- html is returned and sanitized
"""
class MailAttachmentMock:
def __init__(self, payload, content_id):
self.payload = payload
self.content_id = content_id
result = None
with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
html = html_file.read()
png = png_file.read()
attachments = [
MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
]
result = self.parser.transform_inline_html(html, attachments)
resulting_html = result[-1][1].read()
self.assertTrue(result[-1][0] == "index.html")
self.assertIn(result[0][0], resulting_html)
self.assertNotIn("<script", resulting_html.lower())
@mock.patch("paperless_mail.parsers.requests.post")
def test_generate_pdf_from_html(self, mock_post: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- generating pdf from html with inline attachments is attempted
THEN:
- gotenberg is called with the correct parameters and the resulting pdf is returned
"""
class MailAttachmentMock:
def __init__(self, payload, content_id):
self.payload = payload
self.content_id = content_id
mock_response = mock.MagicMock()
mock_response.content = b"Content"
mock_post.return_value = mock_response
result = None
with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
html = html_file.read()
png = png_file.read()
attachments = [
MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
]
result = self.parser.generate_pdf_from_html(html, attachments)
self.assertEqual(
self.parser.gotenberg_server + "/forms/chromium/convert/html",
mock_post.call_args.args[0],
)
self.assertDictEqual({}, mock_post.call_args.kwargs["headers"])
self.assertDictEqual(
{
"marginTop": "0.1",
"marginBottom": "0.1",
"marginLeft": "0.1",
"marginRight": "0.1",
"paperWidth": "8.27",
"paperHeight": "11.7",
"scale": "1.0",
},
mock_post.call_args.kwargs["data"],
)
# read to assert it is a file like object.
mock_post.call_args.kwargs["files"]["cidpart1pNdUSz0sD3NqVtPgexamplede"][
1
].read()
mock_post.call_args.kwargs["files"]["index.html"][1].read()
mock_response.raise_for_status.assert_called_once()
self.assertEqual(b"Content", result)

View File

@@ -1,29 +1,80 @@
import os
import time
from unittest import mock
from urllib.error import HTTPError
from urllib.request import urlopen
import httpx
import pytest
from django.test import TestCase
from imagehash import average_hash
from pdfminer.high_level import extract_text
from PIL import Image
from documents.parsers import run_convert
from documents.tests.utils import FileSystemAssertsMixin
from paperless_mail.parsers import MailDocumentParser
from paperless_mail.tests.test_parsers import BaseMailParserTestCase
class TestParserLive(FileSystemAssertsMixin, TestCase):
SAMPLE_FILES = os.path.join(os.path.dirname(__file__), "samples")
class MailAttachmentMock:
def __init__(self, payload, content_id):
self.payload = payload
self.content_id = content_id
self.content_type = "image/png"
def setUp(self) -> None:
self.parser = MailDocumentParser(logging_group=None)
def tearDown(self) -> None:
self.parser.cleanup()
@pytest.mark.skipif(
"PAPERLESS_CI_TEST" not in os.environ,
reason="No Gotenberg/Tika servers to test with",
)
class TestUrlCanary(TestCase):
"""
Verify certain URLs are still available so testing is valid still
"""
def test_online_image_exception_on_not_available(self):
"""
GIVEN:
- Fresh start
WHEN:
- nonexistent image is requested
THEN:
- An exception shall be thrown
"""
"""
A public image is used in the html sample file. We have no control
whether this image stays online forever, so here we check if we can detect if is not
available anymore.
"""
with self.assertRaises(httpx.HTTPStatusError) as cm:
resp = httpx.get(
"https://upload.wikimedia.org/wikipedia/en/f/f7/nonexistent.png",
)
resp.raise_for_status()
self.assertEqual(cm.exception.response.status_code, httpx.codes.NOT_FOUND)
def test_is_online_image_still_available(self):
"""
GIVEN:
- Fresh start
WHEN:
- A public image used in the html sample file is requested
THEN:
- No exception shall be thrown
"""
"""
A public image is used in the html sample file. We have no control
whether this image stays online forever, so here we check if it is still there
"""
# Now check the URL used in samples/sample.html
resp = httpx.get("https://upload.wikimedia.org/wikipedia/en/f/f7/RickRoll.png")
resp.raise_for_status()
@pytest.mark.skipif(
"PAPERLESS_CI_TEST" not in os.environ,
reason="No Gotenberg/Tika servers to test with",
)
class TestParserLive(FileSystemAssertsMixin, BaseMailParserTestCase):
@staticmethod
def imagehash(file, hash_size=18):
return f"{average_hash(Image.open(file), hash_size)}"
@@ -54,13 +105,18 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
result = method_or_callable(*args)
succeeded = True
except Exception as e:
except httpx.HTTPError as e:
raise
# Retry on HTTP errors
print(f"{e} during try #{retry_count}", flush=True)
retry_count = retry_count + 1
time.sleep(retry_time)
retry_time = retry_time * 2.0
except Exception:
# Not on other error
raise
self.assertTrue(
succeeded,
@@ -79,17 +135,14 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
THEN:
- The returned thumbnail image file is as expected
"""
mock_generate_pdf.return_value = os.path.join(
self.SAMPLE_FILES,
"simple_text.eml.pdf",
)
mock_generate_pdf.return_value = self.SAMPLE_DIR / "simple_text.eml.pdf"
thumb = self.parser.get_thumbnail(
os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
self.SAMPLE_DIR / "simple_text.eml",
"message/rfc822",
)
self.assertIsFile(thumb)
expected = os.path.join(self.SAMPLE_FILES, "simple_text.eml.pdf.webp")
expected = self.SAMPLE_DIR / "simple_text.eml.pdf.webp"
self.assertEqual(
self.imagehash(thumb),
@@ -97,10 +150,6 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
f"Created Thumbnail {thumb} differs from expected file {expected}",
)
@pytest.mark.skipif(
"TIKA_LIVE" not in os.environ,
reason="No tika server",
)
def test_tika_parse_successful(self):
"""
GIVEN:
@@ -117,27 +166,6 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
parsed = self.parser.tika_parse(html)
self.assertEqual(expected_text, parsed.strip())
@pytest.mark.skipif(
"TIKA_LIVE" not in os.environ,
reason="No tika server",
)
def test_tika_parse_unsuccessful(self):
"""
GIVEN:
- Fresh start
WHEN:
- tika parsing fails
THEN:
- the parser should return an empty string
"""
# Check unsuccessful parsing
parsed = self.parser.tika_parse(None)
self.assertEqual("", parsed)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_mail")
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_html")
def test_generate_pdf_gotenberg_merging(
@@ -153,15 +181,16 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
THEN:
- gotenberg is called to merge files and the resulting file is returned
"""
with open(os.path.join(self.SAMPLE_FILES, "first.pdf"), "rb") as first:
mock_generate_pdf_from_mail.return_value = first.read()
mock_generate_pdf_from_mail.return_value = self.SAMPLE_DIR / "first.pdf"
mock_generate_pdf_from_html.return_value = self.SAMPLE_DIR / "second.pdf"
with open(os.path.join(self.SAMPLE_FILES, "second.pdf"), "rb") as second:
mock_generate_pdf_from_html.return_value = second.read()
msg = self.parser.parse_file_to_message(
self.SAMPLE_DIR / "html.eml",
)
pdf_path = self.util_call_with_backoff(
self.parser.generate_pdf,
[os.path.join(self.SAMPLE_FILES, "html.eml")],
[msg],
)
self.assertIsFile(pdf_path)
@@ -169,38 +198,9 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
expected = (
"first\tPDF\tto\tbe\tmerged.\n\n\x0csecond\tPDF\tto\tbe\tmerged.\n\n\x0c"
)
self.assertEqual(expected, extracted)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
def test_generate_pdf_from_mail_no_convert(self):
"""
GIVEN:
- Fresh start
WHEN:
- pdf generation from simple eml file is requested
THEN:
- gotenberg is called and the resulting file is returned and contains the expected text.
"""
mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
pdf_path = os.path.join(self.parser.tempdir, "html.eml.pdf")
with open(pdf_path, "wb") as file:
file.write(
self.util_call_with_backoff(self.parser.generate_pdf_from_mail, [mail]),
)
extracted = extract_text(pdf_path)
expected = extract_text(os.path.join(self.SAMPLE_FILES, "html.eml.pdf"))
self.assertEqual(expected, extracted)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
def test_generate_pdf_from_mail(self):
"""
GIVEN:
@@ -210,193 +210,32 @@ class TestParserLive(FileSystemAssertsMixin, TestCase):
THEN:
- gotenberg is called and the resulting file is returned and look as expected.
"""
mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
pdf_path = os.path.join(self.parser.tempdir, "html.eml.pdf")
with open(pdf_path, "wb") as file:
file.write(
self.util_call_with_backoff(self.parser.generate_pdf_from_mail, [mail]),
)
converted = os.path.join(
self.parser.tempdir,
"html.eml.pdf.webp",
self.util_call_with_backoff(
self.parser.parse,
[self.SAMPLE_DIR / "html.eml", "message/rfc822"],
)
run_convert(
density=300,
scale="500x5000>",
alpha="remove",
strip=True,
trim=False,
auto_orient=True,
input_file=f"{pdf_path}", # Do net define an index to convert all pages.
output_file=converted,
logging_group=None,
# Check the archive PDF
archive_path = self.parser.get_archive_path()
archive_text = extract_text(archive_path)
expected_archive_text = extract_text(self.SAMPLE_DIR / "html.eml.pdf")
# Archive includes the HTML content, so use in
self.assertIn(expected_archive_text, archive_text)
# Check the thumbnail
generated_thumbnail = self.parser.get_thumbnail(
self.SAMPLE_DIR / "html.eml",
"message/rfc822",
)
self.assertIsFile(converted)
thumb_hash = self.imagehash(converted)
generated_thumbnail_hash = self.imagehash(generated_thumbnail)
# The created pdf is not reproducible. But the converted image should always look the same.
expected_hash = self.imagehash(
os.path.join(self.SAMPLE_FILES, "html.eml.pdf.webp"),
)
self.assertEqual(
thumb_hash,
expected_hash,
f"PDF looks different. Check if {converted} looks weird.",
)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
def test_generate_pdf_from_html_no_convert(self):
"""
GIVEN:
- Fresh start
WHEN:
- pdf generation from html eml file is requested
THEN:
- gotenberg is called and the resulting file is returned and contains the expected text.
"""
class MailAttachmentMock:
def __init__(self, payload, content_id):
self.payload = payload
self.content_id = content_id
result = None
with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
html = html_file.read()
png = png_file.read()
attachments = [
MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
]
result = self.util_call_with_backoff(
self.parser.generate_pdf_from_html,
[html, attachments],
)
pdf_path = os.path.join(self.parser.tempdir, "sample.html.pdf")
with open(pdf_path, "wb") as file:
file.write(result)
extracted = extract_text(pdf_path)
expected = extract_text(os.path.join(self.SAMPLE_FILES, "sample.html.pdf"))
self.assertEqual(expected, extracted)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
def test_generate_pdf_from_html(self):
"""
GIVEN:
- Fresh start
WHEN:
- pdf generation from html eml file is requested
THEN:
- gotenberg is called and the resulting file is returned and look as expected.
"""
class MailAttachmentMock:
def __init__(self, payload, content_id):
self.payload = payload
self.content_id = content_id
result = None
with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
html = html_file.read()
png = png_file.read()
attachments = [
MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
]
result = self.util_call_with_backoff(
self.parser.generate_pdf_from_html,
[html, attachments],
)
pdf_path = os.path.join(self.parser.tempdir, "sample.html.pdf")
with open(pdf_path, "wb") as file:
file.write(result)
converted = os.path.join(self.parser.tempdir, "sample.html.pdf.webp")
run_convert(
density=300,
scale="500x5000>",
alpha="remove",
strip=True,
trim=False,
auto_orient=True,
input_file=f"{pdf_path}", # Do net define an index to convert all pages.
output_file=converted,
logging_group=None,
)
self.assertIsFile(converted)
thumb_hash = self.imagehash(converted)
# The created pdf is not reproducible. But the converted image should always look the same.
expected_hash = self.imagehash(
os.path.join(self.SAMPLE_FILES, "sample.html.pdf.webp"),
)
expected_hash = self.imagehash(self.SAMPLE_DIR / "html.eml.pdf.webp")
self.assertEqual(
thumb_hash,
generated_thumbnail_hash,
expected_hash,
f"PDF looks different. Check if {converted} looks weird. "
f"If Rick Astley is shown, Gotenberg loads from web which is bad for Mail content.",
f"PDF looks different. Check if {generated_thumbnail} looks weird.",
)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
def test_online_image_exception_on_not_available(self):
"""
GIVEN:
- Fresh start
WHEN:
- nonexistent image is requested
THEN:
- An exception shall be thrown
"""
"""
A public image is used in the html sample file. We have no control
whether this image stays online forever, so here we check if we can detect if is not
available anymore.
"""
# Start by Testing if nonexistent URL really throws an Exception
self.assertRaises(
HTTPError,
urlopen,
"https://upload.wikimedia.org/wikipedia/en/f/f7/nonexistent.png",
)
@pytest.mark.skipif(
"GOTENBERG_LIVE" not in os.environ,
reason="No gotenberg server",
)
def test_is_online_image_still_available(self):
"""
GIVEN:
- Fresh start
WHEN:
- A public image used in the html sample file is requested
THEN:
- No exception shall be thrown
"""
"""
A public image is used in the html sample file. We have no control
whether this image stays online forever, so here we check if it is still there
"""
# Now check the URL used in samples/sample.html
urlopen("https://upload.wikimedia.org/wikipedia/en/f/f7/RickRoll.png")