Trenton H c8a62715ec Feature: Allow setting backend configuration settings via the UI (#5126)
* Saving some start on this

* At least partially working for the tesseract parser

* Problems with migration testing need to figure out

* Work around that error

* Fixes max m_pixels

* Moving the settings to main paperless application

* Starting some consumer options

* More fixes and work

* Fixes these last tests

* Fix max_length on OcrSettings.mode field

* Fix all fields on Common & Ocr settings serializers

* Umbrellla config view

* Revert "Umbrellla config view"

This reverts commit fbaf9f4be30f89afeb509099180158a3406416a5.

* Updates to use a single configuration object for all settings

* Squashed commit of the following:

commit 8a0a49dd5766094f60462fbfbe62e9921fbd2373
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 23:02:47 2023 -0800

    Fix formatting

commit 66b2d90c507b8afd9507813ff555e46198ea33b9
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 22:36:35 2023 -0800

    Refactor frontend data models

commit 5723bd8dd823ee855625e250df39393e26709d48
Author: Adam Bogdał <adam@bogdal.pl>
Date:   Wed Dec 20 01:17:43 2023 +0100

    Fix: speed up admin panel for installs with a large number of documents (#5052)

commit 9b08ce176199bf9011a6634bb88f616846150d2b
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 15:18:51 2023 -0800

    Update PULL_REQUEST_TEMPLATE.md

commit a6248bec2d793b7690feed95fcaf5eb34a75bfb6
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 15:02:05 2023 -0800

    Chore: Update Angular to v17 (#4980)

commit b1f6f52486d5ba5c04af99b41315eb6428fd1fa8
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 13:53:56 2023 -0800

    Fix: Dont allow null custom_fields property via API (#5063)

commit 638d9970fd468d8c02c91d19bd28f8b0796bdcb1
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 13:43:50 2023 -0800

    Enhancement: symmetric document links (#4907)

commit 5e8de4c1da6eb4eb8f738b20962595c7536b30ec
Author: shamoon <4887959+shamoon@users.noreply.github.com>
Date:   Tue Dec 19 12:45:04 2023 -0800

    Enhancement: shared icon & shared by me filter (#4859)

commit 088bad90306025d3f6b139cbd0ad264a1cbecfe5
Author: Trenton H <797416+stumpylog@users.noreply.github.com>
Date:   Tue Dec 19 12:04:03 2023 -0800

    Bulk updates all the backend libraries (#5061)

* Saving some work on frontend config

* Very basic but dynamically-generated config form

* Saving work on slightly less ugly frontend config

* JSON validation for user_args field

* Fully dynamic config form

* Adds in some additional validators for a nicer error message

* Cleaning up the testing and coverage more

* Reverts unintentional change

* Adds documentation about the settings and the precedence

* Couple more commenting and style fixes

---------

Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
2023-12-29 15:42:56 -08:00

358 lines
12 KiB
Python

import logging
import os
from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter
from pathlib import Path
from pathlib import PurePath
from threading import Event
from time import monotonic
from time import sleep
from typing import Final
from django import db
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
from documents.data_models import DocumentSource
from documents.models import Tag
from documents.parsers import is_file_ext_supported
from documents.tasks import consume_file
try:
from inotifyrecursive import INotify
from inotifyrecursive import flags
except ImportError: # pragma: no cover
INotify = flags = None
logger = logging.getLogger("paperless.management.consumer")
def _tags_from_path(filepath) -> list[int]:
"""
Walk up the directory tree from filepath to CONSUMPTION_DIR
and get or create Tag IDs for every directory.
Returns set of Tag models
"""
db.close_old_connections()
tag_ids = set()
path_parts = Path(filepath).relative_to(settings.CONSUMPTION_DIR).parent.parts
for part in path_parts:
tag_ids.add(
Tag.objects.get_or_create(name__iexact=part, defaults={"name": part})[0].pk,
)
return list(tag_ids)
def _is_ignored(filepath: str) -> bool:
"""
Checks if the given file should be ignored, based on configured
patterns.
Returns True if the file is ignored, False otherwise
"""
filepath = os.path.abspath(
os.path.normpath(filepath),
)
# Trim out the consume directory, leaving only filename and it's
# path relative to the consume directory
filepath_relative = PurePath(filepath).relative_to(settings.CONSUMPTION_DIR)
# March through the components of the path, including directories and the filename
# looking for anything matching
# foo/bar/baz/file.pdf -> (foo, bar, baz, file.pdf)
parts = []
for part in filepath_relative.parts:
# If the part is not the name (ie, it's a dir)
# Need to append the trailing slash or fnmatch doesn't match
# fnmatch("dir", "dir/*") == False
# fnmatch("dir/", "dir/*") == True
if part != filepath_relative.name:
part = part + "/"
parts.append(part)
for pattern in settings.CONSUMER_IGNORE_PATTERNS:
if len(filter(parts, pattern)):
return True
return False
def _consume(filepath: str) -> None:
if os.path.isdir(filepath) or _is_ignored(filepath):
return
if not os.path.isfile(filepath):
logger.debug(f"Not consuming file {filepath}: File has moved.")
return
if not is_file_ext_supported(os.path.splitext(filepath)[1]):
logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
return
# Total wait time: up to 500ms
os_error_retry_count: Final[int] = 50
os_error_retry_wait: Final[float] = 0.01
read_try_count = 0
file_open_ok = False
os_error_str = None
while (read_try_count < os_error_retry_count) and not file_open_ok:
try:
with open(filepath, "rb"):
file_open_ok = True
except OSError as e:
read_try_count += 1
os_error_str = str(e)
sleep(os_error_retry_wait)
if read_try_count >= os_error_retry_count:
logger.warning(f"Not consuming file {filepath}: OS reports {os_error_str}")
return
tag_ids = None
try:
if settings.CONSUMER_SUBDIRS_AS_TAGS:
tag_ids = _tags_from_path(filepath)
except Exception:
logger.exception("Error creating tags from path")
try:
logger.info(f"Adding {filepath} to the task queue.")
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
),
DocumentMetadataOverrides(tag_ids=tag_ids),
)
except Exception:
# Catch all so that the consumer won't crash.
# This is also what the test case is listening for to check for
# errors.
logger.exception("Error while consuming document")
def _consume_wait_unmodified(file: str) -> None:
"""
Waits for the given file to appear unmodified based on file size
and modification time. Will wait a configured number of seconds
and retry a configured number of times before either consuming or
giving up
"""
if _is_ignored(file):
return
logger.debug(f"Waiting for file {file} to remain unmodified")
mtime = -1
size = -1
current_try = 0
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
try:
stat_data = os.stat(file)
new_mtime = stat_data.st_mtime
new_size = stat_data.st_size
except FileNotFoundError:
logger.debug(
f"File {file} moved while waiting for it to remain unmodified.",
)
return
if new_mtime == mtime and new_size == size:
_consume(file)
return
mtime = new_mtime
size = new_size
sleep(settings.CONSUMER_POLLING_DELAY)
current_try += 1
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
class Handler(FileSystemEventHandler):
def __init__(self, pool: ThreadPoolExecutor) -> None:
super().__init__()
self._pool = pool
def on_created(self, event):
self._pool.submit(_consume_wait_unmodified, event.src_path)
def on_moved(self, event):
self._pool.submit(_consume_wait_unmodified, event.dest_path)
class Command(BaseCommand):
"""
On every iteration of an infinite loop, consume what we can from the
consumption directory.
"""
# This is here primarily for the tests and is irrelevant in production.
stop_flag = Event()
# Also only for testing, configures in one place the timeout used before checking
# the stop flag
testing_timeout_s: Final[float] = 0.5
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
def add_arguments(self, parser):
parser.add_argument(
"directory",
default=settings.CONSUMPTION_DIR,
nargs="?",
help="The consumption directory.",
)
parser.add_argument("--oneshot", action="store_true", help="Run only once.")
# Only use during unit testing, will configure a timeout
# Leaving it unset or false and the consumer will exit when it
# receives SIGINT
parser.add_argument(
"--testing",
action="store_true",
help="Flag used only for unit testing",
default=False,
)
def handle(self, *args, **options):
directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE
if not directory:
raise CommandError("CONSUMPTION_DIR does not appear to be set.")
directory = os.path.abspath(directory)
if not os.path.isdir(directory):
raise CommandError(f"Consumption directory {directory} does not exist")
if recursive:
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
_consume(filepath)
else:
for entry in os.scandir(directory):
_consume(entry.path)
if options["oneshot"]:
return
if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory, recursive, options["testing"])
else:
if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
logger.warn("Using polling as INotify import failed")
self.handle_polling(directory, recursive, options["testing"])
logger.debug("Consumer exiting.")
def handle_polling(self, directory, recursive, is_testing: bool):
logger.info(f"Polling directory for changes: {directory}")
timeout = None
if is_testing:
timeout = self.testing_timeout_s
logger.debug(f"Configuring timeout to {timeout}s")
polling_interval = settings.CONSUMER_POLLING
if polling_interval == 0: # pragma: no cover
# Only happens if INotify failed to import
logger.warn("Using polling of 10s, consider settng this")
polling_interval = 10
with ThreadPoolExecutor(max_workers=4) as pool:
observer = PollingObserver(timeout=polling_interval)
observer.schedule(Handler(pool), directory, recursive=recursive)
observer.start()
try:
while observer.is_alive():
observer.join(timeout)
if self.stop_flag.is_set():
observer.stop()
except KeyboardInterrupt:
observer.stop()
observer.join()
def handle_inotify(self, directory, recursive, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}")
timeout = None
if is_testing:
timeout = self.testing_timeout_ms
logger.debug(f"Configuring timeout to {timeout}ms")
inotify = INotify()
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
if recursive:
descriptor = inotify.add_watch_recursive(directory, inotify_flags)
else:
descriptor = inotify.add_watch(directory, inotify_flags)
inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY
finished = False
notified_files = {}
while not finished:
try:
for event in inotify.read(timeout=timeout):
path = inotify.get_path(event.wd) if recursive else directory
filepath = os.path.join(path, event.name)
if flags.MODIFY in flags.from_mask(event.mask):
notified_files.pop(filepath, None)
else:
notified_files[filepath] = monotonic()
# Check the files against the timeout
still_waiting = {}
for filepath in notified_files:
# Time of the last inotify event for this file
last_event_time = notified_files[filepath]
# Current time - last time over the configured timeout
waited_long_enough = (
monotonic() - last_event_time
) > inotify_debounce
# Also make sure the file exists still, some scanners might write a
# temporary file first
file_still_exists = os.path.exists(filepath) and os.path.isfile(
filepath,
)
if waited_long_enough and file_still_exists:
_consume(filepath)
elif file_still_exists:
still_waiting[filepath] = last_event_time
# These files are still waiting to hit the timeout
notified_files = still_waiting
# If files are waiting, need to exit read() to check them
# Otherwise, go back to infinite sleep time, but only if not testing
if len(notified_files) > 0:
timeout = inotify_debounce
elif is_testing:
timeout = self.testing_timeout_ms
else:
timeout = None
if self.stop_flag.is_set():
logger.debug("Finishing because event is set")
finished = True
except KeyboardInterrupt:
logger.info("Received SIGINT, stopping inotify")
finished = True
inotify.rm_watch(descriptor)
inotify.close()