import os
import re
from io import BytesIO
from io import StringIO
import requests
from bleach import clean
from bleach import linkify
from django.conf import settings
from documents.parsers import DocumentParser
from documents.parsers import make_thumbnail_from_pdf
from documents.parsers import ParseError
from imap_tools import MailMessage
from tika import parser
class MailDocumentParser(DocumentParser):
"""
This parser sends documents to a local tika server
"""
logging_name = "paperless.parsing.mail"
_parsed = None
def get_parsed(self, document_path) -> MailMessage:
if not self._parsed:
try:
with open(document_path, "rb") as eml:
self._parsed = MailMessage.from_bytes(eml.read())
except Exception as err:
raise ParseError(
f"Could not parse {document_path}: {err}",
)
return self._parsed
def get_thumbnail(self, document_path, mime_type, file_name=None):
if not self.archive_path:
self.archive_path = self.generate_pdf(document_path)
return make_thumbnail_from_pdf(
self.archive_path,
self.tempdir,
self.logging_group,
)
def extract_metadata(self, document_path, mime_type):
result = []
try:
mail = self.get_parsed(document_path)
except ParseError as e:
self.log(
"warning",
f"Error while fetching document metadata for " f"{document_path}: {e}",
)
return result
for key, value in mail.headers.items():
value = ", ".join(i for i in value)
result.append(
{
"namespace": "",
"prefix": "header",
"key": key,
"value": value,
},
)
result.append(
{
"namespace": "",
"prefix": "",
"key": "attachments",
"value": ", ".join(
f"{attachment.filename}({(attachment.size / 1024):.2f} KiB)"
for attachment in mail.attachments
),
},
)
result.append(
{
"namespace": "",
"prefix": "",
"key": "date",
"value": mail.date.strftime("%Y-%m-%d %H:%M:%S %Z"),
},
)
result.sort(key=lambda item: (item["prefix"], item["key"]))
return result
def parse(self, document_path, mime_type, file_name=None):
def strip_content(text: str):
text = re.sub("\t", " ", text)
text = re.sub(" +", " ", text)
text = re.sub("(\n *)+", "\n", text)
return text.strip()
mail = self.get_parsed(document_path)
self.text = f"{strip_content(mail.text)}\n\n"
self.text += f"Subject: {mail.subject}\n\n"
self.text += f"From: {mail.from_values.full}\n\n"
self.text += f"To: {', '.join(address.full for address in mail.to_values)}\n\n"
if len(mail.cc_values) >= 1:
self.text += (
f"CC: {', '.join(address.full for address in mail.cc_values)}\n\n"
)
if len(mail.bcc_values) >= 1:
self.text += (
f"BCC: {', '.join(address.full for address in mail.bcc_values)}\n\n"
)
if len(mail.attachments) >= 1:
att = ", ".join(f"{a.filename} ({a.size})" for a in mail.attachments)
self.text += f"Attachments: {att}\n\n"
if mail.html != "":
self.text += "HTML content: " + strip_content(self.tika_parse(mail.html))
self.date = mail.date
self.archive_path = self.generate_pdf(document_path)
def tika_parse(self, input):
self.log("info", "Sending content to Tika server")
tika_server = settings.PAPERLESS_TIKA_ENDPOINT
try:
parsed = parser.from_buffer(input, tika_server)
except Exception as err:
raise ParseError(
f"Could not parse content with tika server at " f"{tika_server}: {err}",
)
if parsed["content"]:
return parsed["content"]
else:
return ""
def generate_pdf(self, document_path):
def clean_html(text: str):
if isinstance(text, list):
text = "\n".join([str(e) for e in text])
if type(text) != str:
text = str(text)
text = text.replace("&", "&")
text = text.replace("<", "<")
text = text.replace(">", ">")
text = text.replace(" ", " ")
text = text.replace("'", "'")
text = text.replace('"', """)
text = clean(text)
text = linkify(text, parse_email=True)
text = text.replace("\n", "
")
return text
def clean_html_script(text: str):
text = text.replace("