mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			5e7ee924ff
			...
			feature-si
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					1af6bf70b9 | 
@@ -1,6 +1,5 @@
 | 
				
			|||||||
import logging
 | 
					import logging
 | 
				
			||||||
import os
 | 
					import os
 | 
				
			||||||
from concurrent.futures import ThreadPoolExecutor
 | 
					 | 
				
			||||||
from fnmatch import filter
 | 
					from fnmatch import filter
 | 
				
			||||||
from pathlib import Path
 | 
					from pathlib import Path
 | 
				
			||||||
from pathlib import PurePath
 | 
					from pathlib import PurePath
 | 
				
			||||||
@@ -13,8 +12,9 @@ from django import db
 | 
				
			|||||||
from django.conf import settings
 | 
					from django.conf import settings
 | 
				
			||||||
from django.core.management.base import BaseCommand
 | 
					from django.core.management.base import BaseCommand
 | 
				
			||||||
from django.core.management.base import CommandError
 | 
					from django.core.management.base import CommandError
 | 
				
			||||||
from watchdog.events import FileSystemEventHandler
 | 
					from watchfiles import Change
 | 
				
			||||||
from watchdog.observers.polling import PollingObserver
 | 
					from watchfiles import DefaultFilter
 | 
				
			||||||
 | 
					from watchfiles import watch
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from documents.data_models import ConsumableDocument
 | 
					from documents.data_models import ConsumableDocument
 | 
				
			||||||
from documents.data_models import DocumentMetadataOverrides
 | 
					from documents.data_models import DocumentMetadataOverrides
 | 
				
			||||||
@@ -141,53 +141,6 @@ def _consume(filepath: str) -> None:
 | 
				
			|||||||
        logger.exception("Error while consuming document")
 | 
					        logger.exception("Error while consuming document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def _consume_wait_unmodified(file: str) -> None:
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    Waits for the given file to appear unmodified based on file size
 | 
					 | 
				
			||||||
    and modification time.  Will wait a configured number of seconds
 | 
					 | 
				
			||||||
    and retry a configured number of times before either consuming or
 | 
					 | 
				
			||||||
    giving up
 | 
					 | 
				
			||||||
    """
 | 
					 | 
				
			||||||
    if _is_ignored(file):
 | 
					 | 
				
			||||||
        return
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    logger.debug(f"Waiting for file {file} to remain unmodified")
 | 
					 | 
				
			||||||
    mtime = -1
 | 
					 | 
				
			||||||
    size = -1
 | 
					 | 
				
			||||||
    current_try = 0
 | 
					 | 
				
			||||||
    while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
 | 
					 | 
				
			||||||
        try:
 | 
					 | 
				
			||||||
            stat_data = os.stat(file)
 | 
					 | 
				
			||||||
            new_mtime = stat_data.st_mtime
 | 
					 | 
				
			||||||
            new_size = stat_data.st_size
 | 
					 | 
				
			||||||
        except FileNotFoundError:
 | 
					 | 
				
			||||||
            logger.debug(
 | 
					 | 
				
			||||||
                f"File {file} moved while waiting for it to remain unmodified.",
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
        if new_mtime == mtime and new_size == size:
 | 
					 | 
				
			||||||
            _consume(file)
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
        mtime = new_mtime
 | 
					 | 
				
			||||||
        size = new_size
 | 
					 | 
				
			||||||
        sleep(settings.CONSUMER_POLLING_DELAY)
 | 
					 | 
				
			||||||
        current_try += 1
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Handler(FileSystemEventHandler):
 | 
					 | 
				
			||||||
    def __init__(self, pool: ThreadPoolExecutor) -> None:
 | 
					 | 
				
			||||||
        super().__init__()
 | 
					 | 
				
			||||||
        self._pool = pool
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def on_created(self, event):
 | 
					 | 
				
			||||||
        self._pool.submit(_consume_wait_unmodified, event.src_path)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def on_moved(self, event):
 | 
					 | 
				
			||||||
        self._pool.submit(_consume_wait_unmodified, event.dest_path)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Command(BaseCommand):
 | 
					class Command(BaseCommand):
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    On every iteration of an infinite loop, consume what we can from the
 | 
					    On every iteration of an infinite loop, consume what we can from the
 | 
				
			||||||
@@ -199,7 +152,7 @@ class Command(BaseCommand):
 | 
				
			|||||||
    # Also only for testing, configures in one place the timeout used before checking
 | 
					    # Also only for testing, configures in one place the timeout used before checking
 | 
				
			||||||
    # the stop flag
 | 
					    # the stop flag
 | 
				
			||||||
    testing_timeout_s: Final[float] = 0.5
 | 
					    testing_timeout_s: Final[float] = 0.5
 | 
				
			||||||
    testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
 | 
					    testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def add_arguments(self, parser):
 | 
					    def add_arguments(self, parser):
 | 
				
			||||||
        parser.add_argument(
 | 
					        parser.add_argument(
 | 
				
			||||||
@@ -221,139 +174,121 @@ class Command(BaseCommand):
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def handle(self, *args, **options):
 | 
					    def handle(self, *args, **options):
 | 
				
			||||||
        directory = options["directory"]
 | 
					        directory: Final[Path] = Path(options["directory"]).resolve()
 | 
				
			||||||
        recursive = settings.CONSUMER_RECURSIVE
 | 
					        is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE
 | 
				
			||||||
 | 
					        is_oneshot: Final[bool] = options["oneshot"]
 | 
				
			||||||
 | 
					        is_testing: Final[bool] = options["testing"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if not directory:
 | 
					        if not directory:
 | 
				
			||||||
            raise CommandError("CONSUMPTION_DIR does not appear to be set.")
 | 
					            raise CommandError("CONSUMPTION_DIR does not appear to be set.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        directory = os.path.abspath(directory)
 | 
					        if not directory.exists():
 | 
				
			||||||
 | 
					 | 
				
			||||||
        if not os.path.isdir(directory):
 | 
					 | 
				
			||||||
            raise CommandError(f"Consumption directory {directory} does not exist")
 | 
					            raise CommandError(f"Consumption directory {directory} does not exist")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not directory.is_dir():
 | 
				
			||||||
 | 
					            raise CommandError(f"Consumption directory {directory} is not a directory")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Consumer will need this
 | 
					        # Consumer will need this
 | 
				
			||||||
        settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
 | 
					        settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if recursive:
 | 
					        # Check for existing files at startup
 | 
				
			||||||
            for dirpath, _, filenames in os.walk(directory):
 | 
					        glob_str = "**/*" if is_recursive else "*"
 | 
				
			||||||
                for filename in filenames:
 | 
					 | 
				
			||||||
                    filepath = os.path.join(dirpath, filename)
 | 
					 | 
				
			||||||
                    _consume(filepath)
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            for entry in os.scandir(directory):
 | 
					 | 
				
			||||||
                _consume(entry.path)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if options["oneshot"]:
 | 
					        for filepath in directory.glob(glob_str):
 | 
				
			||||||
 | 
					            _consume(filepath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if is_oneshot:
 | 
				
			||||||
 | 
					            logger.info("One shot consume requested, exiting")
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if settings.CONSUMER_POLLING == 0 and INotify:
 | 
					        use_polling: Final[bool] = settings.CONSUMER_POLLING != 0
 | 
				
			||||||
            self.handle_inotify(directory, recursive, options["testing"])
 | 
					        poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if use_polling:
 | 
				
			||||||
 | 
					            logger.info(
 | 
				
			||||||
 | 
					                f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ",
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            if INotify is None and settings.CONSUMER_POLLING == 0:  # pragma: no cover
 | 
					            logger.info(f"Using inotify to watch {directory} for changes")
 | 
				
			||||||
                logger.warning("Using polling as INotify import failed")
 | 
					 | 
				
			||||||
            self.handle_polling(directory, recursive, options["testing"])
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        logger.debug("Consumer exiting.")
 | 
					        read_timeout_ms = 0
 | 
				
			||||||
 | 
					        if options["testing"]:
 | 
				
			||||||
    def handle_polling(self, directory, recursive, is_testing: bool):
 | 
					            read_timeout_ms = self.testing_timeout_ms
 | 
				
			||||||
        logger.info(f"Polling directory for changes: {directory}")
 | 
					            logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms")
 | 
				
			||||||
 | 
					 | 
				
			||||||
        timeout = None
 | 
					 | 
				
			||||||
        if is_testing:
 | 
					 | 
				
			||||||
            timeout = self.testing_timeout_s
 | 
					 | 
				
			||||||
            logger.debug(f"Configuring timeout to {timeout}s")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        polling_interval = settings.CONSUMER_POLLING
 | 
					 | 
				
			||||||
        if polling_interval == 0:  # pragma: no cover
 | 
					 | 
				
			||||||
            # Only happens if INotify failed to import
 | 
					 | 
				
			||||||
            logger.warning("Using polling of 10s, consider setting this")
 | 
					 | 
				
			||||||
            polling_interval = 10
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        with ThreadPoolExecutor(max_workers=4) as pool:
 | 
					 | 
				
			||||||
            observer = PollingObserver(timeout=polling_interval)
 | 
					 | 
				
			||||||
            observer.schedule(Handler(pool), directory, recursive=recursive)
 | 
					 | 
				
			||||||
            observer.start()
 | 
					 | 
				
			||||||
            try:
 | 
					 | 
				
			||||||
                while observer.is_alive():
 | 
					 | 
				
			||||||
                    observer.join(timeout)
 | 
					 | 
				
			||||||
                    if self.stop_flag.is_set():
 | 
					 | 
				
			||||||
                        observer.stop()
 | 
					 | 
				
			||||||
            except KeyboardInterrupt:
 | 
					 | 
				
			||||||
                observer.stop()
 | 
					 | 
				
			||||||
            observer.join()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def handle_inotify(self, directory, recursive, is_testing: bool):
 | 
					 | 
				
			||||||
        logger.info(f"Using inotify to watch directory for changes: {directory}")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        timeout_ms = None
 | 
					 | 
				
			||||||
        if is_testing:
 | 
					 | 
				
			||||||
            timeout_ms = self.testing_timeout_ms
 | 
					 | 
				
			||||||
            logger.debug(f"Configuring timeout to {timeout_ms}ms")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        inotify = INotify()
 | 
					 | 
				
			||||||
        inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
 | 
					 | 
				
			||||||
        if recursive:
 | 
					 | 
				
			||||||
            descriptor = inotify.add_watch_recursive(directory, inotify_flags)
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            descriptor = inotify.add_watch(directory, inotify_flags)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
 | 
					        inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
 | 
				
			||||||
        inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
 | 
					        inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        finished = False
 | 
					        filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        notified_files = {}
 | 
					        notified_files: dict[Path, float] = {}
 | 
				
			||||||
 | 
					        while not self.stop_flag.is_set():
 | 
				
			||||||
        while not finished:
 | 
					 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                for event in inotify.read(timeout=timeout_ms):
 | 
					                for changes in watch(
 | 
				
			||||||
                    path = inotify.get_path(event.wd) if recursive else directory
 | 
					                    directory,
 | 
				
			||||||
                    filepath = os.path.join(path, event.name)
 | 
					                    watch_filter=filter,
 | 
				
			||||||
                    if flags.MODIFY in flags.from_mask(event.mask):
 | 
					                    rust_timeout=read_timeout_ms,
 | 
				
			||||||
                        notified_files.pop(filepath, None)
 | 
					                    yield_on_timeout=True,
 | 
				
			||||||
                    else:
 | 
					                    force_polling=use_polling,
 | 
				
			||||||
                        notified_files[filepath] = monotonic()
 | 
					                    poll_delay_ms=poll_delay_ms,
 | 
				
			||||||
 | 
					                    recursive=is_recursive,
 | 
				
			||||||
 | 
					                    stop_event=self.stop_flag,
 | 
				
			||||||
 | 
					                ):
 | 
				
			||||||
 | 
					                    for change_type, path in changes:
 | 
				
			||||||
 | 
					                        path = Path(path).resolve()
 | 
				
			||||||
 | 
					                        logger.info(f"Got {change_type.name} for {path}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # Check the files against the timeout
 | 
					                        match change_type:
 | 
				
			||||||
                still_waiting = {}
 | 
					                            case Change.added | Change.modified:
 | 
				
			||||||
                # last_event_time is time of the last inotify event for this file
 | 
					                                logger.info(
 | 
				
			||||||
                for filepath, last_event_time in notified_files.items():
 | 
					                                    f"New event time for {path} at {monotonic()}",
 | 
				
			||||||
                    # Current time - last time over the configured timeout
 | 
					                                )
 | 
				
			||||||
                    waited_long_enough = (
 | 
					                                notified_files[path] = monotonic()
 | 
				
			||||||
                        monotonic() - last_event_time
 | 
					                            case Change.deleted:
 | 
				
			||||||
                    ) > inotify_debounce_secs
 | 
					                                notified_files.pop(path, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # Also make sure the file exists still, some scanners might write a
 | 
					                    logger.info("Checking for files that are ready")
 | 
				
			||||||
                    # temporary file first
 | 
					 | 
				
			||||||
                    file_still_exists = os.path.exists(filepath) and os.path.isfile(
 | 
					 | 
				
			||||||
                        filepath,
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if waited_long_enough and file_still_exists:
 | 
					                    # Check the files against the timeout
 | 
				
			||||||
                        _consume(filepath)
 | 
					                    still_waiting = {}
 | 
				
			||||||
                    elif file_still_exists:
 | 
					                    # last_event_time is time of the last inotify event for this file
 | 
				
			||||||
                        still_waiting[filepath] = last_event_time
 | 
					                    for filepath, last_event_time in notified_files.items():
 | 
				
			||||||
 | 
					                        # Current time - last time over the configured timeout
 | 
				
			||||||
 | 
					                        waited_long_enough = (
 | 
				
			||||||
 | 
					                            monotonic() - last_event_time
 | 
				
			||||||
 | 
					                        ) > inotify_debounce_secs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # These files are still waiting to hit the timeout
 | 
					                        # Also make sure the file exists still, some scanners might write a
 | 
				
			||||||
                notified_files = still_waiting
 | 
					                        # temporary file first
 | 
				
			||||||
 | 
					                        file_still_exists = filepath.exists() and filepath.is_file()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        logger.info(
 | 
				
			||||||
 | 
					                            f"{filepath} - {waited_long_enough} - {file_still_exists}",
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        if waited_long_enough and file_still_exists:
 | 
				
			||||||
 | 
					                            logger.info(f"Consuming {filepath}")
 | 
				
			||||||
 | 
					                            _consume(filepath)
 | 
				
			||||||
 | 
					                        elif file_still_exists:
 | 
				
			||||||
 | 
					                            still_waiting[filepath] = last_event_time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        # These files are still waiting to hit the timeout
 | 
				
			||||||
 | 
					                        notified_files = still_waiting
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # Always exit the watch loop to reconfigure the timeout
 | 
				
			||||||
 | 
					                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # If files are waiting, need to exit read() to check them
 | 
					 | 
				
			||||||
                # Otherwise, go back to infinite sleep time, but only if not testing
 | 
					 | 
				
			||||||
                if len(notified_files) > 0:
 | 
					                if len(notified_files) > 0:
 | 
				
			||||||
                    timeout_ms = inotify_debounce_ms
 | 
					                    logger.info("Using inotify_debounce_ms")
 | 
				
			||||||
 | 
					                    read_timeout_ms = inotify_debounce_ms
 | 
				
			||||||
                elif is_testing:
 | 
					                elif is_testing:
 | 
				
			||||||
                    timeout_ms = self.testing_timeout_ms
 | 
					                    logger.info("Using testing_timeout_ms")
 | 
				
			||||||
 | 
					                    read_timeout_ms = self.testing_timeout_ms
 | 
				
			||||||
                else:
 | 
					                else:
 | 
				
			||||||
                    timeout_ms = None
 | 
					                    logger.info("No files in waiting, configuring indefinite timeout")
 | 
				
			||||||
 | 
					                    read_timeout_ms = 0
 | 
				
			||||||
                if self.stop_flag.is_set():
 | 
					                logger.info(f"Configuring timeout to {read_timeout_ms}ms")
 | 
				
			||||||
                    logger.debug("Finishing because event is set")
 | 
					 | 
				
			||||||
                    finished = True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            except KeyboardInterrupt:
 | 
					            except KeyboardInterrupt:
 | 
				
			||||||
                logger.info("Received SIGINT, stopping inotify")
 | 
					                self.stop_flag.set()
 | 
				
			||||||
                finished = True
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        inotify.rm_watch(descriptor)
 | 
					        logger.debug("Consumer exiting.")
 | 
				
			||||||
        inotify.close()
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user