mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			b9e34bd793
			...
			feature-si
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 1af6bf70b9 | 
| @@ -1,6 +1,5 @@ | |||||||
| import logging | import logging | ||||||
| import os | import os | ||||||
| from concurrent.futures import ThreadPoolExecutor |  | ||||||
| from fnmatch import filter | from fnmatch import filter | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from pathlib import PurePath | from pathlib import PurePath | ||||||
| @@ -13,8 +12,9 @@ from django import db | |||||||
| from django.conf import settings | from django.conf import settings | ||||||
| from django.core.management.base import BaseCommand | from django.core.management.base import BaseCommand | ||||||
| from django.core.management.base import CommandError | from django.core.management.base import CommandError | ||||||
| from watchdog.events import FileSystemEventHandler | from watchfiles import Change | ||||||
| from watchdog.observers.polling import PollingObserver | from watchfiles import DefaultFilter | ||||||
|  | from watchfiles import watch | ||||||
|  |  | ||||||
| from documents.data_models import ConsumableDocument | from documents.data_models import ConsumableDocument | ||||||
| from documents.data_models import DocumentMetadataOverrides | from documents.data_models import DocumentMetadataOverrides | ||||||
| @@ -141,53 +141,6 @@ def _consume(filepath: str) -> None: | |||||||
|         logger.exception("Error while consuming document") |         logger.exception("Error while consuming document") | ||||||
|  |  | ||||||
|  |  | ||||||
| def _consume_wait_unmodified(file: str) -> None: |  | ||||||
|     """ |  | ||||||
|     Waits for the given file to appear unmodified based on file size |  | ||||||
|     and modification time.  Will wait a configured number of seconds |  | ||||||
|     and retry a configured number of times before either consuming or |  | ||||||
|     giving up |  | ||||||
|     """ |  | ||||||
|     if _is_ignored(file): |  | ||||||
|         return |  | ||||||
|  |  | ||||||
|     logger.debug(f"Waiting for file {file} to remain unmodified") |  | ||||||
|     mtime = -1 |  | ||||||
|     size = -1 |  | ||||||
|     current_try = 0 |  | ||||||
|     while current_try < settings.CONSUMER_POLLING_RETRY_COUNT: |  | ||||||
|         try: |  | ||||||
|             stat_data = os.stat(file) |  | ||||||
|             new_mtime = stat_data.st_mtime |  | ||||||
|             new_size = stat_data.st_size |  | ||||||
|         except FileNotFoundError: |  | ||||||
|             logger.debug( |  | ||||||
|                 f"File {file} moved while waiting for it to remain unmodified.", |  | ||||||
|             ) |  | ||||||
|             return |  | ||||||
|         if new_mtime == mtime and new_size == size: |  | ||||||
|             _consume(file) |  | ||||||
|             return |  | ||||||
|         mtime = new_mtime |  | ||||||
|         size = new_size |  | ||||||
|         sleep(settings.CONSUMER_POLLING_DELAY) |  | ||||||
|         current_try += 1 |  | ||||||
|  |  | ||||||
|     logger.error(f"Timeout while waiting on file {file} to remain unmodified.") |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Handler(FileSystemEventHandler): |  | ||||||
|     def __init__(self, pool: ThreadPoolExecutor) -> None: |  | ||||||
|         super().__init__() |  | ||||||
|         self._pool = pool |  | ||||||
|  |  | ||||||
|     def on_created(self, event): |  | ||||||
|         self._pool.submit(_consume_wait_unmodified, event.src_path) |  | ||||||
|  |  | ||||||
|     def on_moved(self, event): |  | ||||||
|         self._pool.submit(_consume_wait_unmodified, event.dest_path) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class Command(BaseCommand): | class Command(BaseCommand): | ||||||
|     """ |     """ | ||||||
|     On every iteration of an infinite loop, consume what we can from the |     On every iteration of an infinite loop, consume what we can from the | ||||||
| @@ -199,7 +152,7 @@ class Command(BaseCommand): | |||||||
|     # Also only for testing, configures in one place the timeout used before checking |     # Also only for testing, configures in one place the timeout used before checking | ||||||
|     # the stop flag |     # the stop flag | ||||||
|     testing_timeout_s: Final[float] = 0.5 |     testing_timeout_s: Final[float] = 0.5 | ||||||
|     testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0 |     testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000) | ||||||
|  |  | ||||||
|     def add_arguments(self, parser): |     def add_arguments(self, parser): | ||||||
|         parser.add_argument( |         parser.add_argument( | ||||||
| @@ -221,139 +174,121 @@ class Command(BaseCommand): | |||||||
|         ) |         ) | ||||||
|  |  | ||||||
|     def handle(self, *args, **options): |     def handle(self, *args, **options): | ||||||
|         directory = options["directory"] |         directory: Final[Path] = Path(options["directory"]).resolve() | ||||||
|         recursive = settings.CONSUMER_RECURSIVE |         is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE | ||||||
|  |         is_oneshot: Final[bool] = options["oneshot"] | ||||||
|  |         is_testing: Final[bool] = options["testing"] | ||||||
|  |  | ||||||
|         if not directory: |         if not directory: | ||||||
|             raise CommandError("CONSUMPTION_DIR does not appear to be set.") |             raise CommandError("CONSUMPTION_DIR does not appear to be set.") | ||||||
|  |  | ||||||
|         directory = os.path.abspath(directory) |         if not directory.exists(): | ||||||
|  |  | ||||||
|         if not os.path.isdir(directory): |  | ||||||
|             raise CommandError(f"Consumption directory {directory} does not exist") |             raise CommandError(f"Consumption directory {directory} does not exist") | ||||||
|  |  | ||||||
|  |         if not directory.is_dir(): | ||||||
|  |             raise CommandError(f"Consumption directory {directory} is not a directory") | ||||||
|  |  | ||||||
|         # Consumer will need this |         # Consumer will need this | ||||||
|         settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) |         settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) | ||||||
|  |  | ||||||
|         if recursive: |         # Check for existing files at startup | ||||||
|             for dirpath, _, filenames in os.walk(directory): |         glob_str = "**/*" if is_recursive else "*" | ||||||
|                 for filename in filenames: |  | ||||||
|                     filepath = os.path.join(dirpath, filename) |  | ||||||
|                     _consume(filepath) |  | ||||||
|         else: |  | ||||||
|             for entry in os.scandir(directory): |  | ||||||
|                 _consume(entry.path) |  | ||||||
|  |  | ||||||
|         if options["oneshot"]: |         for filepath in directory.glob(glob_str): | ||||||
|  |             _consume(filepath) | ||||||
|  |  | ||||||
|  |         if is_oneshot: | ||||||
|  |             logger.info("One shot consume requested, exiting") | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         if settings.CONSUMER_POLLING == 0 and INotify: |         use_polling: Final[bool] = settings.CONSUMER_POLLING != 0 | ||||||
|             self.handle_inotify(directory, recursive, options["testing"]) |         poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000) | ||||||
|  |  | ||||||
|  |         if use_polling: | ||||||
|  |             logger.info( | ||||||
|  |                 f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ", | ||||||
|  |             ) | ||||||
|         else: |         else: | ||||||
|             if INotify is None and settings.CONSUMER_POLLING == 0:  # pragma: no cover |             logger.info(f"Using inotify to watch {directory} for changes") | ||||||
|                 logger.warning("Using polling as INotify import failed") |  | ||||||
|             self.handle_polling(directory, recursive, options["testing"]) |  | ||||||
|  |  | ||||||
|         logger.debug("Consumer exiting.") |         read_timeout_ms = 0 | ||||||
|  |         if options["testing"]: | ||||||
|     def handle_polling(self, directory, recursive, is_testing: bool): |             read_timeout_ms = self.testing_timeout_ms | ||||||
|         logger.info(f"Polling directory for changes: {directory}") |             logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms") | ||||||
|  |  | ||||||
|         timeout = None |  | ||||||
|         if is_testing: |  | ||||||
|             timeout = self.testing_timeout_s |  | ||||||
|             logger.debug(f"Configuring timeout to {timeout}s") |  | ||||||
|  |  | ||||||
|         polling_interval = settings.CONSUMER_POLLING |  | ||||||
|         if polling_interval == 0:  # pragma: no cover |  | ||||||
|             # Only happens if INotify failed to import |  | ||||||
|             logger.warning("Using polling of 10s, consider setting this") |  | ||||||
|             polling_interval = 10 |  | ||||||
|  |  | ||||||
|         with ThreadPoolExecutor(max_workers=4) as pool: |  | ||||||
|             observer = PollingObserver(timeout=polling_interval) |  | ||||||
|             observer.schedule(Handler(pool), directory, recursive=recursive) |  | ||||||
|             observer.start() |  | ||||||
|             try: |  | ||||||
|                 while observer.is_alive(): |  | ||||||
|                     observer.join(timeout) |  | ||||||
|                     if self.stop_flag.is_set(): |  | ||||||
|                         observer.stop() |  | ||||||
|             except KeyboardInterrupt: |  | ||||||
|                 observer.stop() |  | ||||||
|             observer.join() |  | ||||||
|  |  | ||||||
|     def handle_inotify(self, directory, recursive, is_testing: bool): |  | ||||||
|         logger.info(f"Using inotify to watch directory for changes: {directory}") |  | ||||||
|  |  | ||||||
|         timeout_ms = None |  | ||||||
|         if is_testing: |  | ||||||
|             timeout_ms = self.testing_timeout_ms |  | ||||||
|             logger.debug(f"Configuring timeout to {timeout_ms}ms") |  | ||||||
|  |  | ||||||
|         inotify = INotify() |  | ||||||
|         inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY |  | ||||||
|         if recursive: |  | ||||||
|             descriptor = inotify.add_watch_recursive(directory, inotify_flags) |  | ||||||
|         else: |  | ||||||
|             descriptor = inotify.add_watch(directory, inotify_flags) |  | ||||||
|  |  | ||||||
|         inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY |         inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY | ||||||
|         inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000 |         inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000) | ||||||
|  |  | ||||||
|         finished = False |         filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"}) | ||||||
|  |  | ||||||
|         notified_files = {} |         notified_files: dict[Path, float] = {} | ||||||
|  |         while not self.stop_flag.is_set(): | ||||||
|         while not finished: |  | ||||||
|             try: |             try: | ||||||
|                 for event in inotify.read(timeout=timeout_ms): |                 for changes in watch( | ||||||
|                     path = inotify.get_path(event.wd) if recursive else directory |                     directory, | ||||||
|                     filepath = os.path.join(path, event.name) |                     watch_filter=filter, | ||||||
|                     if flags.MODIFY in flags.from_mask(event.mask): |                     rust_timeout=read_timeout_ms, | ||||||
|                         notified_files.pop(filepath, None) |                     yield_on_timeout=True, | ||||||
|                     else: |                     force_polling=use_polling, | ||||||
|                         notified_files[filepath] = monotonic() |                     poll_delay_ms=poll_delay_ms, | ||||||
|  |                     recursive=is_recursive, | ||||||
|  |                     stop_event=self.stop_flag, | ||||||
|  |                 ): | ||||||
|  |                     for change_type, path in changes: | ||||||
|  |                         path = Path(path).resolve() | ||||||
|  |                         logger.info(f"Got {change_type.name} for {path}") | ||||||
|  |  | ||||||
|                 # Check the files against the timeout |                         match change_type: | ||||||
|                 still_waiting = {} |                             case Change.added | Change.modified: | ||||||
|                 # last_event_time is time of the last inotify event for this file |                                 logger.info( | ||||||
|                 for filepath, last_event_time in notified_files.items(): |                                     f"New event time for {path} at {monotonic()}", | ||||||
|                     # Current time - last time over the configured timeout |                                 ) | ||||||
|                     waited_long_enough = ( |                                 notified_files[path] = monotonic() | ||||||
|                         monotonic() - last_event_time |                             case Change.deleted: | ||||||
|                     ) > inotify_debounce_secs |                                 notified_files.pop(path, None) | ||||||
|  |  | ||||||
|                     # Also make sure the file exists still, some scanners might write a |                     logger.info("Checking for files that are ready") | ||||||
|                     # temporary file first |  | ||||||
|                     file_still_exists = os.path.exists(filepath) and os.path.isfile( |  | ||||||
|                         filepath, |  | ||||||
|                     ) |  | ||||||
|  |  | ||||||
|                     if waited_long_enough and file_still_exists: |                     # Check the files against the timeout | ||||||
|                         _consume(filepath) |                     still_waiting = {} | ||||||
|                     elif file_still_exists: |                     # last_event_time is time of the last inotify event for this file | ||||||
|                         still_waiting[filepath] = last_event_time |                     for filepath, last_event_time in notified_files.items(): | ||||||
|  |                         # Current time - last time over the configured timeout | ||||||
|  |                         waited_long_enough = ( | ||||||
|  |                             monotonic() - last_event_time | ||||||
|  |                         ) > inotify_debounce_secs | ||||||
|  |  | ||||||
|                 # These files are still waiting to hit the timeout |                         # Also make sure the file exists still, some scanners might write a | ||||||
|                 notified_files = still_waiting |                         # temporary file first | ||||||
|  |                         file_still_exists = filepath.exists() and filepath.is_file() | ||||||
|  |  | ||||||
|  |                         logger.info( | ||||||
|  |                             f"{filepath} - {waited_long_enough} - {file_still_exists}", | ||||||
|  |                         ) | ||||||
|  |  | ||||||
|  |                         if waited_long_enough and file_still_exists: | ||||||
|  |                             logger.info(f"Consuming {filepath}") | ||||||
|  |                             _consume(filepath) | ||||||
|  |                         elif file_still_exists: | ||||||
|  |                             still_waiting[filepath] = last_event_time | ||||||
|  |  | ||||||
|  |                         # These files are still waiting to hit the timeout | ||||||
|  |                         notified_files = still_waiting | ||||||
|  |  | ||||||
|  |                     # Always exit the watch loop to reconfigure the timeout | ||||||
|  |                     break | ||||||
|  |  | ||||||
|                 # If files are waiting, need to exit read() to check them |  | ||||||
|                 # Otherwise, go back to infinite sleep time, but only if not testing |  | ||||||
|                 if len(notified_files) > 0: |                 if len(notified_files) > 0: | ||||||
|                     timeout_ms = inotify_debounce_ms |                     logger.info("Using inotify_debounce_ms") | ||||||
|  |                     read_timeout_ms = inotify_debounce_ms | ||||||
|                 elif is_testing: |                 elif is_testing: | ||||||
|                     timeout_ms = self.testing_timeout_ms |                     logger.info("Using testing_timeout_ms") | ||||||
|  |                     read_timeout_ms = self.testing_timeout_ms | ||||||
|                 else: |                 else: | ||||||
|                     timeout_ms = None |                     logger.info("No files in waiting, configuring indefinite timeout") | ||||||
|  |                     read_timeout_ms = 0 | ||||||
|                 if self.stop_flag.is_set(): |                 logger.info(f"Configuring timeout to {read_timeout_ms}ms") | ||||||
|                     logger.debug("Finishing because event is set") |  | ||||||
|                     finished = True |  | ||||||
|  |  | ||||||
|             except KeyboardInterrupt: |             except KeyboardInterrupt: | ||||||
|                 logger.info("Received SIGINT, stopping inotify") |                 self.stop_flag.set() | ||||||
|                 finished = True |  | ||||||
|  |  | ||||||
|         inotify.rm_watch(descriptor) |         logger.debug("Consumer exiting.") | ||||||
|         inotify.close() |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user