mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-01-14 21:54:22 -06:00
Compare commits
3 Commits
feature-si
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
25169a48b6 | ||
|
|
45b7f9577c | ||
|
|
af1e7bc557 |
@@ -501,7 +501,7 @@ The `datetime` filter formats a datetime string or datetime object using Python'
|
|||||||
See the [strftime format code documentation](https://docs.python.org/3.13/library/datetime.html#strftime-and-strptime-format-codes)
|
See the [strftime format code documentation](https://docs.python.org/3.13/library/datetime.html#strftime-and-strptime-format-codes)
|
||||||
for the possible codes and their meanings.
|
for the possible codes and their meanings.
|
||||||
|
|
||||||
##### Date Localization {#date-localization}
|
##### Date Localization
|
||||||
|
|
||||||
The `localize_date` filter formats a date or datetime object into a localized string using Babel internationalization.
|
The `localize_date` filter formats a date or datetime object into a localized string using Babel internationalization.
|
||||||
This takes into account the provided locale for translation. Since this must be used on a date or datetime object,
|
This takes into account the provided locale for translation. Since this must be used on a date or datetime object,
|
||||||
@@ -851,8 +851,8 @@ followed by the even pages.
|
|||||||
|
|
||||||
It's important that the scan files get consumed in the correct order, and one at a time.
|
It's important that the scan files get consumed in the correct order, and one at a time.
|
||||||
You therefore need to make sure that Paperless is running while you upload the files into
|
You therefore need to make sure that Paperless is running while you upload the files into
|
||||||
the directory; and if you're using polling, make sure that
|
the directory; and if you're using [polling](configuration.md#polling), make sure that
|
||||||
`CONSUMER_POLLING_INTERVAL` is set to a value lower than it takes for the second scan to appear,
|
`CONSUMER_POLLING` is set to a value lower than it takes for the second scan to appear,
|
||||||
like 5-10 or even lower.
|
like 5-10 or even lower.
|
||||||
|
|
||||||
Another thing that might happen is that you start a double sided scan, but then forget
|
Another thing that might happen is that you start a double sided scan, but then forget
|
||||||
|
|||||||
@@ -1175,45 +1175,21 @@ don't exist yet.
|
|||||||
|
|
||||||
#### [`PAPERLESS_CONSUMER_IGNORE_PATTERNS=<json>`](#PAPERLESS_CONSUMER_IGNORE_PATTERNS) {#PAPERLESS_CONSUMER_IGNORE_PATTERNS}
|
#### [`PAPERLESS_CONSUMER_IGNORE_PATTERNS=<json>`](#PAPERLESS_CONSUMER_IGNORE_PATTERNS) {#PAPERLESS_CONSUMER_IGNORE_PATTERNS}
|
||||||
|
|
||||||
: Additional regex patterns for files to ignore in the consumption directory. Patterns are matched against filenames only (not full paths)
|
: By default, paperless ignores certain files and folders in the
|
||||||
using Python's `re.match()`, which anchors at the start of the filename.
|
consumption directory, such as system files created by the Mac OS
|
||||||
|
or hidden folders some tools use to store data.
|
||||||
|
|
||||||
See the [watchfiles documentation](https://watchfiles.helpmanual.io/api/filters/#watchfiles.BaseFilter.ignore_entity_patterns)
|
This can be adjusted by configuring a custom json array with
|
||||||
|
patterns to exclude.
|
||||||
|
|
||||||
This setting is for additional patterns beyond the built-in defaults. Common system files and directories are already ignored automatically.
|
For example, `.DS_STORE/*` will ignore any files found in a folder
|
||||||
The patterns will be compiled via Python's standard `re` module.
|
named `.DS_STORE`, including `.DS_STORE/bar.pdf` and `foo/.DS_STORE/bar.pdf`
|
||||||
|
|
||||||
Example custom patterns:
|
A pattern like `._*` will ignore anything starting with `._`, including:
|
||||||
|
`._foo.pdf` and `._bar/foo.pdf`
|
||||||
|
|
||||||
```json
|
Defaults to
|
||||||
["^temp_", "\\.bak$", "^~"]
|
`[".DS_Store", ".DS_STORE", "._*", ".stfolder/*", ".stversions/*", ".localized/*", "desktop.ini", "@eaDir/*", "Thumbs.db"]`.
|
||||||
```
|
|
||||||
|
|
||||||
This would ignore:
|
|
||||||
|
|
||||||
- Files starting with `temp_` (e.g., `temp_scan.pdf`)
|
|
||||||
- Files ending with `.bak` (e.g., `document.pdf.bak`)
|
|
||||||
- Files starting with `~` (e.g., `~$document.docx`)
|
|
||||||
|
|
||||||
Defaults to `[]` (empty list, uses only built-in defaults).
|
|
||||||
|
|
||||||
The default ignores are `[.DS_Store, .DS_STORE, ._*, desktop.ini, Thumbs.db]` and cannot be overridden.
|
|
||||||
|
|
||||||
#### [`PAPERLESS_CONSUMER_IGNORE_DIRS=<json>`](#PAPERLESS_CONSUMER_IGNORE_DIRS) {#PAPERLESS_CONSUMER_IGNORE_DIRS}
|
|
||||||
|
|
||||||
: Additional directory names to ignore in the consumption directory. Directories matching these names (and all their contents) will be skipped.
|
|
||||||
|
|
||||||
This setting is for additional directories beyond the built-in defaults. Matching is done by directory name only, not full path.
|
|
||||||
|
|
||||||
Example:
|
|
||||||
|
|
||||||
```json
|
|
||||||
["temp", "incoming", ".hidden"]
|
|
||||||
```
|
|
||||||
|
|
||||||
Defaults to `[]` (empty list, uses only built-in defaults).
|
|
||||||
|
|
||||||
The default ignores are `[.stfolder, .stversions, .localized, @eaDir, .Spotlight-V100, .Trashes, __MACOSX]` and cannot be overridden.
|
|
||||||
|
|
||||||
#### [`PAPERLESS_CONSUMER_BARCODE_SCANNER=<string>`](#PAPERLESS_CONSUMER_BARCODE_SCANNER) {#PAPERLESS_CONSUMER_BARCODE_SCANNER}
|
#### [`PAPERLESS_CONSUMER_BARCODE_SCANNER=<string>`](#PAPERLESS_CONSUMER_BARCODE_SCANNER) {#PAPERLESS_CONSUMER_BARCODE_SCANNER}
|
||||||
|
|
||||||
@@ -1312,24 +1288,48 @@ within your documents.
|
|||||||
|
|
||||||
Defaults to false.
|
Defaults to false.
|
||||||
|
|
||||||
#### [`PAPERLESS_CONSUMER_POLLING_INTERVAL=<num>`](#PAPERLESS_CONSUMER_POLLING_INTERVAL) {#PAPERLESS_CONSUMER_POLLING_INTERVAL}
|
### Polling {#polling}
|
||||||
|
|
||||||
: Configures how the consumer detects new files in the consumption directory.
|
#### [`PAPERLESS_CONSUMER_POLLING=<num>`](#PAPERLESS_CONSUMER_POLLING) {#PAPERLESS_CONSUMER_POLLING}
|
||||||
|
|
||||||
When set to `0` (default), paperless uses native filesystem notifications for efficient, immediate detection of new files.
|
: If paperless won't find documents added to your consume folder, it
|
||||||
|
might not be able to automatically detect filesystem changes. In
|
||||||
|
that case, specify a polling interval in seconds here, which will
|
||||||
|
then cause paperless to periodically check your consumption
|
||||||
|
directory for changes. This will also disable listening for file
|
||||||
|
system changes with `inotify`.
|
||||||
|
|
||||||
When set to a positive number, paperless polls the consumption directory at that interval in seconds. Use polling for network filesystems (NFS, SMB/CIFS) where native notifications may not work reliably.
|
Defaults to 0, which disables polling and uses filesystem
|
||||||
|
notifications.
|
||||||
|
|
||||||
Defaults to 0.
|
#### [`PAPERLESS_CONSUMER_POLLING_RETRY_COUNT=<num>`](#PAPERLESS_CONSUMER_POLLING_RETRY_COUNT) {#PAPERLESS_CONSUMER_POLLING_RETRY_COUNT}
|
||||||
|
|
||||||
#### [`PAPERLESS_CONSUMER_STABILITY_DELAY=<num>`](#PAPERLESS_CONSUMER_STABILITY_DELAY) {#PAPERLESS_CONSUMER_STABILITY_DELAY}
|
: If consumer polling is enabled, sets the maximum number of times
|
||||||
|
paperless will check for a file to remain unmodified. If a file's
|
||||||
|
modification time and size are identical for two consecutive checks, it
|
||||||
|
will be consumed.
|
||||||
|
|
||||||
: Sets the time in seconds that a file must remain unchanged (same size and modification time) before paperless will begin consuming it.
|
Defaults to 5.
|
||||||
|
|
||||||
Increase this value if you experience issues with files being consumed before they are fully written, particularly on slower network storage or
|
#### [`PAPERLESS_CONSUMER_POLLING_DELAY=<num>`](#PAPERLESS_CONSUMER_POLLING_DELAY) {#PAPERLESS_CONSUMER_POLLING_DELAY}
|
||||||
with certain scanner quirks
|
|
||||||
|
|
||||||
Defaults to 5.0 seconds.
|
: If consumer polling is enabled, sets the delay in seconds between
|
||||||
|
each check (above) paperless will do while waiting for a file to
|
||||||
|
remain unmodified.
|
||||||
|
|
||||||
|
Defaults to 5.
|
||||||
|
|
||||||
|
### iNotify {#inotify}
|
||||||
|
|
||||||
|
#### [`PAPERLESS_CONSUMER_INOTIFY_DELAY=<num>`](#PAPERLESS_CONSUMER_INOTIFY_DELAY) {#PAPERLESS_CONSUMER_INOTIFY_DELAY}
|
||||||
|
|
||||||
|
: Sets the time in seconds the consumer will wait for additional
|
||||||
|
events from inotify before the consumer will consider a file ready
|
||||||
|
and begin consumption. Certain scanners or network setups may
|
||||||
|
generate multiple events for a single file, leading to multiple
|
||||||
|
consumers working on the same file. Configure this to prevent that.
|
||||||
|
|
||||||
|
Defaults to 0.5 seconds.
|
||||||
|
|
||||||
## Workflow webhooks
|
## Workflow webhooks
|
||||||
|
|
||||||
|
|||||||
@@ -1,19 +0,0 @@
|
|||||||
# v3 Migration Guide
|
|
||||||
|
|
||||||
## Consumer Settings Changes
|
|
||||||
|
|
||||||
The v3 consumer command uses a [different library](https://watchfiles.helpmanual.io/) to unify
|
|
||||||
the watching for new files in the consume directory. For the user, this removes several configuration options related to delays and retries
|
|
||||||
and replaces with a single unified setting. It also adjusts how the consumer ignore filtering happens, replaced `fnmatch` with `regex` and
|
|
||||||
separating the directory ignore from the file ignore.
|
|
||||||
|
|
||||||
### Summary
|
|
||||||
|
|
||||||
| Old Setting | New Setting | Notes |
|
|
||||||
| ------------------------------ | ----------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------ |
|
|
||||||
| `CONSUMER_POLLING` | [`CONSUMER_POLLING_INTERVAL`](configuration.md#PAPERLESS_CONSUMER_POLLING_INTERVAL) | Renamed for clarity |
|
|
||||||
| `CONSUMER_INOTIFY_DELAY` | [`CONSUMER_STABILITY_DELAY`](configuration.md#PAPERLESS_CONSUMER_STABILITY_DELAY) | Unified for all modes |
|
|
||||||
| `CONSUMER_POLLING_DELAY` | _Removed_ | Use `CONSUMER_STABILITY_DELAY` |
|
|
||||||
| `CONSUMER_POLLING_RETRY_COUNT` | _Removed_ | Automatic with stability tracking |
|
|
||||||
| `CONSUMER_IGNORE_PATTERNS` | [`CONSUMER_IGNORE_PATTERNS`](configuration.md#PAPERLESS_CONSUMER_IGNORE_PATTERNS) | **Now regex, not fnmatch**; user patterns are added to (not replacing) default ones |
|
|
||||||
| _New_ | [`CONSUMER_IGNORE_DIRS`](configuration.md#PAPERLESS_CONSUMER_IGNORE_DIRS) | Additional directories to ignore; user entries are added to (not replacing) defaults |
|
|
||||||
@@ -124,7 +124,8 @@ account. The script essentially automatically performs the steps described in [D
|
|||||||
system notifications with `inotify`. When storing the consumption
|
system notifications with `inotify`. When storing the consumption
|
||||||
directory on such a file system, paperless will not pick up new
|
directory on such a file system, paperless will not pick up new
|
||||||
files with the default configuration. You will need to use
|
files with the default configuration. You will need to use
|
||||||
[`PAPERLESS_CONSUMER_POLLING_INTERVAL`](configuration.md#PAPERLESS_CONSUMER_POLLING_INTERVAL), which will disable inotify.
|
[`PAPERLESS_CONSUMER_POLLING`](configuration.md#PAPERLESS_CONSUMER_POLLING), which will disable inotify. See
|
||||||
|
[here](configuration.md#polling).
|
||||||
|
|
||||||
5. Run `docker compose pull`. This will pull the image from the GitHub container registry
|
5. Run `docker compose pull`. This will pull the image from the GitHub container registry
|
||||||
by default but you can change the image to pull from Docker Hub by changing the `image`
|
by default but you can change the image to pull from Docker Hub by changing the `image`
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ run:
|
|||||||
If you notice that the consumer will only pickup files in the
|
If you notice that the consumer will only pickup files in the
|
||||||
consumption directory at startup, but won't find any other files added
|
consumption directory at startup, but won't find any other files added
|
||||||
later, you will need to enable filesystem polling with the configuration
|
later, you will need to enable filesystem polling with the configuration
|
||||||
option [`PAPERLESS_CONSUMER_POLLING_INTERVAL`](configuration.md#PAPERLESS_CONSUMER_POLLING_INTERVAL).
|
option [`PAPERLESS_CONSUMER_POLLING`](configuration.md#PAPERLESS_CONSUMER_POLLING).
|
||||||
|
|
||||||
This will disable listening to filesystem changes with inotify and
|
This will disable listening to filesystem changes with inotify and
|
||||||
paperless will manually check the consumption directory for changes
|
paperless will manually check the consumption directory for changes
|
||||||
@@ -234,9 +234,47 @@ FileNotFoundError: [Errno 2] No such file or directory: '/tmp/ocrmypdf.io.yhk3zb
|
|||||||
|
|
||||||
This probably indicates paperless tried to consume the same file twice.
|
This probably indicates paperless tried to consume the same file twice.
|
||||||
This can happen for a number of reasons, depending on how documents are
|
This can happen for a number of reasons, depending on how documents are
|
||||||
placed into the consume folder, such as how a scanner may modify a file multiple times as it scans.
|
placed into the consume folder. If paperless is using inotify (the
|
||||||
Try adjusting the
|
default) to check for documents, try adjusting the
|
||||||
[file stability delay](configuration.md#PAPERLESS_CONSUMER_STABILITY_DELAY) to a larger value.
|
[inotify configuration](configuration.md#inotify). If polling is enabled, try adjusting the
|
||||||
|
[polling configuration](configuration.md#polling).
|
||||||
|
|
||||||
|
## Consumer fails waiting for file to remain unmodified.
|
||||||
|
|
||||||
|
You might find messages like these in your log files:
|
||||||
|
|
||||||
|
```
|
||||||
|
[ERROR] [paperless.management.consumer] Timeout while waiting on file /usr/src/paperless/src/../consume/SCN_0001.pdf to remain unmodified.
|
||||||
|
```
|
||||||
|
|
||||||
|
This indicates paperless timed out while waiting for the file to be
|
||||||
|
completely written to the consume folder. Adjusting
|
||||||
|
[polling configuration](configuration.md#polling) values should resolve the issue.
|
||||||
|
|
||||||
|
!!! note
|
||||||
|
|
||||||
|
The user will need to manually move the file out of the consume folder
|
||||||
|
and back in, for the initial failing file to be consumed.
|
||||||
|
|
||||||
|
## Consumer fails reporting "OS reports file as busy still".
|
||||||
|
|
||||||
|
You might find messages like these in your log files:
|
||||||
|
|
||||||
|
```
|
||||||
|
[WARNING] [paperless.management.consumer] Not consuming file /usr/src/paperless/src/../consume/SCN_0001.pdf: OS reports file as busy still
|
||||||
|
```
|
||||||
|
|
||||||
|
This indicates paperless was unable to open the file, as the OS reported
|
||||||
|
the file as still being in use. To prevent a crash, paperless did not
|
||||||
|
try to consume the file. If paperless is using inotify (the default) to
|
||||||
|
check for documents, try adjusting the
|
||||||
|
[inotify configuration](configuration.md#inotify). If polling is enabled, try adjusting the
|
||||||
|
[polling configuration](configuration.md#polling).
|
||||||
|
|
||||||
|
!!! note
|
||||||
|
|
||||||
|
The user will need to manually move the file out of the consume folder
|
||||||
|
and back in, for the initial failing file to be consumed.
|
||||||
|
|
||||||
## Log reports "Creating PaperlessTask failed".
|
## Log reports "Creating PaperlessTask failed".
|
||||||
|
|
||||||
|
|||||||
@@ -565,7 +565,7 @@ This allows for complex logic to be used to generate the title, including [logic
|
|||||||
and [filters](https://jinja.palletsprojects.com/en/3.1.x/templates/#id11).
|
and [filters](https://jinja.palletsprojects.com/en/3.1.x/templates/#id11).
|
||||||
The template is provided as a string.
|
The template is provided as a string.
|
||||||
|
|
||||||
Using Jinja2 Templates is also useful for [Date localization](advanced_usage.md#date-localization) in the title.
|
Using Jinja2 Templates is also useful for [Date localization](advanced_usage.md#Date-Localization) in the title.
|
||||||
|
|
||||||
The available inputs differ depending on the type of workflow trigger.
|
The available inputs differ depending on the type of workflow trigger.
|
||||||
This is because at the time of consumption (when the text is to be set), no automatic tags etc. have been
|
This is because at the time of consumption (when the text is to be set), no automatic tags etc. have been
|
||||||
|
|||||||
@@ -69,9 +69,8 @@ nav:
|
|||||||
- development.md
|
- development.md
|
||||||
- 'FAQs': faq.md
|
- 'FAQs': faq.md
|
||||||
- troubleshooting.md
|
- troubleshooting.md
|
||||||
- 'Migration to v3': migration.md
|
|
||||||
- changelog.md
|
- changelog.md
|
||||||
copyright: Copyright © 2016 - 2026 Daniel Quinn, Jonas Winkler, and the Paperless-ngx team
|
copyright: Copyright © 2016 - 2023 Daniel Quinn, Jonas Winkler, and the Paperless-ngx team
|
||||||
extra:
|
extra:
|
||||||
social:
|
social:
|
||||||
- icon: fontawesome/brands/github
|
- icon: fontawesome/brands/github
|
||||||
|
|||||||
@@ -55,10 +55,10 @@
|
|||||||
#PAPERLESS_TASK_WORKERS=1
|
#PAPERLESS_TASK_WORKERS=1
|
||||||
#PAPERLESS_THREADS_PER_WORKER=1
|
#PAPERLESS_THREADS_PER_WORKER=1
|
||||||
#PAPERLESS_TIME_ZONE=UTC
|
#PAPERLESS_TIME_ZONE=UTC
|
||||||
#PAPERLESS_CONSUMER_POLLING_INTERVAL=10
|
#PAPERLESS_CONSUMER_POLLING=10
|
||||||
#PAPERLESS_CONSUMER_DELETE_DUPLICATES=false
|
#PAPERLESS_CONSUMER_DELETE_DUPLICATES=false
|
||||||
#PAPERLESS_CONSUMER_RECURSIVE=false
|
#PAPERLESS_CONSUMER_RECURSIVE=false
|
||||||
#PAPERLESS_CONSUMER_IGNORE_PATTERNS=[] # Defaults are built in; add filename regexes, e.g. ["^\\.DS_Store$", "^desktop\\.ini$"]
|
#PAPERLESS_CONSUMER_IGNORE_PATTERNS=[".DS_STORE/*", "._*", ".stfolder/*", ".stversions/*", ".localized/*", "desktop.ini"]
|
||||||
#PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS=false
|
#PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS=false
|
||||||
#PAPERLESS_CONSUMER_ENABLE_BARCODES=false
|
#PAPERLESS_CONSUMER_ENABLE_BARCODES=false
|
||||||
#PAPERLESS_CONSUMER_BARCODE_STRING=PATCHT
|
#PAPERLESS_CONSUMER_BARCODE_STRING=PATCHT
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ dependencies = [
|
|||||||
# Only patch versions are guaranteed to not introduce breaking changes.
|
# Only patch versions are guaranteed to not introduce breaking changes.
|
||||||
"django~=5.2.5",
|
"django~=5.2.5",
|
||||||
"django-allauth[mfa,socialaccount]~=65.12.1",
|
"django-allauth[mfa,socialaccount]~=65.12.1",
|
||||||
"django-auditlog~=3.3.0",
|
"django-auditlog~=3.4.1",
|
||||||
"django-cachalot~=2.8.0",
|
"django-cachalot~=2.8.0",
|
||||||
"django-celery-results~=2.6.0",
|
"django-celery-results~=2.6.0",
|
||||||
"django-compression-middleware~=0.5.0",
|
"django-compression-middleware~=0.5.0",
|
||||||
@@ -47,9 +47,10 @@ dependencies = [
|
|||||||
"faiss-cpu>=1.10",
|
"faiss-cpu>=1.10",
|
||||||
"filelock~=3.20.0",
|
"filelock~=3.20.0",
|
||||||
"flower~=2.0.1",
|
"flower~=2.0.1",
|
||||||
"gotenberg-client~=0.12.0",
|
"gotenberg-client~=0.13.1",
|
||||||
"httpx-oauth~=0.16",
|
"httpx-oauth~=0.16",
|
||||||
"imap-tools~=1.11.0",
|
"imap-tools~=1.11.0",
|
||||||
|
"inotifyrecursive~=0.3",
|
||||||
"jinja2~=3.1.5",
|
"jinja2~=3.1.5",
|
||||||
"langdetect~=1.0.9",
|
"langdetect~=1.0.9",
|
||||||
"llama-index-core>=0.12.33.post1",
|
"llama-index-core>=0.12.33.post1",
|
||||||
@@ -59,7 +60,7 @@ dependencies = [
|
|||||||
"llama-index-llms-openai>=0.3.38",
|
"llama-index-llms-openai>=0.3.38",
|
||||||
"llama-index-vector-stores-faiss>=0.3",
|
"llama-index-vector-stores-faiss>=0.3",
|
||||||
"nltk~=3.9.1",
|
"nltk~=3.9.1",
|
||||||
"ocrmypdf~=16.12.0",
|
"ocrmypdf~=16.13.0",
|
||||||
"openai>=1.76",
|
"openai>=1.76",
|
||||||
"pathvalidate~=3.3.1",
|
"pathvalidate~=3.3.1",
|
||||||
"pdf2image~=1.17.0",
|
"pdf2image~=1.17.0",
|
||||||
@@ -77,7 +78,7 @@ dependencies = [
|
|||||||
"setproctitle~=1.3.4",
|
"setproctitle~=1.3.4",
|
||||||
"tika-client~=0.10.0",
|
"tika-client~=0.10.0",
|
||||||
"tqdm~=4.67.1",
|
"tqdm~=4.67.1",
|
||||||
"watchfiles>=1.1.1",
|
"watchdog~=6.0",
|
||||||
"whitenoise~=6.9",
|
"whitenoise~=6.9",
|
||||||
"whoosh-reloaded>=2.7.5",
|
"whoosh-reloaded>=2.7.5",
|
||||||
"zxing-cpp~=2.3.0",
|
"zxing-cpp~=2.3.0",
|
||||||
@@ -90,7 +91,7 @@ optional-dependencies.postgres = [
|
|||||||
"psycopg[c,pool]==3.2.12",
|
"psycopg[c,pool]==3.2.12",
|
||||||
# Direct dependency for proper resolution of the pre-built wheels
|
# Direct dependency for proper resolution of the pre-built wheels
|
||||||
"psycopg-c==3.2.12",
|
"psycopg-c==3.2.12",
|
||||||
"psycopg-pool==3.2.7",
|
"psycopg-pool==3.3",
|
||||||
]
|
]
|
||||||
optional-dependencies.webserver = [
|
optional-dependencies.webserver = [
|
||||||
"granian[uvloop]~=2.5.1",
|
"granian[uvloop]~=2.5.1",
|
||||||
@@ -125,7 +126,7 @@ testing = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
lint = [
|
lint = [
|
||||||
"pre-commit~=4.4.0",
|
"pre-commit~=4.5.1",
|
||||||
"pre-commit-uv~=4.2.0",
|
"pre-commit-uv~=4.2.0",
|
||||||
"ruff~=0.14.0",
|
"ruff~=0.14.0",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,343 +1,135 @@
|
|||||||
"""
|
|
||||||
Document consumer management command.
|
|
||||||
|
|
||||||
Watches a consumption directory for new documents and queues them for processing.
|
|
||||||
Uses watchfiles for efficient file system monitoring with support for both
|
|
||||||
native OS notifications and polling fallback.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from dataclasses import dataclass
|
import os
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from fnmatch import filter
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from pathlib import PurePath
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from time import monotonic
|
from time import monotonic
|
||||||
from typing import TYPE_CHECKING
|
from time import sleep
|
||||||
from typing import Final
|
from typing import Final
|
||||||
|
|
||||||
from django import db
|
from django import db
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
from django.core.management.base import CommandError
|
from django.core.management.base import CommandError
|
||||||
from watchfiles import Change
|
from watchdog.events import FileSystemEventHandler
|
||||||
from watchfiles import DefaultFilter
|
from watchdog.observers.polling import PollingObserver
|
||||||
from watchfiles import watch
|
|
||||||
|
|
||||||
from documents.data_models import ConsumableDocument
|
from documents.data_models import ConsumableDocument
|
||||||
from documents.data_models import DocumentMetadataOverrides
|
from documents.data_models import DocumentMetadataOverrides
|
||||||
from documents.data_models import DocumentSource
|
from documents.data_models import DocumentSource
|
||||||
from documents.models import Tag
|
from documents.models import Tag
|
||||||
from documents.parsers import get_supported_file_extensions
|
from documents.parsers import is_file_ext_supported
|
||||||
from documents.tasks import consume_file
|
from documents.tasks import consume_file
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
try:
|
||||||
from collections.abc import Iterator
|
from inotifyrecursive import INotify
|
||||||
|
from inotifyrecursive import flags
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
INotify = flags = None
|
||||||
|
|
||||||
logger = logging.getLogger("paperless.management.consumer")
|
logger = logging.getLogger("paperless.management.consumer")
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
def _tags_from_path(filepath: Path) -> list[int]:
|
||||||
class TrackedFile:
|
|
||||||
"""Represents a file being tracked for stability."""
|
|
||||||
|
|
||||||
path: Path
|
|
||||||
last_event_time: float
|
|
||||||
last_mtime: float | None = None
|
|
||||||
last_size: int | None = None
|
|
||||||
|
|
||||||
def update_stats(self) -> bool:
|
|
||||||
"""
|
"""
|
||||||
Update file stats. Returns True if file exists and stats were updated.
|
Walk up the directory tree from filepath to CONSUMPTION_DIR
|
||||||
"""
|
|
||||||
try:
|
|
||||||
stat = self.path.stat()
|
|
||||||
self.last_mtime = stat.st_mtime
|
|
||||||
self.last_size = stat.st_size
|
|
||||||
return True
|
|
||||||
except OSError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def is_unchanged(self) -> bool:
|
|
||||||
"""
|
|
||||||
Check if file stats match the previously recorded values.
|
|
||||||
Returns False if file doesn't exist or stats changed.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
stat = self.path.stat()
|
|
||||||
return stat.st_mtime == self.last_mtime and stat.st_size == self.last_size
|
|
||||||
except OSError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class FileStabilityTracker:
|
|
||||||
"""
|
|
||||||
Tracks file events and determines when files are stable for consumption.
|
|
||||||
|
|
||||||
A file is considered stable when:
|
|
||||||
1. No new events have been received for it within the stability delay
|
|
||||||
2. Its size and modification time haven't changed
|
|
||||||
3. It still exists as a regular file
|
|
||||||
|
|
||||||
This handles various edge cases:
|
|
||||||
- Network copies that write in chunks
|
|
||||||
- Scanners that open/close files multiple times
|
|
||||||
- Temporary files that get renamed
|
|
||||||
- Files that are deleted before becoming stable
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, stability_delay: float = 1.0) -> None:
|
|
||||||
"""
|
|
||||||
Initialize the tracker.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
stability_delay: Time in seconds a file must remain unchanged
|
|
||||||
before being considered stable.
|
|
||||||
"""
|
|
||||||
self.stability_delay = stability_delay
|
|
||||||
self._tracked: dict[Path, TrackedFile] = {}
|
|
||||||
|
|
||||||
def track(self, path: Path, change: Change) -> None:
|
|
||||||
"""
|
|
||||||
Register a file event.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
path: The file path that changed.
|
|
||||||
change: The type of change (added, modified, deleted).
|
|
||||||
"""
|
|
||||||
path = path.resolve()
|
|
||||||
|
|
||||||
match change:
|
|
||||||
case Change.deleted:
|
|
||||||
self._tracked.pop(path, None)
|
|
||||||
logger.debug(f"Stopped tracking deleted file: {path}")
|
|
||||||
case Change.added | Change.modified:
|
|
||||||
current_time = monotonic()
|
|
||||||
if path in self._tracked:
|
|
||||||
tracked = self._tracked[path]
|
|
||||||
tracked.last_event_time = current_time
|
|
||||||
tracked.update_stats()
|
|
||||||
logger.debug(f"Updated tracking for: {path}")
|
|
||||||
else:
|
|
||||||
tracked = TrackedFile(path=path, last_event_time=current_time)
|
|
||||||
if tracked.update_stats():
|
|
||||||
self._tracked[path] = tracked
|
|
||||||
logger.debug(f"Started tracking: {path}")
|
|
||||||
else:
|
|
||||||
logger.debug(f"Could not stat file, not tracking: {path}")
|
|
||||||
|
|
||||||
def get_stable_files(self) -> Iterator[Path]:
|
|
||||||
"""
|
|
||||||
Yield files that have been stable for the configured delay.
|
|
||||||
|
|
||||||
Files are removed from tracking once yielded or determined to be invalid.
|
|
||||||
"""
|
|
||||||
current_time = monotonic()
|
|
||||||
to_remove: list[Path] = []
|
|
||||||
to_yield: list[Path] = []
|
|
||||||
|
|
||||||
for path, tracked in self._tracked.items():
|
|
||||||
time_since_event = current_time - tracked.last_event_time
|
|
||||||
|
|
||||||
if time_since_event < self.stability_delay:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# File has waited long enough, verify it's unchanged
|
|
||||||
if not tracked.is_unchanged():
|
|
||||||
# Stats changed or file gone - update and wait again
|
|
||||||
if tracked.update_stats():
|
|
||||||
tracked.last_event_time = current_time
|
|
||||||
logger.debug(f"File changed during stability check: {path}")
|
|
||||||
else:
|
|
||||||
# File no longer exists, remove from tracking
|
|
||||||
to_remove.append(path)
|
|
||||||
logger.debug(f"File disappeared during stability check: {path}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
# File is stable, we can return it
|
|
||||||
to_yield.append(path)
|
|
||||||
logger.info(f"File is stable: {path}")
|
|
||||||
|
|
||||||
# Remove files that are no longer valid
|
|
||||||
for path in to_remove:
|
|
||||||
self._tracked.pop(path, None)
|
|
||||||
|
|
||||||
# Remove and yield stable files
|
|
||||||
for path in to_yield:
|
|
||||||
self._tracked.pop(path, None)
|
|
||||||
yield path
|
|
||||||
|
|
||||||
def has_pending_files(self) -> bool:
|
|
||||||
"""Check if there are files waiting for stability check."""
|
|
||||||
return len(self._tracked) > 0
|
|
||||||
|
|
||||||
@property
|
|
||||||
def pending_count(self) -> int:
|
|
||||||
"""Number of files being tracked."""
|
|
||||||
return len(self._tracked)
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerFilter(DefaultFilter):
|
|
||||||
"""
|
|
||||||
Filter for watchfiles that accepts only supported document types
|
|
||||||
and ignores system files/directories.
|
|
||||||
|
|
||||||
Extends DefaultFilter leveraging its built-in filtering:
|
|
||||||
- `ignore_dirs`: Directory names to ignore (and all their contents)
|
|
||||||
- `ignore_entity_patterns`: Regex patterns matched against filename/dirname only
|
|
||||||
|
|
||||||
We add custom logic for file extension filtering (only accept supported
|
|
||||||
document types), which the library doesn't provide.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Regex patterns for files to always ignore (matched against filename only)
|
|
||||||
# These are passed to DefaultFilter.ignore_entity_patterns
|
|
||||||
DEFAULT_IGNORE_PATTERNS: Final[tuple[str, ...]] = (
|
|
||||||
r"^\.DS_Store$",
|
|
||||||
r"^\.DS_STORE$",
|
|
||||||
r"^\._.*",
|
|
||||||
r"^desktop\.ini$",
|
|
||||||
r"^Thumbs\.db$",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Directories to always ignore (passed to DefaultFilter.ignore_dirs)
|
|
||||||
# These are matched by directory name, not full path
|
|
||||||
DEFAULT_IGNORE_DIRS: Final[tuple[str, ...]] = (
|
|
||||||
".stfolder", # Syncthing
|
|
||||||
".stversions", # Syncthing
|
|
||||||
".localized", # macOS
|
|
||||||
"@eaDir", # Synology NAS
|
|
||||||
".Spotlight-V100", # macOS
|
|
||||||
".Trashes", # macOS
|
|
||||||
"__MACOSX", # macOS archive artifacts
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
supported_extensions: frozenset[str] | None = None,
|
|
||||||
ignore_patterns: list[str] | None = None,
|
|
||||||
ignore_dirs: list[str] | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""
|
|
||||||
Initialize the consumer filter.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
supported_extensions: Set of file extensions to accept (e.g., {".pdf", ".png"}).
|
|
||||||
If None, uses get_supported_file_extensions().
|
|
||||||
ignore_patterns: Additional regex patterns to ignore (matched against filename).
|
|
||||||
ignore_dirs: Additional directory names to ignore (merged with defaults).
|
|
||||||
"""
|
|
||||||
# Get supported extensions
|
|
||||||
if supported_extensions is None:
|
|
||||||
supported_extensions = frozenset(get_supported_file_extensions())
|
|
||||||
self._supported_extensions = supported_extensions
|
|
||||||
|
|
||||||
# Combine default and user patterns
|
|
||||||
all_patterns: list[str] = list(self.DEFAULT_IGNORE_PATTERNS)
|
|
||||||
if ignore_patterns:
|
|
||||||
all_patterns.extend(ignore_patterns)
|
|
||||||
|
|
||||||
# Combine default and user ignore_dirs
|
|
||||||
all_ignore_dirs: list[str] = list(self.DEFAULT_IGNORE_DIRS)
|
|
||||||
if ignore_dirs:
|
|
||||||
all_ignore_dirs.extend(ignore_dirs)
|
|
||||||
|
|
||||||
# Let DefaultFilter handle all the pattern and directory filtering
|
|
||||||
super().__init__(
|
|
||||||
ignore_dirs=tuple(all_ignore_dirs),
|
|
||||||
ignore_entity_patterns=tuple(all_patterns),
|
|
||||||
ignore_paths=(),
|
|
||||||
)
|
|
||||||
|
|
||||||
def __call__(self, change: Change, path: str) -> bool:
|
|
||||||
"""
|
|
||||||
Filter function for watchfiles.
|
|
||||||
|
|
||||||
Returns True if the path should be watched, False to ignore.
|
|
||||||
|
|
||||||
The parent DefaultFilter handles:
|
|
||||||
- Hidden files/directories (starting with .)
|
|
||||||
- Directories in ignore_dirs
|
|
||||||
- Files/directories matching ignore_entity_patterns
|
|
||||||
|
|
||||||
We additionally filter files by extension.
|
|
||||||
"""
|
|
||||||
# Let parent filter handle directory ignoring and pattern matching
|
|
||||||
if not super().__call__(change, path):
|
|
||||||
return False
|
|
||||||
|
|
||||||
path_obj = Path(path)
|
|
||||||
|
|
||||||
# For directories, parent filter already handled everything
|
|
||||||
if path_obj.is_dir():
|
|
||||||
return True
|
|
||||||
|
|
||||||
# For files, check extension
|
|
||||||
return self._has_supported_extension(path_obj)
|
|
||||||
|
|
||||||
def _has_supported_extension(self, path: Path) -> bool:
|
|
||||||
"""Check if the file has a supported extension."""
|
|
||||||
suffix = path.suffix.lower()
|
|
||||||
return suffix in self._supported_extensions
|
|
||||||
|
|
||||||
|
|
||||||
def _tags_from_path(filepath: Path, consumption_dir: Path) -> list[int]:
|
|
||||||
"""
|
|
||||||
Walk up the directory tree from filepath to consumption_dir
|
|
||||||
and get or create Tag IDs for every directory.
|
and get or create Tag IDs for every directory.
|
||||||
|
|
||||||
Returns list of Tag primary keys.
|
Returns set of Tag models
|
||||||
"""
|
"""
|
||||||
db.close_old_connections()
|
db.close_old_connections()
|
||||||
tag_ids: set[int] = set()
|
tag_ids = set()
|
||||||
path_parts = filepath.relative_to(consumption_dir).parent.parts
|
path_parts = filepath.relative_to(settings.CONSUMPTION_DIR).parent.parts
|
||||||
|
|
||||||
for part in path_parts:
|
for part in path_parts:
|
||||||
tag, _ = Tag.objects.get_or_create(
|
tag_ids.add(
|
||||||
name__iexact=part,
|
Tag.objects.get_or_create(name__iexact=part, defaults={"name": part})[0].pk,
|
||||||
defaults={"name": part},
|
|
||||||
)
|
)
|
||||||
tag_ids.add(tag.pk)
|
|
||||||
|
|
||||||
return list(tag_ids)
|
return list(tag_ids)
|
||||||
|
|
||||||
|
|
||||||
def _consume_file(
|
def _is_ignored(filepath: Path) -> bool:
|
||||||
filepath: Path,
|
|
||||||
consumption_dir: Path,
|
|
||||||
*,
|
|
||||||
subdirs_as_tags: bool,
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Queue a file for consumption.
|
Checks if the given file should be ignored, based on configured
|
||||||
|
patterns.
|
||||||
|
|
||||||
Args:
|
Returns True if the file is ignored, False otherwise
|
||||||
filepath: Path to the file to consume.
|
|
||||||
consumption_dir: Base consumption directory.
|
|
||||||
subdirs_as_tags: Whether to create tags from subdirectory names.
|
|
||||||
"""
|
"""
|
||||||
# Verify file still exists and is accessible
|
# Trim out the consume directory, leaving only filename and it's
|
||||||
|
# path relative to the consume directory
|
||||||
|
filepath_relative = PurePath(filepath).relative_to(settings.CONSUMPTION_DIR)
|
||||||
|
|
||||||
|
# March through the components of the path, including directories and the filename
|
||||||
|
# looking for anything matching
|
||||||
|
# foo/bar/baz/file.pdf -> (foo, bar, baz, file.pdf)
|
||||||
|
parts = []
|
||||||
|
for part in filepath_relative.parts:
|
||||||
|
# If the part is not the name (ie, it's a dir)
|
||||||
|
# Need to append the trailing slash or fnmatch doesn't match
|
||||||
|
# fnmatch("dir", "dir/*") == False
|
||||||
|
# fnmatch("dir/", "dir/*") == True
|
||||||
|
if part != filepath_relative.name:
|
||||||
|
part = part + "/"
|
||||||
|
parts.append(part)
|
||||||
|
|
||||||
|
for pattern in settings.CONSUMER_IGNORE_PATTERNS:
|
||||||
|
if len(filter(parts, pattern)):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _consume(filepath: Path) -> None:
|
||||||
|
# Check permissions early
|
||||||
try:
|
try:
|
||||||
|
filepath.stat()
|
||||||
|
except (PermissionError, OSError):
|
||||||
|
logger.warning(f"Not consuming file {filepath}: Permission denied.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if filepath.is_dir() or _is_ignored(filepath):
|
||||||
|
return
|
||||||
|
|
||||||
if not filepath.is_file():
|
if not filepath.is_file():
|
||||||
logger.debug(f"Not consuming {filepath}: not a file or doesn't exist")
|
logger.debug(f"Not consuming file {filepath}: File has moved.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not is_file_ext_supported(filepath.suffix):
|
||||||
|
logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Total wait time: up to 500ms
|
||||||
|
os_error_retry_count: Final[int] = 50
|
||||||
|
os_error_retry_wait: Final[float] = 0.01
|
||||||
|
|
||||||
|
read_try_count = 0
|
||||||
|
file_open_ok = False
|
||||||
|
os_error_str = None
|
||||||
|
|
||||||
|
while (read_try_count < os_error_retry_count) and not file_open_ok:
|
||||||
|
try:
|
||||||
|
with filepath.open("rb"):
|
||||||
|
file_open_ok = True
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
logger.warning(f"Not consuming {filepath}: {e}")
|
read_try_count += 1
|
||||||
|
os_error_str = str(e)
|
||||||
|
sleep(os_error_retry_wait)
|
||||||
|
|
||||||
|
if read_try_count >= os_error_retry_count:
|
||||||
|
logger.warning(f"Not consuming file {filepath}: OS reports {os_error_str}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Get tags from path if configured
|
tag_ids = None
|
||||||
tag_ids: list[int] | None = None
|
|
||||||
if subdirs_as_tags:
|
|
||||||
try:
|
try:
|
||||||
tag_ids = _tags_from_path(filepath, consumption_dir)
|
if settings.CONSUMER_SUBDIRS_AS_TAGS:
|
||||||
|
tag_ids = _tags_from_path(filepath)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"Error creating tags from path for {filepath}")
|
logger.exception("Error creating tags from path")
|
||||||
|
|
||||||
# Queue for consumption
|
|
||||||
try:
|
try:
|
||||||
logger.info(f"Adding {filepath} to the task queue")
|
logger.info(f"Adding {filepath} to the task queue.")
|
||||||
consume_file.delay(
|
consume_file.delay(
|
||||||
ConsumableDocument(
|
ConsumableDocument(
|
||||||
source=DocumentSource.ConsumeFolder,
|
source=DocumentSource.ConsumeFolder,
|
||||||
@@ -346,209 +138,228 @@ def _consume_file(
|
|||||||
DocumentMetadataOverrides(tag_ids=tag_ids),
|
DocumentMetadataOverrides(tag_ids=tag_ids),
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"Error while queuing document {filepath}")
|
# Catch all so that the consumer won't crash.
|
||||||
|
# This is also what the test case is listening for to check for
|
||||||
|
# errors.
|
||||||
|
logger.exception("Error while consuming document")
|
||||||
|
|
||||||
|
|
||||||
|
def _consume_wait_unmodified(file: Path) -> None:
|
||||||
|
"""
|
||||||
|
Waits for the given file to appear unmodified based on file size
|
||||||
|
and modification time. Will wait a configured number of seconds
|
||||||
|
and retry a configured number of times before either consuming or
|
||||||
|
giving up
|
||||||
|
"""
|
||||||
|
if _is_ignored(file):
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug(f"Waiting for file {file} to remain unmodified")
|
||||||
|
mtime = -1
|
||||||
|
size = -1
|
||||||
|
current_try = 0
|
||||||
|
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
|
||||||
|
try:
|
||||||
|
stat_data = file.stat()
|
||||||
|
new_mtime = stat_data.st_mtime
|
||||||
|
new_size = stat_data.st_size
|
||||||
|
except FileNotFoundError:
|
||||||
|
logger.debug(
|
||||||
|
f"File {file} moved while waiting for it to remain unmodified.",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if new_mtime == mtime and new_size == size:
|
||||||
|
_consume(file)
|
||||||
|
return
|
||||||
|
mtime = new_mtime
|
||||||
|
size = new_size
|
||||||
|
sleep(settings.CONSUMER_POLLING_DELAY)
|
||||||
|
current_try += 1
|
||||||
|
|
||||||
|
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
|
||||||
|
|
||||||
|
|
||||||
|
class Handler(FileSystemEventHandler):
|
||||||
|
def __init__(self, pool: ThreadPoolExecutor) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self._pool = pool
|
||||||
|
|
||||||
|
def on_created(self, event):
|
||||||
|
self._pool.submit(_consume_wait_unmodified, Path(event.src_path))
|
||||||
|
|
||||||
|
def on_moved(self, event):
|
||||||
|
self._pool.submit(_consume_wait_unmodified, Path(event.dest_path))
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
"""
|
"""
|
||||||
Watch a consumption directory and queue new documents for processing.
|
On every iteration of an infinite loop, consume what we can from the
|
||||||
|
consumption directory.
|
||||||
Uses watchfiles for efficient file system monitoring. Supports both
|
|
||||||
native OS notifications (inotify on Linux, FSEvents on macOS) and
|
|
||||||
polling for network filesystems.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
help = "Watch the consumption directory for new documents"
|
# This is here primarily for the tests and is irrelevant in production.
|
||||||
|
stop_flag = Event()
|
||||||
# For testing - allows tests to stop the consumer
|
# Also only for testing, configures in one place the timeout used before checking
|
||||||
stop_flag: Event = Event()
|
# the stop flag
|
||||||
|
|
||||||
# Testing timeout in seconds
|
|
||||||
testing_timeout_s: Final[float] = 0.5
|
testing_timeout_s: Final[float] = 0.5
|
||||||
|
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
|
||||||
|
|
||||||
def add_arguments(self, parser) -> None:
|
def add_arguments(self, parser):
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"directory",
|
"directory",
|
||||||
default=None,
|
default=settings.CONSUMPTION_DIR,
|
||||||
nargs="?",
|
nargs="?",
|
||||||
help="The consumption directory (defaults to CONSUMPTION_DIR setting)",
|
help="The consumption directory.",
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--oneshot",
|
|
||||||
action="store_true",
|
|
||||||
help="Process existing files and exit without watching",
|
|
||||||
)
|
)
|
||||||
|
parser.add_argument("--oneshot", action="store_true", help="Run only once.")
|
||||||
|
|
||||||
|
# Only use during unit testing, will configure a timeout
|
||||||
|
# Leaving it unset or false and the consumer will exit when it
|
||||||
|
# receives SIGINT
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--testing",
|
"--testing",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help="Enable testing mode with shorter timeouts",
|
help="Flag used only for unit testing",
|
||||||
default=False,
|
default=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
def handle(self, *args, **options) -> None:
|
def handle(self, *args, **options):
|
||||||
# Resolve consumption directory
|
directory = options["directory"]
|
||||||
directory = options.get("directory")
|
recursive = settings.CONSUMER_RECURSIVE
|
||||||
|
|
||||||
if not directory:
|
if not directory:
|
||||||
directory = getattr(settings, "CONSUMPTION_DIR", None)
|
raise CommandError("CONSUMPTION_DIR does not appear to be set.")
|
||||||
if not directory:
|
|
||||||
raise CommandError("CONSUMPTION_DIR is not configured")
|
|
||||||
|
|
||||||
directory = Path(directory).resolve()
|
directory = Path(directory).resolve()
|
||||||
|
|
||||||
if not directory.exists():
|
|
||||||
raise CommandError(f"Consumption directory does not exist: {directory}")
|
|
||||||
|
|
||||||
if not directory.is_dir():
|
if not directory.is_dir():
|
||||||
raise CommandError(f"Consumption path is not a directory: {directory}")
|
raise CommandError(f"Consumption directory {directory} does not exist")
|
||||||
|
|
||||||
# Ensure scratch directory exists
|
# Consumer will need this
|
||||||
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
|
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
# Get settings
|
if recursive:
|
||||||
recursive: bool = settings.CONSUMER_RECURSIVE
|
for dirpath, _, filenames in os.walk(directory):
|
||||||
subdirs_as_tags: bool = settings.CONSUMER_SUBDIRS_AS_TAGS
|
for filename in filenames:
|
||||||
polling_interval: float = settings.CONSUMER_POLLING_INTERVAL
|
filepath = Path(dirpath) / filename
|
||||||
stability_delay: float = settings.CONSUMER_STABILITY_DELAY
|
_consume(filepath)
|
||||||
ignore_patterns: list[str] = settings.CONSUMER_IGNORE_PATTERNS
|
else:
|
||||||
ignore_dirs: list[str] = settings.CONSUMER_IGNORE_DIRS
|
for filepath in directory.iterdir():
|
||||||
is_testing: bool = options.get("testing", False)
|
_consume(filepath)
|
||||||
is_oneshot: bool = options.get("oneshot", False)
|
|
||||||
|
|
||||||
# Create filter
|
if options["oneshot"]:
|
||||||
consumer_filter = ConsumerFilter(
|
|
||||||
ignore_patterns=ignore_patterns,
|
|
||||||
ignore_dirs=ignore_dirs,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Process existing files
|
|
||||||
self._process_existing_files(
|
|
||||||
directory=directory,
|
|
||||||
recursive=recursive,
|
|
||||||
subdirs_as_tags=subdirs_as_tags,
|
|
||||||
consumer_filter=consumer_filter,
|
|
||||||
)
|
|
||||||
|
|
||||||
if is_oneshot:
|
|
||||||
logger.info("Oneshot mode: processed existing files, exiting")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Start watching
|
if settings.CONSUMER_POLLING == 0 and INotify:
|
||||||
self._watch_directory(
|
self.handle_inotify(directory, recursive, is_testing=options["testing"])
|
||||||
directory=directory,
|
|
||||||
recursive=recursive,
|
|
||||||
subdirs_as_tags=subdirs_as_tags,
|
|
||||||
consumer_filter=consumer_filter,
|
|
||||||
polling_interval=polling_interval,
|
|
||||||
stability_delay=stability_delay,
|
|
||||||
is_testing=is_testing,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug("Consumer exiting")
|
|
||||||
|
|
||||||
def _process_existing_files(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
directory: Path,
|
|
||||||
recursive: bool,
|
|
||||||
subdirs_as_tags: bool,
|
|
||||||
consumer_filter: ConsumerFilter,
|
|
||||||
) -> None:
|
|
||||||
"""Process any existing files in the consumption directory."""
|
|
||||||
logger.info(f"Processing existing files in {directory}")
|
|
||||||
|
|
||||||
glob_pattern = "**/*" if recursive else "*"
|
|
||||||
|
|
||||||
for filepath in directory.glob(glob_pattern):
|
|
||||||
# Use filter to check if file should be processed
|
|
||||||
if not filepath.is_file():
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not consumer_filter(Change.added, str(filepath)):
|
|
||||||
continue
|
|
||||||
|
|
||||||
_consume_file(
|
|
||||||
filepath=filepath,
|
|
||||||
consumption_dir=directory,
|
|
||||||
subdirs_as_tags=subdirs_as_tags,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _watch_directory(
|
|
||||||
self,
|
|
||||||
*,
|
|
||||||
directory: Path,
|
|
||||||
recursive: bool,
|
|
||||||
subdirs_as_tags: bool,
|
|
||||||
consumer_filter: ConsumerFilter,
|
|
||||||
polling_interval: float,
|
|
||||||
stability_delay: float,
|
|
||||||
is_testing: bool,
|
|
||||||
) -> None:
|
|
||||||
"""Watch directory for changes and process stable files."""
|
|
||||||
use_polling = polling_interval > 0
|
|
||||||
poll_delay_ms = int(polling_interval * 1000) if use_polling else 0
|
|
||||||
|
|
||||||
if use_polling:
|
|
||||||
logger.info(
|
|
||||||
f"Watching {directory} using polling (interval: {polling_interval}s)",
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
logger.info(f"Watching {directory} using native file system events")
|
if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
|
||||||
|
logger.warning("Using polling as INotify import failed")
|
||||||
|
self.handle_polling(directory, recursive, is_testing=options["testing"])
|
||||||
|
|
||||||
# Create stability tracker
|
logger.debug("Consumer exiting.")
|
||||||
tracker = FileStabilityTracker(stability_delay=stability_delay)
|
|
||||||
|
|
||||||
# Calculate timeouts
|
def handle_polling(self, directory, recursive, *, is_testing: bool):
|
||||||
stability_timeout_ms = int(stability_delay * 1000)
|
logger.info(f"Polling directory for changes: {directory}")
|
||||||
testing_timeout_ms = int(self.testing_timeout_s * 1000)
|
|
||||||
|
|
||||||
# Start with no timeout (wait indefinitely for first event)
|
timeout = None
|
||||||
# unless in testing mode
|
if is_testing:
|
||||||
timeout_ms = testing_timeout_ms if is_testing else 0
|
timeout = self.testing_timeout_s
|
||||||
|
logger.debug(f"Configuring timeout to {timeout}s")
|
||||||
|
|
||||||
self.stop_flag.clear()
|
polling_interval = settings.CONSUMER_POLLING
|
||||||
|
if polling_interval == 0: # pragma: no cover
|
||||||
|
# Only happens if INotify failed to import
|
||||||
|
logger.warning("Using polling of 10s, consider setting this")
|
||||||
|
polling_interval = 10
|
||||||
|
|
||||||
while not self.stop_flag.is_set():
|
with ThreadPoolExecutor(max_workers=4) as pool:
|
||||||
|
observer = PollingObserver(timeout=polling_interval)
|
||||||
|
observer.schedule(Handler(pool), directory, recursive=recursive)
|
||||||
|
observer.start()
|
||||||
try:
|
try:
|
||||||
for changes in watch(
|
while observer.is_alive():
|
||||||
directory,
|
observer.join(timeout)
|
||||||
watch_filter=consumer_filter,
|
if self.stop_flag.is_set():
|
||||||
rust_timeout=timeout_ms,
|
observer.stop()
|
||||||
yield_on_timeout=True,
|
except KeyboardInterrupt:
|
||||||
force_polling=use_polling,
|
observer.stop()
|
||||||
poll_delay_ms=poll_delay_ms,
|
observer.join()
|
||||||
recursive=recursive,
|
|
||||||
stop_event=self.stop_flag,
|
def handle_inotify(self, directory, recursive, *, is_testing: bool):
|
||||||
):
|
logger.info(f"Using inotify to watch directory for changes: {directory}")
|
||||||
# Process each change
|
|
||||||
for change_type, path in changes:
|
timeout_ms = None
|
||||||
path = Path(path).resolve()
|
if is_testing:
|
||||||
if not path.is_file():
|
timeout_ms = self.testing_timeout_ms
|
||||||
|
logger.debug(f"Configuring timeout to {timeout_ms}ms")
|
||||||
|
|
||||||
|
inotify = INotify()
|
||||||
|
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
|
||||||
|
if recursive:
|
||||||
|
inotify.add_watch_recursive(directory, inotify_flags)
|
||||||
|
else:
|
||||||
|
inotify.add_watch(directory, inotify_flags)
|
||||||
|
|
||||||
|
inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
|
||||||
|
inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
|
||||||
|
|
||||||
|
finished = False
|
||||||
|
|
||||||
|
notified_files = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not finished:
|
||||||
|
try:
|
||||||
|
for event in inotify.read(timeout=timeout_ms):
|
||||||
|
path = inotify.get_path(event.wd) if recursive else directory
|
||||||
|
filepath = Path(path) / event.name
|
||||||
|
if flags.MODIFY in flags.from_mask(event.mask):
|
||||||
|
notified_files.pop(filepath, None)
|
||||||
|
else:
|
||||||
|
notified_files[filepath] = monotonic()
|
||||||
|
|
||||||
|
# Check the files against the timeout
|
||||||
|
still_waiting = {}
|
||||||
|
# last_event_time is time of the last inotify event for this file
|
||||||
|
for filepath, last_event_time in notified_files.items():
|
||||||
|
# Current time - last time over the configured timeout
|
||||||
|
waited_long_enough = (
|
||||||
|
monotonic() - last_event_time
|
||||||
|
) > inotify_debounce_secs
|
||||||
|
|
||||||
|
# Also make sure the file exists still, some scanners might write a
|
||||||
|
# temporary file first
|
||||||
|
try:
|
||||||
|
file_still_exists = filepath.exists() and filepath.is_file()
|
||||||
|
except (PermissionError, OSError): # pragma: no cover
|
||||||
|
# If we can't check, let it fail in the _consume function
|
||||||
|
file_still_exists = True
|
||||||
continue
|
continue
|
||||||
logger.debug(f"Event: {change_type.name} for {path}")
|
|
||||||
tracker.track(path, change_type)
|
|
||||||
|
|
||||||
# Check for stable files
|
if waited_long_enough and file_still_exists:
|
||||||
for stable_path in tracker.get_stable_files():
|
_consume(filepath)
|
||||||
_consume_file(
|
elif file_still_exists:
|
||||||
filepath=stable_path,
|
still_waiting[filepath] = last_event_time
|
||||||
consumption_dir=directory,
|
|
||||||
subdirs_as_tags=subdirs_as_tags,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Exit watch loop to reconfigure timeout
|
# These files are still waiting to hit the timeout
|
||||||
break
|
notified_files = still_waiting
|
||||||
|
|
||||||
# Determine next timeout
|
# If files are waiting, need to exit read() to check them
|
||||||
if tracker.has_pending_files():
|
# Otherwise, go back to infinite sleep time, but only if not testing
|
||||||
# Check pending files at stability interval
|
if len(notified_files) > 0:
|
||||||
timeout_ms = stability_timeout_ms
|
timeout_ms = inotify_debounce_ms
|
||||||
elif is_testing:
|
elif is_testing:
|
||||||
# In testing, use short timeout to check stop flag
|
timeout_ms = self.testing_timeout_ms
|
||||||
timeout_ms = testing_timeout_ms
|
else:
|
||||||
else: # pragma: nocover
|
timeout_ms = None
|
||||||
# No pending files, wait indefinitely
|
|
||||||
timeout_ms = 0
|
|
||||||
|
|
||||||
except KeyboardInterrupt: # pragma: nocover
|
if self.stop_flag.is_set():
|
||||||
logger.info("Received interrupt, stopping consumer")
|
logger.debug("Finishing because event is set")
|
||||||
self.stop_flag.set()
|
finished = True
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Received SIGINT, stopping inotify")
|
||||||
|
finished = True
|
||||||
|
finally:
|
||||||
|
inotify.close()
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -1044,30 +1044,29 @@ IGNORABLE_FILES: Final[list[str]] = [
|
|||||||
"Thumbs.db",
|
"Thumbs.db",
|
||||||
]
|
]
|
||||||
|
|
||||||
CONSUMER_POLLING_INTERVAL = float(os.getenv("PAPERLESS_CONSUMER_POLLING_INTERVAL", 0))
|
CONSUMER_POLLING = int(os.getenv("PAPERLESS_CONSUMER_POLLING", 0))
|
||||||
|
|
||||||
CONSUMER_STABILITY_DELAY = float(os.getenv("PAPERLESS_CONSUMER_STABILITY_DELAY", 5))
|
CONSUMER_POLLING_DELAY = int(os.getenv("PAPERLESS_CONSUMER_POLLING_DELAY", 5))
|
||||||
|
|
||||||
|
CONSUMER_POLLING_RETRY_COUNT = int(
|
||||||
|
os.getenv("PAPERLESS_CONSUMER_POLLING_RETRY_COUNT", 5),
|
||||||
|
)
|
||||||
|
|
||||||
|
CONSUMER_INOTIFY_DELAY: Final[float] = __get_float(
|
||||||
|
"PAPERLESS_CONSUMER_INOTIFY_DELAY",
|
||||||
|
0.5,
|
||||||
|
)
|
||||||
|
|
||||||
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
|
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
|
||||||
|
|
||||||
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
|
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
|
||||||
|
|
||||||
# Ignore regex patterns, matched against filename only
|
# Ignore glob patterns, relative to PAPERLESS_CONSUMPTION_DIR
|
||||||
CONSUMER_IGNORE_PATTERNS = list(
|
CONSUMER_IGNORE_PATTERNS = list(
|
||||||
json.loads(
|
json.loads(
|
||||||
os.getenv(
|
os.getenv(
|
||||||
"PAPERLESS_CONSUMER_IGNORE_PATTERNS",
|
"PAPERLESS_CONSUMER_IGNORE_PATTERNS",
|
||||||
json.dumps([]),
|
json.dumps(IGNORABLE_FILES),
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Directories to always ignore. These are matched by directory name, not full path
|
|
||||||
CONSUMER_IGNORE_DIRS = list(
|
|
||||||
json.loads(
|
|
||||||
os.getenv(
|
|
||||||
"PAPERLESS_CONSUMER_IGNORE_DIRS",
|
|
||||||
json.dumps([]),
|
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -11,14 +11,12 @@ from paperless_ai.chat import stream_chat_with_documents
|
|||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def patch_embed_model():
|
def patch_embed_model():
|
||||||
from llama_index.core import settings as llama_settings
|
from llama_index.core import settings as llama_settings
|
||||||
|
from llama_index.core.embeddings.utils import MockEmbedding
|
||||||
|
|
||||||
mock_embed_model = MagicMock()
|
mock_embed_model = MockEmbedding(embed_dim=8)
|
||||||
mock_embed_model._get_text_embedding_batch.return_value = [
|
llama_settings.Settings.embed_model = mock_embed_model
|
||||||
[0.1] * 1536,
|
|
||||||
] # 1 vector per input
|
|
||||||
llama_settings.Settings._embed_model = mock_embed_model
|
|
||||||
yield
|
yield
|
||||||
llama_settings.Settings._embed_model = None
|
llama_settings.Settings.embed_model = None
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user