mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-01-12 21:44:21 -06:00
Really working on this new consumer now
This commit is contained in:
@@ -1,135 +1,362 @@
|
||||
"""
|
||||
Document consumer management command.
|
||||
|
||||
Watches a consumption directory for new documents and queues them for processing.
|
||||
Uses watchfiles for efficient file system monitoring with support for both
|
||||
native OS notifications and polling fallback.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from fnmatch import filter
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from pathlib import PurePath
|
||||
from threading import Event
|
||||
from time import monotonic
|
||||
from time import sleep
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Final
|
||||
|
||||
from django import db
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.core.management.base import CommandError
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers.polling import PollingObserver
|
||||
from watchfiles import Change
|
||||
from watchfiles import DefaultFilter
|
||||
from watchfiles import watch
|
||||
|
||||
from documents.data_models import ConsumableDocument
|
||||
from documents.data_models import DocumentMetadataOverrides
|
||||
from documents.data_models import DocumentSource
|
||||
from documents.models import Tag
|
||||
from documents.parsers import is_file_ext_supported
|
||||
from documents.parsers import get_supported_file_extensions
|
||||
from documents.tasks import consume_file
|
||||
|
||||
try:
|
||||
from inotifyrecursive import INotify
|
||||
from inotifyrecursive import flags
|
||||
except ImportError: # pragma: no cover
|
||||
INotify = flags = None
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
|
||||
logger = logging.getLogger("paperless.management.consumer")
|
||||
|
||||
|
||||
def _tags_from_path(filepath: Path) -> list[int]:
|
||||
@dataclass
|
||||
class TrackedFile:
|
||||
"""Represents a file being tracked for stability."""
|
||||
|
||||
path: Path
|
||||
last_event_time: float
|
||||
last_mtime: float | None = None
|
||||
last_size: int | None = None
|
||||
|
||||
def update_stats(self) -> bool:
|
||||
"""
|
||||
Update file stats. Returns True if file exists and stats were updated.
|
||||
"""
|
||||
try:
|
||||
stat = self.path.stat()
|
||||
self.last_mtime = stat.st_mtime
|
||||
self.last_size = stat.st_size
|
||||
return True
|
||||
except (FileNotFoundError, PermissionError, OSError):
|
||||
return False
|
||||
|
||||
def is_unchanged(self) -> bool:
|
||||
"""
|
||||
Check if file stats match the previously recorded values.
|
||||
Returns False if file doesn't exist or stats changed.
|
||||
"""
|
||||
try:
|
||||
stat = self.path.stat()
|
||||
return stat.st_mtime == self.last_mtime and stat.st_size == self.last_size
|
||||
except (FileNotFoundError, PermissionError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
class FileStabilityTracker:
|
||||
"""
|
||||
Walk up the directory tree from filepath to CONSUMPTION_DIR
|
||||
Tracks file events and determines when files are stable for consumption.
|
||||
|
||||
A file is considered stable when:
|
||||
1. No new events have been received for it within the stability delay
|
||||
2. Its size and modification time haven't changed
|
||||
3. It still exists as a regular file
|
||||
|
||||
This handles various edge cases:
|
||||
- Network copies that write in chunks
|
||||
- Scanners that open/close files multiple times
|
||||
- Temporary files that get renamed
|
||||
- Files that are deleted before becoming stable
|
||||
"""
|
||||
|
||||
def __init__(self, stability_delay: float = 1.0) -> None:
|
||||
"""
|
||||
Initialize the tracker.
|
||||
|
||||
Args:
|
||||
stability_delay: Time in seconds a file must remain unchanged
|
||||
before being considered stable.
|
||||
"""
|
||||
self.stability_delay = stability_delay
|
||||
self._tracked: dict[Path, TrackedFile] = {}
|
||||
|
||||
def track(self, path: Path, change: Change) -> None:
|
||||
"""
|
||||
Register a file event.
|
||||
|
||||
Args:
|
||||
path: The file path that changed.
|
||||
change: The type of change (added, modified, deleted).
|
||||
"""
|
||||
path = path.resolve()
|
||||
|
||||
match change:
|
||||
case Change.deleted:
|
||||
self._tracked.pop(path, None)
|
||||
logger.debug(f"Stopped tracking deleted file: {path}")
|
||||
case Change.added | Change.modified:
|
||||
current_time = monotonic()
|
||||
if path in self._tracked:
|
||||
tracked = self._tracked[path]
|
||||
tracked.last_event_time = current_time
|
||||
tracked.update_stats()
|
||||
logger.debug(f"Updated tracking for: {path}")
|
||||
else:
|
||||
tracked = TrackedFile(path=path, last_event_time=current_time)
|
||||
if tracked.update_stats():
|
||||
self._tracked[path] = tracked
|
||||
logger.debug(f"Started tracking: {path}")
|
||||
else:
|
||||
logger.debug(f"Could not stat file, not tracking: {path}")
|
||||
|
||||
def get_stable_files(self) -> Iterator[Path]:
|
||||
"""
|
||||
Yield files that have been stable for the configured delay.
|
||||
|
||||
Files are removed from tracking once yielded or determined to be invalid.
|
||||
"""
|
||||
current_time = monotonic()
|
||||
to_remove: list[Path] = []
|
||||
to_yield: list[Path] = []
|
||||
|
||||
for path, tracked in list(self._tracked.items()):
|
||||
time_since_event = current_time - tracked.last_event_time
|
||||
|
||||
if time_since_event < self.stability_delay:
|
||||
continue
|
||||
|
||||
# File has waited long enough, verify it's unchanged
|
||||
if not tracked.is_unchanged():
|
||||
# Stats changed or file gone - update and wait again
|
||||
if tracked.update_stats():
|
||||
tracked.last_event_time = current_time
|
||||
logger.debug(f"File changed during stability check: {path}")
|
||||
else:
|
||||
# File no longer exists, remove from tracking
|
||||
to_remove.append(path)
|
||||
logger.debug(f"File disappeared during stability check: {path}")
|
||||
continue
|
||||
|
||||
# File is stable - verify it's a regular file
|
||||
try:
|
||||
if path.is_file():
|
||||
to_yield.append(path)
|
||||
logger.info(f"File is stable: {path}")
|
||||
else:
|
||||
# Not a regular file (directory, symlink, etc.)
|
||||
to_remove.append(path)
|
||||
logger.debug(f"Path is not a regular file: {path}")
|
||||
except (PermissionError, OSError) as e:
|
||||
logger.warning(f"Cannot access {path}: {e}")
|
||||
to_remove.append(path)
|
||||
|
||||
# Remove files that are no longer valid
|
||||
for path in to_remove:
|
||||
self._tracked.pop(path, None)
|
||||
|
||||
# Remove and yield stable files
|
||||
for path in to_yield:
|
||||
self._tracked.pop(path, None)
|
||||
yield path
|
||||
|
||||
def has_pending_files(self) -> bool:
|
||||
"""Check if there are files waiting for stability check."""
|
||||
return len(self._tracked) > 0
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear all tracked files."""
|
||||
self._tracked.clear()
|
||||
|
||||
@property
|
||||
def pending_count(self) -> int:
|
||||
"""Number of files being tracked."""
|
||||
return len(self._tracked)
|
||||
|
||||
|
||||
class ConsumerFilter(DefaultFilter):
|
||||
"""
|
||||
Custom filter for the document consumer.
|
||||
|
||||
Filters files based on:
|
||||
- Supported file extensions
|
||||
- User-configured ignore patterns (regex)
|
||||
- Default ignore patterns for common system files
|
||||
"""
|
||||
|
||||
# Default regex patterns to ignore (matched against filename only)
|
||||
DEFAULT_IGNORE_PATTERNS: Final[frozenset[str]] = frozenset(
|
||||
{
|
||||
r"^\.DS_Store$",
|
||||
r"^\.DS_STORE$",
|
||||
r"^\._.*",
|
||||
r"^desktop\.ini$",
|
||||
r"^Thumbs\.db$",
|
||||
},
|
||||
)
|
||||
|
||||
# Directories to always ignore (matched by name via DefaultFilter)
|
||||
DEFAULT_IGNORE_DIRS: Final[tuple[str, ...]] = (
|
||||
".stfolder",
|
||||
".stversions",
|
||||
".localized",
|
||||
"@eaDir",
|
||||
".Spotlight-V100",
|
||||
".Trashes",
|
||||
"__MACOSX",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
supported_extensions: frozenset[str] | None = None,
|
||||
ignore_patterns: list[str] | None = None,
|
||||
consumption_dir: Path | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the consumer filter.
|
||||
|
||||
Args:
|
||||
supported_extensions: Set of supported file extensions (e.g., {".pdf", ".png"}).
|
||||
If None, uses get_supported_file_extensions().
|
||||
ignore_patterns: Additional regex patterns to ignore (matched against filename).
|
||||
consumption_dir: Base consumption directory (unused, kept for API compatibility).
|
||||
"""
|
||||
# Combine default and user patterns
|
||||
all_patterns = set(self.DEFAULT_IGNORE_PATTERNS)
|
||||
if ignore_patterns:
|
||||
all_patterns.update(ignore_patterns)
|
||||
|
||||
# Compile all patterns
|
||||
self._ignore_regexes: list[re.Pattern[str]] = [
|
||||
re.compile(pattern) for pattern in all_patterns
|
||||
]
|
||||
|
||||
# Get supported extensions
|
||||
if supported_extensions is None:
|
||||
supported_extensions = frozenset(get_supported_file_extensions())
|
||||
self._supported_extensions = supported_extensions
|
||||
|
||||
# Call parent with directory ignore list
|
||||
# DefaultFilter.ignore_dirs matches directory names, not full paths
|
||||
super().__init__(
|
||||
ignore_dirs=self.DEFAULT_IGNORE_DIRS,
|
||||
ignore_entity_patterns=None,
|
||||
ignore_paths=None,
|
||||
)
|
||||
|
||||
def __call__(self, change: Change, path: str) -> bool:
|
||||
"""
|
||||
Filter function for watchfiles.
|
||||
|
||||
Returns True if the path should be watched, False to ignore.
|
||||
"""
|
||||
# Let parent filter handle directory ignoring and basic checks
|
||||
if not super().__call__(change, path):
|
||||
return False
|
||||
|
||||
path_obj = Path(path)
|
||||
|
||||
# For directories, parent filter already handled ignore_dirs
|
||||
if path_obj.is_dir():
|
||||
return True
|
||||
|
||||
# For files, check extension
|
||||
if not self._has_supported_extension(path_obj):
|
||||
return False
|
||||
|
||||
# Check filename against ignore patterns
|
||||
return not self._matches_ignore_pattern(path_obj.name)
|
||||
|
||||
def _has_supported_extension(self, path: Path) -> bool:
|
||||
"""Check if the file has a supported extension."""
|
||||
suffix = path.suffix.lower()
|
||||
return suffix in self._supported_extensions
|
||||
|
||||
def _matches_ignore_pattern(self, filename: str) -> bool:
|
||||
"""Check if the filename matches any ignore pattern."""
|
||||
for regex in self._ignore_regexes:
|
||||
if regex.search(filename):
|
||||
logger.debug(
|
||||
f"Filename {filename} matched ignore pattern {regex.pattern}",
|
||||
)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _tags_from_path(filepath: Path, consumption_dir: Path) -> list[int]:
|
||||
"""
|
||||
Walk up the directory tree from filepath to consumption_dir
|
||||
and get or create Tag IDs for every directory.
|
||||
|
||||
Returns set of Tag models
|
||||
Returns list of Tag primary keys.
|
||||
"""
|
||||
db.close_old_connections()
|
||||
tag_ids = set()
|
||||
path_parts = filepath.relative_to(settings.CONSUMPTION_DIR).parent.parts
|
||||
tag_ids: set[int] = set()
|
||||
path_parts = filepath.relative_to(consumption_dir).parent.parts
|
||||
|
||||
for part in path_parts:
|
||||
tag_ids.add(
|
||||
Tag.objects.get_or_create(name__iexact=part, defaults={"name": part})[0].pk,
|
||||
tag, _ = Tag.objects.get_or_create(
|
||||
name__iexact=part,
|
||||
defaults={"name": part},
|
||||
)
|
||||
tag_ids.add(tag.pk)
|
||||
|
||||
return list(tag_ids)
|
||||
|
||||
|
||||
def _is_ignored(filepath: Path) -> bool:
|
||||
def _consume_file(
|
||||
filepath: Path,
|
||||
consumption_dir: Path,
|
||||
*,
|
||||
subdirs_as_tags: bool,
|
||||
) -> None:
|
||||
"""
|
||||
Checks if the given file should be ignored, based on configured
|
||||
patterns.
|
||||
Queue a file for consumption.
|
||||
|
||||
Returns True if the file is ignored, False otherwise
|
||||
Args:
|
||||
filepath: Path to the file to consume.
|
||||
consumption_dir: Base consumption directory.
|
||||
subdirs_as_tags: Whether to create tags from subdirectory names.
|
||||
"""
|
||||
# Trim out the consume directory, leaving only filename and it's
|
||||
# path relative to the consume directory
|
||||
filepath_relative = PurePath(filepath).relative_to(settings.CONSUMPTION_DIR)
|
||||
|
||||
# March through the components of the path, including directories and the filename
|
||||
# looking for anything matching
|
||||
# foo/bar/baz/file.pdf -> (foo, bar, baz, file.pdf)
|
||||
parts = []
|
||||
for part in filepath_relative.parts:
|
||||
# If the part is not the name (ie, it's a dir)
|
||||
# Need to append the trailing slash or fnmatch doesn't match
|
||||
# fnmatch("dir", "dir/*") == False
|
||||
# fnmatch("dir/", "dir/*") == True
|
||||
if part != filepath_relative.name:
|
||||
part = part + "/"
|
||||
parts.append(part)
|
||||
|
||||
for pattern in settings.CONSUMER_IGNORE_PATTERNS:
|
||||
if len(filter(parts, pattern)):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _consume(filepath: Path) -> None:
|
||||
# Check permissions early
|
||||
# Verify file still exists and is accessible
|
||||
try:
|
||||
filepath.stat()
|
||||
except (PermissionError, OSError):
|
||||
logger.warning(f"Not consuming file {filepath}: Permission denied.")
|
||||
if not filepath.is_file():
|
||||
logger.debug(f"Not consuming {filepath}: not a file or doesn't exist")
|
||||
return
|
||||
except (PermissionError, OSError) as e:
|
||||
logger.warning(f"Not consuming {filepath}: {e}")
|
||||
return
|
||||
|
||||
if filepath.is_dir() or _is_ignored(filepath):
|
||||
return
|
||||
|
||||
if not filepath.is_file():
|
||||
logger.debug(f"Not consuming file {filepath}: File has moved.")
|
||||
return
|
||||
|
||||
if not is_file_ext_supported(filepath.suffix):
|
||||
logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
|
||||
return
|
||||
|
||||
# Total wait time: up to 500ms
|
||||
os_error_retry_count: Final[int] = 50
|
||||
os_error_retry_wait: Final[float] = 0.01
|
||||
|
||||
read_try_count = 0
|
||||
file_open_ok = False
|
||||
os_error_str = None
|
||||
|
||||
while (read_try_count < os_error_retry_count) and not file_open_ok:
|
||||
# Get tags from path if configured
|
||||
tag_ids: list[int] | None = None
|
||||
if subdirs_as_tags:
|
||||
try:
|
||||
with filepath.open("rb"):
|
||||
file_open_ok = True
|
||||
except OSError as e:
|
||||
read_try_count += 1
|
||||
os_error_str = str(e)
|
||||
sleep(os_error_retry_wait)
|
||||
tag_ids = _tags_from_path(filepath, consumption_dir)
|
||||
except Exception:
|
||||
logger.exception(f"Error creating tags from path for {filepath}")
|
||||
|
||||
if read_try_count >= os_error_retry_count:
|
||||
logger.warning(f"Not consuming file {filepath}: OS reports {os_error_str}")
|
||||
return
|
||||
|
||||
tag_ids = None
|
||||
# Queue for consumption
|
||||
try:
|
||||
if settings.CONSUMER_SUBDIRS_AS_TAGS:
|
||||
tag_ids = _tags_from_path(filepath)
|
||||
except Exception:
|
||||
logger.exception("Error creating tags from path")
|
||||
|
||||
try:
|
||||
logger.info(f"Adding {filepath} to the task queue.")
|
||||
logger.info(f"Adding {filepath} to the task queue")
|
||||
consume_file.delay(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
@@ -138,228 +365,206 @@ def _consume(filepath: Path) -> None:
|
||||
DocumentMetadataOverrides(tag_ids=tag_ids),
|
||||
)
|
||||
except Exception:
|
||||
# Catch all so that the consumer won't crash.
|
||||
# This is also what the test case is listening for to check for
|
||||
# errors.
|
||||
logger.exception("Error while consuming document")
|
||||
|
||||
|
||||
def _consume_wait_unmodified(file: Path) -> None:
|
||||
"""
|
||||
Waits for the given file to appear unmodified based on file size
|
||||
and modification time. Will wait a configured number of seconds
|
||||
and retry a configured number of times before either consuming or
|
||||
giving up
|
||||
"""
|
||||
if _is_ignored(file):
|
||||
return
|
||||
|
||||
logger.debug(f"Waiting for file {file} to remain unmodified")
|
||||
mtime = -1
|
||||
size = -1
|
||||
current_try = 0
|
||||
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
|
||||
try:
|
||||
stat_data = file.stat()
|
||||
new_mtime = stat_data.st_mtime
|
||||
new_size = stat_data.st_size
|
||||
except FileNotFoundError:
|
||||
logger.debug(
|
||||
f"File {file} moved while waiting for it to remain unmodified.",
|
||||
)
|
||||
return
|
||||
if new_mtime == mtime and new_size == size:
|
||||
_consume(file)
|
||||
return
|
||||
mtime = new_mtime
|
||||
size = new_size
|
||||
sleep(settings.CONSUMER_POLLING_DELAY)
|
||||
current_try += 1
|
||||
|
||||
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
|
||||
|
||||
|
||||
class Handler(FileSystemEventHandler):
|
||||
def __init__(self, pool: ThreadPoolExecutor) -> None:
|
||||
super().__init__()
|
||||
self._pool = pool
|
||||
|
||||
def on_created(self, event):
|
||||
self._pool.submit(_consume_wait_unmodified, Path(event.src_path))
|
||||
|
||||
def on_moved(self, event):
|
||||
self._pool.submit(_consume_wait_unmodified, Path(event.dest_path))
|
||||
logger.exception(f"Error while queuing document {filepath}")
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
On every iteration of an infinite loop, consume what we can from the
|
||||
consumption directory.
|
||||
Watch a consumption directory and queue new documents for processing.
|
||||
|
||||
Uses watchfiles for efficient file system monitoring. Supports both
|
||||
native OS notifications (inotify on Linux, FSEvents on macOS) and
|
||||
polling for network filesystems.
|
||||
"""
|
||||
|
||||
# This is here primarily for the tests and is irrelevant in production.
|
||||
stop_flag = Event()
|
||||
# Also only for testing, configures in one place the timeout used before checking
|
||||
# the stop flag
|
||||
testing_timeout_s: Final[float] = 0.5
|
||||
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
|
||||
help = "Watch the consumption directory for new documents"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
# For testing - allows tests to stop the consumer
|
||||
stop_flag: Event = Event()
|
||||
|
||||
# Testing timeout in seconds
|
||||
testing_timeout_s: Final[float] = 0.5
|
||||
|
||||
def add_arguments(self, parser) -> None:
|
||||
parser.add_argument(
|
||||
"directory",
|
||||
default=settings.CONSUMPTION_DIR,
|
||||
default=None,
|
||||
nargs="?",
|
||||
help="The consumption directory.",
|
||||
help="The consumption directory (defaults to CONSUMPTION_DIR setting)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--oneshot",
|
||||
action="store_true",
|
||||
help="Process existing files and exit without watching",
|
||||
)
|
||||
parser.add_argument("--oneshot", action="store_true", help="Run only once.")
|
||||
|
||||
# Only use during unit testing, will configure a timeout
|
||||
# Leaving it unset or false and the consumer will exit when it
|
||||
# receives SIGINT
|
||||
parser.add_argument(
|
||||
"--testing",
|
||||
action="store_true",
|
||||
help="Flag used only for unit testing",
|
||||
help="Enable testing mode with shorter timeouts",
|
||||
default=False,
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
directory = options["directory"]
|
||||
recursive = settings.CONSUMER_RECURSIVE
|
||||
|
||||
def handle(self, *args, **options) -> None:
|
||||
# Resolve consumption directory
|
||||
directory = options.get("directory")
|
||||
if not directory:
|
||||
raise CommandError("CONSUMPTION_DIR does not appear to be set.")
|
||||
directory = getattr(settings, "CONSUMPTION_DIR", None)
|
||||
if not directory:
|
||||
raise CommandError("CONSUMPTION_DIR is not configured")
|
||||
|
||||
directory = Path(directory).resolve()
|
||||
|
||||
if not directory.is_dir():
|
||||
raise CommandError(f"Consumption directory {directory} does not exist")
|
||||
if not directory.exists():
|
||||
raise CommandError(f"Consumption directory does not exist: {directory}")
|
||||
|
||||
# Consumer will need this
|
||||
if not directory.is_dir():
|
||||
raise CommandError(f"Consumption path is not a directory: {directory}")
|
||||
|
||||
# Ensure scratch directory exists
|
||||
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if recursive:
|
||||
for dirpath, _, filenames in os.walk(directory):
|
||||
for filename in filenames:
|
||||
filepath = Path(dirpath) / filename
|
||||
_consume(filepath)
|
||||
else:
|
||||
for filepath in directory.iterdir():
|
||||
_consume(filepath)
|
||||
# Get settings
|
||||
recursive: bool = getattr(settings, "CONSUMER_RECURSIVE", False)
|
||||
subdirs_as_tags: bool = getattr(settings, "CONSUMER_SUBDIRS_AS_TAGS", False)
|
||||
polling_interval: float = getattr(settings, "CONSUMER_POLLING_INTERVAL", 0)
|
||||
stability_delay: float = getattr(settings, "CONSUMER_STABILITY_DELAY", 1.0)
|
||||
ignore_patterns: list[str] = getattr(settings, "CONSUMER_IGNORE_PATTERNS", [])
|
||||
is_testing: bool = options.get("testing", False)
|
||||
is_oneshot: bool = options.get("oneshot", False)
|
||||
|
||||
if options["oneshot"]:
|
||||
# Create filter
|
||||
consumer_filter = ConsumerFilter(
|
||||
ignore_patterns=ignore_patterns,
|
||||
consumption_dir=directory,
|
||||
)
|
||||
|
||||
# Process existing files
|
||||
self._process_existing_files(
|
||||
directory=directory,
|
||||
recursive=recursive,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
consumer_filter=consumer_filter,
|
||||
)
|
||||
|
||||
if is_oneshot:
|
||||
logger.info("Oneshot mode: processed existing files, exiting")
|
||||
return
|
||||
|
||||
if settings.CONSUMER_POLLING == 0 and INotify:
|
||||
self.handle_inotify(directory, recursive, is_testing=options["testing"])
|
||||
# Start watching
|
||||
self._watch_directory(
|
||||
directory=directory,
|
||||
recursive=recursive,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
consumer_filter=consumer_filter,
|
||||
polling_interval=polling_interval,
|
||||
stability_delay=stability_delay,
|
||||
is_testing=is_testing,
|
||||
)
|
||||
|
||||
logger.debug("Consumer exiting")
|
||||
|
||||
def _process_existing_files(
|
||||
self,
|
||||
*,
|
||||
directory: Path,
|
||||
recursive: bool,
|
||||
subdirs_as_tags: bool,
|
||||
consumer_filter: ConsumerFilter,
|
||||
) -> None:
|
||||
"""Process any existing files in the consumption directory."""
|
||||
logger.info(f"Processing existing files in {directory}")
|
||||
|
||||
glob_pattern = "**/*" if recursive else "*"
|
||||
|
||||
for filepath in directory.glob(glob_pattern):
|
||||
# Use filter to check if file should be processed
|
||||
if not filepath.is_file():
|
||||
continue
|
||||
|
||||
if not consumer_filter(Change.added, str(filepath)):
|
||||
continue
|
||||
|
||||
_consume_file(
|
||||
filepath=filepath,
|
||||
consumption_dir=directory,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
)
|
||||
|
||||
def _watch_directory(
|
||||
self,
|
||||
*,
|
||||
directory: Path,
|
||||
recursive: bool,
|
||||
subdirs_as_tags: bool,
|
||||
consumer_filter: ConsumerFilter,
|
||||
polling_interval: float,
|
||||
stability_delay: float,
|
||||
is_testing: bool,
|
||||
) -> None:
|
||||
"""Watch directory for changes and process stable files."""
|
||||
use_polling = polling_interval > 0
|
||||
poll_delay_ms = int(polling_interval * 1000) if use_polling else 0
|
||||
|
||||
if use_polling:
|
||||
logger.info(
|
||||
f"Watching {directory} using polling (interval: {polling_interval}s)",
|
||||
)
|
||||
else:
|
||||
if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
|
||||
logger.warning("Using polling as INotify import failed")
|
||||
self.handle_polling(directory, recursive, is_testing=options["testing"])
|
||||
logger.info(f"Watching {directory} using native file system events")
|
||||
|
||||
logger.debug("Consumer exiting.")
|
||||
# Create stability tracker
|
||||
tracker = FileStabilityTracker(stability_delay=stability_delay)
|
||||
|
||||
def handle_polling(self, directory, recursive, *, is_testing: bool):
|
||||
logger.info(f"Polling directory for changes: {directory}")
|
||||
# Calculate timeouts
|
||||
stability_timeout_ms = int(stability_delay * 1000)
|
||||
testing_timeout_ms = int(self.testing_timeout_s * 1000)
|
||||
|
||||
timeout = None
|
||||
if is_testing:
|
||||
timeout = self.testing_timeout_s
|
||||
logger.debug(f"Configuring timeout to {timeout}s")
|
||||
# Start with no timeout (wait indefinitely for first event)
|
||||
# unless in testing mode
|
||||
timeout_ms = testing_timeout_ms if is_testing else 0
|
||||
|
||||
polling_interval = settings.CONSUMER_POLLING
|
||||
if polling_interval == 0: # pragma: no cover
|
||||
# Only happens if INotify failed to import
|
||||
logger.warning("Using polling of 10s, consider setting this")
|
||||
polling_interval = 10
|
||||
self.stop_flag.clear()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=4) as pool:
|
||||
observer = PollingObserver(timeout=polling_interval)
|
||||
observer.schedule(Handler(pool), directory, recursive=recursive)
|
||||
observer.start()
|
||||
while not self.stop_flag.is_set():
|
||||
try:
|
||||
while observer.is_alive():
|
||||
observer.join(timeout)
|
||||
if self.stop_flag.is_set():
|
||||
observer.stop()
|
||||
for changes in watch(
|
||||
directory,
|
||||
watch_filter=consumer_filter,
|
||||
rust_timeout=timeout_ms,
|
||||
yield_on_timeout=True,
|
||||
force_polling=use_polling,
|
||||
poll_delay_ms=poll_delay_ms,
|
||||
recursive=recursive,
|
||||
stop_event=self.stop_flag,
|
||||
):
|
||||
# Process each change
|
||||
for change_type, path in changes:
|
||||
path = Path(path).resolve()
|
||||
logger.debug(f"Event: {change_type.name} for {path}")
|
||||
tracker.track(path, change_type)
|
||||
|
||||
# Check for stable files
|
||||
for stable_path in tracker.get_stable_files():
|
||||
_consume_file(
|
||||
filepath=stable_path,
|
||||
consumption_dir=directory,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
)
|
||||
|
||||
# Exit watch loop to reconfigure timeout
|
||||
break
|
||||
|
||||
# Determine next timeout
|
||||
if tracker.has_pending_files():
|
||||
# Check pending files at stability interval
|
||||
timeout_ms = stability_timeout_ms
|
||||
elif is_testing:
|
||||
# In testing, use short timeout to check stop flag
|
||||
timeout_ms = testing_timeout_ms
|
||||
else:
|
||||
# No pending files, wait indefinitely
|
||||
timeout_ms = 0
|
||||
|
||||
except KeyboardInterrupt:
|
||||
observer.stop()
|
||||
observer.join()
|
||||
|
||||
def handle_inotify(self, directory, recursive, *, is_testing: bool):
|
||||
logger.info(f"Using inotify to watch directory for changes: {directory}")
|
||||
|
||||
timeout_ms = None
|
||||
if is_testing:
|
||||
timeout_ms = self.testing_timeout_ms
|
||||
logger.debug(f"Configuring timeout to {timeout_ms}ms")
|
||||
|
||||
inotify = INotify()
|
||||
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
|
||||
if recursive:
|
||||
inotify.add_watch_recursive(directory, inotify_flags)
|
||||
else:
|
||||
inotify.add_watch(directory, inotify_flags)
|
||||
|
||||
inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
|
||||
inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
|
||||
|
||||
finished = False
|
||||
|
||||
notified_files = {}
|
||||
|
||||
try:
|
||||
while not finished:
|
||||
try:
|
||||
for event in inotify.read(timeout=timeout_ms):
|
||||
path = inotify.get_path(event.wd) if recursive else directory
|
||||
filepath = Path(path) / event.name
|
||||
if flags.MODIFY in flags.from_mask(event.mask):
|
||||
notified_files.pop(filepath, None)
|
||||
else:
|
||||
notified_files[filepath] = monotonic()
|
||||
|
||||
# Check the files against the timeout
|
||||
still_waiting = {}
|
||||
# last_event_time is time of the last inotify event for this file
|
||||
for filepath, last_event_time in notified_files.items():
|
||||
# Current time - last time over the configured timeout
|
||||
waited_long_enough = (
|
||||
monotonic() - last_event_time
|
||||
) > inotify_debounce_secs
|
||||
|
||||
# Also make sure the file exists still, some scanners might write a
|
||||
# temporary file first
|
||||
try:
|
||||
file_still_exists = filepath.exists() and filepath.is_file()
|
||||
except (PermissionError, OSError): # pragma: no cover
|
||||
# If we can't check, let it fail in the _consume function
|
||||
file_still_exists = True
|
||||
continue
|
||||
|
||||
if waited_long_enough and file_still_exists:
|
||||
_consume(filepath)
|
||||
elif file_still_exists:
|
||||
still_waiting[filepath] = last_event_time
|
||||
|
||||
# These files are still waiting to hit the timeout
|
||||
notified_files = still_waiting
|
||||
|
||||
# If files are waiting, need to exit read() to check them
|
||||
# Otherwise, go back to infinite sleep time, but only if not testing
|
||||
if len(notified_files) > 0:
|
||||
timeout_ms = inotify_debounce_ms
|
||||
elif is_testing:
|
||||
timeout_ms = self.testing_timeout_ms
|
||||
else:
|
||||
timeout_ms = None
|
||||
|
||||
if self.stop_flag.is_set():
|
||||
logger.debug("Finishing because event is set")
|
||||
finished = True
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received SIGINT, stopping inotify")
|
||||
finished = True
|
||||
finally:
|
||||
inotify.close()
|
||||
logger.info("Received interrupt, stopping consumer")
|
||||
self.stop_flag.set()
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user