mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-01-14 21:54:22 -06:00
Compare commits
3 Commits
feature-si
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
25169a48b6 | ||
|
|
45b7f9577c | ||
|
|
af1e7bc557 |
@@ -501,7 +501,7 @@ The `datetime` filter formats a datetime string or datetime object using Python'
|
||||
See the [strftime format code documentation](https://docs.python.org/3.13/library/datetime.html#strftime-and-strptime-format-codes)
|
||||
for the possible codes and their meanings.
|
||||
|
||||
##### Date Localization {#date-localization}
|
||||
##### Date Localization
|
||||
|
||||
The `localize_date` filter formats a date or datetime object into a localized string using Babel internationalization.
|
||||
This takes into account the provided locale for translation. Since this must be used on a date or datetime object,
|
||||
@@ -851,8 +851,8 @@ followed by the even pages.
|
||||
|
||||
It's important that the scan files get consumed in the correct order, and one at a time.
|
||||
You therefore need to make sure that Paperless is running while you upload the files into
|
||||
the directory; and if you're using polling, make sure that
|
||||
`CONSUMER_POLLING_INTERVAL` is set to a value lower than it takes for the second scan to appear,
|
||||
the directory; and if you're using [polling](configuration.md#polling), make sure that
|
||||
`CONSUMER_POLLING` is set to a value lower than it takes for the second scan to appear,
|
||||
like 5-10 or even lower.
|
||||
|
||||
Another thing that might happen is that you start a double sided scan, but then forget
|
||||
|
||||
@@ -1175,45 +1175,21 @@ don't exist yet.
|
||||
|
||||
#### [`PAPERLESS_CONSUMER_IGNORE_PATTERNS=<json>`](#PAPERLESS_CONSUMER_IGNORE_PATTERNS) {#PAPERLESS_CONSUMER_IGNORE_PATTERNS}
|
||||
|
||||
: Additional regex patterns for files to ignore in the consumption directory. Patterns are matched against filenames only (not full paths)
|
||||
using Python's `re.match()`, which anchors at the start of the filename.
|
||||
: By default, paperless ignores certain files and folders in the
|
||||
consumption directory, such as system files created by the Mac OS
|
||||
or hidden folders some tools use to store data.
|
||||
|
||||
See the [watchfiles documentation](https://watchfiles.helpmanual.io/api/filters/#watchfiles.BaseFilter.ignore_entity_patterns)
|
||||
This can be adjusted by configuring a custom json array with
|
||||
patterns to exclude.
|
||||
|
||||
This setting is for additional patterns beyond the built-in defaults. Common system files and directories are already ignored automatically.
|
||||
The patterns will be compiled via Python's standard `re` module.
|
||||
For example, `.DS_STORE/*` will ignore any files found in a folder
|
||||
named `.DS_STORE`, including `.DS_STORE/bar.pdf` and `foo/.DS_STORE/bar.pdf`
|
||||
|
||||
Example custom patterns:
|
||||
A pattern like `._*` will ignore anything starting with `._`, including:
|
||||
`._foo.pdf` and `._bar/foo.pdf`
|
||||
|
||||
```json
|
||||
["^temp_", "\\.bak$", "^~"]
|
||||
```
|
||||
|
||||
This would ignore:
|
||||
|
||||
- Files starting with `temp_` (e.g., `temp_scan.pdf`)
|
||||
- Files ending with `.bak` (e.g., `document.pdf.bak`)
|
||||
- Files starting with `~` (e.g., `~$document.docx`)
|
||||
|
||||
Defaults to `[]` (empty list, uses only built-in defaults).
|
||||
|
||||
The default ignores are `[.DS_Store, .DS_STORE, ._*, desktop.ini, Thumbs.db]` and cannot be overridden.
|
||||
|
||||
#### [`PAPERLESS_CONSUMER_IGNORE_DIRS=<json>`](#PAPERLESS_CONSUMER_IGNORE_DIRS) {#PAPERLESS_CONSUMER_IGNORE_DIRS}
|
||||
|
||||
: Additional directory names to ignore in the consumption directory. Directories matching these names (and all their contents) will be skipped.
|
||||
|
||||
This setting is for additional directories beyond the built-in defaults. Matching is done by directory name only, not full path.
|
||||
|
||||
Example:
|
||||
|
||||
```json
|
||||
["temp", "incoming", ".hidden"]
|
||||
```
|
||||
|
||||
Defaults to `[]` (empty list, uses only built-in defaults).
|
||||
|
||||
The default ignores are `[.stfolder, .stversions, .localized, @eaDir, .Spotlight-V100, .Trashes, __MACOSX]` and cannot be overridden.
|
||||
Defaults to
|
||||
`[".DS_Store", ".DS_STORE", "._*", ".stfolder/*", ".stversions/*", ".localized/*", "desktop.ini", "@eaDir/*", "Thumbs.db"]`.
|
||||
|
||||
#### [`PAPERLESS_CONSUMER_BARCODE_SCANNER=<string>`](#PAPERLESS_CONSUMER_BARCODE_SCANNER) {#PAPERLESS_CONSUMER_BARCODE_SCANNER}
|
||||
|
||||
@@ -1312,24 +1288,48 @@ within your documents.
|
||||
|
||||
Defaults to false.
|
||||
|
||||
#### [`PAPERLESS_CONSUMER_POLLING_INTERVAL=<num>`](#PAPERLESS_CONSUMER_POLLING_INTERVAL) {#PAPERLESS_CONSUMER_POLLING_INTERVAL}
|
||||
### Polling {#polling}
|
||||
|
||||
: Configures how the consumer detects new files in the consumption directory.
|
||||
#### [`PAPERLESS_CONSUMER_POLLING=<num>`](#PAPERLESS_CONSUMER_POLLING) {#PAPERLESS_CONSUMER_POLLING}
|
||||
|
||||
When set to `0` (default), paperless uses native filesystem notifications for efficient, immediate detection of new files.
|
||||
: If paperless won't find documents added to your consume folder, it
|
||||
might not be able to automatically detect filesystem changes. In
|
||||
that case, specify a polling interval in seconds here, which will
|
||||
then cause paperless to periodically check your consumption
|
||||
directory for changes. This will also disable listening for file
|
||||
system changes with `inotify`.
|
||||
|
||||
When set to a positive number, paperless polls the consumption directory at that interval in seconds. Use polling for network filesystems (NFS, SMB/CIFS) where native notifications may not work reliably.
|
||||
Defaults to 0, which disables polling and uses filesystem
|
||||
notifications.
|
||||
|
||||
Defaults to 0.
|
||||
#### [`PAPERLESS_CONSUMER_POLLING_RETRY_COUNT=<num>`](#PAPERLESS_CONSUMER_POLLING_RETRY_COUNT) {#PAPERLESS_CONSUMER_POLLING_RETRY_COUNT}
|
||||
|
||||
#### [`PAPERLESS_CONSUMER_STABILITY_DELAY=<num>`](#PAPERLESS_CONSUMER_STABILITY_DELAY) {#PAPERLESS_CONSUMER_STABILITY_DELAY}
|
||||
: If consumer polling is enabled, sets the maximum number of times
|
||||
paperless will check for a file to remain unmodified. If a file's
|
||||
modification time and size are identical for two consecutive checks, it
|
||||
will be consumed.
|
||||
|
||||
: Sets the time in seconds that a file must remain unchanged (same size and modification time) before paperless will begin consuming it.
|
||||
Defaults to 5.
|
||||
|
||||
Increase this value if you experience issues with files being consumed before they are fully written, particularly on slower network storage or
|
||||
with certain scanner quirks
|
||||
#### [`PAPERLESS_CONSUMER_POLLING_DELAY=<num>`](#PAPERLESS_CONSUMER_POLLING_DELAY) {#PAPERLESS_CONSUMER_POLLING_DELAY}
|
||||
|
||||
Defaults to 5.0 seconds.
|
||||
: If consumer polling is enabled, sets the delay in seconds between
|
||||
each check (above) paperless will do while waiting for a file to
|
||||
remain unmodified.
|
||||
|
||||
Defaults to 5.
|
||||
|
||||
### iNotify {#inotify}
|
||||
|
||||
#### [`PAPERLESS_CONSUMER_INOTIFY_DELAY=<num>`](#PAPERLESS_CONSUMER_INOTIFY_DELAY) {#PAPERLESS_CONSUMER_INOTIFY_DELAY}
|
||||
|
||||
: Sets the time in seconds the consumer will wait for additional
|
||||
events from inotify before the consumer will consider a file ready
|
||||
and begin consumption. Certain scanners or network setups may
|
||||
generate multiple events for a single file, leading to multiple
|
||||
consumers working on the same file. Configure this to prevent that.
|
||||
|
||||
Defaults to 0.5 seconds.
|
||||
|
||||
## Workflow webhooks
|
||||
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
# v3 Migration Guide
|
||||
|
||||
## Consumer Settings Changes
|
||||
|
||||
The v3 consumer command uses a [different library](https://watchfiles.helpmanual.io/) to unify
|
||||
the watching for new files in the consume directory. For the user, this removes several configuration options related to delays and retries
|
||||
and replaces with a single unified setting. It also adjusts how the consumer ignore filtering happens, replaced `fnmatch` with `regex` and
|
||||
separating the directory ignore from the file ignore.
|
||||
|
||||
### Summary
|
||||
|
||||
| Old Setting | New Setting | Notes |
|
||||
| ------------------------------ | ----------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
|
||||
| `CONSUMER_POLLING` | [`CONSUMER_POLLING_INTERVAL`](configuration.md#PAPERLESS_CONSUMER_POLLING_INTERVAL) | Renamed for clarity |
|
||||
| `CONSUMER_INOTIFY_DELAY` | [`CONSUMER_STABILITY_DELAY`](configuration.md#PAPERLESS_CONSUMER_STABILITY_DELAY) | Unified for all modes |
|
||||
| `CONSUMER_POLLING_DELAY` | _Removed_ | Use `CONSUMER_STABILITY_DELAY` |
|
||||
| `CONSUMER_POLLING_RETRY_COUNT` | _Removed_ | Automatic with stability tracking |
|
||||
| `CONSUMER_IGNORE_PATTERNS` | [`CONSUMER_IGNORE_PATTERNS`](configuration.md#PAPERLESS_CONSUMER_IGNORE_PATTERNS) | **Now regex, not fnmatch**; user patterns are added to (not replacing) default ones |
|
||||
| _New_ | [`CONSUMER_IGNORE_DIRS`](configuration.md#PAPERLESS_CONSUMER_IGNORE_DIRS) | Additional directories to ignore; user entries are added to (not replacing) defaults |
|
||||
@@ -124,7 +124,8 @@ account. The script essentially automatically performs the steps described in [D
|
||||
system notifications with `inotify`. When storing the consumption
|
||||
directory on such a file system, paperless will not pick up new
|
||||
files with the default configuration. You will need to use
|
||||
[`PAPERLESS_CONSUMER_POLLING_INTERVAL`](configuration.md#PAPERLESS_CONSUMER_POLLING_INTERVAL), which will disable inotify.
|
||||
[`PAPERLESS_CONSUMER_POLLING`](configuration.md#PAPERLESS_CONSUMER_POLLING), which will disable inotify. See
|
||||
[here](configuration.md#polling).
|
||||
|
||||
5. Run `docker compose pull`. This will pull the image from the GitHub container registry
|
||||
by default but you can change the image to pull from Docker Hub by changing the `image`
|
||||
|
||||
@@ -46,7 +46,7 @@ run:
|
||||
If you notice that the consumer will only pickup files in the
|
||||
consumption directory at startup, but won't find any other files added
|
||||
later, you will need to enable filesystem polling with the configuration
|
||||
option [`PAPERLESS_CONSUMER_POLLING_INTERVAL`](configuration.md#PAPERLESS_CONSUMER_POLLING_INTERVAL).
|
||||
option [`PAPERLESS_CONSUMER_POLLING`](configuration.md#PAPERLESS_CONSUMER_POLLING).
|
||||
|
||||
This will disable listening to filesystem changes with inotify and
|
||||
paperless will manually check the consumption directory for changes
|
||||
@@ -234,9 +234,47 @@ FileNotFoundError: [Errno 2] No such file or directory: '/tmp/ocrmypdf.io.yhk3zb
|
||||
|
||||
This probably indicates paperless tried to consume the same file twice.
|
||||
This can happen for a number of reasons, depending on how documents are
|
||||
placed into the consume folder, such as how a scanner may modify a file multiple times as it scans.
|
||||
Try adjusting the
|
||||
[file stability delay](configuration.md#PAPERLESS_CONSUMER_STABILITY_DELAY) to a larger value.
|
||||
placed into the consume folder. If paperless is using inotify (the
|
||||
default) to check for documents, try adjusting the
|
||||
[inotify configuration](configuration.md#inotify). If polling is enabled, try adjusting the
|
||||
[polling configuration](configuration.md#polling).
|
||||
|
||||
## Consumer fails waiting for file to remain unmodified.
|
||||
|
||||
You might find messages like these in your log files:
|
||||
|
||||
```
|
||||
[ERROR] [paperless.management.consumer] Timeout while waiting on file /usr/src/paperless/src/../consume/SCN_0001.pdf to remain unmodified.
|
||||
```
|
||||
|
||||
This indicates paperless timed out while waiting for the file to be
|
||||
completely written to the consume folder. Adjusting
|
||||
[polling configuration](configuration.md#polling) values should resolve the issue.
|
||||
|
||||
!!! note
|
||||
|
||||
The user will need to manually move the file out of the consume folder
|
||||
and back in, for the initial failing file to be consumed.
|
||||
|
||||
## Consumer fails reporting "OS reports file as busy still".
|
||||
|
||||
You might find messages like these in your log files:
|
||||
|
||||
```
|
||||
[WARNING] [paperless.management.consumer] Not consuming file /usr/src/paperless/src/../consume/SCN_0001.pdf: OS reports file as busy still
|
||||
```
|
||||
|
||||
This indicates paperless was unable to open the file, as the OS reported
|
||||
the file as still being in use. To prevent a crash, paperless did not
|
||||
try to consume the file. If paperless is using inotify (the default) to
|
||||
check for documents, try adjusting the
|
||||
[inotify configuration](configuration.md#inotify). If polling is enabled, try adjusting the
|
||||
[polling configuration](configuration.md#polling).
|
||||
|
||||
!!! note
|
||||
|
||||
The user will need to manually move the file out of the consume folder
|
||||
and back in, for the initial failing file to be consumed.
|
||||
|
||||
## Log reports "Creating PaperlessTask failed".
|
||||
|
||||
|
||||
@@ -565,7 +565,7 @@ This allows for complex logic to be used to generate the title, including [logic
|
||||
and [filters](https://jinja.palletsprojects.com/en/3.1.x/templates/#id11).
|
||||
The template is provided as a string.
|
||||
|
||||
Using Jinja2 Templates is also useful for [Date localization](advanced_usage.md#date-localization) in the title.
|
||||
Using Jinja2 Templates is also useful for [Date localization](advanced_usage.md#Date-Localization) in the title.
|
||||
|
||||
The available inputs differ depending on the type of workflow trigger.
|
||||
This is because at the time of consumption (when the text is to be set), no automatic tags etc. have been
|
||||
|
||||
@@ -69,9 +69,8 @@ nav:
|
||||
- development.md
|
||||
- 'FAQs': faq.md
|
||||
- troubleshooting.md
|
||||
- 'Migration to v3': migration.md
|
||||
- changelog.md
|
||||
copyright: Copyright © 2016 - 2026 Daniel Quinn, Jonas Winkler, and the Paperless-ngx team
|
||||
copyright: Copyright © 2016 - 2023 Daniel Quinn, Jonas Winkler, and the Paperless-ngx team
|
||||
extra:
|
||||
social:
|
||||
- icon: fontawesome/brands/github
|
||||
|
||||
@@ -55,10 +55,10 @@
|
||||
#PAPERLESS_TASK_WORKERS=1
|
||||
#PAPERLESS_THREADS_PER_WORKER=1
|
||||
#PAPERLESS_TIME_ZONE=UTC
|
||||
#PAPERLESS_CONSUMER_POLLING_INTERVAL=10
|
||||
#PAPERLESS_CONSUMER_POLLING=10
|
||||
#PAPERLESS_CONSUMER_DELETE_DUPLICATES=false
|
||||
#PAPERLESS_CONSUMER_RECURSIVE=false
|
||||
#PAPERLESS_CONSUMER_IGNORE_PATTERNS=[] # Defaults are built in; add filename regexes, e.g. ["^\\.DS_Store$", "^desktop\\.ini$"]
|
||||
#PAPERLESS_CONSUMER_IGNORE_PATTERNS=[".DS_STORE/*", "._*", ".stfolder/*", ".stversions/*", ".localized/*", "desktop.ini"]
|
||||
#PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS=false
|
||||
#PAPERLESS_CONSUMER_ENABLE_BARCODES=false
|
||||
#PAPERLESS_CONSUMER_BARCODE_STRING=PATCHT
|
||||
|
||||
@@ -28,7 +28,7 @@ dependencies = [
|
||||
# Only patch versions are guaranteed to not introduce breaking changes.
|
||||
"django~=5.2.5",
|
||||
"django-allauth[mfa,socialaccount]~=65.12.1",
|
||||
"django-auditlog~=3.3.0",
|
||||
"django-auditlog~=3.4.1",
|
||||
"django-cachalot~=2.8.0",
|
||||
"django-celery-results~=2.6.0",
|
||||
"django-compression-middleware~=0.5.0",
|
||||
@@ -47,9 +47,10 @@ dependencies = [
|
||||
"faiss-cpu>=1.10",
|
||||
"filelock~=3.20.0",
|
||||
"flower~=2.0.1",
|
||||
"gotenberg-client~=0.12.0",
|
||||
"gotenberg-client~=0.13.1",
|
||||
"httpx-oauth~=0.16",
|
||||
"imap-tools~=1.11.0",
|
||||
"inotifyrecursive~=0.3",
|
||||
"jinja2~=3.1.5",
|
||||
"langdetect~=1.0.9",
|
||||
"llama-index-core>=0.12.33.post1",
|
||||
@@ -59,7 +60,7 @@ dependencies = [
|
||||
"llama-index-llms-openai>=0.3.38",
|
||||
"llama-index-vector-stores-faiss>=0.3",
|
||||
"nltk~=3.9.1",
|
||||
"ocrmypdf~=16.12.0",
|
||||
"ocrmypdf~=16.13.0",
|
||||
"openai>=1.76",
|
||||
"pathvalidate~=3.3.1",
|
||||
"pdf2image~=1.17.0",
|
||||
@@ -77,7 +78,7 @@ dependencies = [
|
||||
"setproctitle~=1.3.4",
|
||||
"tika-client~=0.10.0",
|
||||
"tqdm~=4.67.1",
|
||||
"watchfiles>=1.1.1",
|
||||
"watchdog~=6.0",
|
||||
"whitenoise~=6.9",
|
||||
"whoosh-reloaded>=2.7.5",
|
||||
"zxing-cpp~=2.3.0",
|
||||
@@ -90,7 +91,7 @@ optional-dependencies.postgres = [
|
||||
"psycopg[c,pool]==3.2.12",
|
||||
# Direct dependency for proper resolution of the pre-built wheels
|
||||
"psycopg-c==3.2.12",
|
||||
"psycopg-pool==3.2.7",
|
||||
"psycopg-pool==3.3",
|
||||
]
|
||||
optional-dependencies.webserver = [
|
||||
"granian[uvloop]~=2.5.1",
|
||||
@@ -125,7 +126,7 @@ testing = [
|
||||
]
|
||||
|
||||
lint = [
|
||||
"pre-commit~=4.4.0",
|
||||
"pre-commit~=4.5.1",
|
||||
"pre-commit-uv~=4.2.0",
|
||||
"ruff~=0.14.0",
|
||||
]
|
||||
|
||||
@@ -1,343 +1,135 @@
|
||||
"""
|
||||
Document consumer management command.
|
||||
|
||||
Watches a consumption directory for new documents and queues them for processing.
|
||||
Uses watchfiles for efficient file system monitoring with support for both
|
||||
native OS notifications and polling fallback.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from fnmatch import filter
|
||||
from pathlib import Path
|
||||
from pathlib import PurePath
|
||||
from threading import Event
|
||||
from time import monotonic
|
||||
from typing import TYPE_CHECKING
|
||||
from time import sleep
|
||||
from typing import Final
|
||||
|
||||
from django import db
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.core.management.base import CommandError
|
||||
from watchfiles import Change
|
||||
from watchfiles import DefaultFilter
|
||||
from watchfiles import watch
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers.polling import PollingObserver
|
||||
|
||||
from documents.data_models import ConsumableDocument
|
||||
from documents.data_models import DocumentMetadataOverrides
|
||||
from documents.data_models import DocumentSource
|
||||
from documents.models import Tag
|
||||
from documents.parsers import get_supported_file_extensions
|
||||
from documents.parsers import is_file_ext_supported
|
||||
from documents.tasks import consume_file
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
|
||||
try:
|
||||
from inotifyrecursive import INotify
|
||||
from inotifyrecursive import flags
|
||||
except ImportError: # pragma: no cover
|
||||
INotify = flags = None
|
||||
|
||||
logger = logging.getLogger("paperless.management.consumer")
|
||||
|
||||
|
||||
@dataclass
|
||||
class TrackedFile:
|
||||
"""Represents a file being tracked for stability."""
|
||||
|
||||
path: Path
|
||||
last_event_time: float
|
||||
last_mtime: float | None = None
|
||||
last_size: int | None = None
|
||||
|
||||
def update_stats(self) -> bool:
|
||||
"""
|
||||
Update file stats. Returns True if file exists and stats were updated.
|
||||
"""
|
||||
try:
|
||||
stat = self.path.stat()
|
||||
self.last_mtime = stat.st_mtime
|
||||
self.last_size = stat.st_size
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def is_unchanged(self) -> bool:
|
||||
"""
|
||||
Check if file stats match the previously recorded values.
|
||||
Returns False if file doesn't exist or stats changed.
|
||||
"""
|
||||
try:
|
||||
stat = self.path.stat()
|
||||
return stat.st_mtime == self.last_mtime and stat.st_size == self.last_size
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
class FileStabilityTracker:
|
||||
def _tags_from_path(filepath: Path) -> list[int]:
|
||||
"""
|
||||
Tracks file events and determines when files are stable for consumption.
|
||||
|
||||
A file is considered stable when:
|
||||
1. No new events have been received for it within the stability delay
|
||||
2. Its size and modification time haven't changed
|
||||
3. It still exists as a regular file
|
||||
|
||||
This handles various edge cases:
|
||||
- Network copies that write in chunks
|
||||
- Scanners that open/close files multiple times
|
||||
- Temporary files that get renamed
|
||||
- Files that are deleted before becoming stable
|
||||
"""
|
||||
|
||||
def __init__(self, stability_delay: float = 1.0) -> None:
|
||||
"""
|
||||
Initialize the tracker.
|
||||
|
||||
Args:
|
||||
stability_delay: Time in seconds a file must remain unchanged
|
||||
before being considered stable.
|
||||
"""
|
||||
self.stability_delay = stability_delay
|
||||
self._tracked: dict[Path, TrackedFile] = {}
|
||||
|
||||
def track(self, path: Path, change: Change) -> None:
|
||||
"""
|
||||
Register a file event.
|
||||
|
||||
Args:
|
||||
path: The file path that changed.
|
||||
change: The type of change (added, modified, deleted).
|
||||
"""
|
||||
path = path.resolve()
|
||||
|
||||
match change:
|
||||
case Change.deleted:
|
||||
self._tracked.pop(path, None)
|
||||
logger.debug(f"Stopped tracking deleted file: {path}")
|
||||
case Change.added | Change.modified:
|
||||
current_time = monotonic()
|
||||
if path in self._tracked:
|
||||
tracked = self._tracked[path]
|
||||
tracked.last_event_time = current_time
|
||||
tracked.update_stats()
|
||||
logger.debug(f"Updated tracking for: {path}")
|
||||
else:
|
||||
tracked = TrackedFile(path=path, last_event_time=current_time)
|
||||
if tracked.update_stats():
|
||||
self._tracked[path] = tracked
|
||||
logger.debug(f"Started tracking: {path}")
|
||||
else:
|
||||
logger.debug(f"Could not stat file, not tracking: {path}")
|
||||
|
||||
def get_stable_files(self) -> Iterator[Path]:
|
||||
"""
|
||||
Yield files that have been stable for the configured delay.
|
||||
|
||||
Files are removed from tracking once yielded or determined to be invalid.
|
||||
"""
|
||||
current_time = monotonic()
|
||||
to_remove: list[Path] = []
|
||||
to_yield: list[Path] = []
|
||||
|
||||
for path, tracked in self._tracked.items():
|
||||
time_since_event = current_time - tracked.last_event_time
|
||||
|
||||
if time_since_event < self.stability_delay:
|
||||
continue
|
||||
|
||||
# File has waited long enough, verify it's unchanged
|
||||
if not tracked.is_unchanged():
|
||||
# Stats changed or file gone - update and wait again
|
||||
if tracked.update_stats():
|
||||
tracked.last_event_time = current_time
|
||||
logger.debug(f"File changed during stability check: {path}")
|
||||
else:
|
||||
# File no longer exists, remove from tracking
|
||||
to_remove.append(path)
|
||||
logger.debug(f"File disappeared during stability check: {path}")
|
||||
continue
|
||||
|
||||
# File is stable, we can return it
|
||||
to_yield.append(path)
|
||||
logger.info(f"File is stable: {path}")
|
||||
|
||||
# Remove files that are no longer valid
|
||||
for path in to_remove:
|
||||
self._tracked.pop(path, None)
|
||||
|
||||
# Remove and yield stable files
|
||||
for path in to_yield:
|
||||
self._tracked.pop(path, None)
|
||||
yield path
|
||||
|
||||
def has_pending_files(self) -> bool:
|
||||
"""Check if there are files waiting for stability check."""
|
||||
return len(self._tracked) > 0
|
||||
|
||||
@property
|
||||
def pending_count(self) -> int:
|
||||
"""Number of files being tracked."""
|
||||
return len(self._tracked)
|
||||
|
||||
|
||||
class ConsumerFilter(DefaultFilter):
|
||||
"""
|
||||
Filter for watchfiles that accepts only supported document types
|
||||
and ignores system files/directories.
|
||||
|
||||
Extends DefaultFilter leveraging its built-in filtering:
|
||||
- `ignore_dirs`: Directory names to ignore (and all their contents)
|
||||
- `ignore_entity_patterns`: Regex patterns matched against filename/dirname only
|
||||
|
||||
We add custom logic for file extension filtering (only accept supported
|
||||
document types), which the library doesn't provide.
|
||||
"""
|
||||
|
||||
# Regex patterns for files to always ignore (matched against filename only)
|
||||
# These are passed to DefaultFilter.ignore_entity_patterns
|
||||
DEFAULT_IGNORE_PATTERNS: Final[tuple[str, ...]] = (
|
||||
r"^\.DS_Store$",
|
||||
r"^\.DS_STORE$",
|
||||
r"^\._.*",
|
||||
r"^desktop\.ini$",
|
||||
r"^Thumbs\.db$",
|
||||
)
|
||||
|
||||
# Directories to always ignore (passed to DefaultFilter.ignore_dirs)
|
||||
# These are matched by directory name, not full path
|
||||
DEFAULT_IGNORE_DIRS: Final[tuple[str, ...]] = (
|
||||
".stfolder", # Syncthing
|
||||
".stversions", # Syncthing
|
||||
".localized", # macOS
|
||||
"@eaDir", # Synology NAS
|
||||
".Spotlight-V100", # macOS
|
||||
".Trashes", # macOS
|
||||
"__MACOSX", # macOS archive artifacts
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
supported_extensions: frozenset[str] | None = None,
|
||||
ignore_patterns: list[str] | None = None,
|
||||
ignore_dirs: list[str] | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the consumer filter.
|
||||
|
||||
Args:
|
||||
supported_extensions: Set of file extensions to accept (e.g., {".pdf", ".png"}).
|
||||
If None, uses get_supported_file_extensions().
|
||||
ignore_patterns: Additional regex patterns to ignore (matched against filename).
|
||||
ignore_dirs: Additional directory names to ignore (merged with defaults).
|
||||
"""
|
||||
# Get supported extensions
|
||||
if supported_extensions is None:
|
||||
supported_extensions = frozenset(get_supported_file_extensions())
|
||||
self._supported_extensions = supported_extensions
|
||||
|
||||
# Combine default and user patterns
|
||||
all_patterns: list[str] = list(self.DEFAULT_IGNORE_PATTERNS)
|
||||
if ignore_patterns:
|
||||
all_patterns.extend(ignore_patterns)
|
||||
|
||||
# Combine default and user ignore_dirs
|
||||
all_ignore_dirs: list[str] = list(self.DEFAULT_IGNORE_DIRS)
|
||||
if ignore_dirs:
|
||||
all_ignore_dirs.extend(ignore_dirs)
|
||||
|
||||
# Let DefaultFilter handle all the pattern and directory filtering
|
||||
super().__init__(
|
||||
ignore_dirs=tuple(all_ignore_dirs),
|
||||
ignore_entity_patterns=tuple(all_patterns),
|
||||
ignore_paths=(),
|
||||
)
|
||||
|
||||
def __call__(self, change: Change, path: str) -> bool:
|
||||
"""
|
||||
Filter function for watchfiles.
|
||||
|
||||
Returns True if the path should be watched, False to ignore.
|
||||
|
||||
The parent DefaultFilter handles:
|
||||
- Hidden files/directories (starting with .)
|
||||
- Directories in ignore_dirs
|
||||
- Files/directories matching ignore_entity_patterns
|
||||
|
||||
We additionally filter files by extension.
|
||||
"""
|
||||
# Let parent filter handle directory ignoring and pattern matching
|
||||
if not super().__call__(change, path):
|
||||
return False
|
||||
|
||||
path_obj = Path(path)
|
||||
|
||||
# For directories, parent filter already handled everything
|
||||
if path_obj.is_dir():
|
||||
return True
|
||||
|
||||
# For files, check extension
|
||||
return self._has_supported_extension(path_obj)
|
||||
|
||||
def _has_supported_extension(self, path: Path) -> bool:
|
||||
"""Check if the file has a supported extension."""
|
||||
suffix = path.suffix.lower()
|
||||
return suffix in self._supported_extensions
|
||||
|
||||
|
||||
def _tags_from_path(filepath: Path, consumption_dir: Path) -> list[int]:
|
||||
"""
|
||||
Walk up the directory tree from filepath to consumption_dir
|
||||
Walk up the directory tree from filepath to CONSUMPTION_DIR
|
||||
and get or create Tag IDs for every directory.
|
||||
|
||||
Returns list of Tag primary keys.
|
||||
Returns set of Tag models
|
||||
"""
|
||||
db.close_old_connections()
|
||||
tag_ids: set[int] = set()
|
||||
path_parts = filepath.relative_to(consumption_dir).parent.parts
|
||||
|
||||
tag_ids = set()
|
||||
path_parts = filepath.relative_to(settings.CONSUMPTION_DIR).parent.parts
|
||||
for part in path_parts:
|
||||
tag, _ = Tag.objects.get_or_create(
|
||||
name__iexact=part,
|
||||
defaults={"name": part},
|
||||
tag_ids.add(
|
||||
Tag.objects.get_or_create(name__iexact=part, defaults={"name": part})[0].pk,
|
||||
)
|
||||
tag_ids.add(tag.pk)
|
||||
|
||||
return list(tag_ids)
|
||||
|
||||
|
||||
def _consume_file(
|
||||
filepath: Path,
|
||||
consumption_dir: Path,
|
||||
*,
|
||||
subdirs_as_tags: bool,
|
||||
) -> None:
|
||||
def _is_ignored(filepath: Path) -> bool:
|
||||
"""
|
||||
Queue a file for consumption.
|
||||
Checks if the given file should be ignored, based on configured
|
||||
patterns.
|
||||
|
||||
Args:
|
||||
filepath: Path to the file to consume.
|
||||
consumption_dir: Base consumption directory.
|
||||
subdirs_as_tags: Whether to create tags from subdirectory names.
|
||||
Returns True if the file is ignored, False otherwise
|
||||
"""
|
||||
# Verify file still exists and is accessible
|
||||
# Trim out the consume directory, leaving only filename and it's
|
||||
# path relative to the consume directory
|
||||
filepath_relative = PurePath(filepath).relative_to(settings.CONSUMPTION_DIR)
|
||||
|
||||
# March through the components of the path, including directories and the filename
|
||||
# looking for anything matching
|
||||
# foo/bar/baz/file.pdf -> (foo, bar, baz, file.pdf)
|
||||
parts = []
|
||||
for part in filepath_relative.parts:
|
||||
# If the part is not the name (ie, it's a dir)
|
||||
# Need to append the trailing slash or fnmatch doesn't match
|
||||
# fnmatch("dir", "dir/*") == False
|
||||
# fnmatch("dir/", "dir/*") == True
|
||||
if part != filepath_relative.name:
|
||||
part = part + "/"
|
||||
parts.append(part)
|
||||
|
||||
for pattern in settings.CONSUMER_IGNORE_PATTERNS:
|
||||
if len(filter(parts, pattern)):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _consume(filepath: Path) -> None:
|
||||
# Check permissions early
|
||||
try:
|
||||
if not filepath.is_file():
|
||||
logger.debug(f"Not consuming {filepath}: not a file or doesn't exist")
|
||||
return
|
||||
except OSError as e:
|
||||
logger.warning(f"Not consuming {filepath}: {e}")
|
||||
filepath.stat()
|
||||
except (PermissionError, OSError):
|
||||
logger.warning(f"Not consuming file {filepath}: Permission denied.")
|
||||
return
|
||||
|
||||
# Get tags from path if configured
|
||||
tag_ids: list[int] | None = None
|
||||
if subdirs_as_tags:
|
||||
try:
|
||||
tag_ids = _tags_from_path(filepath, consumption_dir)
|
||||
except Exception:
|
||||
logger.exception(f"Error creating tags from path for {filepath}")
|
||||
if filepath.is_dir() or _is_ignored(filepath):
|
||||
return
|
||||
|
||||
# Queue for consumption
|
||||
if not filepath.is_file():
|
||||
logger.debug(f"Not consuming file {filepath}: File has moved.")
|
||||
return
|
||||
|
||||
if not is_file_ext_supported(filepath.suffix):
|
||||
logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
|
||||
return
|
||||
|
||||
# Total wait time: up to 500ms
|
||||
os_error_retry_count: Final[int] = 50
|
||||
os_error_retry_wait: Final[float] = 0.01
|
||||
|
||||
read_try_count = 0
|
||||
file_open_ok = False
|
||||
os_error_str = None
|
||||
|
||||
while (read_try_count < os_error_retry_count) and not file_open_ok:
|
||||
try:
|
||||
with filepath.open("rb"):
|
||||
file_open_ok = True
|
||||
except OSError as e:
|
||||
read_try_count += 1
|
||||
os_error_str = str(e)
|
||||
sleep(os_error_retry_wait)
|
||||
|
||||
if read_try_count >= os_error_retry_count:
|
||||
logger.warning(f"Not consuming file {filepath}: OS reports {os_error_str}")
|
||||
return
|
||||
|
||||
tag_ids = None
|
||||
try:
|
||||
logger.info(f"Adding {filepath} to the task queue")
|
||||
if settings.CONSUMER_SUBDIRS_AS_TAGS:
|
||||
tag_ids = _tags_from_path(filepath)
|
||||
except Exception:
|
||||
logger.exception("Error creating tags from path")
|
||||
|
||||
try:
|
||||
logger.info(f"Adding {filepath} to the task queue.")
|
||||
consume_file.delay(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
@@ -346,209 +138,228 @@ def _consume_file(
|
||||
DocumentMetadataOverrides(tag_ids=tag_ids),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(f"Error while queuing document {filepath}")
|
||||
# Catch all so that the consumer won't crash.
|
||||
# This is also what the test case is listening for to check for
|
||||
# errors.
|
||||
logger.exception("Error while consuming document")
|
||||
|
||||
|
||||
def _consume_wait_unmodified(file: Path) -> None:
|
||||
"""
|
||||
Waits for the given file to appear unmodified based on file size
|
||||
and modification time. Will wait a configured number of seconds
|
||||
and retry a configured number of times before either consuming or
|
||||
giving up
|
||||
"""
|
||||
if _is_ignored(file):
|
||||
return
|
||||
|
||||
logger.debug(f"Waiting for file {file} to remain unmodified")
|
||||
mtime = -1
|
||||
size = -1
|
||||
current_try = 0
|
||||
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
|
||||
try:
|
||||
stat_data = file.stat()
|
||||
new_mtime = stat_data.st_mtime
|
||||
new_size = stat_data.st_size
|
||||
except FileNotFoundError:
|
||||
logger.debug(
|
||||
f"File {file} moved while waiting for it to remain unmodified.",
|
||||
)
|
||||
return
|
||||
if new_mtime == mtime and new_size == size:
|
||||
_consume(file)
|
||||
return
|
||||
mtime = new_mtime
|
||||
size = new_size
|
||||
sleep(settings.CONSUMER_POLLING_DELAY)
|
||||
current_try += 1
|
||||
|
||||
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
|
||||
|
||||
|
||||
class Handler(FileSystemEventHandler):
|
||||
def __init__(self, pool: ThreadPoolExecutor) -> None:
|
||||
super().__init__()
|
||||
self._pool = pool
|
||||
|
||||
def on_created(self, event):
|
||||
self._pool.submit(_consume_wait_unmodified, Path(event.src_path))
|
||||
|
||||
def on_moved(self, event):
|
||||
self._pool.submit(_consume_wait_unmodified, Path(event.dest_path))
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Watch a consumption directory and queue new documents for processing.
|
||||
|
||||
Uses watchfiles for efficient file system monitoring. Supports both
|
||||
native OS notifications (inotify on Linux, FSEvents on macOS) and
|
||||
polling for network filesystems.
|
||||
On every iteration of an infinite loop, consume what we can from the
|
||||
consumption directory.
|
||||
"""
|
||||
|
||||
help = "Watch the consumption directory for new documents"
|
||||
|
||||
# For testing - allows tests to stop the consumer
|
||||
stop_flag: Event = Event()
|
||||
|
||||
# Testing timeout in seconds
|
||||
# This is here primarily for the tests and is irrelevant in production.
|
||||
stop_flag = Event()
|
||||
# Also only for testing, configures in one place the timeout used before checking
|
||||
# the stop flag
|
||||
testing_timeout_s: Final[float] = 0.5
|
||||
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
|
||||
|
||||
def add_arguments(self, parser) -> None:
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
"directory",
|
||||
default=None,
|
||||
default=settings.CONSUMPTION_DIR,
|
||||
nargs="?",
|
||||
help="The consumption directory (defaults to CONSUMPTION_DIR setting)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--oneshot",
|
||||
action="store_true",
|
||||
help="Process existing files and exit without watching",
|
||||
help="The consumption directory.",
|
||||
)
|
||||
parser.add_argument("--oneshot", action="store_true", help="Run only once.")
|
||||
|
||||
# Only use during unit testing, will configure a timeout
|
||||
# Leaving it unset or false and the consumer will exit when it
|
||||
# receives SIGINT
|
||||
parser.add_argument(
|
||||
"--testing",
|
||||
action="store_true",
|
||||
help="Enable testing mode with shorter timeouts",
|
||||
help="Flag used only for unit testing",
|
||||
default=False,
|
||||
)
|
||||
|
||||
def handle(self, *args, **options) -> None:
|
||||
# Resolve consumption directory
|
||||
directory = options.get("directory")
|
||||
def handle(self, *args, **options):
|
||||
directory = options["directory"]
|
||||
recursive = settings.CONSUMER_RECURSIVE
|
||||
|
||||
if not directory:
|
||||
directory = getattr(settings, "CONSUMPTION_DIR", None)
|
||||
if not directory:
|
||||
raise CommandError("CONSUMPTION_DIR is not configured")
|
||||
raise CommandError("CONSUMPTION_DIR does not appear to be set.")
|
||||
|
||||
directory = Path(directory).resolve()
|
||||
|
||||
if not directory.exists():
|
||||
raise CommandError(f"Consumption directory does not exist: {directory}")
|
||||
|
||||
if not directory.is_dir():
|
||||
raise CommandError(f"Consumption path is not a directory: {directory}")
|
||||
raise CommandError(f"Consumption directory {directory} does not exist")
|
||||
|
||||
# Ensure scratch directory exists
|
||||
# Consumer will need this
|
||||
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Get settings
|
||||
recursive: bool = settings.CONSUMER_RECURSIVE
|
||||
subdirs_as_tags: bool = settings.CONSUMER_SUBDIRS_AS_TAGS
|
||||
polling_interval: float = settings.CONSUMER_POLLING_INTERVAL
|
||||
stability_delay: float = settings.CONSUMER_STABILITY_DELAY
|
||||
ignore_patterns: list[str] = settings.CONSUMER_IGNORE_PATTERNS
|
||||
ignore_dirs: list[str] = settings.CONSUMER_IGNORE_DIRS
|
||||
is_testing: bool = options.get("testing", False)
|
||||
is_oneshot: bool = options.get("oneshot", False)
|
||||
if recursive:
|
||||
for dirpath, _, filenames in os.walk(directory):
|
||||
for filename in filenames:
|
||||
filepath = Path(dirpath) / filename
|
||||
_consume(filepath)
|
||||
else:
|
||||
for filepath in directory.iterdir():
|
||||
_consume(filepath)
|
||||
|
||||
# Create filter
|
||||
consumer_filter = ConsumerFilter(
|
||||
ignore_patterns=ignore_patterns,
|
||||
ignore_dirs=ignore_dirs,
|
||||
)
|
||||
|
||||
# Process existing files
|
||||
self._process_existing_files(
|
||||
directory=directory,
|
||||
recursive=recursive,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
consumer_filter=consumer_filter,
|
||||
)
|
||||
|
||||
if is_oneshot:
|
||||
logger.info("Oneshot mode: processed existing files, exiting")
|
||||
if options["oneshot"]:
|
||||
return
|
||||
|
||||
# Start watching
|
||||
self._watch_directory(
|
||||
directory=directory,
|
||||
recursive=recursive,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
consumer_filter=consumer_filter,
|
||||
polling_interval=polling_interval,
|
||||
stability_delay=stability_delay,
|
||||
is_testing=is_testing,
|
||||
)
|
||||
|
||||
logger.debug("Consumer exiting")
|
||||
|
||||
def _process_existing_files(
|
||||
self,
|
||||
*,
|
||||
directory: Path,
|
||||
recursive: bool,
|
||||
subdirs_as_tags: bool,
|
||||
consumer_filter: ConsumerFilter,
|
||||
) -> None:
|
||||
"""Process any existing files in the consumption directory."""
|
||||
logger.info(f"Processing existing files in {directory}")
|
||||
|
||||
glob_pattern = "**/*" if recursive else "*"
|
||||
|
||||
for filepath in directory.glob(glob_pattern):
|
||||
# Use filter to check if file should be processed
|
||||
if not filepath.is_file():
|
||||
continue
|
||||
|
||||
if not consumer_filter(Change.added, str(filepath)):
|
||||
continue
|
||||
|
||||
_consume_file(
|
||||
filepath=filepath,
|
||||
consumption_dir=directory,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
)
|
||||
|
||||
def _watch_directory(
|
||||
self,
|
||||
*,
|
||||
directory: Path,
|
||||
recursive: bool,
|
||||
subdirs_as_tags: bool,
|
||||
consumer_filter: ConsumerFilter,
|
||||
polling_interval: float,
|
||||
stability_delay: float,
|
||||
is_testing: bool,
|
||||
) -> None:
|
||||
"""Watch directory for changes and process stable files."""
|
||||
use_polling = polling_interval > 0
|
||||
poll_delay_ms = int(polling_interval * 1000) if use_polling else 0
|
||||
|
||||
if use_polling:
|
||||
logger.info(
|
||||
f"Watching {directory} using polling (interval: {polling_interval}s)",
|
||||
)
|
||||
if settings.CONSUMER_POLLING == 0 and INotify:
|
||||
self.handle_inotify(directory, recursive, is_testing=options["testing"])
|
||||
else:
|
||||
logger.info(f"Watching {directory} using native file system events")
|
||||
if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
|
||||
logger.warning("Using polling as INotify import failed")
|
||||
self.handle_polling(directory, recursive, is_testing=options["testing"])
|
||||
|
||||
# Create stability tracker
|
||||
tracker = FileStabilityTracker(stability_delay=stability_delay)
|
||||
logger.debug("Consumer exiting.")
|
||||
|
||||
# Calculate timeouts
|
||||
stability_timeout_ms = int(stability_delay * 1000)
|
||||
testing_timeout_ms = int(self.testing_timeout_s * 1000)
|
||||
def handle_polling(self, directory, recursive, *, is_testing: bool):
|
||||
logger.info(f"Polling directory for changes: {directory}")
|
||||
|
||||
# Start with no timeout (wait indefinitely for first event)
|
||||
# unless in testing mode
|
||||
timeout_ms = testing_timeout_ms if is_testing else 0
|
||||
timeout = None
|
||||
if is_testing:
|
||||
timeout = self.testing_timeout_s
|
||||
logger.debug(f"Configuring timeout to {timeout}s")
|
||||
|
||||
self.stop_flag.clear()
|
||||
polling_interval = settings.CONSUMER_POLLING
|
||||
if polling_interval == 0: # pragma: no cover
|
||||
# Only happens if INotify failed to import
|
||||
logger.warning("Using polling of 10s, consider setting this")
|
||||
polling_interval = 10
|
||||
|
||||
while not self.stop_flag.is_set():
|
||||
with ThreadPoolExecutor(max_workers=4) as pool:
|
||||
observer = PollingObserver(timeout=polling_interval)
|
||||
observer.schedule(Handler(pool), directory, recursive=recursive)
|
||||
observer.start()
|
||||
try:
|
||||
for changes in watch(
|
||||
directory,
|
||||
watch_filter=consumer_filter,
|
||||
rust_timeout=timeout_ms,
|
||||
yield_on_timeout=True,
|
||||
force_polling=use_polling,
|
||||
poll_delay_ms=poll_delay_ms,
|
||||
recursive=recursive,
|
||||
stop_event=self.stop_flag,
|
||||
):
|
||||
# Process each change
|
||||
for change_type, path in changes:
|
||||
path = Path(path).resolve()
|
||||
if not path.is_file():
|
||||
while observer.is_alive():
|
||||
observer.join(timeout)
|
||||
if self.stop_flag.is_set():
|
||||
observer.stop()
|
||||
except KeyboardInterrupt:
|
||||
observer.stop()
|
||||
observer.join()
|
||||
|
||||
def handle_inotify(self, directory, recursive, *, is_testing: bool):
|
||||
logger.info(f"Using inotify to watch directory for changes: {directory}")
|
||||
|
||||
timeout_ms = None
|
||||
if is_testing:
|
||||
timeout_ms = self.testing_timeout_ms
|
||||
logger.debug(f"Configuring timeout to {timeout_ms}ms")
|
||||
|
||||
inotify = INotify()
|
||||
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
|
||||
if recursive:
|
||||
inotify.add_watch_recursive(directory, inotify_flags)
|
||||
else:
|
||||
inotify.add_watch(directory, inotify_flags)
|
||||
|
||||
inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
|
||||
inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
|
||||
|
||||
finished = False
|
||||
|
||||
notified_files = {}
|
||||
|
||||
try:
|
||||
while not finished:
|
||||
try:
|
||||
for event in inotify.read(timeout=timeout_ms):
|
||||
path = inotify.get_path(event.wd) if recursive else directory
|
||||
filepath = Path(path) / event.name
|
||||
if flags.MODIFY in flags.from_mask(event.mask):
|
||||
notified_files.pop(filepath, None)
|
||||
else:
|
||||
notified_files[filepath] = monotonic()
|
||||
|
||||
# Check the files against the timeout
|
||||
still_waiting = {}
|
||||
# last_event_time is time of the last inotify event for this file
|
||||
for filepath, last_event_time in notified_files.items():
|
||||
# Current time - last time over the configured timeout
|
||||
waited_long_enough = (
|
||||
monotonic() - last_event_time
|
||||
) > inotify_debounce_secs
|
||||
|
||||
# Also make sure the file exists still, some scanners might write a
|
||||
# temporary file first
|
||||
try:
|
||||
file_still_exists = filepath.exists() and filepath.is_file()
|
||||
except (PermissionError, OSError): # pragma: no cover
|
||||
# If we can't check, let it fail in the _consume function
|
||||
file_still_exists = True
|
||||
continue
|
||||
logger.debug(f"Event: {change_type.name} for {path}")
|
||||
tracker.track(path, change_type)
|
||||
|
||||
# Check for stable files
|
||||
for stable_path in tracker.get_stable_files():
|
||||
_consume_file(
|
||||
filepath=stable_path,
|
||||
consumption_dir=directory,
|
||||
subdirs_as_tags=subdirs_as_tags,
|
||||
)
|
||||
if waited_long_enough and file_still_exists:
|
||||
_consume(filepath)
|
||||
elif file_still_exists:
|
||||
still_waiting[filepath] = last_event_time
|
||||
|
||||
# Exit watch loop to reconfigure timeout
|
||||
break
|
||||
# These files are still waiting to hit the timeout
|
||||
notified_files = still_waiting
|
||||
|
||||
# Determine next timeout
|
||||
if tracker.has_pending_files():
|
||||
# Check pending files at stability interval
|
||||
timeout_ms = stability_timeout_ms
|
||||
elif is_testing:
|
||||
# In testing, use short timeout to check stop flag
|
||||
timeout_ms = testing_timeout_ms
|
||||
else: # pragma: nocover
|
||||
# No pending files, wait indefinitely
|
||||
timeout_ms = 0
|
||||
# If files are waiting, need to exit read() to check them
|
||||
# Otherwise, go back to infinite sleep time, but only if not testing
|
||||
if len(notified_files) > 0:
|
||||
timeout_ms = inotify_debounce_ms
|
||||
elif is_testing:
|
||||
timeout_ms = self.testing_timeout_ms
|
||||
else:
|
||||
timeout_ms = None
|
||||
|
||||
except KeyboardInterrupt: # pragma: nocover
|
||||
logger.info("Received interrupt, stopping consumer")
|
||||
self.stop_flag.set()
|
||||
if self.stop_flag.is_set():
|
||||
logger.debug("Finishing because event is set")
|
||||
finished = True
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received SIGINT, stopping inotify")
|
||||
finished = True
|
||||
finally:
|
||||
inotify.close()
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1044,30 +1044,29 @@ IGNORABLE_FILES: Final[list[str]] = [
|
||||
"Thumbs.db",
|
||||
]
|
||||
|
||||
CONSUMER_POLLING_INTERVAL = float(os.getenv("PAPERLESS_CONSUMER_POLLING_INTERVAL", 0))
|
||||
CONSUMER_POLLING = int(os.getenv("PAPERLESS_CONSUMER_POLLING", 0))
|
||||
|
||||
CONSUMER_STABILITY_DELAY = float(os.getenv("PAPERLESS_CONSUMER_STABILITY_DELAY", 5))
|
||||
CONSUMER_POLLING_DELAY = int(os.getenv("PAPERLESS_CONSUMER_POLLING_DELAY", 5))
|
||||
|
||||
CONSUMER_POLLING_RETRY_COUNT = int(
|
||||
os.getenv("PAPERLESS_CONSUMER_POLLING_RETRY_COUNT", 5),
|
||||
)
|
||||
|
||||
CONSUMER_INOTIFY_DELAY: Final[float] = __get_float(
|
||||
"PAPERLESS_CONSUMER_INOTIFY_DELAY",
|
||||
0.5,
|
||||
)
|
||||
|
||||
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
|
||||
|
||||
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
|
||||
|
||||
# Ignore regex patterns, matched against filename only
|
||||
# Ignore glob patterns, relative to PAPERLESS_CONSUMPTION_DIR
|
||||
CONSUMER_IGNORE_PATTERNS = list(
|
||||
json.loads(
|
||||
os.getenv(
|
||||
"PAPERLESS_CONSUMER_IGNORE_PATTERNS",
|
||||
json.dumps([]),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# Directories to always ignore. These are matched by directory name, not full path
|
||||
CONSUMER_IGNORE_DIRS = list(
|
||||
json.loads(
|
||||
os.getenv(
|
||||
"PAPERLESS_CONSUMER_IGNORE_DIRS",
|
||||
json.dumps([]),
|
||||
json.dumps(IGNORABLE_FILES),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -11,14 +11,12 @@ from paperless_ai.chat import stream_chat_with_documents
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_embed_model():
|
||||
from llama_index.core import settings as llama_settings
|
||||
from llama_index.core.embeddings.utils import MockEmbedding
|
||||
|
||||
mock_embed_model = MagicMock()
|
||||
mock_embed_model._get_text_embedding_batch.return_value = [
|
||||
[0.1] * 1536,
|
||||
] # 1 vector per input
|
||||
llama_settings.Settings._embed_model = mock_embed_model
|
||||
mock_embed_model = MockEmbedding(embed_dim=8)
|
||||
llama_settings.Settings.embed_model = mock_embed_model
|
||||
yield
|
||||
llama_settings.Settings._embed_model = None
|
||||
llama_settings.Settings.embed_model = None
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
|
||||
Reference in New Issue
Block a user