diff --git a/docs/changelog.rst b/docs/changelog.rst index cac7bb5e4..31bcdceed 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -38,6 +38,19 @@ paperless-ng 0.9.0 multi user solution, however, it allows more than one user to access the website and set some basic permissions / renew passwords. +* **Modified:** Changes to the consumer: + + * Now uses the excellent watchdog library that should make sure files are + discovered no matter what the platform is. + * The consumer now uses a task scheduler to run consumption processes in parallel. + This means that consuming many documents should be much faster on systems with + many cores. + * Concurrency is controlled with the new settings ``PAPERLESS_TASK_WORKERS`` + and ``PAPERLESS_THREADS_PER_WORKER``. See TODO for details on concurrency. + * The consumer no longer blocks the database for extended periods of time. + * An issue with tesseract running multiple threads per page and slowing down + the consumer was fixed. + * **Modified [breaking]:** REST Api changes: * New filters added, other filters removed (case sensitive filters, slug filters) @@ -64,8 +77,8 @@ paperless-ng 0.9.0 * Rework of the code of the tesseract parser. This is now a lot cleaner. * Rework of the filename handling code. It was a mess. * Fixed some issues with the document exporter not exporting all documents when encountering duplicate filenames. - * Consumer rework: now uses the excellent watchdog library, lots of code removed. - * Added a task scheduler that takes care of checking mail, training the classifier and maintaining the document search index. + * Added a task scheduler that takes care of checking mail, training the classifier, maintaining the document search index + and consuming documents. * Updated dependencies. Now uses Pipenv all around. * Updated Dockerfile and docker-compose. Now uses ``supervisord`` to run everything paperless-related in a single container. @@ -77,6 +90,8 @@ paperless-ng 0.9.0 * ``PAPERLESS_DEBUG`` defaults to ``false``. * The presence of ``PAPERLESS_DBHOST`` now determines whether to use PostgreSQL or sqlite. + * ``PAPERLESS_OCR_THREADS`` is gone and replaced with ``PAPERLESS_TASK_WORKERS`` and + ``PAPERLESS_THREADS_PER_WORKER``. See TODO for details. * Many more small changes here and there. The usual stuff. diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 6754ebf26..7f0fd7d21 100755 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -12,7 +12,7 @@ from django.utils import timezone from paperless.db import GnuPG from .classifier import DocumentClassifier, IncompatibleClassifierVersionError from .file_handling import generate_filename, create_source_path_directory -from .models import Document, FileInfo +from .models import Document, FileInfo, Correspondent, DocumentType, Tag from .parsers import ParseError, get_parser_class from .signals import ( document_consumption_finished, @@ -25,130 +25,196 @@ class ConsumerError(Exception): class Consumer: - """ - Loop over every file found in CONSUMPTION_DIR and: - 1. Convert it to a greyscale pnm - 2. Use tesseract on the pnm - 3. Store the document in the MEDIA_ROOT with optional encryption - 4. Store the OCR'd text in the database - 5. Delete the document and image(s) - """ - def __init__(self, consume=settings.CONSUMPTION_DIR, - scratch=settings.SCRATCH_DIR): + def __init__(self): self.logger = logging.getLogger(__name__) self.logging_group = None - self.consume = consume - self.scratch = scratch - - self.classifier = DocumentClassifier() - - os.makedirs(self.scratch, exist_ok=True) - self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED if settings.PASSPHRASE: self.storage_type = Document.STORAGE_TYPE_GPG - if not self.consume: + @staticmethod + def pre_check_file_exists(filename): + if not os.path.isfile(filename): + raise ConsumerError("Cannot consume {}: It is not a file".format( + filename)) + + @staticmethod + def pre_check_consumption_dir(): + if not settings.CONSUMPTION_DIR: raise ConsumerError( "The CONSUMPTION_DIR settings variable does not appear to be " - "set." + "set.") + + if not os.path.isdir(settings.CONSUMPTION_DIR): + raise ConsumerError( + "Consumption directory {} does not exist".format( + settings.CONSUMPTION_DIR)) + + @staticmethod + def pre_check_regex(filename): + if not re.match(FileInfo.REGEXES["title"], filename): + raise ConsumerError( + "Filename {} does not seem to be safe to " + "consume".format(filename)) + + @staticmethod + def pre_check_duplicate(filename): + with open(filename, "rb") as f: + checksum = hashlib.md5(f.read()).hexdigest() + if Document.objects.filter(checksum=checksum).exists(): + if settings.CONSUMER_DELETE_DUPLICATES: + os.unlink(filename) + raise ConsumerError( + "Not consuming {}: It is a duplicate.".format(filename) ) - if not os.path.exists(self.consume): - raise ConsumerError( - "Consumption directory {} does not exist".format(self.consume)) + @staticmethod + def pre_check_scratch_fir(): + os.makedirs(settings.SCRATCH_DIR, exist_ok=True) def log(self, level, message): getattr(self.logger, level)(message, extra={ "group": self.logging_group }) - @transaction.atomic - def try_consume_file(self, file): + def try_consume_file(self, + filename, + original_filename=None, + force_title=None, + force_correspondent_id=None, + force_document_type_id=None, + force_tag_ids=None): """ - Return True if file was consumed + Return the document object if it was successfully created. """ + # this is for grouping logging entries for this particular file + # together. + self.logging_group = uuid.uuid4() - if not re.match(FileInfo.REGEXES["title"], file): - return False + # Make sure that preconditions for consuming the file are met. - doc = file + self.pre_check_file_exists(filename) + self.pre_check_consumption_dir() + self.pre_check_scratch_fir() + self.pre_check_regex(filename) + self.pre_check_duplicate(filename) - if self._is_duplicate(doc): - self.log( - "warning", - "Skipping {} as it appears to be a duplicate".format(doc) - ) - if settings.CONSUMER_DELETE_DUPLICATES: - self._cleanup_doc(doc) - return False + self.log("info", "Consuming {}".format(filename)) - self.log("info", "Consuming {}".format(doc)) + # Determine the parser class. - parser_class = get_parser_class(doc) + parser_class = get_parser_class(original_filename or filename) if not parser_class: - self.log( - "error", "No parsers could be found for {}".format(doc)) - return False + raise ConsumerError("No parsers abvailable for {}".format(filename)) else: - self.log("info", "Parser: {}".format(parser_class.__name__)) + self.log("debug", "Parser: {}".format(parser_class.__name__)) + + # Notify all listeners that we're going to do some work. document_consumption_started.send( sender=self.__class__, - filename=doc, + filename=filename, logging_group=self.logging_group ) - document_parser = parser_class(doc, self.logging_group) + # This doesn't parse the document yet, but gives us a parser. + + document_parser = parser_class(filename, self.logging_group) + + # However, this already created working directories which we have to + # clean up. + + # Parse the document. This may take some time. try: - self.log("info", "Generating thumbnail for {}...".format(doc)) + self.log("debug", "Generating thumbnail for {}...".format(filename)) thumbnail = document_parser.get_optimised_thumbnail() + self.log("debug", "Parsing {}...".format(filename)) text = document_parser.get_text() date = document_parser.get_date() - document = self._store( - text, - doc, - thumbnail, - date - ) except ParseError as e: - self.log("fatal", "PARSE FAILURE for {}: {}".format(doc, e)) document_parser.cleanup() - return False - else: - document_parser.cleanup() - self._cleanup_doc(doc) + raise ConsumerError(e) - self.log( - "info", - "Document {} consumption finished".format(document) - ) + # Prepare the document classifier. + # TODO: I don't really like to do this here, but this way we avoid + # reloading the classifier multiple times, since there are multiple + # post-consume hooks that all require the classifier. + + try: + classifier = DocumentClassifier() + classifier.reload() + except (FileNotFoundError, IncompatibleClassifierVersionError) as e: + logging.getLogger(__name__).warning( + "Cannot classify documents: {}.".format(e)) classifier = None - try: - self.classifier.reload() - classifier = self.classifier - except (FileNotFoundError, IncompatibleClassifierVersionError) as e: - logging.getLogger(__name__).warning("Cannot classify documents: {}.".format(e)) + # now that everything is done, we can start to store the document + # in the system. This will be a transaction and reasonably fast. + try: + with transaction.atomic(): - document_consumption_finished.send( - sender=self.__class__, - document=document, - logging_group=self.logging_group, - classifier=classifier - ) - return True + # store the document. + document = self._store( + text=text, + doc=filename, + thumbnail=thumbnail, + date=date, + original_filename=original_filename, + force_title=force_title, + force_correspondent_id=force_correspondent_id, + force_document_type_id=force_document_type_id, + force_tag_ids=force_tag_ids + ) - def _store(self, text, doc, thumbnail, date): + # If we get here, it was successful. Proceed with post-consume + # hooks. If they fail, nothing will get changed. - file_info = FileInfo.from_path(doc) + document_consumption_finished.send( + sender=self.__class__, + document=document, + logging_group=self.logging_group, + classifier=classifier + ) + + # After everything is in the database, copy the files into + # place. If this fails, we'll also rollback the transaction. + + create_source_path_directory(document.source_path) + self._write(document, filename, document.source_path) + self._write(document, thumbnail, document.thumbnail_path) + + # Delete the file only if it was successfully consumed + self.log("debug", "Deleting document {}".format(filename)) + os.unlink(filename) + except Exception as e: + raise ConsumerError(e) + finally: + document_parser.cleanup() + + self.log( + "info", + "Document {} consumption finished".format(document) + ) + + return document + + def _store(self, text, doc, thumbnail, date, + original_filename=None, + force_title=None, + force_correspondent_id=None, + force_document_type_id=None, + force_tag_ids=None): + + # If someone gave us the original filename, use it instead of doc. + + file_info = FileInfo.from_path(original_filename or doc) stats = os.stat(doc) @@ -175,13 +241,21 @@ class Consumer: self.log("debug", "Tagging with {}".format(tag_names)) document.tags.add(*relevant_tags) + if force_title: + document.title = force_title + + if force_correspondent_id: + document.correspondent = Correspondent.objects.get(pk=force_correspondent_id) + + if force_document_type_id: + document.document_type = DocumentType.objects.get(pk=force_document_type_id) + + if force_tag_ids: + for tag_id in force_tag_ids: + document.tags.add(Tag.objects.get(pk=tag_id)) + document.filename = generate_filename(document) - create_source_path_directory(document.source_path) - - self._write(document, doc, document.source_path) - self._write(document, thumbnail, document.thumbnail_path) - # We need to save the document twice, since we need the PK of the # document in order to create its filename above. document.save() @@ -196,13 +270,3 @@ class Consumer: return self.log("debug", "Encrypting") write_file.write(GnuPG.encrypted(read_file)) - - def _cleanup_doc(self, doc): - self.log("debug", "Deleting document {}".format(doc)) - os.unlink(doc) - - @staticmethod - def _is_duplicate(doc): - with open(doc, "rb") as f: - checksum = hashlib.md5(f.read()).hexdigest() - return Document.objects.filter(checksum=checksum).exists() diff --git a/src/documents/forms.py b/src/documents/forms.py index a1e42dfea..ce79867a1 100644 --- a/src/documents/forms.py +++ b/src/documents/forms.py @@ -1,9 +1,11 @@ import os +import tempfile from datetime import datetime from time import mktime from django import forms from django.conf import settings +from django_q.tasks import async_task from pathvalidate import validate_filename, ValidationError @@ -18,15 +20,6 @@ class UploadForm(forms.Form): raise forms.ValidationError("That filename is suspicious.") return self.cleaned_data.get("document") - def get_filename(self, i=None): - return os.path.join( - settings.CONSUMPTION_DIR, - "{}_{}".format( - str(i), - self.cleaned_data.get("document").name - ) if i else self.cleaned_data.get("document").name - ) - def save(self): """ Since the consumer already does a lot of work, it's easier just to save @@ -35,15 +28,13 @@ class UploadForm(forms.Form): """ document = self.cleaned_data.get("document").read() + original_filename = self.cleaned_data.get("document").name t = int(mktime(datetime.now().timetuple())) - file_name = self.get_filename() - i = 0 - while os.path.exists(file_name): - i += 1 - file_name = self.get_filename(i) + with tempfile.NamedTemporaryFile(prefix="paperless-upload-", suffix=".pdf", dir=settings.SCRATCH_DIR, delete=False) as f: - with open(file_name, "wb") as f: f.write(document) - os.utime(file_name, times=(t, t)) + os.utime(f.name, times=(t, t)) + + async_task("documents.tasks.consume_file", f.name, original_filename, task_name=os.path.basename(original_filename)) diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index bb317a192..769d71af2 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -3,11 +3,10 @@ import os from django.conf import settings from django.core.management.base import BaseCommand +from django_q.tasks import async_task from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer -from documents.consumer import Consumer - try: from inotify_simple import INotify, flags except ImportError: @@ -16,13 +15,10 @@ except ImportError: class Handler(FileSystemEventHandler): - def __init__(self, consumer): - self.consumer = consumer - def _consume(self, file): if os.path.isfile(file): try: - self.consumer.try_consume_file(file) + async_task("documents.tasks.consume_file", file, task_name=os.path.basename(file)) except Exception as e: # Catch all so that the consumer won't crash. logging.getLogger(__name__).error("Error while consuming document: {}".format(e)) @@ -49,8 +45,6 @@ class Command(BaseCommand): self.mail_fetcher = None self.first_iteration = True - self.consumer = Consumer() - BaseCommand.__init__(self, *args, **kwargs) def add_arguments(self, parser): @@ -78,11 +72,11 @@ class Command(BaseCommand): # Consume all files as this is not done initially by the watchdog for entry in os.scandir(directory): if entry.is_file(): - self.consumer.try_consume_file(entry.path) + async_task("documents.tasks.consume_file", entry.path, task_name=os.path.basename(entry.path)) # Start the watchdog. Woof! observer = Observer() - event_handler = Handler(self.consumer) + event_handler = Handler() observer.schedule(event_handler, directory, recursive=True) observer.start() try: diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 9a3a0d7b8..b803704c4 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -6,6 +6,7 @@ from whoosh.writing import AsyncWriter from documents import index from documents.classifier import DocumentClassifier, \ IncompatibleClassifierVersionError +from documents.consumer import Consumer, ConsumerError from documents.mail import MailFetcher from documents.models import Document @@ -54,3 +55,27 @@ def train_classifier(): logging.getLogger(__name__).error( "Classifier error: " + str(e) ) + + +def consume_file(file, + original_filename=None, + force_title=None, + force_correspondent_id=None, + force_document_type_id=None, + force_tag_ids=None): + + document = Consumer().try_consume_file( + file, + original_filename=original_filename, + force_title=force_title, + force_correspondent_id=force_correspondent_id, + force_document_type_id=force_document_type_id, + force_tag_ids=force_tag_ids) + + if document: + return "Success. New document id {} created".format( + document.pk + ) + else: + raise ConsumerError("Unknown error: Returned document was null, but " + "no error message was given.") diff --git a/src/paperless/settings.py b/src/paperless/settings.py index dda85e039..7712844d0 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -1,4 +1,5 @@ import json +import math import multiprocessing import os import re @@ -262,6 +263,26 @@ LOGGING = { # Task queue # ############################################################################### + +# Sensible defaults for multitasking: +# use a fair balance between worker processes and threads epr worker so that +# both consuming many documents in parallel and consuming large documents is +# reasonably fast. +# Favors threads per worker on smaller systems and never exceeds cpu_count() +# in total. + +def default_task_workers(): + try: + return max( + math.floor(math.sqrt(multiprocessing.cpu_count())), + 1 + ) + except NotImplementedError: + return 1 + + +TASK_WORKERS = int(os.getenv("PAPERLESS_TASK_WORKERS", default_task_workers())) + Q_CLUSTER = { 'name': 'paperless', 'catch_up': False, @@ -278,8 +299,6 @@ CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES # documents. It should be a 3-letter language code consistent with ISO 639. OCR_LANGUAGE = os.getenv("PAPERLESS_OCR_LANGUAGE", "eng") -# The amount of threads to use for OCR -OCR_THREADS = int(os.getenv("PAPERLESS_OCR_THREADS", multiprocessing.cpu_count())) # OCR all documents? OCR_ALWAYS = __get_boolean("PAPERLESS_OCR_ALWAYS", "false") diff --git a/src/paperless_tesseract/parsers.py b/src/paperless_tesseract/parsers.py index 4018e853a..d07f9e4b3 100644 --- a/src/paperless_tesseract/parsers.py +++ b/src/paperless_tesseract/parsers.py @@ -2,7 +2,7 @@ import itertools import os import re import subprocess -from multiprocessing.pool import Pool +from multiprocessing.pool import ThreadPool import langdetect import pdftotext @@ -151,7 +151,7 @@ class RasterisedDocumentParser(DocumentParser): self.log("info", "Running unpaper on {} pages...".format(len(pnms))) # Run unpaper in parallel on converted images - with Pool(processes=settings.OCR_THREADS) as pool: + with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool: pnms = pool.map(run_unpaper, pnms) return sorted(filter(lambda __: os.path.isfile(__), pnms)) @@ -166,7 +166,7 @@ class RasterisedDocumentParser(DocumentParser): def _ocr(self, imgs, lang): self.log("info", "Performing OCR on {} page(s) with language {}".format(len(imgs), lang)) - with Pool(processes=settings.OCR_THREADS) as pool: + with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool: r = pool.map(image_to_string, itertools.product(imgs, [lang])) return r