first version of the new consumer.

This commit is contained in:
Jonas Winkler 2020-11-16 18:26:54 +01:00
parent 011bc9916e
commit 8dca459573
7 changed files with 231 additions and 123 deletions

View File

@ -38,6 +38,19 @@ paperless-ng 0.9.0
multi user solution, however, it allows more than one user to access the website multi user solution, however, it allows more than one user to access the website
and set some basic permissions / renew passwords. and set some basic permissions / renew passwords.
* **Modified:** Changes to the consumer:
* Now uses the excellent watchdog library that should make sure files are
discovered no matter what the platform is.
* The consumer now uses a task scheduler to run consumption processes in parallel.
This means that consuming many documents should be much faster on systems with
many cores.
* Concurrency is controlled with the new settings ``PAPERLESS_TASK_WORKERS``
and ``PAPERLESS_THREADS_PER_WORKER``. See TODO for details on concurrency.
* The consumer no longer blocks the database for extended periods of time.
* An issue with tesseract running multiple threads per page and slowing down
the consumer was fixed.
* **Modified [breaking]:** REST Api changes: * **Modified [breaking]:** REST Api changes:
* New filters added, other filters removed (case sensitive filters, slug filters) * New filters added, other filters removed (case sensitive filters, slug filters)
@ -64,8 +77,8 @@ paperless-ng 0.9.0
* Rework of the code of the tesseract parser. This is now a lot cleaner. * Rework of the code of the tesseract parser. This is now a lot cleaner.
* Rework of the filename handling code. It was a mess. * Rework of the filename handling code. It was a mess.
* Fixed some issues with the document exporter not exporting all documents when encountering duplicate filenames. * Fixed some issues with the document exporter not exporting all documents when encountering duplicate filenames.
* Consumer rework: now uses the excellent watchdog library, lots of code removed. * Added a task scheduler that takes care of checking mail, training the classifier, maintaining the document search index
* Added a task scheduler that takes care of checking mail, training the classifier and maintaining the document search index. and consuming documents.
* Updated dependencies. Now uses Pipenv all around. * Updated dependencies. Now uses Pipenv all around.
* Updated Dockerfile and docker-compose. Now uses ``supervisord`` to run everything paperless-related in a single container. * Updated Dockerfile and docker-compose. Now uses ``supervisord`` to run everything paperless-related in a single container.
@ -77,6 +90,8 @@ paperless-ng 0.9.0
* ``PAPERLESS_DEBUG`` defaults to ``false``. * ``PAPERLESS_DEBUG`` defaults to ``false``.
* The presence of ``PAPERLESS_DBHOST`` now determines whether to use PostgreSQL or * The presence of ``PAPERLESS_DBHOST`` now determines whether to use PostgreSQL or
sqlite. sqlite.
* ``PAPERLESS_OCR_THREADS`` is gone and replaced with ``PAPERLESS_TASK_WORKERS`` and
``PAPERLESS_THREADS_PER_WORKER``. See TODO for details.
* Many more small changes here and there. The usual stuff. * Many more small changes here and there. The usual stuff.

View File

@ -12,7 +12,7 @@ from django.utils import timezone
from paperless.db import GnuPG from paperless.db import GnuPG
from .classifier import DocumentClassifier, IncompatibleClassifierVersionError from .classifier import DocumentClassifier, IncompatibleClassifierVersionError
from .file_handling import generate_filename, create_source_path_directory from .file_handling import generate_filename, create_source_path_directory
from .models import Document, FileInfo from .models import Document, FileInfo, Correspondent, DocumentType, Tag
from .parsers import ParseError, get_parser_class from .parsers import ParseError, get_parser_class
from .signals import ( from .signals import (
document_consumption_finished, document_consumption_finished,
@ -25,130 +25,196 @@ class ConsumerError(Exception):
class Consumer: class Consumer:
"""
Loop over every file found in CONSUMPTION_DIR and:
1. Convert it to a greyscale pnm
2. Use tesseract on the pnm
3. Store the document in the MEDIA_ROOT with optional encryption
4. Store the OCR'd text in the database
5. Delete the document and image(s)
"""
def __init__(self, consume=settings.CONSUMPTION_DIR, def __init__(self):
scratch=settings.SCRATCH_DIR):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.logging_group = None self.logging_group = None
self.consume = consume
self.scratch = scratch
self.classifier = DocumentClassifier()
os.makedirs(self.scratch, exist_ok=True)
self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
if settings.PASSPHRASE: if settings.PASSPHRASE:
self.storage_type = Document.STORAGE_TYPE_GPG self.storage_type = Document.STORAGE_TYPE_GPG
if not self.consume: @staticmethod
def pre_check_file_exists(filename):
if not os.path.isfile(filename):
raise ConsumerError("Cannot consume {}: It is not a file".format(
filename))
@staticmethod
def pre_check_consumption_dir():
if not settings.CONSUMPTION_DIR:
raise ConsumerError( raise ConsumerError(
"The CONSUMPTION_DIR settings variable does not appear to be " "The CONSUMPTION_DIR settings variable does not appear to be "
"set." "set.")
if not os.path.isdir(settings.CONSUMPTION_DIR):
raise ConsumerError(
"Consumption directory {} does not exist".format(
settings.CONSUMPTION_DIR))
@staticmethod
def pre_check_regex(filename):
if not re.match(FileInfo.REGEXES["title"], filename):
raise ConsumerError(
"Filename {} does not seem to be safe to "
"consume".format(filename))
@staticmethod
def pre_check_duplicate(filename):
with open(filename, "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
if Document.objects.filter(checksum=checksum).exists():
if settings.CONSUMER_DELETE_DUPLICATES:
os.unlink(filename)
raise ConsumerError(
"Not consuming {}: It is a duplicate.".format(filename)
) )
if not os.path.exists(self.consume): @staticmethod
raise ConsumerError( def pre_check_scratch_fir():
"Consumption directory {} does not exist".format(self.consume)) os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
def log(self, level, message): def log(self, level, message):
getattr(self.logger, level)(message, extra={ getattr(self.logger, level)(message, extra={
"group": self.logging_group "group": self.logging_group
}) })
@transaction.atomic def try_consume_file(self,
def try_consume_file(self, file): filename,
original_filename=None,
force_title=None,
force_correspondent_id=None,
force_document_type_id=None,
force_tag_ids=None):
""" """
Return True if file was consumed Return the document object if it was successfully created.
""" """
# this is for grouping logging entries for this particular file
# together.
self.logging_group = uuid.uuid4() self.logging_group = uuid.uuid4()
if not re.match(FileInfo.REGEXES["title"], file): # Make sure that preconditions for consuming the file are met.
return False
doc = file self.pre_check_file_exists(filename)
self.pre_check_consumption_dir()
self.pre_check_scratch_fir()
self.pre_check_regex(filename)
self.pre_check_duplicate(filename)
if self._is_duplicate(doc): self.log("info", "Consuming {}".format(filename))
self.log(
"warning",
"Skipping {} as it appears to be a duplicate".format(doc)
)
if settings.CONSUMER_DELETE_DUPLICATES:
self._cleanup_doc(doc)
return False
self.log("info", "Consuming {}".format(doc)) # Determine the parser class.
parser_class = get_parser_class(doc) parser_class = get_parser_class(original_filename or filename)
if not parser_class: if not parser_class:
self.log( raise ConsumerError("No parsers abvailable for {}".format(filename))
"error", "No parsers could be found for {}".format(doc))
return False
else: else:
self.log("info", "Parser: {}".format(parser_class.__name__)) self.log("debug", "Parser: {}".format(parser_class.__name__))
# Notify all listeners that we're going to do some work.
document_consumption_started.send( document_consumption_started.send(
sender=self.__class__, sender=self.__class__,
filename=doc, filename=filename,
logging_group=self.logging_group logging_group=self.logging_group
) )
document_parser = parser_class(doc, self.logging_group) # This doesn't parse the document yet, but gives us a parser.
document_parser = parser_class(filename, self.logging_group)
# However, this already created working directories which we have to
# clean up.
# Parse the document. This may take some time.
try: try:
self.log("info", "Generating thumbnail for {}...".format(doc)) self.log("debug", "Generating thumbnail for {}...".format(filename))
thumbnail = document_parser.get_optimised_thumbnail() thumbnail = document_parser.get_optimised_thumbnail()
self.log("debug", "Parsing {}...".format(filename))
text = document_parser.get_text() text = document_parser.get_text()
date = document_parser.get_date() date = document_parser.get_date()
document = self._store(
text,
doc,
thumbnail,
date
)
except ParseError as e: except ParseError as e:
self.log("fatal", "PARSE FAILURE for {}: {}".format(doc, e))
document_parser.cleanup() document_parser.cleanup()
return False raise ConsumerError(e)
else:
document_parser.cleanup()
self._cleanup_doc(doc)
self.log( # Prepare the document classifier.
"info",
"Document {} consumption finished".format(document)
)
# TODO: I don't really like to do this here, but this way we avoid
# reloading the classifier multiple times, since there are multiple
# post-consume hooks that all require the classifier.
try:
classifier = DocumentClassifier()
classifier.reload()
except (FileNotFoundError, IncompatibleClassifierVersionError) as e:
logging.getLogger(__name__).warning(
"Cannot classify documents: {}.".format(e))
classifier = None classifier = None
try: # now that everything is done, we can start to store the document
self.classifier.reload() # in the system. This will be a transaction and reasonably fast.
classifier = self.classifier try:
except (FileNotFoundError, IncompatibleClassifierVersionError) as e: with transaction.atomic():
logging.getLogger(__name__).warning("Cannot classify documents: {}.".format(e))
document_consumption_finished.send( # store the document.
sender=self.__class__, document = self._store(
document=document, text=text,
logging_group=self.logging_group, doc=filename,
classifier=classifier thumbnail=thumbnail,
) date=date,
return True original_filename=original_filename,
force_title=force_title,
force_correspondent_id=force_correspondent_id,
force_document_type_id=force_document_type_id,
force_tag_ids=force_tag_ids
)
def _store(self, text, doc, thumbnail, date): # If we get here, it was successful. Proceed with post-consume
# hooks. If they fail, nothing will get changed.
file_info = FileInfo.from_path(doc) document_consumption_finished.send(
sender=self.__class__,
document=document,
logging_group=self.logging_group,
classifier=classifier
)
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
create_source_path_directory(document.source_path)
self._write(document, filename, document.source_path)
self._write(document, thumbnail, document.thumbnail_path)
# Delete the file only if it was successfully consumed
self.log("debug", "Deleting document {}".format(filename))
os.unlink(filename)
except Exception as e:
raise ConsumerError(e)
finally:
document_parser.cleanup()
self.log(
"info",
"Document {} consumption finished".format(document)
)
return document
def _store(self, text, doc, thumbnail, date,
original_filename=None,
force_title=None,
force_correspondent_id=None,
force_document_type_id=None,
force_tag_ids=None):
# If someone gave us the original filename, use it instead of doc.
file_info = FileInfo.from_path(original_filename or doc)
stats = os.stat(doc) stats = os.stat(doc)
@ -175,13 +241,21 @@ class Consumer:
self.log("debug", "Tagging with {}".format(tag_names)) self.log("debug", "Tagging with {}".format(tag_names))
document.tags.add(*relevant_tags) document.tags.add(*relevant_tags)
if force_title:
document.title = force_title
if force_correspondent_id:
document.correspondent = Correspondent.objects.get(pk=force_correspondent_id)
if force_document_type_id:
document.document_type = DocumentType.objects.get(pk=force_document_type_id)
if force_tag_ids:
for tag_id in force_tag_ids:
document.tags.add(Tag.objects.get(pk=tag_id))
document.filename = generate_filename(document) document.filename = generate_filename(document)
create_source_path_directory(document.source_path)
self._write(document, doc, document.source_path)
self._write(document, thumbnail, document.thumbnail_path)
# We need to save the document twice, since we need the PK of the # We need to save the document twice, since we need the PK of the
# document in order to create its filename above. # document in order to create its filename above.
document.save() document.save()
@ -196,13 +270,3 @@ class Consumer:
return return
self.log("debug", "Encrypting") self.log("debug", "Encrypting")
write_file.write(GnuPG.encrypted(read_file)) write_file.write(GnuPG.encrypted(read_file))
def _cleanup_doc(self, doc):
self.log("debug", "Deleting document {}".format(doc))
os.unlink(doc)
@staticmethod
def _is_duplicate(doc):
with open(doc, "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
return Document.objects.filter(checksum=checksum).exists()

View File

@ -1,9 +1,11 @@
import os import os
import tempfile
from datetime import datetime from datetime import datetime
from time import mktime from time import mktime
from django import forms from django import forms
from django.conf import settings from django.conf import settings
from django_q.tasks import async_task
from pathvalidate import validate_filename, ValidationError from pathvalidate import validate_filename, ValidationError
@ -18,15 +20,6 @@ class UploadForm(forms.Form):
raise forms.ValidationError("That filename is suspicious.") raise forms.ValidationError("That filename is suspicious.")
return self.cleaned_data.get("document") return self.cleaned_data.get("document")
def get_filename(self, i=None):
return os.path.join(
settings.CONSUMPTION_DIR,
"{}_{}".format(
str(i),
self.cleaned_data.get("document").name
) if i else self.cleaned_data.get("document").name
)
def save(self): def save(self):
""" """
Since the consumer already does a lot of work, it's easier just to save Since the consumer already does a lot of work, it's easier just to save
@ -35,15 +28,13 @@ class UploadForm(forms.Form):
""" """
document = self.cleaned_data.get("document").read() document = self.cleaned_data.get("document").read()
original_filename = self.cleaned_data.get("document").name
t = int(mktime(datetime.now().timetuple())) t = int(mktime(datetime.now().timetuple()))
file_name = self.get_filename() with tempfile.NamedTemporaryFile(prefix="paperless-upload-", suffix=".pdf", dir=settings.SCRATCH_DIR, delete=False) as f:
i = 0
while os.path.exists(file_name):
i += 1
file_name = self.get_filename(i)
with open(file_name, "wb") as f:
f.write(document) f.write(document)
os.utime(file_name, times=(t, t)) os.utime(f.name, times=(t, t))
async_task("documents.tasks.consume_file", f.name, original_filename, task_name=os.path.basename(original_filename))

View File

@ -3,11 +3,10 @@ import os
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django_q.tasks import async_task
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer from watchdog.observers import Observer
from documents.consumer import Consumer
try: try:
from inotify_simple import INotify, flags from inotify_simple import INotify, flags
except ImportError: except ImportError:
@ -16,13 +15,10 @@ except ImportError:
class Handler(FileSystemEventHandler): class Handler(FileSystemEventHandler):
def __init__(self, consumer):
self.consumer = consumer
def _consume(self, file): def _consume(self, file):
if os.path.isfile(file): if os.path.isfile(file):
try: try:
self.consumer.try_consume_file(file) async_task("documents.tasks.consume_file", file, task_name=os.path.basename(file))
except Exception as e: except Exception as e:
# Catch all so that the consumer won't crash. # Catch all so that the consumer won't crash.
logging.getLogger(__name__).error("Error while consuming document: {}".format(e)) logging.getLogger(__name__).error("Error while consuming document: {}".format(e))
@ -49,8 +45,6 @@ class Command(BaseCommand):
self.mail_fetcher = None self.mail_fetcher = None
self.first_iteration = True self.first_iteration = True
self.consumer = Consumer()
BaseCommand.__init__(self, *args, **kwargs) BaseCommand.__init__(self, *args, **kwargs)
def add_arguments(self, parser): def add_arguments(self, parser):
@ -78,11 +72,11 @@ class Command(BaseCommand):
# Consume all files as this is not done initially by the watchdog # Consume all files as this is not done initially by the watchdog
for entry in os.scandir(directory): for entry in os.scandir(directory):
if entry.is_file(): if entry.is_file():
self.consumer.try_consume_file(entry.path) async_task("documents.tasks.consume_file", entry.path, task_name=os.path.basename(entry.path))
# Start the watchdog. Woof! # Start the watchdog. Woof!
observer = Observer() observer = Observer()
event_handler = Handler(self.consumer) event_handler = Handler()
observer.schedule(event_handler, directory, recursive=True) observer.schedule(event_handler, directory, recursive=True)
observer.start() observer.start()
try: try:

View File

@ -6,6 +6,7 @@ from whoosh.writing import AsyncWriter
from documents import index from documents import index
from documents.classifier import DocumentClassifier, \ from documents.classifier import DocumentClassifier, \
IncompatibleClassifierVersionError IncompatibleClassifierVersionError
from documents.consumer import Consumer, ConsumerError
from documents.mail import MailFetcher from documents.mail import MailFetcher
from documents.models import Document from documents.models import Document
@ -54,3 +55,27 @@ def train_classifier():
logging.getLogger(__name__).error( logging.getLogger(__name__).error(
"Classifier error: " + str(e) "Classifier error: " + str(e)
) )
def consume_file(file,
original_filename=None,
force_title=None,
force_correspondent_id=None,
force_document_type_id=None,
force_tag_ids=None):
document = Consumer().try_consume_file(
file,
original_filename=original_filename,
force_title=force_title,
force_correspondent_id=force_correspondent_id,
force_document_type_id=force_document_type_id,
force_tag_ids=force_tag_ids)
if document:
return "Success. New document id {} created".format(
document.pk
)
else:
raise ConsumerError("Unknown error: Returned document was null, but "
"no error message was given.")

View File

@ -1,4 +1,5 @@
import json import json
import math
import multiprocessing import multiprocessing
import os import os
import re import re
@ -262,6 +263,26 @@ LOGGING = {
# Task queue # # Task queue #
############################################################################### ###############################################################################
# Sensible defaults for multitasking:
# use a fair balance between worker processes and threads epr worker so that
# both consuming many documents in parallel and consuming large documents is
# reasonably fast.
# Favors threads per worker on smaller systems and never exceeds cpu_count()
# in total.
def default_task_workers():
try:
return max(
math.floor(math.sqrt(multiprocessing.cpu_count())),
1
)
except NotImplementedError:
return 1
TASK_WORKERS = int(os.getenv("PAPERLESS_TASK_WORKERS", default_task_workers()))
Q_CLUSTER = { Q_CLUSTER = {
'name': 'paperless', 'name': 'paperless',
'catch_up': False, 'catch_up': False,
@ -278,8 +299,6 @@ CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES
# documents. It should be a 3-letter language code consistent with ISO 639. # documents. It should be a 3-letter language code consistent with ISO 639.
OCR_LANGUAGE = os.getenv("PAPERLESS_OCR_LANGUAGE", "eng") OCR_LANGUAGE = os.getenv("PAPERLESS_OCR_LANGUAGE", "eng")
# The amount of threads to use for OCR
OCR_THREADS = int(os.getenv("PAPERLESS_OCR_THREADS", multiprocessing.cpu_count()))
# OCR all documents? # OCR all documents?
OCR_ALWAYS = __get_boolean("PAPERLESS_OCR_ALWAYS", "false") OCR_ALWAYS = __get_boolean("PAPERLESS_OCR_ALWAYS", "false")

View File

@ -2,7 +2,7 @@ import itertools
import os import os
import re import re
import subprocess import subprocess
from multiprocessing.pool import Pool from multiprocessing.pool import ThreadPool
import langdetect import langdetect
import pdftotext import pdftotext
@ -151,7 +151,7 @@ class RasterisedDocumentParser(DocumentParser):
self.log("info", "Running unpaper on {} pages...".format(len(pnms))) self.log("info", "Running unpaper on {} pages...".format(len(pnms)))
# Run unpaper in parallel on converted images # Run unpaper in parallel on converted images
with Pool(processes=settings.OCR_THREADS) as pool: with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool:
pnms = pool.map(run_unpaper, pnms) pnms = pool.map(run_unpaper, pnms)
return sorted(filter(lambda __: os.path.isfile(__), pnms)) return sorted(filter(lambda __: os.path.isfile(__), pnms))
@ -166,7 +166,7 @@ class RasterisedDocumentParser(DocumentParser):
def _ocr(self, imgs, lang): def _ocr(self, imgs, lang):
self.log("info", "Performing OCR on {} page(s) with language {}".format(len(imgs), lang)) self.log("info", "Performing OCR on {} page(s) with language {}".format(len(imgs), lang))
with Pool(processes=settings.OCR_THREADS) as pool: with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool:
r = pool.map(image_to_string, itertools.product(imgs, [lang])) r = pool.map(image_to_string, itertools.product(imgs, [lang]))
return r return r