Experiment with a simpler and combined consumer loop

This commit is contained in:
Trenton H 2025-01-30 10:55:23 -08:00
parent fce7b03324
commit 1af6bf70b9

View File

@ -1,6 +1,5 @@
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter
from pathlib import Path
from pathlib import PurePath
@ -13,8 +12,9 @@ from django import db
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from watchfiles import Change
from watchfiles import DefaultFilter
from watchfiles import watch
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
@ -141,53 +141,6 @@ def _consume(filepath: str) -> None:
logger.exception("Error while consuming document")
def _consume_wait_unmodified(file: str) -> None:
"""
Waits for the given file to appear unmodified based on file size
and modification time. Will wait a configured number of seconds
and retry a configured number of times before either consuming or
giving up
"""
if _is_ignored(file):
return
logger.debug(f"Waiting for file {file} to remain unmodified")
mtime = -1
size = -1
current_try = 0
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
try:
stat_data = os.stat(file)
new_mtime = stat_data.st_mtime
new_size = stat_data.st_size
except FileNotFoundError:
logger.debug(
f"File {file} moved while waiting for it to remain unmodified.",
)
return
if new_mtime == mtime and new_size == size:
_consume(file)
return
mtime = new_mtime
size = new_size
sleep(settings.CONSUMER_POLLING_DELAY)
current_try += 1
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
class Handler(FileSystemEventHandler):
def __init__(self, pool: ThreadPoolExecutor) -> None:
super().__init__()
self._pool = pool
def on_created(self, event):
self._pool.submit(_consume_wait_unmodified, event.src_path)
def on_moved(self, event):
self._pool.submit(_consume_wait_unmodified, event.dest_path)
class Command(BaseCommand):
"""
On every iteration of an infinite loop, consume what we can from the
@ -199,7 +152,7 @@ class Command(BaseCommand):
# Also only for testing, configures in one place the timeout used before checking
# the stop flag
testing_timeout_s: Final[float] = 0.5
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000)
def add_arguments(self, parser):
parser.add_argument(
@ -221,139 +174,121 @@ class Command(BaseCommand):
)
def handle(self, *args, **options):
directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE
directory: Final[Path] = Path(options["directory"]).resolve()
is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE
is_oneshot: Final[bool] = options["oneshot"]
is_testing: Final[bool] = options["testing"]
if not directory:
raise CommandError("CONSUMPTION_DIR does not appear to be set.")
directory = os.path.abspath(directory)
if not os.path.isdir(directory):
if not directory.exists():
raise CommandError(f"Consumption directory {directory} does not exist")
if not directory.is_dir():
raise CommandError(f"Consumption directory {directory} is not a directory")
# Consumer will need this
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
if recursive:
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
_consume(filepath)
else:
for entry in os.scandir(directory):
_consume(entry.path)
# Check for existing files at startup
glob_str = "**/*" if is_recursive else "*"
if options["oneshot"]:
for filepath in directory.glob(glob_str):
_consume(filepath)
if is_oneshot:
logger.info("One shot consume requested, exiting")
return
if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory, recursive, options["testing"])
use_polling: Final[bool] = settings.CONSUMER_POLLING != 0
poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000)
if use_polling:
logger.info(
f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ",
)
else:
if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
logger.warning("Using polling as INotify import failed")
self.handle_polling(directory, recursive, options["testing"])
logger.info(f"Using inotify to watch {directory} for changes")
logger.debug("Consumer exiting.")
def handle_polling(self, directory, recursive, is_testing: bool):
logger.info(f"Polling directory for changes: {directory}")
timeout = None
if is_testing:
timeout = self.testing_timeout_s
logger.debug(f"Configuring timeout to {timeout}s")
polling_interval = settings.CONSUMER_POLLING
if polling_interval == 0: # pragma: no cover
# Only happens if INotify failed to import
logger.warning("Using polling of 10s, consider setting this")
polling_interval = 10
with ThreadPoolExecutor(max_workers=4) as pool:
observer = PollingObserver(timeout=polling_interval)
observer.schedule(Handler(pool), directory, recursive=recursive)
observer.start()
try:
while observer.is_alive():
observer.join(timeout)
if self.stop_flag.is_set():
observer.stop()
except KeyboardInterrupt:
observer.stop()
observer.join()
def handle_inotify(self, directory, recursive, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}")
timeout_ms = None
if is_testing:
timeout_ms = self.testing_timeout_ms
logger.debug(f"Configuring timeout to {timeout_ms}ms")
inotify = INotify()
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
if recursive:
descriptor = inotify.add_watch_recursive(directory, inotify_flags)
else:
descriptor = inotify.add_watch(directory, inotify_flags)
read_timeout_ms = 0
if options["testing"]:
read_timeout_ms = self.testing_timeout_ms
logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms")
inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000)
finished = False
filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"})
notified_files = {}
while not finished:
notified_files: dict[Path, float] = {}
while not self.stop_flag.is_set():
try:
for event in inotify.read(timeout=timeout_ms):
path = inotify.get_path(event.wd) if recursive else directory
filepath = os.path.join(path, event.name)
if flags.MODIFY in flags.from_mask(event.mask):
notified_files.pop(filepath, None)
else:
notified_files[filepath] = monotonic()
for changes in watch(
directory,
watch_filter=filter,
rust_timeout=read_timeout_ms,
yield_on_timeout=True,
force_polling=use_polling,
poll_delay_ms=poll_delay_ms,
recursive=is_recursive,
stop_event=self.stop_flag,
):
for change_type, path in changes:
path = Path(path).resolve()
logger.info(f"Got {change_type.name} for {path}")
# Check the files against the timeout
still_waiting = {}
# last_event_time is time of the last inotify event for this file
for filepath, last_event_time in notified_files.items():
# Current time - last time over the configured timeout
waited_long_enough = (
monotonic() - last_event_time
) > inotify_debounce_secs
match change_type:
case Change.added | Change.modified:
logger.info(
f"New event time for {path} at {monotonic()}",
)
notified_files[path] = monotonic()
case Change.deleted:
notified_files.pop(path, None)
# Also make sure the file exists still, some scanners might write a
# temporary file first
file_still_exists = os.path.exists(filepath) and os.path.isfile(
filepath,
)
logger.info("Checking for files that are ready")
if waited_long_enough and file_still_exists:
_consume(filepath)
elif file_still_exists:
still_waiting[filepath] = last_event_time
# Check the files against the timeout
still_waiting = {}
# last_event_time is time of the last inotify event for this file
for filepath, last_event_time in notified_files.items():
# Current time - last time over the configured timeout
waited_long_enough = (
monotonic() - last_event_time
) > inotify_debounce_secs
# These files are still waiting to hit the timeout
notified_files = still_waiting
# Also make sure the file exists still, some scanners might write a
# temporary file first
file_still_exists = filepath.exists() and filepath.is_file()
logger.info(
f"{filepath} - {waited_long_enough} - {file_still_exists}",
)
if waited_long_enough and file_still_exists:
logger.info(f"Consuming {filepath}")
_consume(filepath)
elif file_still_exists:
still_waiting[filepath] = last_event_time
# These files are still waiting to hit the timeout
notified_files = still_waiting
# Always exit the watch loop to reconfigure the timeout
break
# If files are waiting, need to exit read() to check them
# Otherwise, go back to infinite sleep time, but only if not testing
if len(notified_files) > 0:
timeout_ms = inotify_debounce_ms
logger.info("Using inotify_debounce_ms")
read_timeout_ms = inotify_debounce_ms
elif is_testing:
timeout_ms = self.testing_timeout_ms
logger.info("Using testing_timeout_ms")
read_timeout_ms = self.testing_timeout_ms
else:
timeout_ms = None
if self.stop_flag.is_set():
logger.debug("Finishing because event is set")
finished = True
logger.info("No files in waiting, configuring indefinite timeout")
read_timeout_ms = 0
logger.info(f"Configuring timeout to {read_timeout_ms}ms")
except KeyboardInterrupt:
logger.info("Received SIGINT, stopping inotify")
finished = True
self.stop_flag.set()
inotify.rm_watch(descriptor)
inotify.close()
logger.debug("Consumer exiting.")