mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			4210addb46
			...
			feature-si
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 1af6bf70b9 | 
| @@ -1,6 +1,5 @@ | ||||
| import logging | ||||
| import os | ||||
| from concurrent.futures import ThreadPoolExecutor | ||||
| from fnmatch import filter | ||||
| from pathlib import Path | ||||
| from pathlib import PurePath | ||||
| @@ -13,8 +12,9 @@ from django import db | ||||
| from django.conf import settings | ||||
| from django.core.management.base import BaseCommand | ||||
| from django.core.management.base import CommandError | ||||
| from watchdog.events import FileSystemEventHandler | ||||
| from watchdog.observers.polling import PollingObserver | ||||
| from watchfiles import Change | ||||
| from watchfiles import DefaultFilter | ||||
| from watchfiles import watch | ||||
|  | ||||
| from documents.data_models import ConsumableDocument | ||||
| from documents.data_models import DocumentMetadataOverrides | ||||
| @@ -141,53 +141,6 @@ def _consume(filepath: str) -> None: | ||||
|         logger.exception("Error while consuming document") | ||||
|  | ||||
|  | ||||
| def _consume_wait_unmodified(file: str) -> None: | ||||
|     """ | ||||
|     Waits for the given file to appear unmodified based on file size | ||||
|     and modification time.  Will wait a configured number of seconds | ||||
|     and retry a configured number of times before either consuming or | ||||
|     giving up | ||||
|     """ | ||||
|     if _is_ignored(file): | ||||
|         return | ||||
|  | ||||
|     logger.debug(f"Waiting for file {file} to remain unmodified") | ||||
|     mtime = -1 | ||||
|     size = -1 | ||||
|     current_try = 0 | ||||
|     while current_try < settings.CONSUMER_POLLING_RETRY_COUNT: | ||||
|         try: | ||||
|             stat_data = os.stat(file) | ||||
|             new_mtime = stat_data.st_mtime | ||||
|             new_size = stat_data.st_size | ||||
|         except FileNotFoundError: | ||||
|             logger.debug( | ||||
|                 f"File {file} moved while waiting for it to remain unmodified.", | ||||
|             ) | ||||
|             return | ||||
|         if new_mtime == mtime and new_size == size: | ||||
|             _consume(file) | ||||
|             return | ||||
|         mtime = new_mtime | ||||
|         size = new_size | ||||
|         sleep(settings.CONSUMER_POLLING_DELAY) | ||||
|         current_try += 1 | ||||
|  | ||||
|     logger.error(f"Timeout while waiting on file {file} to remain unmodified.") | ||||
|  | ||||
|  | ||||
| class Handler(FileSystemEventHandler): | ||||
|     def __init__(self, pool: ThreadPoolExecutor) -> None: | ||||
|         super().__init__() | ||||
|         self._pool = pool | ||||
|  | ||||
|     def on_created(self, event): | ||||
|         self._pool.submit(_consume_wait_unmodified, event.src_path) | ||||
|  | ||||
|     def on_moved(self, event): | ||||
|         self._pool.submit(_consume_wait_unmodified, event.dest_path) | ||||
|  | ||||
|  | ||||
| class Command(BaseCommand): | ||||
|     """ | ||||
|     On every iteration of an infinite loop, consume what we can from the | ||||
| @@ -199,7 +152,7 @@ class Command(BaseCommand): | ||||
|     # Also only for testing, configures in one place the timeout used before checking | ||||
|     # the stop flag | ||||
|     testing_timeout_s: Final[float] = 0.5 | ||||
|     testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0 | ||||
|     testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000) | ||||
|  | ||||
|     def add_arguments(self, parser): | ||||
|         parser.add_argument( | ||||
| @@ -221,139 +174,121 @@ class Command(BaseCommand): | ||||
|         ) | ||||
|  | ||||
|     def handle(self, *args, **options): | ||||
|         directory = options["directory"] | ||||
|         recursive = settings.CONSUMER_RECURSIVE | ||||
|         directory: Final[Path] = Path(options["directory"]).resolve() | ||||
|         is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE | ||||
|         is_oneshot: Final[bool] = options["oneshot"] | ||||
|         is_testing: Final[bool] = options["testing"] | ||||
|  | ||||
|         if not directory: | ||||
|             raise CommandError("CONSUMPTION_DIR does not appear to be set.") | ||||
|  | ||||
|         directory = os.path.abspath(directory) | ||||
|  | ||||
|         if not os.path.isdir(directory): | ||||
|         if not directory.exists(): | ||||
|             raise CommandError(f"Consumption directory {directory} does not exist") | ||||
|  | ||||
|         if not directory.is_dir(): | ||||
|             raise CommandError(f"Consumption directory {directory} is not a directory") | ||||
|  | ||||
|         # Consumer will need this | ||||
|         settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) | ||||
|  | ||||
|         if recursive: | ||||
|             for dirpath, _, filenames in os.walk(directory): | ||||
|                 for filename in filenames: | ||||
|                     filepath = os.path.join(dirpath, filename) | ||||
|                     _consume(filepath) | ||||
|         else: | ||||
|             for entry in os.scandir(directory): | ||||
|                 _consume(entry.path) | ||||
|         # Check for existing files at startup | ||||
|         glob_str = "**/*" if is_recursive else "*" | ||||
|  | ||||
|         if options["oneshot"]: | ||||
|         for filepath in directory.glob(glob_str): | ||||
|             _consume(filepath) | ||||
|  | ||||
|         if is_oneshot: | ||||
|             logger.info("One shot consume requested, exiting") | ||||
|             return | ||||
|  | ||||
|         if settings.CONSUMER_POLLING == 0 and INotify: | ||||
|             self.handle_inotify(directory, recursive, options["testing"]) | ||||
|         use_polling: Final[bool] = settings.CONSUMER_POLLING != 0 | ||||
|         poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000) | ||||
|  | ||||
|         if use_polling: | ||||
|             logger.info( | ||||
|                 f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ", | ||||
|             ) | ||||
|         else: | ||||
|             if INotify is None and settings.CONSUMER_POLLING == 0:  # pragma: no cover | ||||
|                 logger.warning("Using polling as INotify import failed") | ||||
|             self.handle_polling(directory, recursive, options["testing"]) | ||||
|             logger.info(f"Using inotify to watch {directory} for changes") | ||||
|  | ||||
|         logger.debug("Consumer exiting.") | ||||
|  | ||||
|     def handle_polling(self, directory, recursive, is_testing: bool): | ||||
|         logger.info(f"Polling directory for changes: {directory}") | ||||
|  | ||||
|         timeout = None | ||||
|         if is_testing: | ||||
|             timeout = self.testing_timeout_s | ||||
|             logger.debug(f"Configuring timeout to {timeout}s") | ||||
|  | ||||
|         polling_interval = settings.CONSUMER_POLLING | ||||
|         if polling_interval == 0:  # pragma: no cover | ||||
|             # Only happens if INotify failed to import | ||||
|             logger.warning("Using polling of 10s, consider setting this") | ||||
|             polling_interval = 10 | ||||
|  | ||||
|         with ThreadPoolExecutor(max_workers=4) as pool: | ||||
|             observer = PollingObserver(timeout=polling_interval) | ||||
|             observer.schedule(Handler(pool), directory, recursive=recursive) | ||||
|             observer.start() | ||||
|             try: | ||||
|                 while observer.is_alive(): | ||||
|                     observer.join(timeout) | ||||
|                     if self.stop_flag.is_set(): | ||||
|                         observer.stop() | ||||
|             except KeyboardInterrupt: | ||||
|                 observer.stop() | ||||
|             observer.join() | ||||
|  | ||||
|     def handle_inotify(self, directory, recursive, is_testing: bool): | ||||
|         logger.info(f"Using inotify to watch directory for changes: {directory}") | ||||
|  | ||||
|         timeout_ms = None | ||||
|         if is_testing: | ||||
|             timeout_ms = self.testing_timeout_ms | ||||
|             logger.debug(f"Configuring timeout to {timeout_ms}ms") | ||||
|  | ||||
|         inotify = INotify() | ||||
|         inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY | ||||
|         if recursive: | ||||
|             descriptor = inotify.add_watch_recursive(directory, inotify_flags) | ||||
|         else: | ||||
|             descriptor = inotify.add_watch(directory, inotify_flags) | ||||
|         read_timeout_ms = 0 | ||||
|         if options["testing"]: | ||||
|             read_timeout_ms = self.testing_timeout_ms | ||||
|             logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms") | ||||
|  | ||||
|         inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY | ||||
|         inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000 | ||||
|         inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000) | ||||
|  | ||||
|         finished = False | ||||
|         filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"}) | ||||
|  | ||||
|         notified_files = {} | ||||
|  | ||||
|         while not finished: | ||||
|         notified_files: dict[Path, float] = {} | ||||
|         while not self.stop_flag.is_set(): | ||||
|             try: | ||||
|                 for event in inotify.read(timeout=timeout_ms): | ||||
|                     path = inotify.get_path(event.wd) if recursive else directory | ||||
|                     filepath = os.path.join(path, event.name) | ||||
|                     if flags.MODIFY in flags.from_mask(event.mask): | ||||
|                         notified_files.pop(filepath, None) | ||||
|                     else: | ||||
|                         notified_files[filepath] = monotonic() | ||||
|                 for changes in watch( | ||||
|                     directory, | ||||
|                     watch_filter=filter, | ||||
|                     rust_timeout=read_timeout_ms, | ||||
|                     yield_on_timeout=True, | ||||
|                     force_polling=use_polling, | ||||
|                     poll_delay_ms=poll_delay_ms, | ||||
|                     recursive=is_recursive, | ||||
|                     stop_event=self.stop_flag, | ||||
|                 ): | ||||
|                     for change_type, path in changes: | ||||
|                         path = Path(path).resolve() | ||||
|                         logger.info(f"Got {change_type.name} for {path}") | ||||
|  | ||||
|                 # Check the files against the timeout | ||||
|                 still_waiting = {} | ||||
|                 # last_event_time is time of the last inotify event for this file | ||||
|                 for filepath, last_event_time in notified_files.items(): | ||||
|                     # Current time - last time over the configured timeout | ||||
|                     waited_long_enough = ( | ||||
|                         monotonic() - last_event_time | ||||
|                     ) > inotify_debounce_secs | ||||
|                         match change_type: | ||||
|                             case Change.added | Change.modified: | ||||
|                                 logger.info( | ||||
|                                     f"New event time for {path} at {monotonic()}", | ||||
|                                 ) | ||||
|                                 notified_files[path] = monotonic() | ||||
|                             case Change.deleted: | ||||
|                                 notified_files.pop(path, None) | ||||
|  | ||||
|                     # Also make sure the file exists still, some scanners might write a | ||||
|                     # temporary file first | ||||
|                     file_still_exists = os.path.exists(filepath) and os.path.isfile( | ||||
|                         filepath, | ||||
|                     ) | ||||
|                     logger.info("Checking for files that are ready") | ||||
|  | ||||
|                     if waited_long_enough and file_still_exists: | ||||
|                         _consume(filepath) | ||||
|                     elif file_still_exists: | ||||
|                         still_waiting[filepath] = last_event_time | ||||
|                     # Check the files against the timeout | ||||
|                     still_waiting = {} | ||||
|                     # last_event_time is time of the last inotify event for this file | ||||
|                     for filepath, last_event_time in notified_files.items(): | ||||
|                         # Current time - last time over the configured timeout | ||||
|                         waited_long_enough = ( | ||||
|                             monotonic() - last_event_time | ||||
|                         ) > inotify_debounce_secs | ||||
|  | ||||
|                 # These files are still waiting to hit the timeout | ||||
|                 notified_files = still_waiting | ||||
|                         # Also make sure the file exists still, some scanners might write a | ||||
|                         # temporary file first | ||||
|                         file_still_exists = filepath.exists() and filepath.is_file() | ||||
|  | ||||
|                         logger.info( | ||||
|                             f"{filepath} - {waited_long_enough} - {file_still_exists}", | ||||
|                         ) | ||||
|  | ||||
|                         if waited_long_enough and file_still_exists: | ||||
|                             logger.info(f"Consuming {filepath}") | ||||
|                             _consume(filepath) | ||||
|                         elif file_still_exists: | ||||
|                             still_waiting[filepath] = last_event_time | ||||
|  | ||||
|                         # These files are still waiting to hit the timeout | ||||
|                         notified_files = still_waiting | ||||
|  | ||||
|                     # Always exit the watch loop to reconfigure the timeout | ||||
|                     break | ||||
|  | ||||
|                 # If files are waiting, need to exit read() to check them | ||||
|                 # Otherwise, go back to infinite sleep time, but only if not testing | ||||
|                 if len(notified_files) > 0: | ||||
|                     timeout_ms = inotify_debounce_ms | ||||
|                     logger.info("Using inotify_debounce_ms") | ||||
|                     read_timeout_ms = inotify_debounce_ms | ||||
|                 elif is_testing: | ||||
|                     timeout_ms = self.testing_timeout_ms | ||||
|                     logger.info("Using testing_timeout_ms") | ||||
|                     read_timeout_ms = self.testing_timeout_ms | ||||
|                 else: | ||||
|                     timeout_ms = None | ||||
|  | ||||
|                 if self.stop_flag.is_set(): | ||||
|                     logger.debug("Finishing because event is set") | ||||
|                     finished = True | ||||
|  | ||||
|                     logger.info("No files in waiting, configuring indefinite timeout") | ||||
|                     read_timeout_ms = 0 | ||||
|                 logger.info(f"Configuring timeout to {read_timeout_ms}ms") | ||||
|             except KeyboardInterrupt: | ||||
|                 logger.info("Received SIGINT, stopping inotify") | ||||
|                 finished = True | ||||
|                 self.stop_flag.set() | ||||
|  | ||||
|         inotify.rm_watch(descriptor) | ||||
|         inotify.close() | ||||
|         logger.debug("Consumer exiting.") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user