Adds better handling for files with invalid utf8 content

This commit is contained in:
Trenton H 2023-05-12 14:21:32 -07:00
parent 350c20d6ab
commit 111960c530
6 changed files with 47 additions and 16 deletions

View File

@ -7,6 +7,7 @@ import shutil
import subprocess import subprocess
import tempfile import tempfile
from functools import lru_cache from functools import lru_cache
from pathlib import Path
from typing import Iterator from typing import Iterator
from typing import Match from typing import Match
from typing import Optional from typing import Optional
@ -319,6 +320,18 @@ class DocumentParser(LoggingMixin):
if self.progress_callback: if self.progress_callback:
self.progress_callback(current_progress, max_progress) self.progress_callback(current_progress, max_progress)
def read_file_handle_unicode_errors(self, filepath: Path) -> str:
"""
Helper utility for reading from a file, and handling a problem with its
unicode, falling back to ignoring the error to remove the invalid bytes
"""
try:
text = filepath.read_text(encoding="utf-8")
except UnicodeDecodeError as e:
self.log("warning", f"Unicode error during text reading, continuing: {e}")
text = filepath.read_bytes().decode("utf-8", errors="ignore")
return text
def extract_metadata(self, document_path, mime_type): def extract_metadata(self, document_path, mime_type):
return [] return []

View File

@ -122,8 +122,7 @@ class RasterisedDocumentParser(DocumentParser):
and os.path.isfile(sidecar_file) and os.path.isfile(sidecar_file)
and settings.OCR_MODE != "redo" and settings.OCR_MODE != "redo"
): ):
with open(sidecar_file) as f: text = self.read_file_handle_unicode_errors(sidecar_file)
text = f.read()
if "[OCR skipped on page" not in text: if "[OCR skipped on page" not in text:
# This happens when there's already text in the input file. # This happens when there's already text in the input file.
@ -155,7 +154,7 @@ class RasterisedDocumentParser(DocumentParser):
tmp.name, tmp.name,
], ],
) )
text = tmp.read() text = self.read_file_handle_unicode_errors(Path(tmp.name))
return post_process_text(text) return post_process_text(text)

View File

@ -2,6 +2,7 @@ import os
import shutil import shutil
import tempfile import tempfile
import uuid import uuid
from pathlib import Path
from typing import ContextManager from typing import ContextManager
from unittest import mock from unittest import mock
@ -39,7 +40,7 @@ class FakeImageFile(ContextManager):
class TestParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase): class TestParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
SAMPLE_FILES = os.path.join(os.path.dirname(__file__), "samples") SAMPLE_FILES = Path(__file__).resolve().parent / "samples"
def assertContainsStrings(self, content, strings): def assertContainsStrings(self, content, strings):
# Asserts that all strings appear in content, in the given order. # Asserts that all strings appear in content, in the given order.
@ -77,7 +78,7 @@ class TestParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
parser = RasterisedDocumentParser(uuid.uuid4()) parser = RasterisedDocumentParser(uuid.uuid4())
text = parser.extract_text( text = parser.extract_text(
None, None,
os.path.join(self.SAMPLE_FILES, "simple-digital.pdf"), self.SAMPLE_FILES / "simple-digital.pdf",
) )
self.assertContainsStrings(text.strip(), ["This is a test document."]) self.assertContainsStrings(text.strip(), ["This is a test document."])

View File

@ -16,11 +16,7 @@ class TextDocumentParser(DocumentParser):
logging_name = "paperless.parsing.text" logging_name = "paperless.parsing.text"
def get_thumbnail(self, document_path, mime_type, file_name=None): def get_thumbnail(self, document_path, mime_type, file_name=None):
def read_text(): text = self.read_file_handle_unicode_errors(document_path)
with open(document_path) as src:
lines = [line.strip() for line in src.readlines()]
text = "\n".join(lines[:50])
return text
img = Image.new("RGB", (500, 700), color="white") img = Image.new("RGB", (500, 700), color="white")
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
@ -29,7 +25,7 @@ class TextDocumentParser(DocumentParser):
size=20, size=20,
layout_engine=ImageFont.Layout.BASIC, layout_engine=ImageFont.Layout.BASIC,
) )
draw.text((5, 5), read_text(), font=font, fill="black") draw.text((5, 5), text, font=font, fill="black")
out_path = os.path.join(self.tempdir, "thumb.webp") out_path = os.path.join(self.tempdir, "thumb.webp")
img.save(out_path, format="WEBP") img.save(out_path, format="WEBP")
@ -37,5 +33,4 @@ class TextDocumentParser(DocumentParser):
return out_path return out_path
def parse(self, document_path, mime_type, file_name=None): def parse(self, document_path, mime_type, file_name=None):
with open(document_path) as f: self.text = self.read_file_handle_unicode_errors(document_path)
self.text = f.read()

View File

@ -0,0 +1 @@
Pantothensäure

View File

@ -1,4 +1,4 @@
import os from pathlib import Path
from django.test import TestCase from django.test import TestCase
@ -8,12 +8,14 @@ from paperless_text.parsers import TextDocumentParser
class TestTextParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase): class TestTextParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
SAMPLE_DIR = Path(__file__).resolve().parent / "samples"
def test_thumbnail(self): def test_thumbnail(self):
parser = TextDocumentParser(None) parser = TextDocumentParser(None)
# just make sure that it does not crash # just make sure that it does not crash
f = parser.get_thumbnail( f = parser.get_thumbnail(
os.path.join(os.path.dirname(__file__), "samples", "test.txt"), self.SAMPLE_DIR / "test.txt",
"text/plain", "text/plain",
) )
self.assertIsFile(f) self.assertIsFile(f)
@ -22,9 +24,29 @@ class TestTextParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
parser = TextDocumentParser(None) parser = TextDocumentParser(None)
parser.parse( parser.parse(
os.path.join(os.path.dirname(__file__), "samples", "test.txt"), self.SAMPLE_DIR / "test.txt",
"text/plain", "text/plain",
) )
self.assertEqual(parser.get_text(), "This is a test file.\n") self.assertEqual(parser.get_text(), "This is a test file.\n")
self.assertIsNone(parser.get_archive_path()) self.assertIsNone(parser.get_archive_path())
def test_parse_invalid_bytes(self):
"""
GIVEN:
- Text file which contains invalid UTF bytes
WHEN:
- The file is parsed
THEN:
- Parsing continues
- Invalid bytes are removed
"""
parser = TextDocumentParser(None)
parser.parse(
self.SAMPLE_DIR / "decode_error.txt",
"text/plain",
)
self.assertEqual(parser.get_text(), "Pantothensure\n")
self.assertIsNone(parser.get_archive_path())