mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
reworked PDF parser that uses OCRmyPDF and produces archive files.
This commit is contained in:
@@ -107,23 +107,6 @@ def run_convert(input_file,
|
||||
raise ParseError("Convert failed at {}".format(args))
|
||||
|
||||
|
||||
def run_unpaper(pnm, logging_group=None):
|
||||
pnm_out = pnm.replace(".pnm", ".unpaper.pnm")
|
||||
|
||||
command_args = (settings.UNPAPER_BINARY, "--overwrite", "--quiet", pnm,
|
||||
pnm_out)
|
||||
|
||||
logger.debug(f"Execute: {' '.join(command_args)}",
|
||||
extra={'group': logging_group})
|
||||
|
||||
if not subprocess.Popen(command_args,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL).wait() == 0:
|
||||
raise ParseError(f"Unpaper failed at {command_args}")
|
||||
|
||||
return pnm_out
|
||||
|
||||
|
||||
class ParseError(Exception):
|
||||
pass
|
||||
|
||||
|
@@ -338,9 +338,13 @@ OCR_PAGES = int(os.getenv('PAPERLESS_OCR_PAGES', 0))
|
||||
# documents. It should be a 3-letter language code consistent with ISO 639.
|
||||
OCR_LANGUAGE = os.getenv("PAPERLESS_OCR_LANGUAGE", "eng")
|
||||
|
||||
# OCRmyPDF --output-type options are available.
|
||||
# TODO: validate this setting.
|
||||
OCR_OUTPUT_TYPE = os.getenv("PAPERLESS_OCR_OUTPUT_TYPE", "pdfa")
|
||||
|
||||
# OCR all documents?
|
||||
OCR_ALWAYS = __get_boolean("PAPERLESS_OCR_ALWAYS", "false")
|
||||
# skip. redo, force
|
||||
# TODO: validate this.
|
||||
OCR_MODE = os.getenv("PAPERLESS_OCR_MODE", "skip")
|
||||
|
||||
# GNUPG needs a home directory for some reason
|
||||
GNUPG_HOME = os.getenv("HOME", "/tmp")
|
||||
|
@@ -1,23 +1,14 @@
|
||||
import itertools
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from multiprocessing.pool import ThreadPool
|
||||
|
||||
import langdetect
|
||||
import ocrmypdf
|
||||
import pdftotext
|
||||
import pyocr
|
||||
from PIL import Image
|
||||
from django.conf import settings
|
||||
from pyocr import PyocrException
|
||||
from ocrmypdf import InputFileError
|
||||
|
||||
from documents.parsers import DocumentParser, ParseError, run_unpaper, \
|
||||
run_convert
|
||||
from .languages import ISO639
|
||||
|
||||
|
||||
class OCRError(Exception):
|
||||
pass
|
||||
from documents.parsers import DocumentParser, ParseError, run_convert
|
||||
|
||||
|
||||
class RasterisedDocumentParser(DocumentParser):
|
||||
@@ -29,6 +20,7 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
def __init__(self, path, logging_group):
|
||||
super().__init__(path, logging_group)
|
||||
self._text = None
|
||||
self._archive_path = None
|
||||
|
||||
def get_thumbnail(self):
|
||||
"""
|
||||
@@ -74,113 +66,67 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
|
||||
return out_path
|
||||
|
||||
def _is_ocred(self):
|
||||
|
||||
# Extract text from PDF using pdftotext
|
||||
text = get_text_from_pdf(self.document_path)
|
||||
|
||||
# We assume, that a PDF with at least 50 characters contains text
|
||||
# (so no OCR required)
|
||||
return len(text) > 50
|
||||
|
||||
def get_text(self):
|
||||
|
||||
if self._text is not None:
|
||||
if self._text:
|
||||
return self._text
|
||||
|
||||
if not settings.OCR_ALWAYS and self._is_ocred():
|
||||
self.log("debug", "Skipping OCR, using Text from PDF")
|
||||
self._text = get_text_from_pdf(self.document_path)
|
||||
return self._text
|
||||
archive_path = os.path.join(self.tempdir, "archive.pdf")
|
||||
|
||||
images = self._get_greyscale()
|
||||
ocr_args = {
|
||||
'input_file': self.document_path,
|
||||
'output_file': archive_path,
|
||||
'use_threads': True,
|
||||
'jobs': settings.THREADS_PER_WORKER,
|
||||
'language': settings.OCR_LANGUAGE,
|
||||
'output_type': settings.OCR_OUTPUT_TYPE,
|
||||
'progress_bar': False,
|
||||
'clean': True
|
||||
}
|
||||
|
||||
if not images:
|
||||
raise ParseError("Empty document, nothing to do.")
|
||||
if settings.OCR_PAGES > 0:
|
||||
ocr_args['pages'] = f"1-{settings.OCR_PAGES}"
|
||||
|
||||
if settings.OCR_MODE == 'skip':
|
||||
ocr_args['skip_text'] = True
|
||||
elif settings.OCR_MODE == 'redo':
|
||||
ocr_args['redo_ocr'] = True
|
||||
elif settings.OCR_MODE == 'force':
|
||||
ocr_args['force_ocr'] = True
|
||||
|
||||
try:
|
||||
ocrmypdf.ocr(**ocr_args)
|
||||
# success! announce that we have an archive document
|
||||
self._archive_path = archive_path
|
||||
self._text = get_text_from_pdf(self._archive_path)
|
||||
|
||||
sample_page_index = int(len(images) / 2)
|
||||
self.log(
|
||||
"debug",
|
||||
f"Attempting language detection on page "
|
||||
f"{sample_page_index + 1} of {len(images)}...")
|
||||
except InputFileError as e:
|
||||
# This happens with some PDFs when used with the redo_ocr option.
|
||||
# This is not the end of the world, we'll just use what we already
|
||||
# have in the document.
|
||||
self._text = get_text_from_pdf(self.document_path)
|
||||
# Also, no archived file.
|
||||
if not self._text:
|
||||
# However, if we don't have anything, fail:
|
||||
raise ParseError(e)
|
||||
|
||||
sample_page_text = self._ocr([images[sample_page_index]],
|
||||
settings.OCR_LANGUAGE)[0]
|
||||
guessed_language = self._guess_language(sample_page_text)
|
||||
|
||||
if not guessed_language or guessed_language not in ISO639:
|
||||
self.log("warning", "Language detection failed.")
|
||||
ocr_pages = self._complete_ocr_default_language(
|
||||
images, sample_page_index, sample_page_text)
|
||||
|
||||
elif ISO639[guessed_language] == settings.OCR_LANGUAGE:
|
||||
self.log(
|
||||
"debug",
|
||||
f"Detected language: {guessed_language} "
|
||||
f"(default language)")
|
||||
ocr_pages = self._complete_ocr_default_language(
|
||||
images, sample_page_index, sample_page_text)
|
||||
|
||||
elif not ISO639[guessed_language] in pyocr.get_available_tools()[0].get_available_languages(): # NOQA: E501
|
||||
self.log(
|
||||
"warning",
|
||||
f"Detected language {guessed_language} is not available "
|
||||
f"on this system.")
|
||||
ocr_pages = self._complete_ocr_default_language(
|
||||
images, sample_page_index, sample_page_text)
|
||||
|
||||
else:
|
||||
self.log("debug", f"Detected language: {guessed_language}")
|
||||
ocr_pages = self._ocr(images, ISO639[guessed_language])
|
||||
|
||||
self.log("debug", "OCR completed.")
|
||||
self._text = strip_excess_whitespace(" ".join(ocr_pages))
|
||||
return self._text
|
||||
|
||||
except OCRError as e:
|
||||
except Exception as e:
|
||||
# Anything else is probably serious.
|
||||
raise ParseError(e)
|
||||
|
||||
def _get_greyscale(self):
|
||||
"""
|
||||
Greyscale images are easier for Tesseract to OCR
|
||||
"""
|
||||
if not self._text:
|
||||
# This may happen for files that don't have any text.
|
||||
self.log(
|
||||
'warning',
|
||||
f"Document {self.document_path} does not have any text."
|
||||
f"This is probably an error or you tried to add an image "
|
||||
f"without text.")
|
||||
return ""
|
||||
|
||||
# Convert PDF to multiple PNMs
|
||||
input_file = self.document_path
|
||||
return self._text
|
||||
|
||||
if settings.OCR_PAGES == 1:
|
||||
input_file += "[0]"
|
||||
elif settings.OCR_PAGES > 1:
|
||||
input_file += f"[0-{settings.OCR_PAGES - 1}]"
|
||||
|
||||
self.log(
|
||||
"debug",
|
||||
f"Converting document {input_file} into greyscale images")
|
||||
|
||||
output_files = os.path.join(self.tempdir, "convert-%04d.pnm")
|
||||
|
||||
run_convert(density=settings.CONVERT_DENSITY,
|
||||
depth="8",
|
||||
type="grayscale",
|
||||
input_file=input_file,
|
||||
output_file=output_files,
|
||||
logging_group=self.logging_group)
|
||||
|
||||
# Get a list of converted images
|
||||
pnms = []
|
||||
for f in os.listdir(self.tempdir):
|
||||
if f.endswith(".pnm"):
|
||||
pnms.append(os.path.join(self.tempdir, f))
|
||||
|
||||
self.log("debug", f"Running unpaper on {len(pnms)} pages...")
|
||||
|
||||
# Run unpaper in parallel on converted images
|
||||
with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool:
|
||||
pnms = pool.map(run_unpaper, pnms)
|
||||
|
||||
return sorted(filter(lambda __: os.path.isfile(__), pnms))
|
||||
def get_archive_path(self):
|
||||
return self._archive_path
|
||||
|
||||
def _guess_language(self, text):
|
||||
try:
|
||||
@@ -190,30 +136,11 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
self.log('warning', f"Language detection failed with: {e}")
|
||||
return None
|
||||
|
||||
def _ocr(self, imgs, lang):
|
||||
self.log(
|
||||
"debug",
|
||||
f"Performing OCR on {len(imgs)} page(s) with language {lang}")
|
||||
with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool:
|
||||
r = pool.map(image_to_string, itertools.product(imgs, [lang]))
|
||||
return r
|
||||
|
||||
def _complete_ocr_default_language(self,
|
||||
images,
|
||||
sample_page_index,
|
||||
sample_page):
|
||||
images_copy = list(images)
|
||||
del images_copy[sample_page_index]
|
||||
if images_copy:
|
||||
self.log('debug', "Continuing ocr with default language.")
|
||||
ocr_pages = self._ocr(images_copy, settings.OCR_LANGUAGE)
|
||||
ocr_pages.insert(sample_page_index, sample_page)
|
||||
return ocr_pages
|
||||
else:
|
||||
return [sample_page]
|
||||
|
||||
|
||||
def strip_excess_whitespace(text):
|
||||
if not text:
|
||||
return None
|
||||
|
||||
collapsed_spaces = re.sub(r"([^\S\r\n]+)", " ", text)
|
||||
no_leading_whitespace = re.sub(
|
||||
r"([\n\r]+)([^\S\n\r]+)", '\\1', collapsed_spaces)
|
||||
@@ -222,29 +149,14 @@ def strip_excess_whitespace(text):
|
||||
return no_trailing_whitespace
|
||||
|
||||
|
||||
def image_to_string(args):
|
||||
img, lang = args
|
||||
ocr = pyocr.get_available_tools()[0]
|
||||
with Image.open(img) as f:
|
||||
if ocr.can_detect_orientation():
|
||||
try:
|
||||
orientation = ocr.detect_orientation(f, lang=lang)
|
||||
f = f.rotate(orientation["angle"], expand=1)
|
||||
except Exception:
|
||||
# Rotation not possible, ignore
|
||||
pass
|
||||
try:
|
||||
return ocr.image_to_string(f, lang=lang)
|
||||
except PyocrException as e:
|
||||
raise OCRError(e)
|
||||
|
||||
|
||||
def get_text_from_pdf(pdf_file):
|
||||
|
||||
with open(pdf_file, "rb") as f:
|
||||
try:
|
||||
pdf = pdftotext.PDF(f)
|
||||
except pdftotext.Error:
|
||||
return ""
|
||||
return None
|
||||
|
||||
return "\n".join(pdf)
|
||||
text = "\n".join(pdf)
|
||||
|
||||
return strip_excess_whitespace(text)
|
||||
|
Reference in New Issue
Block a user