From 1af6bf70b92f97c556765550fefdede0e39e41f1 Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Thu, 30 Jan 2025 10:55:23 -0800 Subject: [PATCH] Experiment with a simpler and combined consumer loop --- .../management/commands/document_consumer.py | 251 +++++++----------- 1 file changed, 93 insertions(+), 158 deletions(-) diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index 6b2706733..74f94cfdb 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -1,6 +1,5 @@ import logging import os -from concurrent.futures import ThreadPoolExecutor from fnmatch import filter from pathlib import Path from pathlib import PurePath @@ -13,8 +12,9 @@ from django import db from django.conf import settings from django.core.management.base import BaseCommand from django.core.management.base import CommandError -from watchdog.events import FileSystemEventHandler -from watchdog.observers.polling import PollingObserver +from watchfiles import Change +from watchfiles import DefaultFilter +from watchfiles import watch from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides @@ -141,53 +141,6 @@ def _consume(filepath: str) -> None: logger.exception("Error while consuming document") -def _consume_wait_unmodified(file: str) -> None: - """ - Waits for the given file to appear unmodified based on file size - and modification time. Will wait a configured number of seconds - and retry a configured number of times before either consuming or - giving up - """ - if _is_ignored(file): - return - - logger.debug(f"Waiting for file {file} to remain unmodified") - mtime = -1 - size = -1 - current_try = 0 - while current_try < settings.CONSUMER_POLLING_RETRY_COUNT: - try: - stat_data = os.stat(file) - new_mtime = stat_data.st_mtime - new_size = stat_data.st_size - except FileNotFoundError: - logger.debug( - f"File {file} moved while waiting for it to remain unmodified.", - ) - return - if new_mtime == mtime and new_size == size: - _consume(file) - return - mtime = new_mtime - size = new_size - sleep(settings.CONSUMER_POLLING_DELAY) - current_try += 1 - - logger.error(f"Timeout while waiting on file {file} to remain unmodified.") - - -class Handler(FileSystemEventHandler): - def __init__(self, pool: ThreadPoolExecutor) -> None: - super().__init__() - self._pool = pool - - def on_created(self, event): - self._pool.submit(_consume_wait_unmodified, event.src_path) - - def on_moved(self, event): - self._pool.submit(_consume_wait_unmodified, event.dest_path) - - class Command(BaseCommand): """ On every iteration of an infinite loop, consume what we can from the @@ -199,7 +152,7 @@ class Command(BaseCommand): # Also only for testing, configures in one place the timeout used before checking # the stop flag testing_timeout_s: Final[float] = 0.5 - testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0 + testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000) def add_arguments(self, parser): parser.add_argument( @@ -221,139 +174,121 @@ class Command(BaseCommand): ) def handle(self, *args, **options): - directory = options["directory"] - recursive = settings.CONSUMER_RECURSIVE + directory: Final[Path] = Path(options["directory"]).resolve() + is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE + is_oneshot: Final[bool] = options["oneshot"] + is_testing: Final[bool] = options["testing"] if not directory: raise CommandError("CONSUMPTION_DIR does not appear to be set.") - directory = os.path.abspath(directory) - - if not os.path.isdir(directory): + if not directory.exists(): raise CommandError(f"Consumption directory {directory} does not exist") + if not directory.is_dir(): + raise CommandError(f"Consumption directory {directory} is not a directory") + # Consumer will need this settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) - if recursive: - for dirpath, _, filenames in os.walk(directory): - for filename in filenames: - filepath = os.path.join(dirpath, filename) - _consume(filepath) - else: - for entry in os.scandir(directory): - _consume(entry.path) + # Check for existing files at startup + glob_str = "**/*" if is_recursive else "*" - if options["oneshot"]: + for filepath in directory.glob(glob_str): + _consume(filepath) + + if is_oneshot: + logger.info("One shot consume requested, exiting") return - if settings.CONSUMER_POLLING == 0 and INotify: - self.handle_inotify(directory, recursive, options["testing"]) + use_polling: Final[bool] = settings.CONSUMER_POLLING != 0 + poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000) + + if use_polling: + logger.info( + f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ", + ) else: - if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover - logger.warning("Using polling as INotify import failed") - self.handle_polling(directory, recursive, options["testing"]) + logger.info(f"Using inotify to watch {directory} for changes") - logger.debug("Consumer exiting.") - - def handle_polling(self, directory, recursive, is_testing: bool): - logger.info(f"Polling directory for changes: {directory}") - - timeout = None - if is_testing: - timeout = self.testing_timeout_s - logger.debug(f"Configuring timeout to {timeout}s") - - polling_interval = settings.CONSUMER_POLLING - if polling_interval == 0: # pragma: no cover - # Only happens if INotify failed to import - logger.warning("Using polling of 10s, consider setting this") - polling_interval = 10 - - with ThreadPoolExecutor(max_workers=4) as pool: - observer = PollingObserver(timeout=polling_interval) - observer.schedule(Handler(pool), directory, recursive=recursive) - observer.start() - try: - while observer.is_alive(): - observer.join(timeout) - if self.stop_flag.is_set(): - observer.stop() - except KeyboardInterrupt: - observer.stop() - observer.join() - - def handle_inotify(self, directory, recursive, is_testing: bool): - logger.info(f"Using inotify to watch directory for changes: {directory}") - - timeout_ms = None - if is_testing: - timeout_ms = self.testing_timeout_ms - logger.debug(f"Configuring timeout to {timeout_ms}ms") - - inotify = INotify() - inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY - if recursive: - descriptor = inotify.add_watch_recursive(directory, inotify_flags) - else: - descriptor = inotify.add_watch(directory, inotify_flags) + read_timeout_ms = 0 + if options["testing"]: + read_timeout_ms = self.testing_timeout_ms + logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms") inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY - inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000 + inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000) - finished = False + filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"}) - notified_files = {} - - while not finished: + notified_files: dict[Path, float] = {} + while not self.stop_flag.is_set(): try: - for event in inotify.read(timeout=timeout_ms): - path = inotify.get_path(event.wd) if recursive else directory - filepath = os.path.join(path, event.name) - if flags.MODIFY in flags.from_mask(event.mask): - notified_files.pop(filepath, None) - else: - notified_files[filepath] = monotonic() + for changes in watch( + directory, + watch_filter=filter, + rust_timeout=read_timeout_ms, + yield_on_timeout=True, + force_polling=use_polling, + poll_delay_ms=poll_delay_ms, + recursive=is_recursive, + stop_event=self.stop_flag, + ): + for change_type, path in changes: + path = Path(path).resolve() + logger.info(f"Got {change_type.name} for {path}") - # Check the files against the timeout - still_waiting = {} - # last_event_time is time of the last inotify event for this file - for filepath, last_event_time in notified_files.items(): - # Current time - last time over the configured timeout - waited_long_enough = ( - monotonic() - last_event_time - ) > inotify_debounce_secs + match change_type: + case Change.added | Change.modified: + logger.info( + f"New event time for {path} at {monotonic()}", + ) + notified_files[path] = monotonic() + case Change.deleted: + notified_files.pop(path, None) - # Also make sure the file exists still, some scanners might write a - # temporary file first - file_still_exists = os.path.exists(filepath) and os.path.isfile( - filepath, - ) + logger.info("Checking for files that are ready") - if waited_long_enough and file_still_exists: - _consume(filepath) - elif file_still_exists: - still_waiting[filepath] = last_event_time + # Check the files against the timeout + still_waiting = {} + # last_event_time is time of the last inotify event for this file + for filepath, last_event_time in notified_files.items(): + # Current time - last time over the configured timeout + waited_long_enough = ( + monotonic() - last_event_time + ) > inotify_debounce_secs - # These files are still waiting to hit the timeout - notified_files = still_waiting + # Also make sure the file exists still, some scanners might write a + # temporary file first + file_still_exists = filepath.exists() and filepath.is_file() + + logger.info( + f"{filepath} - {waited_long_enough} - {file_still_exists}", + ) + + if waited_long_enough and file_still_exists: + logger.info(f"Consuming {filepath}") + _consume(filepath) + elif file_still_exists: + still_waiting[filepath] = last_event_time + + # These files are still waiting to hit the timeout + notified_files = still_waiting + + # Always exit the watch loop to reconfigure the timeout + break - # If files are waiting, need to exit read() to check them - # Otherwise, go back to infinite sleep time, but only if not testing if len(notified_files) > 0: - timeout_ms = inotify_debounce_ms + logger.info("Using inotify_debounce_ms") + read_timeout_ms = inotify_debounce_ms elif is_testing: - timeout_ms = self.testing_timeout_ms + logger.info("Using testing_timeout_ms") + read_timeout_ms = self.testing_timeout_ms else: - timeout_ms = None - - if self.stop_flag.is_set(): - logger.debug("Finishing because event is set") - finished = True - + logger.info("No files in waiting, configuring indefinite timeout") + read_timeout_ms = 0 + logger.info(f"Configuring timeout to {read_timeout_ms}ms") except KeyboardInterrupt: - logger.info("Received SIGINT, stopping inotify") - finished = True + self.stop_flag.set() - inotify.rm_watch(descriptor) - inotify.close() + logger.debug("Consumer exiting.")