From 8dca459573d6c425443010dd8c97b0e22370e5cb Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Mon, 16 Nov 2020 18:26:54 +0100 Subject: [PATCH 1/7] first version of the new consumer. --- docs/changelog.rst | 19 +- src/documents/consumer.py | 244 +++++++++++------- src/documents/forms.py | 23 +- .../management/commands/document_consumer.py | 14 +- src/documents/tasks.py | 25 ++ src/paperless/settings.py | 23 +- src/paperless_tesseract/parsers.py | 6 +- 7 files changed, 231 insertions(+), 123 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index cac7bb5e4..31bcdceed 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -38,6 +38,19 @@ paperless-ng 0.9.0 multi user solution, however, it allows more than one user to access the website and set some basic permissions / renew passwords. +* **Modified:** Changes to the consumer: + + * Now uses the excellent watchdog library that should make sure files are + discovered no matter what the platform is. + * The consumer now uses a task scheduler to run consumption processes in parallel. + This means that consuming many documents should be much faster on systems with + many cores. + * Concurrency is controlled with the new settings ``PAPERLESS_TASK_WORKERS`` + and ``PAPERLESS_THREADS_PER_WORKER``. See TODO for details on concurrency. + * The consumer no longer blocks the database for extended periods of time. + * An issue with tesseract running multiple threads per page and slowing down + the consumer was fixed. + * **Modified [breaking]:** REST Api changes: * New filters added, other filters removed (case sensitive filters, slug filters) @@ -64,8 +77,8 @@ paperless-ng 0.9.0 * Rework of the code of the tesseract parser. This is now a lot cleaner. * Rework of the filename handling code. It was a mess. * Fixed some issues with the document exporter not exporting all documents when encountering duplicate filenames. - * Consumer rework: now uses the excellent watchdog library, lots of code removed. - * Added a task scheduler that takes care of checking mail, training the classifier and maintaining the document search index. + * Added a task scheduler that takes care of checking mail, training the classifier, maintaining the document search index + and consuming documents. * Updated dependencies. Now uses Pipenv all around. * Updated Dockerfile and docker-compose. Now uses ``supervisord`` to run everything paperless-related in a single container. @@ -77,6 +90,8 @@ paperless-ng 0.9.0 * ``PAPERLESS_DEBUG`` defaults to ``false``. * The presence of ``PAPERLESS_DBHOST`` now determines whether to use PostgreSQL or sqlite. + * ``PAPERLESS_OCR_THREADS`` is gone and replaced with ``PAPERLESS_TASK_WORKERS`` and + ``PAPERLESS_THREADS_PER_WORKER``. See TODO for details. * Many more small changes here and there. The usual stuff. diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 6754ebf26..7f0fd7d21 100755 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -12,7 +12,7 @@ from django.utils import timezone from paperless.db import GnuPG from .classifier import DocumentClassifier, IncompatibleClassifierVersionError from .file_handling import generate_filename, create_source_path_directory -from .models import Document, FileInfo +from .models import Document, FileInfo, Correspondent, DocumentType, Tag from .parsers import ParseError, get_parser_class from .signals import ( document_consumption_finished, @@ -25,130 +25,196 @@ class ConsumerError(Exception): class Consumer: - """ - Loop over every file found in CONSUMPTION_DIR and: - 1. Convert it to a greyscale pnm - 2. Use tesseract on the pnm - 3. Store the document in the MEDIA_ROOT with optional encryption - 4. Store the OCR'd text in the database - 5. Delete the document and image(s) - """ - def __init__(self, consume=settings.CONSUMPTION_DIR, - scratch=settings.SCRATCH_DIR): + def __init__(self): self.logger = logging.getLogger(__name__) self.logging_group = None - self.consume = consume - self.scratch = scratch - - self.classifier = DocumentClassifier() - - os.makedirs(self.scratch, exist_ok=True) - self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED if settings.PASSPHRASE: self.storage_type = Document.STORAGE_TYPE_GPG - if not self.consume: + @staticmethod + def pre_check_file_exists(filename): + if not os.path.isfile(filename): + raise ConsumerError("Cannot consume {}: It is not a file".format( + filename)) + + @staticmethod + def pre_check_consumption_dir(): + if not settings.CONSUMPTION_DIR: raise ConsumerError( "The CONSUMPTION_DIR settings variable does not appear to be " - "set." + "set.") + + if not os.path.isdir(settings.CONSUMPTION_DIR): + raise ConsumerError( + "Consumption directory {} does not exist".format( + settings.CONSUMPTION_DIR)) + + @staticmethod + def pre_check_regex(filename): + if not re.match(FileInfo.REGEXES["title"], filename): + raise ConsumerError( + "Filename {} does not seem to be safe to " + "consume".format(filename)) + + @staticmethod + def pre_check_duplicate(filename): + with open(filename, "rb") as f: + checksum = hashlib.md5(f.read()).hexdigest() + if Document.objects.filter(checksum=checksum).exists(): + if settings.CONSUMER_DELETE_DUPLICATES: + os.unlink(filename) + raise ConsumerError( + "Not consuming {}: It is a duplicate.".format(filename) ) - if not os.path.exists(self.consume): - raise ConsumerError( - "Consumption directory {} does not exist".format(self.consume)) + @staticmethod + def pre_check_scratch_fir(): + os.makedirs(settings.SCRATCH_DIR, exist_ok=True) def log(self, level, message): getattr(self.logger, level)(message, extra={ "group": self.logging_group }) - @transaction.atomic - def try_consume_file(self, file): + def try_consume_file(self, + filename, + original_filename=None, + force_title=None, + force_correspondent_id=None, + force_document_type_id=None, + force_tag_ids=None): """ - Return True if file was consumed + Return the document object if it was successfully created. """ + # this is for grouping logging entries for this particular file + # together. + self.logging_group = uuid.uuid4() - if not re.match(FileInfo.REGEXES["title"], file): - return False + # Make sure that preconditions for consuming the file are met. - doc = file + self.pre_check_file_exists(filename) + self.pre_check_consumption_dir() + self.pre_check_scratch_fir() + self.pre_check_regex(filename) + self.pre_check_duplicate(filename) - if self._is_duplicate(doc): - self.log( - "warning", - "Skipping {} as it appears to be a duplicate".format(doc) - ) - if settings.CONSUMER_DELETE_DUPLICATES: - self._cleanup_doc(doc) - return False + self.log("info", "Consuming {}".format(filename)) - self.log("info", "Consuming {}".format(doc)) + # Determine the parser class. - parser_class = get_parser_class(doc) + parser_class = get_parser_class(original_filename or filename) if not parser_class: - self.log( - "error", "No parsers could be found for {}".format(doc)) - return False + raise ConsumerError("No parsers abvailable for {}".format(filename)) else: - self.log("info", "Parser: {}".format(parser_class.__name__)) + self.log("debug", "Parser: {}".format(parser_class.__name__)) + + # Notify all listeners that we're going to do some work. document_consumption_started.send( sender=self.__class__, - filename=doc, + filename=filename, logging_group=self.logging_group ) - document_parser = parser_class(doc, self.logging_group) + # This doesn't parse the document yet, but gives us a parser. + + document_parser = parser_class(filename, self.logging_group) + + # However, this already created working directories which we have to + # clean up. + + # Parse the document. This may take some time. try: - self.log("info", "Generating thumbnail for {}...".format(doc)) + self.log("debug", "Generating thumbnail for {}...".format(filename)) thumbnail = document_parser.get_optimised_thumbnail() + self.log("debug", "Parsing {}...".format(filename)) text = document_parser.get_text() date = document_parser.get_date() - document = self._store( - text, - doc, - thumbnail, - date - ) except ParseError as e: - self.log("fatal", "PARSE FAILURE for {}: {}".format(doc, e)) document_parser.cleanup() - return False - else: - document_parser.cleanup() - self._cleanup_doc(doc) + raise ConsumerError(e) - self.log( - "info", - "Document {} consumption finished".format(document) - ) + # Prepare the document classifier. + # TODO: I don't really like to do this here, but this way we avoid + # reloading the classifier multiple times, since there are multiple + # post-consume hooks that all require the classifier. + + try: + classifier = DocumentClassifier() + classifier.reload() + except (FileNotFoundError, IncompatibleClassifierVersionError) as e: + logging.getLogger(__name__).warning( + "Cannot classify documents: {}.".format(e)) classifier = None - try: - self.classifier.reload() - classifier = self.classifier - except (FileNotFoundError, IncompatibleClassifierVersionError) as e: - logging.getLogger(__name__).warning("Cannot classify documents: {}.".format(e)) + # now that everything is done, we can start to store the document + # in the system. This will be a transaction and reasonably fast. + try: + with transaction.atomic(): - document_consumption_finished.send( - sender=self.__class__, - document=document, - logging_group=self.logging_group, - classifier=classifier - ) - return True + # store the document. + document = self._store( + text=text, + doc=filename, + thumbnail=thumbnail, + date=date, + original_filename=original_filename, + force_title=force_title, + force_correspondent_id=force_correspondent_id, + force_document_type_id=force_document_type_id, + force_tag_ids=force_tag_ids + ) - def _store(self, text, doc, thumbnail, date): + # If we get here, it was successful. Proceed with post-consume + # hooks. If they fail, nothing will get changed. - file_info = FileInfo.from_path(doc) + document_consumption_finished.send( + sender=self.__class__, + document=document, + logging_group=self.logging_group, + classifier=classifier + ) + + # After everything is in the database, copy the files into + # place. If this fails, we'll also rollback the transaction. + + create_source_path_directory(document.source_path) + self._write(document, filename, document.source_path) + self._write(document, thumbnail, document.thumbnail_path) + + # Delete the file only if it was successfully consumed + self.log("debug", "Deleting document {}".format(filename)) + os.unlink(filename) + except Exception as e: + raise ConsumerError(e) + finally: + document_parser.cleanup() + + self.log( + "info", + "Document {} consumption finished".format(document) + ) + + return document + + def _store(self, text, doc, thumbnail, date, + original_filename=None, + force_title=None, + force_correspondent_id=None, + force_document_type_id=None, + force_tag_ids=None): + + # If someone gave us the original filename, use it instead of doc. + + file_info = FileInfo.from_path(original_filename or doc) stats = os.stat(doc) @@ -175,13 +241,21 @@ class Consumer: self.log("debug", "Tagging with {}".format(tag_names)) document.tags.add(*relevant_tags) + if force_title: + document.title = force_title + + if force_correspondent_id: + document.correspondent = Correspondent.objects.get(pk=force_correspondent_id) + + if force_document_type_id: + document.document_type = DocumentType.objects.get(pk=force_document_type_id) + + if force_tag_ids: + for tag_id in force_tag_ids: + document.tags.add(Tag.objects.get(pk=tag_id)) + document.filename = generate_filename(document) - create_source_path_directory(document.source_path) - - self._write(document, doc, document.source_path) - self._write(document, thumbnail, document.thumbnail_path) - # We need to save the document twice, since we need the PK of the # document in order to create its filename above. document.save() @@ -196,13 +270,3 @@ class Consumer: return self.log("debug", "Encrypting") write_file.write(GnuPG.encrypted(read_file)) - - def _cleanup_doc(self, doc): - self.log("debug", "Deleting document {}".format(doc)) - os.unlink(doc) - - @staticmethod - def _is_duplicate(doc): - with open(doc, "rb") as f: - checksum = hashlib.md5(f.read()).hexdigest() - return Document.objects.filter(checksum=checksum).exists() diff --git a/src/documents/forms.py b/src/documents/forms.py index a1e42dfea..ce79867a1 100644 --- a/src/documents/forms.py +++ b/src/documents/forms.py @@ -1,9 +1,11 @@ import os +import tempfile from datetime import datetime from time import mktime from django import forms from django.conf import settings +from django_q.tasks import async_task from pathvalidate import validate_filename, ValidationError @@ -18,15 +20,6 @@ class UploadForm(forms.Form): raise forms.ValidationError("That filename is suspicious.") return self.cleaned_data.get("document") - def get_filename(self, i=None): - return os.path.join( - settings.CONSUMPTION_DIR, - "{}_{}".format( - str(i), - self.cleaned_data.get("document").name - ) if i else self.cleaned_data.get("document").name - ) - def save(self): """ Since the consumer already does a lot of work, it's easier just to save @@ -35,15 +28,13 @@ class UploadForm(forms.Form): """ document = self.cleaned_data.get("document").read() + original_filename = self.cleaned_data.get("document").name t = int(mktime(datetime.now().timetuple())) - file_name = self.get_filename() - i = 0 - while os.path.exists(file_name): - i += 1 - file_name = self.get_filename(i) + with tempfile.NamedTemporaryFile(prefix="paperless-upload-", suffix=".pdf", dir=settings.SCRATCH_DIR, delete=False) as f: - with open(file_name, "wb") as f: f.write(document) - os.utime(file_name, times=(t, t)) + os.utime(f.name, times=(t, t)) + + async_task("documents.tasks.consume_file", f.name, original_filename, task_name=os.path.basename(original_filename)) diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index bb317a192..769d71af2 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -3,11 +3,10 @@ import os from django.conf import settings from django.core.management.base import BaseCommand +from django_q.tasks import async_task from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer -from documents.consumer import Consumer - try: from inotify_simple import INotify, flags except ImportError: @@ -16,13 +15,10 @@ except ImportError: class Handler(FileSystemEventHandler): - def __init__(self, consumer): - self.consumer = consumer - def _consume(self, file): if os.path.isfile(file): try: - self.consumer.try_consume_file(file) + async_task("documents.tasks.consume_file", file, task_name=os.path.basename(file)) except Exception as e: # Catch all so that the consumer won't crash. logging.getLogger(__name__).error("Error while consuming document: {}".format(e)) @@ -49,8 +45,6 @@ class Command(BaseCommand): self.mail_fetcher = None self.first_iteration = True - self.consumer = Consumer() - BaseCommand.__init__(self, *args, **kwargs) def add_arguments(self, parser): @@ -78,11 +72,11 @@ class Command(BaseCommand): # Consume all files as this is not done initially by the watchdog for entry in os.scandir(directory): if entry.is_file(): - self.consumer.try_consume_file(entry.path) + async_task("documents.tasks.consume_file", entry.path, task_name=os.path.basename(entry.path)) # Start the watchdog. Woof! observer = Observer() - event_handler = Handler(self.consumer) + event_handler = Handler() observer.schedule(event_handler, directory, recursive=True) observer.start() try: diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 9a3a0d7b8..b803704c4 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -6,6 +6,7 @@ from whoosh.writing import AsyncWriter from documents import index from documents.classifier import DocumentClassifier, \ IncompatibleClassifierVersionError +from documents.consumer import Consumer, ConsumerError from documents.mail import MailFetcher from documents.models import Document @@ -54,3 +55,27 @@ def train_classifier(): logging.getLogger(__name__).error( "Classifier error: " + str(e) ) + + +def consume_file(file, + original_filename=None, + force_title=None, + force_correspondent_id=None, + force_document_type_id=None, + force_tag_ids=None): + + document = Consumer().try_consume_file( + file, + original_filename=original_filename, + force_title=force_title, + force_correspondent_id=force_correspondent_id, + force_document_type_id=force_document_type_id, + force_tag_ids=force_tag_ids) + + if document: + return "Success. New document id {} created".format( + document.pk + ) + else: + raise ConsumerError("Unknown error: Returned document was null, but " + "no error message was given.") diff --git a/src/paperless/settings.py b/src/paperless/settings.py index dda85e039..7712844d0 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -1,4 +1,5 @@ import json +import math import multiprocessing import os import re @@ -262,6 +263,26 @@ LOGGING = { # Task queue # ############################################################################### + +# Sensible defaults for multitasking: +# use a fair balance between worker processes and threads epr worker so that +# both consuming many documents in parallel and consuming large documents is +# reasonably fast. +# Favors threads per worker on smaller systems and never exceeds cpu_count() +# in total. + +def default_task_workers(): + try: + return max( + math.floor(math.sqrt(multiprocessing.cpu_count())), + 1 + ) + except NotImplementedError: + return 1 + + +TASK_WORKERS = int(os.getenv("PAPERLESS_TASK_WORKERS", default_task_workers())) + Q_CLUSTER = { 'name': 'paperless', 'catch_up': False, @@ -278,8 +299,6 @@ CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES # documents. It should be a 3-letter language code consistent with ISO 639. OCR_LANGUAGE = os.getenv("PAPERLESS_OCR_LANGUAGE", "eng") -# The amount of threads to use for OCR -OCR_THREADS = int(os.getenv("PAPERLESS_OCR_THREADS", multiprocessing.cpu_count())) # OCR all documents? OCR_ALWAYS = __get_boolean("PAPERLESS_OCR_ALWAYS", "false") diff --git a/src/paperless_tesseract/parsers.py b/src/paperless_tesseract/parsers.py index 4018e853a..d07f9e4b3 100644 --- a/src/paperless_tesseract/parsers.py +++ b/src/paperless_tesseract/parsers.py @@ -2,7 +2,7 @@ import itertools import os import re import subprocess -from multiprocessing.pool import Pool +from multiprocessing.pool import ThreadPool import langdetect import pdftotext @@ -151,7 +151,7 @@ class RasterisedDocumentParser(DocumentParser): self.log("info", "Running unpaper on {} pages...".format(len(pnms))) # Run unpaper in parallel on converted images - with Pool(processes=settings.OCR_THREADS) as pool: + with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool: pnms = pool.map(run_unpaper, pnms) return sorted(filter(lambda __: os.path.isfile(__), pnms)) @@ -166,7 +166,7 @@ class RasterisedDocumentParser(DocumentParser): def _ocr(self, imgs, lang): self.log("info", "Performing OCR on {} page(s) with language {}".format(len(imgs), lang)) - with Pool(processes=settings.OCR_THREADS) as pool: + with ThreadPool(processes=settings.THREADS_PER_WORKER) as pool: r = pool.map(image_to_string, itertools.product(imgs, [lang])) return r From f6a926c9b122d3f5eefa2b88afe612cc532f39f7 Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Mon, 16 Nov 2020 18:37:12 +0100 Subject: [PATCH 2/7] added config options to conf example --- paperless.conf.example | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/paperless.conf.example b/paperless.conf.example index 1c62256ab..f825daa6b 100644 --- a/paperless.conf.example +++ b/paperless.conf.example @@ -143,6 +143,29 @@ PAPERLESS_EMAIL_SECRET="" #### Software Tweaks #### ############################################################################### +# Paperless does multiple things in the background: Maintain the search index, +# maintain the automatic matching algorithm, check emails, consume documents, +# etc. This variable specifies how many things it will do in parallel. +#PAPERLESS_TASK_WORKERS=1 + +# Furthermore, paperless uses multiple threads when consuming documents to +# speed up OCR. This variable specifies how many pages paperless will process +# in parallel on a single document. +#PAPERLESS_THREADS_PER_WORKER=1 + +# Ensure that the product +# PAPERLESS_TASK_WORKERS * PAPERLESS_THREADS_PER_WORKER +# does not exceed your CPU core count or else paperless will be extremely slow. +# If you want paperless to process many documents in parallel, choose a high +# worker count. If you want paperless to process very large documents faster, +# use a higher thread per worker count. +# The default is a balance between the two, according to your CPU core count, +# with a slight favor towards threads per worker, and using as much cores as +# possible. +# If you only specify PAPERLESS_TASK_WORKERS, paperless will adjust +# PAPERLESS_THREADS_PER_WORKER automatically. + + # When the consumer detects a duplicate document, it will not touch the # original document. This default behavior can be changed here. #PAPERLESS_CONSUMER_DELETE_DUPLICATES="false" @@ -186,12 +209,6 @@ PAPERLESS_EMAIL_SECRET="" # -# By default, Paperless will attempt to use all available CPU cores to process -# a document, but if you would like to limit that, you can set this value to -# an integer: -#PAPERLESS_OCR_THREADS=1 - - # Customize the default language that tesseract will attempt to use when # parsing documents. The default language is used whenever # - No language could be detected on a document From 31c4167535e3966c5d40b02322ac2e10a14962c7 Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Mon, 16 Nov 2020 18:52:13 +0100 Subject: [PATCH 3/7] added option for polling --- paperless.conf.example | 6 ++++++ .../management/commands/document_consumer.py | 8 +++++++- src/paperless/settings.py | 16 ++++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/paperless.conf.example b/paperless.conf.example index f825daa6b..c64385cbb 100644 --- a/paperless.conf.example +++ b/paperless.conf.example @@ -165,6 +165,12 @@ PAPERLESS_EMAIL_SECRET="" # If you only specify PAPERLESS_TASK_WORKERS, paperless will adjust # PAPERLESS_THREADS_PER_WORKER automatically. +# If paperless won't find documents added to your consume folder, it might +# not be able to automatically detect filesystem changes. In that case, +# specify a polling interval in seconds below, which will then cause paperless +# to periodically check your consumption directory for changes. +#PAPERLESS_CONSUMER_POLLING=10 + # When the consumer detects a duplicate document, it will not touch the # original document. This default behavior can be changed here. diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index 769d71af2..d991b722a 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -6,6 +6,7 @@ from django.core.management.base import BaseCommand from django_q.tasks import async_task from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer +from watchdog.observers.polling import PollingObserver try: from inotify_simple import INotify, flags @@ -75,7 +76,12 @@ class Command(BaseCommand): async_task("documents.tasks.consume_file", entry.path, task_name=os.path.basename(entry.path)) # Start the watchdog. Woof! - observer = Observer() + if settings.CONSUMER_POLLING > 0: + logging.getLogger(__name__).info('Using polling instead of file' + 'system notifications.') + observer = PollingObserver(timeout=settings.CONSUMER_POLLING) + else: + observer = Observer() event_handler = Handler() observer.schedule(event_handler, directory, recursive=True) observer.start() diff --git a/src/paperless/settings.py b/src/paperless/settings.py index 7712844d0..18acf401a 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -286,13 +286,29 @@ TASK_WORKERS = int(os.getenv("PAPERLESS_TASK_WORKERS", default_task_workers())) Q_CLUSTER = { 'name': 'paperless', 'catch_up': False, + 'workers': TASK_WORKERS, 'redis': os.getenv("PAPERLESS_REDIS", "redis://localhost:6379") } + +def default_threads_per_worker(): + try: + return max( + math.floor(multiprocessing.cpu_count() / TASK_WORKERS), + 1 + ) + except NotImplementedError: + return 1 + + +THREADS_PER_WORKER = os.getenv("PAPERLESS_THREADS_PER_WORKER", default_threads_per_worker()) + ############################################################################### # Paperless Specific Settings # ############################################################################### +CONSUMER_POLLING = int(os.getenv("PAPERLESS_CONSUMER_POLLING", 0)) + CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES") # The default language that tesseract will attempt to use when parsing From 084f0b6a0f43bdd0b5e54f05e4e57d3ed90755de Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Mon, 16 Nov 2020 19:11:18 +0100 Subject: [PATCH 4/7] added some documentation --- docs/troubleshooting.rst | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/docs/troubleshooting.rst b/docs/troubleshooting.rst index fc652689e..9e1c42f4a 100644 --- a/docs/troubleshooting.rst +++ b/docs/troubleshooting.rst @@ -2,9 +2,38 @@ Troubleshooting *************** -.. warning:: +No files are added by the consumer +################################## + +Check for the following issues: + +* Ensure that the directory you're putting your documents in is the folder + paperless is watching. With docker, this setting is performed in the + ``docker-compose.yml`` file. Without docker, look at the ``CONSUMPTION_DIR`` + setting. Don't adjust this setting if you're using docker. +* Ensure that redis is up and running. Paperless does its task processing + asynchronously, and for documents to arrive at the task processor, it needs + redis to run. +* Ensure that the task processor is running. Docker does this automatically. + Manually invoke the task processor by executing + + .. code:: shell-session + + $ python3 manage.py qcluster + +* Look at the output of paperless and inspect it for any errors. +* Go to the admin interface, and check if there are failed tasks. If so, the + tasks will contain an error message. + + +Consumer fails to pickup any new files +###################################### + +If you notice, that the consumer will only pickup files in the consumption +directory at startup, but won't find any other files added later, check out +the configuration file and enable filesystem polling with the setting +``PAPERLESS_CONSUMER_POLLING``. - This section is not updated to paperless-ng yet. Consumer warns ``OCR for XX failed`` #################################### From 70d8e8bc56b11146e764f1b3e78246d8081c9985 Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Mon, 16 Nov 2020 23:16:37 +0100 Subject: [PATCH 5/7] added more testing --- src/documents/consumer.py | 6 +- .../management/commands/document_consumer.py | 3 - src/documents/tests/test_consumer.py | 261 +++++++++++++++++- src/paperless/settings.py | 1 + src/setup.cfg | 1 - 5 files changed, 264 insertions(+), 8 deletions(-) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 7f0fd7d21..6239e7d2a 100755 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -72,8 +72,10 @@ class Consumer: ) @staticmethod - def pre_check_scratch_fir(): + def pre_check_directories(): os.makedirs(settings.SCRATCH_DIR, exist_ok=True) + os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True) + os.makedirs(settings.ORIGINALS_DIR, exist_ok=True) def log(self, level, message): getattr(self.logger, level)(message, extra={ @@ -100,7 +102,7 @@ class Consumer: self.pre_check_file_exists(filename) self.pre_check_consumption_dir() - self.pre_check_scratch_fir() + self.pre_check_directories() self.pre_check_regex(filename) self.pre_check_duplicate(filename) diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index d991b722a..ec48daa96 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -61,9 +61,6 @@ class Command(BaseCommand): self.verbosity = options["verbosity"] directory = options["directory"] - for d in (settings.ORIGINALS_DIR, settings.THUMBNAIL_DIR): - os.makedirs(d, exist_ok=True) - logging.getLogger(__name__).info( "Starting document consumer at {}".format( directory diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 4d5360c7b..1661eef5b 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -1,8 +1,17 @@ +import os import re +import shutil +import tempfile +from unittest import mock +from unittest.mock import MagicMock -from django.test import TestCase +from django.conf import settings +from django.db import DatabaseError +from django.test import TestCase, override_settings -from ..models import FileInfo, Tag +from ..consumer import Consumer, ConsumerError +from ..models import FileInfo, Tag, Correspondent, DocumentType, Document +from ..parsers import DocumentParser, ParseError class TestAttributes(TestCase): @@ -394,3 +403,251 @@ class TestFieldPermutations(TestCase): self.assertEqual(info.created.year, 2019) self.assertEqual(info.created.month, 9) self.assertEqual(info.created.day, 8) + + +class DummyParser(DocumentParser): + + def get_thumbnail(self): + # not important during tests + raise NotImplementedError() + + def __init__(self, path, logging_group, scratch_dir): + super(DummyParser, self).__init__(path, logging_group) + _, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir) + + def get_optimised_thumbnail(self): + return self.fake_thumb + + def get_text(self): + return "The Text" + + +class FaultyParser(DocumentParser): + + def get_thumbnail(self): + # not important during tests + raise NotImplementedError() + + def __init__(self, path, logging_group, scratch_dir): + super(FaultyParser, self).__init__(path, logging_group) + _, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir) + + def get_optimised_thumbnail(self): + return self.fake_thumb + + def get_text(self): + raise ParseError("Does not compute.") + + +class TestConsumer(TestCase): + + def make_dummy_parser(self, path, logging_group): + return DummyParser(path, logging_group, self.scratch_dir) + + def make_faulty_parser(self, path, logging_group): + return FaultyParser(path, logging_group, self.scratch_dir) + + def setUp(self): + self.scratch_dir = tempfile.mkdtemp() + self.media_dir = tempfile.mkdtemp() + + override_settings( + SCRATCH_DIR=self.scratch_dir, + MEDIA_ROOT=self.media_dir, + ORIGINALS_DIR=os.path.join(self.media_dir, "documents", "originals"), + THUMBNAIL_DIR=os.path.join(self.media_dir, "documents", "thumbnails") + ).enable() + + patcher = mock.patch("documents.parsers.document_consumer_declaration.send") + m = patcher.start() + m.return_value = [(None, { + "parser": self.make_dummy_parser, + "test": lambda _: True, + "weight": 0 + })] + + self.addCleanup(patcher.stop) + + self.consumer = Consumer() + + def tearDown(self): + shutil.rmtree(self.scratch_dir, ignore_errors=True) + shutil.rmtree(self.media_dir, ignore_errors=True) + + def get_test_file(self): + fd, f = tempfile.mkstemp(suffix=".pdf", dir=self.scratch_dir) + return f + + def testNormalOperation(self): + + filename = self.get_test_file() + document = self.consumer.try_consume_file(filename) + + self.assertEqual(document.content, "The Text") + self.assertEqual(document.title, os.path.splitext(os.path.basename(filename))[0]) + self.assertIsNone(document.correspondent) + self.assertIsNone(document.document_type) + self.assertEqual(document.filename, "0000001.pdf") + + self.assertTrue(os.path.isfile( + document.source_path + )) + + self.assertTrue(os.path.isfile( + document.thumbnail_path + )) + + self.assertFalse(os.path.isfile(filename)) + + def testOverrideFilename(self): + filename = self.get_test_file() + overrideFilename = "My Bank - Statement for November.pdf" + + document = self.consumer.try_consume_file(filename, original_filename=overrideFilename) + + self.assertEqual(document.correspondent.name, "My Bank") + self.assertEqual(document.title, "Statement for November") + + def testOverrideTitle(self): + + document = self.consumer.try_consume_file(self.get_test_file(), force_title="Override Title") + self.assertEqual(document.title, "Override Title") + + def testOverrideCorrespondent(self): + c = Correspondent.objects.create(name="test") + + document = self.consumer.try_consume_file(self.get_test_file(), force_correspondent_id=c.pk) + self.assertEqual(document.correspondent.id, c.id) + + def testOverrideDocumentType(self): + dt = DocumentType.objects.create(name="test") + + document = self.consumer.try_consume_file(self.get_test_file(), force_document_type_id=dt.pk) + self.assertEqual(document.document_type.id, dt.id) + + def testOverrideTags(self): + t1 = Tag.objects.create(name="t1") + t2 = Tag.objects.create(name="t2") + t3 = Tag.objects.create(name="t3") + document = self.consumer.try_consume_file(self.get_test_file(), force_tag_ids=[t1.id, t3.id]) + + self.assertIn(t1, document.tags.all()) + self.assertNotIn(t2, document.tags.all()) + self.assertIn(t3, document.tags.all()) + + def testNotAFile(self): + try: + self.consumer.try_consume_file("non-existing-file") + except ConsumerError as e: + self.assertTrue(str(e).endswith('It is not a file')) + return + + self.fail("Should throw exception") + + @override_settings(CONSUMPTION_DIR=None) + def testConsumptionDirUnset(self): + try: + self.consumer.try_consume_file(self.get_test_file()) + except ConsumerError as e: + self.assertEqual(str(e), "The CONSUMPTION_DIR settings variable does not appear to be set.") + return + + self.fail("Should throw exception") + + @override_settings(CONSUMPTION_DIR="asd") + def testNoConsumptionDir(self): + try: + self.consumer.try_consume_file(self.get_test_file()) + except ConsumerError as e: + self.assertEqual(str(e), "Consumption directory asd does not exist") + return + + self.fail("Should throw exception") + + def testDuplicates(self): + self.consumer.try_consume_file(self.get_test_file()) + + try: + self.consumer.try_consume_file(self.get_test_file()) + except ConsumerError as e: + self.assertTrue(str(e).endswith("It is a duplicate.")) + return + + self.fail("Should throw exception") + + @mock.patch("documents.parsers.document_consumer_declaration.send") + def testNoParsers(self, m): + m.return_value = [] + + try: + self.consumer.try_consume_file(self.get_test_file()) + except ConsumerError as e: + self.assertTrue(str(e).startswith("No parsers abvailable")) + return + + self.fail("Should throw exception") + + @mock.patch("documents.parsers.document_consumer_declaration.send") + def testFaultyParser(self, m): + m.return_value = [(None, { + "parser": self.make_faulty_parser, + "test": lambda _: True, + "weight": 0 + })] + + try: + self.consumer.try_consume_file(self.get_test_file()) + except ConsumerError as e: + self.assertEqual(str(e), "Does not compute.") + return + + self.fail("Should throw exception.") + + @mock.patch("documents.consumer.Consumer._write") + def testPostSaveError(self, m): + filename = self.get_test_file() + m.side_effect = OSError("NO.") + try: + self.consumer.try_consume_file(filename) + except ConsumerError as e: + self.assertEqual(str(e), "NO.") + else: + self.fail("Should raise exception") + + # file not deleted + self.assertTrue(os.path.isfile(filename)) + + # Database empty + self.assertEqual(len(Document.objects.all()), 0) + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{title}") + def testFilenameHandling(self): + filename = self.get_test_file() + + document = self.consumer.try_consume_file(filename, original_filename="Bank - Test.pdf", force_title="new docs") + + print(document.source_path) + print("===") + + self.assertEqual(document.title, "new docs") + self.assertEqual(document.correspondent.name, "Bank") + self.assertEqual(document.filename, "bank/new-docs-0000001.pdf") + + @mock.patch("documents.consumer.DocumentClassifier") + def testClassifyDocument(self, m): + correspondent = Correspondent.objects.create(name="test") + dtype = DocumentType.objects.create(name="test") + t1 = Tag.objects.create(name="t1") + t2 = Tag.objects.create(name="t2") + + m.return_value = MagicMock() + m.return_value.predict_correspondent.return_value = correspondent.pk + m.return_value.predict_document_type.return_value = dtype.pk + m.return_value.predict_tags.return_value = [t1.pk] + + document = self.consumer.try_consume_file(self.get_test_file()) + + self.assertEqual(document.correspondent, correspondent) + self.assertEqual(document.document_type, dtype) + self.assertIn(t1, document.tags.all()) + self.assertNotIn(t2, document.tags.all()) diff --git a/src/paperless/settings.py b/src/paperless/settings.py index 18acf401a..0f9d9d7e9 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -359,5 +359,6 @@ FILENAME_PARSE_TRANSFORMS = [] for t in json.loads(os.getenv("PAPERLESS_FILENAME_PARSE_TRANSFORMS", "[]")): FILENAME_PARSE_TRANSFORMS.append((re.compile(t["pattern"]), t["repl"])) +# TODO: this should not have a prefix. # Specify the filename format for out files PAPERLESS_FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT") diff --git a/src/setup.cfg b/src/setup.cfg index 33bef4f4e..05b89eb51 100644 --- a/src/setup.cfg +++ b/src/setup.cfg @@ -6,7 +6,6 @@ ignore = E501 DJANGO_SETTINGS_MODULE=paperless.settings addopts = --pythonwarnings=all env = - PAPERLESS_PASSPHRASE=THISISNOTASECRET PAPERLESS_SECRET=paperless PAPERLESS_EMAIL_SECRET=paperless From d2e22e3f27709d6f17c7b48c65665c5dde5f99c9 Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Mon, 16 Nov 2020 23:53:12 +0100 Subject: [PATCH 6/7] Changed the way parsers are discovered. This also prepares for upcoming changes regarding content types and file types: parsers should declare what they support, and actual file extensions should not be hardcoded everywhere. --- src/documents/parsers.py | 17 +++++++------- src/documents/tests/test_parsers.py | 8 +++---- src/paperless_tesseract/apps.py | 6 ++--- src/paperless_tesseract/signals.py | 23 ++++++++----------- src/paperless_tesseract/tests/test_signals.py | 10 ++++---- src/paperless_text/apps.py | 6 ++--- src/paperless_text/signals.py | 23 ++++++++----------- 7 files changed, 42 insertions(+), 51 deletions(-) diff --git a/src/documents/parsers.py b/src/documents/parsers.py index c33c1bbd4..600e4fc93 100644 --- a/src/documents/parsers.py +++ b/src/documents/parsers.py @@ -41,15 +41,16 @@ def get_parser_class(doc): Determine the appropriate parser class based on the file """ - parsers = [] - for response in document_consumer_declaration.send(None): - parsers.append(response[1]) - options = [] - for parser in parsers: - result = parser(doc) - if result: - options.append(result) + + # Sein letzter Befehl war: KOMMT! Und sie kamen. Alle. Sogar die Parser. + + for response in document_consumer_declaration.send(None): + parser_declaration = response[1] + parser_test = parser_declaration["test"] + + if parser_test(doc): + options.append(parser_declaration) if not options: return None diff --git a/src/documents/tests/test_parsers.py b/src/documents/tests/test_parsers.py index f49d6ca4d..5896f3ba3 100644 --- a/src/documents/tests/test_parsers.py +++ b/src/documents/tests/test_parsers.py @@ -14,7 +14,7 @@ class TestParserDiscovery(TestCase): pass m.return_value = ( - (None, lambda _: {"weight": 0, "parser": DummyParser}), + (None, {"weight": 0, "parser": DummyParser, "test": lambda _: True}), ) self.assertEqual( @@ -32,8 +32,8 @@ class TestParserDiscovery(TestCase): pass m.return_value = ( - (None, lambda _: {"weight": 0, "parser": DummyParser1}), - (None, lambda _: {"weight": 1, "parser": DummyParser2}), + (None, {"weight": 0, "parser": DummyParser1, "test": lambda _: True}), + (None, {"weight": 1, "parser": DummyParser2, "test": lambda _: True}), ) self.assertEqual( @@ -43,7 +43,7 @@ class TestParserDiscovery(TestCase): @mock.patch("documents.parsers.document_consumer_declaration.send") def test__get_parser_class_0_parsers(self, m, *args): - m.return_value = ((None, lambda _: None),) + m.return_value = [] with TemporaryDirectory() as tmpdir: self.assertIsNone( get_parser_class("doc.pdf") diff --git a/src/paperless_tesseract/apps.py b/src/paperless_tesseract/apps.py index bdb430bea..67b90f006 100644 --- a/src/paperless_tesseract/apps.py +++ b/src/paperless_tesseract/apps.py @@ -1,5 +1,7 @@ from django.apps import AppConfig +from paperless_tesseract.signals import tesseract_consumer_declaration + class PaperlessTesseractConfig(AppConfig): @@ -9,8 +11,6 @@ class PaperlessTesseractConfig(AppConfig): from documents.signals import document_consumer_declaration - from .signals import ConsumerDeclaration - - document_consumer_declaration.connect(ConsumerDeclaration.handle) + document_consumer_declaration.connect(tesseract_consumer_declaration) AppConfig.ready(self) diff --git a/src/paperless_tesseract/signals.py b/src/paperless_tesseract/signals.py index 237f15c52..3fc6c2a11 100644 --- a/src/paperless_tesseract/signals.py +++ b/src/paperless_tesseract/signals.py @@ -3,21 +3,16 @@ import re from .parsers import RasterisedDocumentParser -class ConsumerDeclaration: +def tesseract_consumer_declaration(sender, **kwargs): + return { + "parser": RasterisedDocumentParser, + "weight": 0, + "test": tesseract_consumer_test + } - MATCHING_FILES = re.compile(r"^.*\.(pdf|jpe?g|gif|png|tiff?|pnm|bmp)$") - @classmethod - def handle(cls, sender, **kwargs): - return cls.test +MATCHING_FILES = re.compile(r"^.*\.(pdf|jpe?g|gif|png|tiff?|pnm|bmp)$") - @classmethod - def test(cls, doc): - if cls.MATCHING_FILES.match(doc.lower()): - return { - "parser": RasterisedDocumentParser, - "weight": 0 - } - - return None +def tesseract_consumer_test(doc): + return MATCHING_FILES.match(doc.lower()) diff --git a/src/paperless_tesseract/tests/test_signals.py b/src/paperless_tesseract/tests/test_signals.py index b5ff4da59..354557732 100644 --- a/src/paperless_tesseract/tests/test_signals.py +++ b/src/paperless_tesseract/tests/test_signals.py @@ -1,6 +1,6 @@ from django.test import TestCase -from ..signals import ConsumerDeclaration +from paperless_tesseract.signals import tesseract_consumer_test class SignalsTestCase(TestCase): @@ -20,7 +20,7 @@ class SignalsTestCase(TestCase): for prefix in prefixes: for suffix in suffixes: name = "{}.{}".format(prefix, suffix) - self.assertTrue(ConsumerDeclaration.test(name)) + self.assertTrue(tesseract_consumer_test(name)) def test_test_handles_various_file_names_false(self): @@ -30,7 +30,7 @@ class SignalsTestCase(TestCase): for prefix in prefixes: for suffix in suffixes: name = "{}.{}".format(prefix, suffix) - self.assertFalse(ConsumerDeclaration.test(name)) + self.assertFalse(tesseract_consumer_test(name)) - self.assertFalse(ConsumerDeclaration.test("")) - self.assertFalse(ConsumerDeclaration.test("doc")) + self.assertFalse(tesseract_consumer_test("")) + self.assertFalse(tesseract_consumer_test("doc")) diff --git a/src/paperless_text/apps.py b/src/paperless_text/apps.py index 389167368..1acc361aa 100644 --- a/src/paperless_text/apps.py +++ b/src/paperless_text/apps.py @@ -1,5 +1,7 @@ from django.apps import AppConfig +from paperless_text.signals import text_consumer_declaration + class PaperlessTextConfig(AppConfig): @@ -9,8 +11,6 @@ class PaperlessTextConfig(AppConfig): from documents.signals import document_consumer_declaration - from .signals import ConsumerDeclaration - - document_consumer_declaration.connect(ConsumerDeclaration.handle) + document_consumer_declaration.connect(text_consumer_declaration) AppConfig.ready(self) diff --git a/src/paperless_text/signals.py b/src/paperless_text/signals.py index ae5a005e1..784bfd45d 100644 --- a/src/paperless_text/signals.py +++ b/src/paperless_text/signals.py @@ -3,21 +3,16 @@ import re from .parsers import TextDocumentParser -class ConsumerDeclaration: +def text_consumer_declaration(sender, **kwargs): + return { + "parser": TextDocumentParser, + "weight": 10, + "test": text_consumer_test + } - MATCHING_FILES = re.compile(r"^.*\.(te?xt|md|csv)$") - @classmethod - def handle(cls, sender, **kwargs): - return cls.test +MATCHING_FILES = re.compile(r"^.*\.(te?xt|md|csv)$") - @classmethod - def test(cls, doc): - if cls.MATCHING_FILES.match(doc.lower()): - return { - "parser": TextDocumentParser, - "weight": 10 - } - - return None +def text_consumer_test(doc): + return MATCHING_FILES.match(doc.lower()) From a06cb8039e8a637cff375c6aaa8c139fe9e1ab65 Mon Sep 17 00:00:00 2001 From: Jonas Winkler Date: Tue, 17 Nov 2020 00:05:06 +0100 Subject: [PATCH 7/7] updated test config --- src/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/setup.cfg b/src/setup.cfg index 33bef4f4e..55b128141 100644 --- a/src/setup.cfg +++ b/src/setup.cfg @@ -15,4 +15,4 @@ env = source = ./ omit = - */tests + */tests/*