Compare commits

..

4 Commits

Author SHA1 Message Date
shamoon
1dd3a62bc2 Fixhancement: show sequential + id version labels, fix padding (#12196) 2026-02-27 16:22:29 -08:00
Jan Kleine
0bc032a67d Development: improve test portability (#12187)
* Fix: improve test portability

* Make settings always consistent

* Make a few more tests deterministic wrt settings

* Dont pollute settings for this one

* Fix timezone issue with mail parser

* Update test_parser.py

* Uh, I guess OCR gives variants for this

---------

Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
2026-02-27 23:24:11 +00:00
GitHub Actions
8531078a54 Auto translate strings 2026-02-27 22:39:20 +00:00
Trenton H
5988d5896b Breaking: Refactor advanced database settings to allow more user configuration (#12165) 2026-02-27 14:37:26 -08:00
33 changed files with 1786 additions and 1142 deletions

View File

@@ -39,3 +39,6 @@ max_line_length = off
[Dockerfile*]
indent_style = space
[*.toml]
indent_style = space

View File

@@ -62,6 +62,10 @@ copies you created in the steps above.
## Updating Paperless {#updating}
!!! warning
Please review the [migration instructions](migration-v3.md) before upgrading Paperless-ngx to v3.0, it includes some breaking changes that require manual intervention before upgrading.
### Docker Route {#docker-updating}
If a new release of paperless-ngx is available, upgrading depends on how

View File

@@ -51,137 +51,172 @@ matcher.
### Database
By default, Paperless uses **SQLite** with a database stored at `data/db.sqlite3`.
To switch to **PostgreSQL** or **MariaDB**, set [`PAPERLESS_DBHOST`](#PAPERLESS_DBHOST) and optionally configure other
database-related environment variables.
For multi-user or higher-throughput deployments, **PostgreSQL** (recommended) or
**MariaDB** can be used instead by setting [`PAPERLESS_DBENGINE`](#PAPERLESS_DBENGINE)
and the relevant connection variables.
#### [`PAPERLESS_DBENGINE=<engine>`](#PAPERLESS_DBENGINE) {#PAPERLESS_DBENGINE}
: Specifies the database engine to use. Accepted values are `sqlite`, `postgresql`,
and `mariadb`.
Defaults to `sqlite` if not set.
PostgreSQL and MariaDB both require [`PAPERLESS_DBHOST`](#PAPERLESS_DBHOST) to be
set. SQLite does not use any other connection variables; the database file is always
located at `<PAPERLESS_DATA_DIR>/db.sqlite3`.
!!! warning
Using MariaDB comes with some caveats.
See [MySQL Caveats](advanced_usage.md#mysql-caveats).
#### [`PAPERLESS_DBHOST=<hostname>`](#PAPERLESS_DBHOST) {#PAPERLESS_DBHOST}
: If unset, Paperless uses **SQLite** by default.
Set `PAPERLESS_DBHOST` to switch to PostgreSQL or MariaDB instead.
#### [`PAPERLESS_DBENGINE=<engine_name>`](#PAPERLESS_DBENGINE) {#PAPERLESS_DBENGINE}
: Optional. Specifies the database engine to use when connecting to a remote database.
Available options are `postgresql` and `mariadb`.
Defaults to `postgresql` if `PAPERLESS_DBHOST` is set.
!!! warning
Using MariaDB comes with some caveats. See [MySQL Caveats](advanced_usage.md#mysql-caveats).
: Hostname of the PostgreSQL or MariaDB database server. Required when
`PAPERLESS_DBENGINE` is `postgresql` or `mariadb`.
#### [`PAPERLESS_DBPORT=<port>`](#PAPERLESS_DBPORT) {#PAPERLESS_DBPORT}
: Port to use when connecting to PostgreSQL or MariaDB.
Default is `5432` for PostgreSQL and `3306` for MariaDB.
Defaults to `5432` for PostgreSQL and `3306` for MariaDB.
#### [`PAPERLESS_DBNAME=<name>`](#PAPERLESS_DBNAME) {#PAPERLESS_DBNAME}
: Name of the database to connect to when using PostgreSQL or MariaDB.
: Name of the PostgreSQL or MariaDB database to connect to.
Defaults to "paperless".
Defaults to `paperless`.
#### [`PAPERLESS_DBUSER=<name>`](#PAPERLESS_DBUSER) {#PAPERLESS_DBUSER}
#### [`PAPERLESS_DBUSER=<user>`](#PAPERLESS_DBUSER) {#PAPERLESS_DBUSER}
: Username for authenticating with the PostgreSQL or MariaDB database.
Defaults to "paperless".
Defaults to `paperless`.
#### [`PAPERLESS_DBPASS=<password>`](#PAPERLESS_DBPASS) {#PAPERLESS_DBPASS}
: Password for the PostgreSQL or MariaDB database user.
Defaults to "paperless".
Defaults to `paperless`.
#### [`PAPERLESS_DBSSLMODE=<mode>`](#PAPERLESS_DBSSLMODE) {#PAPERLESS_DBSSLMODE}
#### [`PAPERLESS_DB_OPTIONS=<options>`](#PAPERLESS_DB_OPTIONS) {#PAPERLESS_DB_OPTIONS}
: SSL mode to use when connecting to PostgreSQL or MariaDB.
: Advanced database connection options as a semicolon-delimited key-value string.
Keys and values are separated by `=`. Dot-notation produces nested option
dictionaries; for example, `pool.max_size=20` sets
`OPTIONS["pool"]["max_size"] = 20`.
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
Options specified here are merged over the engine defaults. Unrecognised keys
are passed through to the underlying database driver without validation, so a
typo will be silently ignored rather than producing an error.
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode).
Refer to your database driver's documentation for the full set of accepted keys:
*Note*: SSL mode values differ between PostgreSQL and MariaDB.
- PostgreSQL: [libpq connection parameters](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS)
- MariaDB: [MariaDB Connector/Python](https://mariadb.com/kb/en/mariadb-connector-python/)
- SQLite: [SQLite PRAGMA statements](https://www.sqlite.org/pragma.html)
Default is `prefer` for PostgreSQL and `PREFERRED` for MariaDB.
!!! note "PostgreSQL connection pooling"
#### [`PAPERLESS_DBSSLROOTCERT=<ca-path>`](#PAPERLESS_DBSSLROOTCERT) {#PAPERLESS_DBSSLROOTCERT}
Pool size is controlled via `pool.min_size` and `pool.max_size`. When
configuring pooling, ensure your PostgreSQL `max_connections` is large enough
to handle all pool connections across all workers:
`(web_workers + celery_workers) * pool.max_size + safety_margin`.
: Path to the SSL root certificate used to verify the database server.
**Examples:**
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
Changes the location of `root.crt`.
```bash title="PostgreSQL: require SSL, set a custom CA certificate, and limit the pool size"
PAPERLESS_DB_OPTIONS="sslmode=require;sslrootcert=/certs/ca.pem;pool.max_size=5"
```
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-ca).
```bash title="MariaDB: require SSL with a custom CA certificate"
PAPERLESS_DB_OPTIONS="ssl_mode=REQUIRED;ssl.ca=/certs/ca.pem"
```
Defaults to unset, using the standard location in the home directory.
```bash title="SQLite: set a busy timeout of 30 seconds"
# PostgreSQL: set a connection timeout
PAPERLESS_DB_OPTIONS="connect_timeout=10"
```
#### [`PAPERLESS_DBSSLCERT=<client-cert-path>`](#PAPERLESS_DBSSLCERT) {#PAPERLESS_DBSSLCERT}
#### ~~[`PAPERLESS_DBSSLMODE`](#PAPERLESS_DBSSLMODE)~~ {#PAPERLESS_DBSSLMODE}
: Path to the client SSL certificate used when connecting securely.
!!! failure "Removed in v3"
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-cert).
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslmode=require"
```
Changes the location of `postgresql.crt`.
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl_mode=REQUIRED"
```
Defaults to unset, using the standard location in the home directory.
#### ~~[`PAPERLESS_DBSSLROOTCERT`](#PAPERLESS_DBSSLROOTCERT)~~ {#PAPERLESS_DBSSLROOTCERT}
#### [`PAPERLESS_DBSSLKEY=<client-cert-key>`](#PAPERLESS_DBSSLKEY) {#PAPERLESS_DBSSLKEY}
!!! failure "Removed in v3"
: Path to the client SSL private key used when connecting securely.
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
See [the official documentation about
sslmode for PostgreSQL](https://www.postgresql.org/docs/current/libpq-ssl.html).
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslrootcert=/path/to/ca.pem"
```
See [the official documentation about
sslmode for MySQL and MariaDB](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-key).
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl.ca=/path/to/ca.pem"
```
Changes the location of `postgresql.key`.
#### ~~[`PAPERLESS_DBSSLCERT`](#PAPERLESS_DBSSLCERT)~~ {#PAPERLESS_DBSSLCERT}
Defaults to unset, using the standard location in the home directory.
!!! failure "Removed in v3"
#### [`PAPERLESS_DB_TIMEOUT=<int>`](#PAPERLESS_DB_TIMEOUT) {#PAPERLESS_DB_TIMEOUT}
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
: Sets how long a database connection should wait before timing out.
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslcert=/path/to/client.crt"
```
For SQLite, this sets how long to wait if the database is locked.
For PostgreSQL or MariaDB, this sets the connection timeout.
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl.cert=/path/to/client.crt"
```
Defaults to unset, which uses Djangos built-in defaults.
#### ~~[`PAPERLESS_DBSSLKEY`](#PAPERLESS_DBSSLKEY)~~ {#PAPERLESS_DBSSLKEY}
#### [`PAPERLESS_DB_POOLSIZE=<int>`](#PAPERLESS_DB_POOLSIZE) {#PAPERLESS_DB_POOLSIZE}
!!! failure "Removed in v3"
: Defines the maximum number of database connections to keep in the pool.
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
Only applies to PostgreSQL. This setting is ignored for other database engines.
```bash title="PostgreSQL"
PAPERLESS_DB_OPTIONS="sslkey=/path/to/client.key"
```
The value must be greater than or equal to 1 to be used.
Defaults to unset, which disables connection pooling.
```bash title="MariaDB"
PAPERLESS_DB_OPTIONS="ssl.key=/path/to/client.key"
```
!!! note
#### ~~[`PAPERLESS_DB_TIMEOUT`](#PAPERLESS_DB_TIMEOUT)~~ {#PAPERLESS_DB_TIMEOUT}
A pool of 8-10 connections per worker is typically sufficient.
If you encounter error messages such as `couldn't get a connection`
or database connection timeouts, you probably need to increase the pool size.
!!! failure "Removed in v3"
!!! warning
Make sure your PostgreSQL `max_connections` setting is large enough to handle the connection pools:
`(NB_PAPERLESS_WORKERS + NB_CELERY_WORKERS) × POOL_SIZE + SAFETY_MARGIN`. For example, with
4 Paperless workers and 2 Celery workers, and a pool size of 8:``(4 + 2) × 8 + 10 = 58`,
so `max_connections = 60` (or even more) is appropriate.
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
This assumes only Paperless-ngx connects to your PostgreSQL instance. If you have other applications,
you should increase `max_connections` accordingly.
```bash title="SQLite"
PAPERLESS_DB_OPTIONS="timeout=30"
```
```bash title="PostgreSQL or MariaDB"
PAPERLESS_DB_OPTIONS="connect_timeout=30"
```
#### ~~[`PAPERLESS_DB_POOLSIZE`](#PAPERLESS_DB_POOLSIZE)~~ {#PAPERLESS_DB_POOLSIZE}
!!! failure "Removed in v3"
Use [`PAPERLESS_DB_OPTIONS`](#PAPERLESS_DB_OPTIONS) instead.
```bash
PAPERLESS_DB_OPTIONS="pool.max_size=10"
```
#### [`PAPERLESS_DB_READ_CACHE_ENABLED=<bool>`](#PAPERLESS_DB_READ_CACHE_ENABLED) {#PAPERLESS_DB_READ_CACHE_ENABLED}

View File

@@ -48,3 +48,58 @@ The `CONSUMER_BARCODE_SCANNER` setting has been removed. zxing-cpp is now the on
reliability.
- The `libzbar0` / `libzbar-dev` system packages are no longer required and can be removed from any custom Docker
images or host installations.
## Database Engine
`PAPERLESS_DBENGINE` is now required to use PostgreSQL or MariaDB. Previously, the
engine was inferred from the presence of `PAPERLESS_DBHOST`, with `PAPERLESS_DBENGINE`
only needed to select MariaDB over PostgreSQL.
SQLite users require no changes, though they may explicitly set their engine if desired.
#### Action Required
PostgreSQL and MariaDB users must add `PAPERLESS_DBENGINE` to their environment:
```yaml
# v2 (PostgreSQL inferred from PAPERLESS_DBHOST)
PAPERLESS_DBHOST: postgres
# v3 (engine must be explicit)
PAPERLESS_DBENGINE: postgresql
PAPERLESS_DBHOST: postgres
```
See [`PAPERLESS_DBENGINE`](configuration.md#PAPERLESS_DBENGINE) for accepted values.
## Database Advanced Options
The individual SSL, timeout, and pooling variables have been removed in favor of a
single [`PAPERLESS_DB_OPTIONS`](configuration.md#PAPERLESS_DB_OPTIONS) string. This
consolidates a growing set of engine-specific variables into one place, and allows
any option supported by the underlying database driver to be set without requiring a
dedicated environment variable for each.
The removed variables and their replacements are:
| Removed Variable | Replacement in `PAPERLESS_DB_OPTIONS` |
| ------------------------- | ---------------------------------------------------------------------------- |
| `PAPERLESS_DBSSLMODE` | `sslmode=<value>` (PostgreSQL) or `ssl_mode=<value>` (MariaDB) |
| `PAPERLESS_DBSSLROOTCERT` | `sslrootcert=<path>` (PostgreSQL) or `ssl.ca=<path>` (MariaDB) |
| `PAPERLESS_DBSSLCERT` | `sslcert=<path>` (PostgreSQL) or `ssl.cert=<path>` (MariaDB) |
| `PAPERLESS_DBSSLKEY` | `sslkey=<path>` (PostgreSQL) or `ssl.key=<path>` (MariaDB) |
| `PAPERLESS_DB_POOLSIZE` | `pool.max_size=<value>` (PostgreSQL only) |
| `PAPERLESS_DB_TIMEOUT` | `timeout=<value>` (SQLite) or `connect_timeout=<value>` (PostgreSQL/MariaDB) |
The deprecated variables will continue to function for now but will be removed in a
future release. A deprecation warning is logged at startup for each deprecated variable
that is still set.
#### Action Required
Users with any of the deprecated variables set should migrate to `PAPERLESS_DB_OPTIONS`.
Multiple options are combined in a single value:
```bash
PAPERLESS_DB_OPTIONS="sslmode=require;sslrootcert=/certs/ca.pem;pool.max_size=10"
```

View File

@@ -504,8 +504,7 @@ installation. Keep these points in mind:
- Read the [changelog](changelog.md) and
take note of breaking changes.
- Decide whether to stay on SQLite or migrate to PostgreSQL.
See [documentation](#sqlite_to_psql) for details on moving data
from SQLite to PostgreSQL. Both work fine with
Both work fine with
Paperless. However, if you already have a database server running
for other services, you might as well use it for Paperless as well.
- The task scheduler of Paperless, which is used to execute periodic

View File

@@ -57,7 +57,7 @@
}
</div>
@for (version of versions; track version.id) {
<div class="dropdown-item border-top px-0">
<div class="dropdown-item border-top px-0" [class.pe-3]="versions.length === 1">
<div class="d-flex align-items-center w-100 py-2 version-item">
<div class="btn btn-link link-underline link-underline-opacity-0 d-flex align-items-center small text-start p-0 version-link"
(click)="selectVersion(version.id)"
@@ -88,7 +88,7 @@
@if (version.version_label) {
{{ version.version_label }}
} @else {
<span i18n>Version</span>&nbsp;#{{ version.id }}
<ng-container i18n>Version</ng-container>&nbsp;{{ versions.length - $index }}&nbsp;<span class="text-muted small">(#{{ version.id }})</span>
}
</span>
}

View File

@@ -1,97 +1,17 @@
"""Management command to check the document archive for issues."""
from django.core.management.base import BaseCommand
from __future__ import annotations
import logging
from typing import Any
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from documents.management.commands.base import PaperlessCommand
from documents.models import Document
from documents.sanity_checker import SanityCheckMessages
from documents.management.commands.mixins import ProgressBarMixin
from documents.sanity_checker import check_sanity
_LEVEL_STYLE: dict[int, tuple[str, str]] = {
logging.ERROR: ("bold red", "ERROR"),
logging.WARNING: ("yellow", "WARN"),
logging.INFO: ("dim", "INFO"),
}
class Command(PaperlessCommand):
class Command(ProgressBarMixin, BaseCommand):
help = "This command checks your document archive for issues."
def _render_results(self, messages: SanityCheckMessages) -> None:
"""Render sanity check results as a Rich table."""
console = self.console
def add_arguments(self, parser):
self.add_argument_progress_bar_mixin(parser)
if len(messages) == 0:
console.print(
Panel(
"[green]No issues detected.[/green]",
title="Sanity Check",
border_style="green",
),
)
return
def handle(self, *args, **options):
self.handle_progress_bar_mixin(**options)
messages = check_sanity(progress=self.use_progress_bar, scheduled=False)
# Build a lookup for document titles
doc_pks = [pk for pk in messages.document_pks() if pk is not None]
titles: dict[int, str] = {}
if doc_pks:
titles = dict(
Document.global_objects.filter(pk__in=doc_pks)
.only("pk", "title")
.values_list("pk", "title"),
)
table = Table(
title="Sanity Check Results",
show_lines=True,
title_style="bold",
)
table.add_column("Level", width=7, no_wrap=True)
table.add_column("Document", min_width=20)
table.add_column("Issue", ratio=1)
for doc_pk, doc_messages in messages.iter_messages():
if doc_pk is not None:
title = titles.get(doc_pk, "Unknown")
doc_label = f"#{doc_pk} {title}"
else:
doc_label = "(global)"
for msg in doc_messages:
style, label = _LEVEL_STYLE.get(
msg["level"],
("dim", "INFO"),
)
table.add_row(
Text(label, style=style),
doc_label,
msg["message"],
)
console.print(table)
# Summary line
parts: list[str] = []
if messages.has_error:
parts.append("[bold red]errors[/bold red]")
if messages.has_warning:
parts.append("[yellow]warnings[/yellow]")
summary = " and ".join(parts) if parts else "infos"
console.print(f"\nFound {len(messages)} document(s) with {summary}.")
def handle(self, *args: Any, **options: Any) -> None:
messages = check_sanity(
scheduled=False,
iter_wrapper=lambda docs: self.track(
docs,
description="Checking documents...",
),
)
self._render_results(messages)
messages.log_messages()

View File

@@ -1,141 +1,80 @@
"""
Sanity checker for the Paperless-ngx document archive.
Verifies that all documents have valid files, correct checksums,
and consistent metadata. Reports orphaned files in the media directory.
Progress display is the caller's responsibility -- pass an ``iter_wrapper``
to wrap the document queryset (e.g., with a progress bar). The default
is an identity function that adds no overhead.
"""
from __future__ import annotations
import hashlib
import logging
import uuid
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Final
from typing import TypedDict
from typing import TypeVar
from celery import states
from django.conf import settings
from django.utils import timezone
from tqdm import tqdm
from documents.models import Document
from documents.models import PaperlessTask
from paperless.config import GeneralConfig
logger = logging.getLogger("paperless.sanity_checker")
_T = TypeVar("_T")
IterWrapper = Callable[[Iterable[_T]], Iterable[_T]]
class MessageEntry(TypedDict):
"""A single sanity check message with its severity level."""
level: int
message: str
def _identity(iterable: Iterable[_T]) -> Iterable[_T]:
"""Pass through an iterable unchanged (default iter_wrapper)."""
return iterable
class SanityCheckMessages:
"""Collects sanity check messages grouped by document primary key.
Messages are categorized as error, warning, or info. ``None`` is used
as the key for messages not associated with a specific document
(e.g., orphaned files).
"""
def __init__(self) -> None:
self._messages: dict[int | None, list[MessageEntry]] = defaultdict(list)
self.has_error: bool = False
self.has_warning: bool = False
self._messages: dict[int, list[dict]] = defaultdict(list)
self.has_error = False
self.has_warning = False
# -- Recording ----------------------------------------------------------
def error(self, doc_pk: int | None, message: str) -> None:
def error(self, doc_pk, message) -> None:
self._messages[doc_pk].append({"level": logging.ERROR, "message": message})
self.has_error = True
def warning(self, doc_pk: int | None, message: str) -> None:
def warning(self, doc_pk, message) -> None:
self._messages[doc_pk].append({"level": logging.WARNING, "message": message})
self.has_warning = True
def info(self, doc_pk: int | None, message: str) -> None:
def info(self, doc_pk, message) -> None:
self._messages[doc_pk].append({"level": logging.INFO, "message": message})
# -- Iteration / query --------------------------------------------------
def document_pks(self) -> list[int | None]:
"""Return all document PKs (including None for global messages)."""
return list(self._messages.keys())
def iter_messages(self) -> Iterator[tuple[int | None, list[MessageEntry]]]:
"""Iterate over (doc_pk, messages) pairs."""
yield from self._messages.items()
def __len__(self) -> int:
return len(self._messages)
def __getitem__(self, item: int | None) -> list[MessageEntry]:
return self._messages[item]
# -- Logging output (used by Celery task path) --------------------------
def log_messages(self) -> None:
"""Write all messages to the ``paperless.sanity_checker`` logger.
logger = logging.getLogger("paperless.sanity_checker")
This is the output path for headless / Celery execution.
Management commands use Rich rendering instead.
"""
if len(self._messages) == 0:
logger.info("Sanity checker detected no issues.")
return
else:
# Query once
all_docs = Document.global_objects.all()
doc_pks = [pk for pk in self._messages if pk is not None]
titles: dict[int, str] = {}
if doc_pks:
titles = dict(
Document.global_objects.filter(pk__in=doc_pks)
.only("pk", "title")
.values_list("pk", "title"),
)
for doc_pk in self._messages:
if doc_pk is not None:
doc = all_docs.get(pk=doc_pk)
logger.info(
f"Detected following issue(s) with document #{doc.pk},"
f" titled {doc.title}",
)
for msg in self._messages[doc_pk]:
logger.log(msg["level"], msg["message"])
for doc_pk, entries in self._messages.items():
if doc_pk is not None:
title = titles.get(doc_pk, "Unknown")
logger.info(
"Detected following issue(s) with document #%s, titled %s",
doc_pk,
title,
)
for msg in entries:
logger.log(msg["level"], msg["message"])
def __len__(self):
return len(self._messages)
def __getitem__(self, item):
return self._messages[item]
class SanityCheckFailedException(Exception):
pass
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def check_sanity(*, progress=False, scheduled=True) -> SanityCheckMessages:
paperless_task = PaperlessTask.objects.create(
task_id=uuid.uuid4(),
type=PaperlessTask.TaskType.SCHEDULED_TASK
if scheduled
else PaperlessTask.TaskType.MANUAL_TASK,
task_name=PaperlessTask.TaskName.CHECK_SANITY,
status=states.STARTED,
date_created=timezone.now(),
date_started=timezone.now(),
)
messages = SanityCheckMessages()
def _build_present_files() -> set[Path]:
"""Collect all files in MEDIA_ROOT, excluding directories and ignorable files."""
present_files = {
x.resolve()
for x in Path(settings.MEDIA_ROOT).glob("**/*")
@@ -143,167 +82,95 @@ def _build_present_files() -> set[Path]:
}
lockfile = Path(settings.MEDIA_LOCK).resolve()
present_files.discard(lockfile)
if lockfile in present_files:
present_files.remove(lockfile)
general_config = GeneralConfig()
app_logo = general_config.app_logo or settings.APP_LOGO
if app_logo:
logo_file = Path(settings.MEDIA_ROOT / Path(app_logo.lstrip("/"))).resolve()
present_files.discard(logo_file)
if logo_file in present_files:
present_files.remove(logo_file)
return present_files
def _check_thumbnail(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Verify the thumbnail exists and is readable."""
thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve()
if not thumbnail_path.exists() or not thumbnail_path.is_file():
messages.error(doc.pk, "Thumbnail of document does not exist.")
return
present_files.discard(thumbnail_path)
try:
_ = thumbnail_path.read_bytes()
except OSError as e:
messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}")
def _check_original(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Verify the original file exists, is readable, and has matching checksum."""
source_path: Final[Path] = Path(doc.source_path).resolve()
if not source_path.exists() or not source_path.is_file():
messages.error(doc.pk, "Original of document does not exist.")
return
present_files.discard(source_path)
try:
checksum = hashlib.md5(source_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(doc.pk, f"Cannot read original file of document: {e}")
else:
if checksum != doc.checksum:
messages.error(
doc.pk,
f"Checksum mismatch. Stored: {doc.checksum}, actual: {checksum}.",
)
def _check_archive(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Verify archive file consistency: checksum/filename pairing and file integrity."""
if doc.archive_checksum is not None and doc.archive_filename is None:
messages.error(
doc.pk,
"Document has an archive file checksum, but no archive filename.",
)
elif doc.archive_checksum is None and doc.archive_filename is not None:
messages.error(
doc.pk,
"Document has an archive file, but its checksum is missing.",
)
elif doc.has_archive_version:
if TYPE_CHECKING:
assert isinstance(doc.archive_path, Path)
archive_path: Final[Path] = Path(doc.archive_path).resolve()
if not archive_path.exists() or not archive_path.is_file():
messages.error(doc.pk, "Archived version of document does not exist.")
return
present_files.discard(archive_path)
try:
checksum = hashlib.md5(archive_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(
doc.pk,
f"Cannot read archive file of document : {e}",
)
for doc in tqdm(Document.global_objects.all(), disable=not progress):
# Check sanity of the thumbnail
thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve()
if not thumbnail_path.exists() or not thumbnail_path.is_file():
messages.error(doc.pk, "Thumbnail of document does not exist.")
else:
if checksum != doc.archive_checksum:
messages.error(
doc.pk,
"Checksum mismatch of archived document. "
f"Stored: {doc.archive_checksum}, actual: {checksum}.",
)
if thumbnail_path in present_files:
present_files.remove(thumbnail_path)
try:
_ = thumbnail_path.read_bytes()
except OSError as e:
messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}")
# Check sanity of the original file
# TODO: extract method
source_path: Final[Path] = Path(doc.source_path).resolve()
if not source_path.exists() or not source_path.is_file():
messages.error(doc.pk, "Original of document does not exist.")
else:
if source_path in present_files:
present_files.remove(source_path)
try:
checksum = hashlib.md5(source_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(doc.pk, f"Cannot read original file of document: {e}")
else:
if checksum != doc.checksum:
messages.error(
doc.pk,
"Checksum mismatch. "
f"Stored: {doc.checksum}, actual: {checksum}.",
)
def _check_content(doc: Document, messages: SanityCheckMessages) -> None:
"""Flag documents with no OCR content."""
if not doc.content:
messages.info(doc.pk, "Document contains no OCR data")
# Check sanity of the archive file.
if doc.archive_checksum is not None and doc.archive_filename is None:
messages.error(
doc.pk,
"Document has an archive file checksum, but no archive filename.",
)
elif doc.archive_checksum is None and doc.archive_filename is not None:
messages.error(
doc.pk,
"Document has an archive file, but its checksum is missing.",
)
elif doc.has_archive_version:
archive_path: Final[Path] = Path(doc.archive_path).resolve()
if not archive_path.exists() or not archive_path.is_file():
messages.error(doc.pk, "Archived version of document does not exist.")
else:
if archive_path in present_files:
present_files.remove(archive_path)
try:
checksum = hashlib.md5(archive_path.read_bytes()).hexdigest()
except OSError as e:
messages.error(
doc.pk,
f"Cannot read archive file of document : {e}",
)
else:
if checksum != doc.archive_checksum:
messages.error(
doc.pk,
"Checksum mismatch of archived document. "
f"Stored: {doc.archive_checksum}, "
f"actual: {checksum}.",
)
def _check_document(
doc: Document,
messages: SanityCheckMessages,
present_files: set[Path],
) -> None:
"""Run all checks for a single document."""
_check_thumbnail(doc, messages, present_files)
_check_original(doc, messages, present_files)
_check_archive(doc, messages, present_files)
_check_content(doc, messages)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def check_sanity(
*,
scheduled: bool = True,
iter_wrapper: IterWrapper[Document] = _identity,
) -> SanityCheckMessages:
"""Run a full sanity check on the document archive.
Args:
scheduled: Whether this is a scheduled (automatic) or manual check.
Controls the task type recorded in the database.
iter_wrapper: A callable that wraps the document iterable, e.g.,
for progress bar display. Defaults to identity (no wrapping).
Returns:
A SanityCheckMessages instance containing all detected issues.
"""
paperless_task = PaperlessTask.objects.create(
task_id=uuid.uuid4(),
type=(
PaperlessTask.TaskType.SCHEDULED_TASK
if scheduled
else PaperlessTask.TaskType.MANUAL_TASK
),
task_name=PaperlessTask.TaskName.CHECK_SANITY,
status=states.STARTED,
date_created=timezone.now(),
date_started=timezone.now(),
)
messages = SanityCheckMessages()
present_files = _build_present_files()
documents = Document.global_objects.all()
for doc in iter_wrapper(documents):
_check_document(doc, messages, present_files)
# other document checks
if not doc.content:
messages.info(doc.pk, "Document contains no OCR data")
for extra_file in present_files:
messages.warning(None, f"Orphaned file in media dir: {extra_file}")
paperless_task.status = states.SUCCESS if not messages.has_error else states.FAILURE
# result is concatenated messages
paperless_task.result = f"{len(messages)} issues found."
if messages.has_error:
paperless_task.result += " Check logs for details."
paperless_task.date_done = timezone.now()
paperless_task.save(update_fields=["status", "result", "date_done"])
return messages

View File

@@ -1,96 +1,10 @@
import shutil
import zoneinfo
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
import filelock
import pytest
from django.contrib.auth import get_user_model
from pytest_django.fixtures import SettingsWrapper
from rest_framework.test import APIClient
from documents.tests.factories import DocumentFactory
if TYPE_CHECKING:
from documents.models import Document
@dataclass(frozen=True, slots=True)
class PaperlessDirs:
"""Standard Paperless-ngx directory layout for tests."""
media: Path
originals: Path
archive: Path
thumbnails: Path
@pytest.fixture(scope="session")
def samples_dir() -> Path:
"""Path to the shared test sample documents."""
return Path(__file__).parent / "samples" / "documents"
@pytest.fixture()
def paperless_dirs(tmp_path: Path) -> PaperlessDirs:
"""Create and return the directory structure for testing."""
media = tmp_path / "media"
dirs = PaperlessDirs(
media=media,
originals=media / "documents" / "originals",
archive=media / "documents" / "archive",
thumbnails=media / "documents" / "thumbnails",
)
for d in (dirs.originals, dirs.archive, dirs.thumbnails):
d.mkdir(parents=True)
return dirs
@pytest.fixture()
def _media_settings(paperless_dirs: PaperlessDirs, settings) -> None:
"""Configure Django settings to point at temp directories."""
settings.MEDIA_ROOT = paperless_dirs.media
settings.ORIGINALS_DIR = paperless_dirs.originals
settings.ARCHIVE_DIR = paperless_dirs.archive
settings.THUMBNAIL_DIR = paperless_dirs.thumbnails
settings.MEDIA_LOCK = paperless_dirs.media / "media.lock"
settings.IGNORABLE_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
settings.APP_LOGO = ""
@pytest.fixture()
def sample_doc(
paperless_dirs: PaperlessDirs,
_media_settings: None,
samples_dir: Path,
) -> "Document":
"""Create a document with valid files and matching checksums."""
with filelock.FileLock(paperless_dirs.media / "media.lock"):
shutil.copy(
samples_dir / "originals" / "0000001.pdf",
paperless_dirs.originals / "0000001.pdf",
)
shutil.copy(
samples_dir / "archive" / "0000001.pdf",
paperless_dirs.archive / "0000001.pdf",
)
shutil.copy(
samples_dir / "thumbnails" / "0000001.webp",
paperless_dirs.thumbnails / "0000001.webp",
)
return DocumentFactory(
title="test",
checksum="42995833e01aea9b3edee44bbfdd7ce1",
archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b",
content="test content",
pk=1,
filename="0000001.pdf",
mime_type="application/pdf",
archive_filename="0000001.pdf",
)
@pytest.fixture()
def settings_timezone(settings: SettingsWrapper) -> zoneinfo.ZoneInfo:

View File

@@ -1,173 +0,0 @@
"""Tests for the document_sanity_checker management command.
Verifies Rich rendering (table, panel, summary) and end-to-end CLI behavior.
"""
from __future__ import annotations
from io import StringIO
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from django.core.management import call_command
from rich.console import Console
from documents.management.commands.document_sanity_checker import Command
from documents.sanity_checker import SanityCheckMessages
from documents.tests.factories import DocumentFactory
if TYPE_CHECKING:
from documents.models import Document
from documents.tests.conftest import PaperlessDirs
def _render_to_string(messages: SanityCheckMessages) -> str:
"""Render command output to a plain string for assertion."""
buf = StringIO()
cmd = Command()
cmd.console = Console(file=buf, width=120, no_color=True)
cmd._render_results(messages)
return buf.getvalue()
# ---------------------------------------------------------------------------
# Rich rendering
# ---------------------------------------------------------------------------
class TestRenderResultsNoIssues:
"""No DB access needed -- renders an empty SanityCheckMessages."""
def test_shows_panel(self) -> None:
output = _render_to_string(SanityCheckMessages())
assert "No issues detected" in output
assert "Sanity Check" in output
@pytest.mark.django_db
class TestRenderResultsWithIssues:
def test_error_row(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "Original missing")
output = _render_to_string(msgs)
assert "Sanity Check Results" in output
assert "ERROR" in output
assert "Original missing" in output
assert f"#{sample_doc.pk}" in output
assert sample_doc.title in output
def test_warning_row(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.warning(sample_doc.pk, "Suspicious file")
output = _render_to_string(msgs)
assert "WARN" in output
assert "Suspicious file" in output
def test_info_row(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.info(sample_doc.pk, "No OCR data")
output = _render_to_string(msgs)
assert "INFO" in output
assert "No OCR data" in output
@pytest.mark.usefixtures("_media_settings")
def test_global_message(self) -> None:
msgs = SanityCheckMessages()
msgs.warning(None, "Orphaned file: /tmp/stray.pdf")
output = _render_to_string(msgs)
assert "(global)" in output
assert "Orphaned file" in output
def test_multiple_messages_same_doc(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "Thumbnail missing")
msgs.error(sample_doc.pk, "Checksum mismatch")
output = _render_to_string(msgs)
assert "Thumbnail missing" in output
assert "Checksum mismatch" in output
@pytest.mark.usefixtures("_media_settings")
def test_unknown_doc_pk(self) -> None:
msgs = SanityCheckMessages()
msgs.error(99999, "Ghost document")
output = _render_to_string(msgs)
assert "#99999" in output
assert "Unknown" in output
@pytest.mark.django_db
class TestRenderResultsSummary:
def test_errors_only(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "broken")
output = _render_to_string(msgs)
assert "errors" in output
assert "Found 1 document(s)" in output
def test_warnings_only(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.warning(sample_doc.pk, "odd")
output = _render_to_string(msgs)
assert "warnings" in output
def test_errors_and_warnings(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.error(sample_doc.pk, "broken")
msgs.warning(None, "orphan")
output = _render_to_string(msgs)
assert "errors" in output
assert "warnings" in output
assert "Found 2 document(s)" in output
def test_infos_only(self, sample_doc: Document) -> None:
msgs = SanityCheckMessages()
msgs.info(sample_doc.pk, "no OCR")
output = _render_to_string(msgs)
assert "infos" in output
# ---------------------------------------------------------------------------
# End-to-end command execution
# ---------------------------------------------------------------------------
@pytest.mark.django_db
@pytest.mark.management
class TestDocumentSanityCheckerCommand:
def test_no_issues(self, sample_doc: Document) -> None:
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
assert "No issues detected" in out.getvalue()
@pytest.mark.usefixtures("_media_settings")
def test_no_issues_empty_archive(self) -> None:
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
assert "No issues detected" in out.getvalue()
def test_missing_original(self, sample_doc: Document) -> None:
Path(sample_doc.source_path).unlink()
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
output = out.getvalue()
assert "ERROR" in output
assert "Original of document does not exist" in output
@pytest.mark.usefixtures("_media_settings")
def test_checksum_mismatch(self, paperless_dirs: PaperlessDirs) -> None:
"""Lightweight document with zero-byte files triggers checksum mismatch."""
doc = DocumentFactory(
title="test",
content="test",
filename="test.pdf",
checksum="abc",
)
Path(doc.source_path).touch()
Path(doc.thumbnail_path).touch()
out = StringIO()
call_command("document_sanity_checker", "--no-progress-bar", stdout=out)
output = out.getvalue()
assert "ERROR" in output
assert "Checksum mismatch. Stored: abc, actual:" in output

View File

@@ -21,6 +21,16 @@ class TestApiUiSettings(DirectoriesMixin, APITestCase):
self.test_user.save()
self.client.force_authenticate(user=self.test_user)
@override_settings(
APP_TITLE=None,
APP_LOGO=None,
AUDIT_LOG_ENABLED=True,
EMPTY_TRASH_DELAY=30,
ENABLE_UPDATE_CHECK="default",
EMAIL_ENABLED=False,
GMAIL_OAUTH_ENABLED=False,
OUTLOOK_OAUTH_ENABLED=False,
)
def test_api_get_ui_settings(self) -> None:
response = self.client.get(self.ENDPOINT, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)

View File

@@ -919,6 +919,7 @@ class TestTagBarcode(DirectoriesMixin, SampleDirMixin, GetReaderPluginMixin, Tes
@override_settings(
CONSUMER_ENABLE_TAG_BARCODE=True,
CONSUMER_TAG_BARCODE_MAPPING={"ASN(.*)": "\\g<1>"},
CONSUMER_ENABLE_ASN_BARCODE=False,
)
def test_scan_file_for_many_custom_tags(self) -> None:
"""

View File

@@ -329,14 +329,14 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
FILENAME_FORMAT="{added_year}-{added_month}-{added_day}",
)
def test_added_year_month_day(self) -> None:
d1 = timezone.make_aware(datetime.datetime(232, 1, 9, 1, 1, 1))
d1 = timezone.make_aware(datetime.datetime(1232, 1, 9, 1, 1, 1))
doc1 = Document.objects.create(
title="doc1",
mime_type="application/pdf",
added=d1,
)
self.assertEqual(generate_filename(doc1), Path("232-01-09.pdf"))
self.assertEqual(generate_filename(doc1), Path("1232-01-09.pdf"))
doc1.added = timezone.make_aware(datetime.datetime(2020, 11, 16, 1, 1, 1))

View File

@@ -134,7 +134,6 @@ class TestRenamer(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
self.assertIsFile(doc2.archive_path)
@pytest.mark.management
class TestCreateClassifier(TestCase):
@mock.patch(
"documents.management.commands.document_create_classifier.train_classifier",
@@ -145,6 +144,32 @@ class TestCreateClassifier(TestCase):
m.assert_called_once()
@pytest.mark.management
class TestSanityChecker(DirectoriesMixin, TestCase):
def test_no_issues(self) -> None:
with self.assertLogs() as capture:
call_command("document_sanity_checker")
self.assertEqual(len(capture.output), 1)
self.assertIn("Sanity checker detected no issues.", capture.output[0])
def test_errors(self) -> None:
doc = Document.objects.create(
title="test",
content="test",
filename="test.pdf",
checksum="abc",
)
Path(doc.source_path).touch()
Path(doc.thumbnail_path).touch()
with self.assertLogs() as capture:
call_command("document_sanity_checker")
self.assertEqual(len(capture.output), 2)
self.assertIn("Checksum mismatch. Stored: abc, actual:", capture.output[1])
@pytest.mark.management
class TestConvertMariaDBUUID(TestCase):
@mock.patch("django.db.connection.schema_editor")

View File

@@ -140,7 +140,7 @@ class TestFuzzyMatchCommand(TestCase):
mime_type="application/pdf",
filename="final_test.pdf",
)
stdout, _ = self.call_command("--no-progress-bar")
stdout, _ = self.call_command("--no-progress-bar", "--processes", "1")
lines = [x.strip() for x in stdout.splitlines() if x.strip()]
self.assertEqual(len(lines), 3)
for line in lines:
@@ -183,7 +183,12 @@ class TestFuzzyMatchCommand(TestCase):
self.assertEqual(Document.objects.count(), 3)
stdout, _ = self.call_command("--delete", "--no-progress-bar")
stdout, _ = self.call_command(
"--delete",
"--no-progress-bar",
"--processes",
"1",
)
self.assertIn(
"The command is configured to delete documents. Use with caution",

View File

@@ -1,295 +1,192 @@
"""Tests for the sanity checker module.
Tests exercise ``check_sanity`` as a whole, verifying document validation,
orphan detection, task recording, and the iter_wrapper contract.
"""
from __future__ import annotations
import logging
import shutil
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
import filelock
from django.conf import settings
from django.test import TestCase
from django.test import override_settings
from documents.models import Document
from documents.models import PaperlessTask
from documents.sanity_checker import check_sanity
if TYPE_CHECKING:
from collections.abc import Iterable
from documents.tests.conftest import PaperlessDirs
from documents.tests.utils import DirectoriesMixin
@pytest.mark.django_db
class TestCheckSanityNoDocuments:
"""Sanity checks against an empty archive."""
class TestSanityCheck(DirectoriesMixin, TestCase):
def make_test_data(self):
with filelock.FileLock(settings.MEDIA_LOCK):
# just make sure that the lockfile is present.
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000001.pdf"
),
Path(self.dirs.originals_dir) / "0000001.pdf",
)
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "archive"
/ "0000001.pdf"
),
Path(self.dirs.archive_dir) / "0000001.pdf",
)
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "thumbnails"
/ "0000001.webp"
),
Path(self.dirs.thumbnail_dir) / "0000001.webp",
)
@pytest.mark.usefixtures("_media_settings")
def test_no_documents(self) -> None:
return Document.objects.create(
title="test",
checksum="42995833e01aea9b3edee44bbfdd7ce1",
archive_checksum="62acb0bcbfbcaa62ca6ad3668e4e404b",
content="test",
pk=1,
filename="0000001.pdf",
mime_type="application/pdf",
archive_filename="0000001.pdf",
)
def assertSanityError(self, doc: Document, messageRegex) -> None:
messages = check_sanity()
assert not messages.has_error
assert not messages.has_warning
assert len(messages) == 0
@pytest.mark.usefixtures("_media_settings")
def test_no_issues_logs_clean(self, caplog: pytest.LogCaptureFixture) -> None:
messages = check_sanity()
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
self.assertTrue(messages.has_error)
with self.assertLogs() as capture:
messages.log_messages()
assert "Sanity checker detected no issues." in caplog.text
@pytest.mark.django_db
class TestCheckSanityHealthyDocument:
def test_no_errors(self, sample_doc: Document) -> None:
messages = check_sanity()
assert not messages.has_error
assert not messages.has_warning
assert len(messages) == 0
@pytest.mark.django_db
class TestCheckSanityThumbnail:
def test_missing(self, sample_doc: Document) -> None:
Path(sample_doc.thumbnail_path).unlink()
messages = check_sanity()
assert messages.has_error
assert any(
"Thumbnail of document does not exist" in m["message"]
for m in messages[sample_doc.pk]
)
def test_unreadable(self, sample_doc: Document) -> None:
thumb = Path(sample_doc.thumbnail_path)
thumb.chmod(0o000)
try:
messages = check_sanity()
assert messages.has_error
assert any(
"Cannot read thumbnail" in m["message"] for m in messages[sample_doc.pk]
self.assertEqual(
capture.records[0].message,
f"Detected following issue(s) with document #{doc.pk}, titled {doc.title}",
)
finally:
thumb.chmod(0o644)
self.assertRegex(capture.records[1].message, messageRegex)
@pytest.mark.django_db
class TestCheckSanityOriginal:
def test_missing(self, sample_doc: Document) -> None:
Path(sample_doc.source_path).unlink()
def test_no_issues(self) -> None:
self.make_test_data()
messages = check_sanity()
assert messages.has_error
assert any(
"Original of document does not exist" in m["message"]
for m in messages[sample_doc.pk]
)
def test_checksum_mismatch(self, sample_doc: Document) -> None:
sample_doc.checksum = "badhash"
sample_doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"Checksum mismatch" in m["message"] and "badhash" in m["message"]
for m in messages[sample_doc.pk]
)
def test_unreadable(self, sample_doc: Document) -> None:
src = Path(sample_doc.source_path)
src.chmod(0o000)
try:
messages = check_sanity()
assert messages.has_error
assert any(
"Cannot read original" in m["message"] for m in messages[sample_doc.pk]
self.assertFalse(messages.has_error)
self.assertFalse(messages.has_warning)
with self.assertLogs() as capture:
messages.log_messages()
self.assertEqual(len(capture.output), 1)
self.assertEqual(capture.records[0].levelno, logging.INFO)
self.assertEqual(
capture.records[0].message,
"Sanity checker detected no issues.",
)
finally:
src.chmod(0o644)
def test_no_docs(self) -> None:
self.assertEqual(len(check_sanity()), 0)
@pytest.mark.django_db
class TestCheckSanityArchive:
def test_checksum_without_filename(self, sample_doc: Document) -> None:
sample_doc.archive_filename = None
sample_doc.save()
def test_success(self) -> None:
self.make_test_data()
self.assertEqual(len(check_sanity()), 0)
def test_no_thumbnail(self) -> None:
doc = self.make_test_data()
Path(doc.thumbnail_path).unlink()
self.assertSanityError(doc, "Thumbnail of document does not exist")
def test_thumbnail_no_access(self) -> None:
doc = self.make_test_data()
Path(doc.thumbnail_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read thumbnail file of document")
Path(doc.thumbnail_path).chmod(0o777)
def test_no_original(self) -> None:
doc = self.make_test_data()
Path(doc.source_path).unlink()
self.assertSanityError(doc, "Original of document does not exist.")
def test_original_no_access(self) -> None:
doc = self.make_test_data()
Path(doc.source_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read original file of document")
Path(doc.source_path).chmod(0o777)
def test_original_checksum_mismatch(self) -> None:
doc = self.make_test_data()
doc.checksum = "WOW"
doc.save()
self.assertSanityError(doc, "Checksum mismatch. Stored: WOW, actual: ")
def test_no_archive(self) -> None:
doc = self.make_test_data()
Path(doc.archive_path).unlink()
self.assertSanityError(doc, "Archived version of document does not exist.")
def test_archive_no_access(self) -> None:
doc = self.make_test_data()
Path(doc.archive_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read archive file of document")
Path(doc.archive_path).chmod(0o777)
def test_archive_checksum_mismatch(self) -> None:
doc = self.make_test_data()
doc.archive_checksum = "WOW"
doc.save()
self.assertSanityError(doc, "Checksum mismatch of archived document")
def test_empty_content(self) -> None:
doc = self.make_test_data()
doc.content = ""
doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"checksum, but no archive filename" in m["message"]
for m in messages[sample_doc.pk]
self.assertFalse(messages.has_error)
self.assertFalse(messages.has_warning)
self.assertEqual(len(messages), 1)
self.assertRegex(
messages[doc.pk][0]["message"],
"Document contains no OCR data",
)
def test_filename_without_checksum(self, sample_doc: Document) -> None:
sample_doc.archive_checksum = None
sample_doc.save()
def test_orphaned_file(self) -> None:
self.make_test_data()
Path(self.dirs.originals_dir, "orphaned").touch()
messages = check_sanity()
assert messages.has_error
assert any(
"checksum is missing" in m["message"] for m in messages[sample_doc.pk]
self.assertTrue(messages.has_warning)
self.assertRegex(
messages._messages[None][0]["message"],
"Orphaned file in media dir",
)
def test_missing_file(self, sample_doc: Document) -> None:
Path(sample_doc.archive_path).unlink()
messages = check_sanity()
assert messages.has_error
assert any(
"Archived version of document does not exist" in m["message"]
for m in messages[sample_doc.pk]
)
def test_checksum_mismatch(self, sample_doc: Document) -> None:
sample_doc.archive_checksum = "wronghash"
sample_doc.save()
messages = check_sanity()
assert messages.has_error
assert any(
"Checksum mismatch of archived document" in m["message"]
for m in messages[sample_doc.pk]
)
def test_unreadable(self, sample_doc: Document) -> None:
archive = Path(sample_doc.archive_path)
archive.chmod(0o000)
try:
messages = check_sanity()
assert messages.has_error
assert any(
"Cannot read archive" in m["message"] for m in messages[sample_doc.pk]
)
finally:
archive.chmod(0o644)
def test_no_archive_at_all(self, sample_doc: Document) -> None:
"""Document with neither archive checksum nor filename is valid."""
Path(sample_doc.archive_path).unlink()
sample_doc.archive_checksum = None
sample_doc.archive_filename = None
sample_doc.save()
messages = check_sanity()
assert not messages.has_error
@pytest.mark.django_db
class TestCheckSanityContent:
@pytest.mark.parametrize(
"content",
[
pytest.param("", id="empty-string"),
],
@override_settings(
APP_LOGO="logo/logo.png",
)
def test_no_content(self, sample_doc: Document, content: str) -> None:
sample_doc.content = content
sample_doc.save()
def test_ignore_logo(self) -> None:
self.make_test_data()
logo_dir = Path(self.dirs.media_dir, "logo")
logo_dir.mkdir(parents=True, exist_ok=True)
Path(self.dirs.media_dir, "logo", "logo.png").touch()
messages = check_sanity()
assert not messages.has_error
assert not messages.has_warning
assert any("no OCR data" in m["message"] for m in messages[sample_doc.pk])
self.assertFalse(messages.has_warning)
@pytest.mark.django_db
class TestCheckSanityOrphans:
def test_orphaned_file(
self,
sample_doc: Document,
paperless_dirs: PaperlessDirs,
) -> None:
(paperless_dirs.originals / "orphan.pdf").touch()
def test_ignore_ignorable_files(self) -> None:
self.make_test_data()
Path(self.dirs.media_dir, ".DS_Store").touch()
Path(self.dirs.media_dir, "desktop.ini").touch()
messages = check_sanity()
assert messages.has_warning
assert any("Orphaned file" in m["message"] for m in messages[None])
self.assertFalse(messages.has_warning)
@pytest.mark.usefixtures("_media_settings")
def test_ignorable_files_not_flagged(
self,
paperless_dirs: PaperlessDirs,
) -> None:
(paperless_dirs.media / ".DS_Store").touch()
(paperless_dirs.media / "desktop.ini").touch()
messages = check_sanity()
assert not messages.has_warning
def test_archive_filename_no_checksum(self) -> None:
doc = self.make_test_data()
doc.archive_checksum = None
doc.save()
self.assertSanityError(doc, "has an archive file, but its checksum is missing.")
@pytest.mark.django_db
class TestCheckSanityIterWrapper:
def test_wrapper_receives_documents(self, sample_doc: Document) -> None:
seen: list[Document] = []
def tracking(iterable: Iterable[Document]) -> Iterable[Document]:
for item in iterable:
seen.append(item)
yield item
check_sanity(iter_wrapper=tracking)
assert len(seen) == 1
assert seen[0].pk == sample_doc.pk
def test_default_works_without_wrapper(self, sample_doc: Document) -> None:
messages = check_sanity()
assert not messages.has_error
@pytest.mark.django_db
class TestCheckSanityTaskRecording:
@pytest.mark.parametrize(
("expected_type", "scheduled"),
[
pytest.param(PaperlessTask.TaskType.SCHEDULED_TASK, True, id="scheduled"),
pytest.param(PaperlessTask.TaskType.MANUAL_TASK, False, id="manual"),
],
)
@pytest.mark.usefixtures("_media_settings")
def test_task_type(self, expected_type: str, *, scheduled: bool) -> None:
check_sanity(scheduled=scheduled)
task = PaperlessTask.objects.latest("date_created")
assert task.task_name == PaperlessTask.TaskName.CHECK_SANITY
assert task.type == expected_type
def test_success_status(self, sample_doc: Document) -> None:
check_sanity()
task = PaperlessTask.objects.latest("date_created")
assert task.status == "SUCCESS"
def test_failure_status(self, sample_doc: Document) -> None:
Path(sample_doc.source_path).unlink()
check_sanity()
task = PaperlessTask.objects.latest("date_created")
assert task.status == "FAILURE"
assert "Check logs for details" in task.result
@pytest.mark.django_db
class TestCheckSanityLogMessages:
def test_logs_doc_issues(
self,
sample_doc: Document,
caplog: pytest.LogCaptureFixture,
) -> None:
Path(sample_doc.source_path).unlink()
messages = check_sanity()
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
messages.log_messages()
assert f"document #{sample_doc.pk}" in caplog.text
assert "Original of document does not exist" in caplog.text
def test_logs_global_issues(
self,
sample_doc: Document,
paperless_dirs: PaperlessDirs,
caplog: pytest.LogCaptureFixture,
) -> None:
(paperless_dirs.originals / "orphan.pdf").touch()
messages = check_sanity()
with caplog.at_level(logging.WARNING, logger="paperless.sanity_checker"):
messages.log_messages()
assert "Orphaned file" in caplog.text
@pytest.mark.usefixtures("_media_settings")
def test_logs_unknown_doc_pk(self, caplog: pytest.LogCaptureFixture) -> None:
"""A doc PK not in the DB logs 'Unknown' as the title."""
messages = check_sanity()
messages.error(99999, "Ghost document")
with caplog.at_level(logging.INFO, logger="paperless.sanity_checker"):
messages.log_messages()
assert "#99999" in caplog.text
assert "Unknown" in caplog.text
def test_archive_checksum_no_filename(self) -> None:
doc = self.make_test_data()
doc.archive_filename = None
doc.save()
self.assertSanityError(
doc,
"has an archive file checksum, but no archive filename.",
)

View File

@@ -33,11 +33,11 @@ from documents.plugins.helpers import ProgressStatusOptions
def setup_directories():
dirs = namedtuple("Dirs", ())
dirs.data_dir = Path(tempfile.mkdtemp())
dirs.scratch_dir = Path(tempfile.mkdtemp())
dirs.media_dir = Path(tempfile.mkdtemp())
dirs.consumption_dir = Path(tempfile.mkdtemp())
dirs.static_dir = Path(tempfile.mkdtemp())
dirs.data_dir = Path(tempfile.mkdtemp()).resolve()
dirs.scratch_dir = Path(tempfile.mkdtemp()).resolve()
dirs.media_dir = Path(tempfile.mkdtemp()).resolve()
dirs.consumption_dir = Path(tempfile.mkdtemp()).resolve()
dirs.static_dir = Path(tempfile.mkdtemp()).resolve()
dirs.index_dir = dirs.data_dir / "index"
dirs.originals_dir = dirs.media_dir / "documents" / "originals"
dirs.thumbnail_dir = dirs.media_dir / "documents" / "thumbnails"

View File

@@ -2,7 +2,7 @@ msgid ""
msgstr ""
"Project-Id-Version: paperless-ngx\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-02-26 18:09+0000\n"
"POT-Creation-Date: 2026-02-27 22:38+0000\n"
"PO-Revision-Date: 2022-02-17 04:17\n"
"Last-Translator: \n"
"Language-Team: English\n"
@@ -1856,151 +1856,151 @@ msgstr ""
msgid "paperless application settings"
msgstr ""
#: paperless/settings.py:819
#: paperless/settings/__init__.py:746
msgid "English (US)"
msgstr ""
#: paperless/settings.py:820
#: paperless/settings/__init__.py:747
msgid "Arabic"
msgstr ""
#: paperless/settings.py:821
#: paperless/settings/__init__.py:748
msgid "Afrikaans"
msgstr ""
#: paperless/settings.py:822
#: paperless/settings/__init__.py:749
msgid "Belarusian"
msgstr ""
#: paperless/settings.py:823
#: paperless/settings/__init__.py:750
msgid "Bulgarian"
msgstr ""
#: paperless/settings.py:824
#: paperless/settings/__init__.py:751
msgid "Catalan"
msgstr ""
#: paperless/settings.py:825
#: paperless/settings/__init__.py:752
msgid "Czech"
msgstr ""
#: paperless/settings.py:826
#: paperless/settings/__init__.py:753
msgid "Danish"
msgstr ""
#: paperless/settings.py:827
#: paperless/settings/__init__.py:754
msgid "German"
msgstr ""
#: paperless/settings.py:828
#: paperless/settings/__init__.py:755
msgid "Greek"
msgstr ""
#: paperless/settings.py:829
#: paperless/settings/__init__.py:756
msgid "English (GB)"
msgstr ""
#: paperless/settings.py:830
#: paperless/settings/__init__.py:757
msgid "Spanish"
msgstr ""
#: paperless/settings.py:831
#: paperless/settings/__init__.py:758
msgid "Persian"
msgstr ""
#: paperless/settings.py:832
#: paperless/settings/__init__.py:759
msgid "Finnish"
msgstr ""
#: paperless/settings.py:833
#: paperless/settings/__init__.py:760
msgid "French"
msgstr ""
#: paperless/settings.py:834
#: paperless/settings/__init__.py:761
msgid "Hungarian"
msgstr ""
#: paperless/settings.py:835
#: paperless/settings/__init__.py:762
msgid "Indonesian"
msgstr ""
#: paperless/settings.py:836
#: paperless/settings/__init__.py:763
msgid "Italian"
msgstr ""
#: paperless/settings.py:837
#: paperless/settings/__init__.py:764
msgid "Japanese"
msgstr ""
#: paperless/settings.py:838
#: paperless/settings/__init__.py:765
msgid "Korean"
msgstr ""
#: paperless/settings.py:839
#: paperless/settings/__init__.py:766
msgid "Luxembourgish"
msgstr ""
#: paperless/settings.py:840
#: paperless/settings/__init__.py:767
msgid "Norwegian"
msgstr ""
#: paperless/settings.py:841
#: paperless/settings/__init__.py:768
msgid "Dutch"
msgstr ""
#: paperless/settings.py:842
#: paperless/settings/__init__.py:769
msgid "Polish"
msgstr ""
#: paperless/settings.py:843
#: paperless/settings/__init__.py:770
msgid "Portuguese (Brazil)"
msgstr ""
#: paperless/settings.py:844
#: paperless/settings/__init__.py:771
msgid "Portuguese"
msgstr ""
#: paperless/settings.py:845
#: paperless/settings/__init__.py:772
msgid "Romanian"
msgstr ""
#: paperless/settings.py:846
#: paperless/settings/__init__.py:773
msgid "Russian"
msgstr ""
#: paperless/settings.py:847
#: paperless/settings/__init__.py:774
msgid "Slovak"
msgstr ""
#: paperless/settings.py:848
#: paperless/settings/__init__.py:775
msgid "Slovenian"
msgstr ""
#: paperless/settings.py:849
#: paperless/settings/__init__.py:776
msgid "Serbian"
msgstr ""
#: paperless/settings.py:850
#: paperless/settings/__init__.py:777
msgid "Swedish"
msgstr ""
#: paperless/settings.py:851
#: paperless/settings/__init__.py:778
msgid "Turkish"
msgstr ""
#: paperless/settings.py:852
#: paperless/settings/__init__.py:779
msgid "Ukrainian"
msgstr ""
#: paperless/settings.py:853
#: paperless/settings/__init__.py:780
msgid "Vietnamese"
msgstr ""
#: paperless/settings.py:854
#: paperless/settings/__init__.py:781
msgid "Chinese Simplified"
msgstr ""
#: paperless/settings.py:855
#: paperless/settings/__init__.py:782
msgid "Chinese Traditional"
msgstr ""

View File

@@ -202,3 +202,43 @@ def audit_log_check(app_configs, **kwargs):
)
return result
@register()
def check_deprecated_db_settings(
app_configs: object,
**kwargs: object,
) -> list[Warning]:
"""Check for deprecated database environment variables.
Detects legacy advanced options that should be migrated to
PAPERLESS_DB_OPTIONS. Returns one Warning per deprecated variable found.
"""
deprecated_vars: dict[str, str] = {
"PAPERLESS_DB_TIMEOUT": "timeout",
"PAPERLESS_DB_POOLSIZE": "pool.min_size / pool.max_size",
"PAPERLESS_DBSSLMODE": "sslmode",
"PAPERLESS_DBSSLROOTCERT": "sslrootcert",
"PAPERLESS_DBSSLCERT": "sslcert",
"PAPERLESS_DBSSLKEY": "sslkey",
}
warnings: list[Warning] = []
for var_name, db_option_key in deprecated_vars.items():
if not os.getenv(var_name):
continue
warnings.append(
Warning(
f"Deprecated environment variable: {var_name}",
hint=(
f"{var_name} is no longer supported and will be removed in v3.2. "
f"Set the equivalent option via PAPERLESS_DB_OPTIONS instead. "
f'Example: PAPERLESS_DB_OPTIONS=\'{{"{db_option_key}": "<value>"}}\'. '
"See https://docs.paperless-ngx.com/migration/ for the full reference."
),
id="paperless.W001",
),
)
return warnings

View File

@@ -17,6 +17,8 @@ from dateparser.languages.loader import LocaleDataLoader
from django.utils.translation import gettext_lazy as _
from dotenv import load_dotenv
from paperless.settings.custom import parse_db_settings
logger = logging.getLogger("paperless.settings")
# Tap paperless.conf if it's available
@@ -282,7 +284,7 @@ DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
# Directories #
###############################################################################
BASE_DIR: Path = Path(__file__).resolve().parent.parent
BASE_DIR: Path = Path(__file__).resolve().parent.parent.parent
STATIC_ROOT = __get_path("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
@@ -722,83 +724,8 @@ EMAIL_CERTIFICATE_FILE = __get_optional_path("PAPERLESS_EMAIL_CERTIFICATE_LOCATI
###############################################################################
# Database #
###############################################################################
def _parse_db_settings() -> dict:
databases = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": DATA_DIR / "db.sqlite3",
"OPTIONS": {},
},
}
if os.getenv("PAPERLESS_DBHOST"):
# Have sqlite available as a second option for management commands
# This is important when migrating to/from sqlite
databases["sqlite"] = databases["default"].copy()
databases["default"] = {
"HOST": os.getenv("PAPERLESS_DBHOST"),
"NAME": os.getenv("PAPERLESS_DBNAME", "paperless"),
"USER": os.getenv("PAPERLESS_DBUSER", "paperless"),
"PASSWORD": os.getenv("PAPERLESS_DBPASS", "paperless"),
"OPTIONS": {},
}
if os.getenv("PAPERLESS_DBPORT"):
databases["default"]["PORT"] = os.getenv("PAPERLESS_DBPORT")
# Leave room for future extensibility
if os.getenv("PAPERLESS_DBENGINE") == "mariadb":
engine = "django.db.backends.mysql"
# Contrary to Postgres, Django does not natively support connection pooling for MariaDB.
# However, since MariaDB uses threads instead of forks, establishing connections is significantly faster
# compared to PostgreSQL, so the lack of pooling is not an issue
options = {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"ssl_mode": os.getenv("PAPERLESS_DBSSLMODE", "PREFERRED"),
"ssl": {
"ca": os.getenv("PAPERLESS_DBSSLROOTCERT", None),
"cert": os.getenv("PAPERLESS_DBSSLCERT", None),
"key": os.getenv("PAPERLESS_DBSSLKEY", None),
},
}
else: # Default to PostgresDB
engine = "django.db.backends.postgresql"
options = {
"sslmode": os.getenv("PAPERLESS_DBSSLMODE", "prefer"),
"sslrootcert": os.getenv("PAPERLESS_DBSSLROOTCERT", None),
"sslcert": os.getenv("PAPERLESS_DBSSLCERT", None),
"sslkey": os.getenv("PAPERLESS_DBSSLKEY", None),
}
if int(os.getenv("PAPERLESS_DB_POOLSIZE", 0)) > 0:
options.update(
{
"pool": {
"min_size": 1,
"max_size": int(os.getenv("PAPERLESS_DB_POOLSIZE")),
},
},
)
databases["default"]["ENGINE"] = engine
databases["default"]["OPTIONS"].update(options)
if os.getenv("PAPERLESS_DB_TIMEOUT") is not None:
if databases["default"]["ENGINE"] == "django.db.backends.sqlite3":
databases["default"]["OPTIONS"].update(
{"timeout": int(os.getenv("PAPERLESS_DB_TIMEOUT"))},
)
else:
databases["default"]["OPTIONS"].update(
{"connect_timeout": int(os.getenv("PAPERLESS_DB_TIMEOUT"))},
)
databases["sqlite"]["OPTIONS"].update(
{"timeout": int(os.getenv("PAPERLESS_DB_TIMEOUT"))},
)
return databases
DATABASES = _parse_db_settings()
DATABASES = parse_db_settings(DATA_DIR)
if os.getenv("PAPERLESS_DBENGINE") == "mariadb":
# Silence Django error on old MariaDB versions.

View File

@@ -0,0 +1,122 @@
import os
from pathlib import Path
from typing import Any
from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import parse_dict_from_str
def parse_db_settings(data_dir: Path) -> dict[str, dict[str, Any]]:
"""Parse database settings from environment variables.
Core connection variables (no deprecation):
- PAPERLESS_DBENGINE (sqlite/postgresql/mariadb)
- PAPERLESS_DBHOST, PAPERLESS_DBPORT
- PAPERLESS_DBNAME, PAPERLESS_DBUSER, PAPERLESS_DBPASS
Advanced options can be set via:
- Legacy individual env vars (deprecated in v3.0, removed in v3.2)
- PAPERLESS_DB_OPTIONS (recommended v3+ approach)
Args:
data_dir: The data directory path for SQLite database location.
Returns:
A databases dict suitable for Django DATABASES setting.
"""
try:
engine = get_choice_from_env(
"PAPERLESS_DBENGINE",
{"sqlite", "postgresql", "mariadb"},
default="sqlite",
)
except ValueError:
# MariaDB users already had to set PAPERLESS_DBENGINE, so it was picked up above
# SQLite users didn't need to set anything
engine = "postgresql" if "PAPERLESS_DBHOST" in os.environ else "sqlite"
db_config: dict[str, Any]
base_options: dict[str, Any]
match engine:
case "sqlite":
db_config = {
"ENGINE": "django.db.backends.sqlite3",
"NAME": str((data_dir / "db.sqlite3").resolve()),
}
base_options = {}
case "postgresql":
db_config = {
"ENGINE": "django.db.backends.postgresql",
"HOST": os.getenv("PAPERLESS_DBHOST"),
"NAME": os.getenv("PAPERLESS_DBNAME", "paperless"),
"USER": os.getenv("PAPERLESS_DBUSER", "paperless"),
"PASSWORD": os.getenv("PAPERLESS_DBPASS", "paperless"),
}
base_options = {
"sslmode": os.getenv("PAPERLESS_DBSSLMODE", "prefer"),
"sslrootcert": os.getenv("PAPERLESS_DBSSLROOTCERT"),
"sslcert": os.getenv("PAPERLESS_DBSSLCERT"),
"sslkey": os.getenv("PAPERLESS_DBSSLKEY"),
}
if (pool_size := get_int_from_env("PAPERLESS_DB_POOLSIZE")) is not None:
base_options["pool"] = {
"min_size": 1,
"max_size": pool_size,
}
case "mariadb":
db_config = {
"ENGINE": "django.db.backends.mysql",
"HOST": os.getenv("PAPERLESS_DBHOST"),
"NAME": os.getenv("PAPERLESS_DBNAME", "paperless"),
"USER": os.getenv("PAPERLESS_DBUSER", "paperless"),
"PASSWORD": os.getenv("PAPERLESS_DBPASS", "paperless"),
}
base_options = {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": os.getenv("PAPERLESS_DBSSLMODE", "PREFERRED"),
"ssl": {
"ca": os.getenv("PAPERLESS_DBSSLROOTCERT"),
"cert": os.getenv("PAPERLESS_DBSSLCERT"),
"key": os.getenv("PAPERLESS_DBSSLKEY"),
},
}
case _: # pragma: no cover
raise NotImplementedError(engine)
# Handle port setting for external databases
if (
engine in ("postgresql", "mariadb")
and (port := get_int_from_env("PAPERLESS_DBPORT")) is not None
):
db_config["PORT"] = port
# Handle timeout setting (common across all engines, different key names)
if (timeout := get_int_from_env("PAPERLESS_DB_TIMEOUT")) is not None:
timeout_key = "timeout" if engine == "sqlite" else "connect_timeout"
base_options[timeout_key] = timeout
# Apply PAPERLESS_DB_OPTIONS overrides
db_config["OPTIONS"] = parse_dict_from_str(
os.getenv("PAPERLESS_DB_OPTIONS"),
defaults=base_options,
separator=";",
type_map={
# SQLite options
"timeout": int,
# Postgres/MariaDB options
"connect_timeout": int,
"pool.min_size": int,
"pool.max_size": int,
},
)
return {"default": db_config}

View File

@@ -0,0 +1,192 @@
import copy
import os
from collections.abc import Callable
from collections.abc import Mapping
from pathlib import Path
from typing import Any
from typing import TypeVar
from typing import overload
T = TypeVar("T")
def str_to_bool(value: str) -> bool:
"""
Converts a string representation of truth to a boolean value.
Recognizes 'true', '1', 't', 'y', 'yes' as True, and
'false', '0', 'f', 'n', 'no' as False. Case-insensitive.
Args:
value: The string to convert.
Returns:
The boolean representation of the string.
Raises:
ValueError: If the string is not a recognized boolean value.
"""
val_lower = value.strip().lower()
if val_lower in ("true", "1", "t", "y", "yes"):
return True
elif val_lower in ("false", "0", "f", "n", "no"):
return False
raise ValueError(f"Cannot convert '{value}' to a boolean.")
@overload
def get_int_from_env(key: str) -> int | None: ...
@overload
def get_int_from_env(key: str, default: None) -> int | None: ...
@overload
def get_int_from_env(key: str, default: int) -> int: ...
def get_int_from_env(key: str, default: int | None = None) -> int | None:
"""
Return an integer value based on the environment variable.
If default is provided, returns that value when key is missing.
If default is None, returns None when key is missing.
"""
if key not in os.environ:
return default
return int(os.environ[key])
def parse_dict_from_str(
env_str: str | None,
defaults: dict[str, Any] | None = None,
type_map: Mapping[str, Callable[[str], Any]] | None = None,
separator: str = ",",
) -> dict[str, Any]:
"""
Parses a key-value string into a dictionary, applying defaults and casting types.
Supports nested keys via dot-notation, e.g.:
"database.host=localhost,database.port=5432"
Args:
env_str: The string from the environment variable (e.g., "port=9090,debug=true").
defaults: A dictionary of default values (can contain nested dicts).
type_map: A dictionary mapping keys (dot-notation allowed) to a type or a parsing
function (e.g., {'port': int, 'debug': bool, 'database.port': int}).
The special `bool` type triggers custom boolean parsing.
separator: The character used to separate key-value pairs. Defaults to ','.
Returns:
A dictionary with the parsed and correctly-typed settings.
Raises:
ValueError: If a value cannot be cast to its specified type.
"""
def _set_nested(d: dict, keys: list[str], value: Any) -> None:
"""Set a nested value, creating intermediate dicts as needed."""
cur = d
for k in keys[:-1]:
if k not in cur or not isinstance(cur[k], dict):
cur[k] = {}
cur = cur[k]
cur[keys[-1]] = value
def _get_nested(d: dict, keys: list[str]) -> Any:
"""Get nested value or raise KeyError if not present."""
cur = d
for k in keys:
if not isinstance(cur, dict) or k not in cur:
raise KeyError
cur = cur[k]
return cur
def _has_nested(d: dict, keys: list[str]) -> bool:
try:
_get_nested(d, keys)
return True
except KeyError:
return False
settings: dict[str, Any] = copy.deepcopy(defaults) if defaults else {}
_type_map = type_map if type_map else {}
if not env_str:
return settings
# Parse the environment string using the specified separator
pairs = [p.strip() for p in env_str.split(separator) if p.strip()]
for pair in pairs:
if "=" not in pair:
# ignore malformed pairs
continue
key, val = pair.split("=", 1)
key = key.strip()
val = val.strip()
if not key:
continue
parts = key.split(".")
_set_nested(settings, parts, val)
# Apply type casting to the updated settings (supports nested keys in type_map)
for key, caster in _type_map.items():
key_parts = key.split(".")
if _has_nested(settings, key_parts):
raw_val = _get_nested(settings, key_parts)
# Only cast if it's a string (i.e. from env parsing). If defaults already provided
# a different type we leave it as-is.
if isinstance(raw_val, str):
try:
if caster is bool:
parsed = str_to_bool(raw_val)
elif caster is Path:
parsed = Path(raw_val).resolve()
else:
parsed = caster(raw_val)
except (ValueError, TypeError) as e:
caster_name = getattr(caster, "__name__", repr(caster))
raise ValueError(
f"Error casting key '{key}' with value '{raw_val}' "
f"to type '{caster_name}'",
) from e
_set_nested(settings, key_parts, parsed)
return settings
def get_choice_from_env(
env_key: str,
choices: set[str],
default: str | None = None,
) -> str:
"""
Gets and validates an environment variable against a set of allowed choices.
Args:
env_key: The environment variable key to validate
choices: Set of valid choices for the environment variable
default: Optional default value if environment variable is not set
Returns:
The validated environment variable value
Raises:
ValueError: If the environment variable value is not in choices
or if no default is provided and env var is missing
"""
value = os.environ.get(env_key, default)
if value is None:
raise ValueError(
f"Environment variable '{env_key}' is required but not set.",
)
if value not in choices:
raise ValueError(
f"Environment variable '{env_key}' has invalid value '{value}'. "
f"Valid choices are: {', '.join(sorted(choices))}",
)
return value

View File

View File

@@ -0,0 +1,266 @@
import os
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from paperless.settings.custom import parse_db_settings
class TestParseDbSettings:
"""Test suite for parse_db_settings function."""
@pytest.mark.parametrize(
("env_vars", "expected_database_settings"),
[
pytest.param(
{},
{
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": None, # Will be replaced with tmp_path
"OPTIONS": {},
},
},
id="default-sqlite",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "sqlite",
"PAPERLESS_DB_OPTIONS": "timeout=30",
},
{
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": None, # Will be replaced with tmp_path
"OPTIONS": {
"timeout": 30,
},
},
},
id="sqlite-with-timeout-override",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "localhost",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "localhost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"sslmode": "prefer",
"sslrootcert": None,
"sslcert": None,
"sslkey": None,
},
},
},
id="postgresql-defaults",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "paperless-db-host",
"PAPERLESS_DBPORT": "1111",
"PAPERLESS_DBNAME": "customdb",
"PAPERLESS_DBUSER": "customuser",
"PAPERLESS_DBPASS": "custompass",
"PAPERLESS_DB_OPTIONS": "pool.max_size=50;pool.min_size=2;sslmode=require",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "paperless-db-host",
"PORT": 1111,
"NAME": "customdb",
"USER": "customuser",
"PASSWORD": "custompass",
"OPTIONS": {
"sslmode": "require",
"sslrootcert": None,
"sslcert": None,
"sslkey": None,
"pool": {
"min_size": 2,
"max_size": 50,
},
},
},
},
id="postgresql-overrides",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "pghost",
"PAPERLESS_DB_POOLSIZE": "10",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "pghost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"sslmode": "prefer",
"sslrootcert": None,
"sslcert": None,
"sslkey": None,
"pool": {
"min_size": 1,
"max_size": 10,
},
},
},
},
id="postgresql-legacy-poolsize",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "postgresql",
"PAPERLESS_DBHOST": "pghost",
"PAPERLESS_DBSSLMODE": "require",
"PAPERLESS_DBSSLROOTCERT": "/certs/ca.crt",
"PAPERLESS_DB_TIMEOUT": "30",
},
{
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "pghost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"sslmode": "require",
"sslrootcert": "/certs/ca.crt",
"sslcert": None,
"sslkey": None,
"connect_timeout": 30,
},
},
},
id="postgresql-legacy-ssl-and-timeout",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "mariadb",
"PAPERLESS_DBHOST": "localhost",
},
{
"default": {
"ENGINE": "django.db.backends.mysql",
"HOST": "localhost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": "PREFERRED",
"ssl": {
"ca": None,
"cert": None,
"key": None,
},
},
},
},
id="mariadb-defaults",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "mariadb",
"PAPERLESS_DBHOST": "paperless-mariadb-host",
"PAPERLESS_DBPORT": "5555",
"PAPERLESS_DBUSER": "my-cool-user",
"PAPERLESS_DBPASS": "my-secure-password",
"PAPERLESS_DB_OPTIONS": "ssl.ca=/path/to/ca.pem;ssl_mode=REQUIRED",
},
{
"default": {
"ENGINE": "django.db.backends.mysql",
"HOST": "paperless-mariadb-host",
"PORT": 5555,
"NAME": "paperless",
"USER": "my-cool-user",
"PASSWORD": "my-secure-password",
"OPTIONS": {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": "REQUIRED",
"ssl": {
"ca": "/path/to/ca.pem",
"cert": None,
"key": None,
},
},
},
},
id="mariadb-overrides",
),
pytest.param(
{
"PAPERLESS_DBENGINE": "mariadb",
"PAPERLESS_DBHOST": "mariahost",
"PAPERLESS_DBSSLMODE": "REQUIRED",
"PAPERLESS_DBSSLROOTCERT": "/certs/ca.pem",
"PAPERLESS_DBSSLCERT": "/certs/client.pem",
"PAPERLESS_DBSSLKEY": "/certs/client.key",
"PAPERLESS_DB_TIMEOUT": "25",
},
{
"default": {
"ENGINE": "django.db.backends.mysql",
"HOST": "mariahost",
"NAME": "paperless",
"USER": "paperless",
"PASSWORD": "paperless",
"OPTIONS": {
"read_default_file": "/etc/mysql/my.cnf",
"charset": "utf8mb4",
"collation": "utf8mb4_unicode_ci",
"ssl_mode": "REQUIRED",
"ssl": {
"ca": "/certs/ca.pem",
"cert": "/certs/client.pem",
"key": "/certs/client.key",
},
"connect_timeout": 25,
},
},
},
id="mariadb-legacy-ssl-and-timeout",
),
],
)
def test_parse_db_settings(
self,
tmp_path: Path,
mocker: MockerFixture,
env_vars: dict[str, str],
expected_database_settings: dict[str, dict],
) -> None:
"""Test various database configurations with defaults and overrides."""
# Clear environment and set test vars
mocker.patch.dict(os.environ, env_vars, clear=True)
# Update expected paths with actual tmp_path
if (
"default" in expected_database_settings
and expected_database_settings["default"]["NAME"] is None
):
expected_database_settings["default"]["NAME"] = str(
tmp_path / "db.sqlite3",
)
settings = parse_db_settings(tmp_path)
assert settings == expected_database_settings

View File

@@ -0,0 +1,414 @@
import os
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from paperless.settings.parsers import get_choice_from_env
from paperless.settings.parsers import get_int_from_env
from paperless.settings.parsers import parse_dict_from_str
from paperless.settings.parsers import str_to_bool
class TestStringToBool:
@pytest.mark.parametrize(
"true_value",
[
pytest.param("true", id="lowercase_true"),
pytest.param("1", id="digit_1"),
pytest.param("T", id="capital_T"),
pytest.param("y", id="lowercase_y"),
pytest.param("YES", id="uppercase_YES"),
pytest.param(" True ", id="whitespace_true"),
],
)
def test_true_conversion(self, true_value: str):
"""Test that various 'true' strings correctly evaluate to True."""
assert str_to_bool(true_value) is True
@pytest.mark.parametrize(
"false_value",
[
pytest.param("false", id="lowercase_false"),
pytest.param("0", id="digit_0"),
pytest.param("f", id="capital_f"),
pytest.param("N", id="capital_N"),
pytest.param("no", id="lowercase_no"),
pytest.param(" False ", id="whitespace_false"),
],
)
def test_false_conversion(self, false_value: str):
"""Test that various 'false' strings correctly evaluate to False."""
assert str_to_bool(false_value) is False
def test_invalid_conversion(self):
"""Test that an invalid string raises a ValueError."""
with pytest.raises(ValueError, match="Cannot convert 'maybe' to a boolean\\."):
str_to_bool("maybe")
class TestParseDictFromString:
def test_empty_and_none_input(self):
"""Test behavior with None or empty string input."""
assert parse_dict_from_str(None) == {}
assert parse_dict_from_str("") == {}
defaults = {"a": 1}
res = parse_dict_from_str(None, defaults=defaults)
assert res == defaults
# Ensure it returns a copy, not the original object
assert res is not defaults
def test_basic_parsing(self):
"""Test simple key-value parsing without defaults or types."""
env_str = "key1=val1, key2=val2"
expected = {"key1": "val1", "key2": "val2"}
assert parse_dict_from_str(env_str) == expected
def test_with_defaults(self):
"""Test that environment values override defaults correctly."""
defaults = {"host": "localhost", "port": 8000, "user": "default"}
env_str = "port=9090, host=db.example.com"
expected = {"host": "db.example.com", "port": "9090", "user": "default"}
result = parse_dict_from_str(env_str, defaults=defaults)
assert result == expected
def test_type_casting(self):
"""Test successful casting of values to specified types."""
env_str = "port=9090, debug=true, timeout=12.5, user=admin"
type_map = {"port": int, "debug": bool, "timeout": float}
expected = {"port": 9090, "debug": True, "timeout": 12.5, "user": "admin"}
result = parse_dict_from_str(env_str, type_map=type_map)
assert result == expected
def test_type_casting_with_defaults(self):
"""Test casting when values come from both defaults and env string."""
defaults = {"port": 8000, "debug": False, "retries": 3}
env_str = "port=9090, debug=true"
type_map = {"port": int, "debug": bool, "retries": int}
# The 'retries' value comes from defaults and is already an int,
# so it should not be processed by the caster.
expected = {"port": 9090, "debug": True, "retries": 3}
result = parse_dict_from_str(env_str, defaults=defaults, type_map=type_map)
assert result == expected
assert isinstance(result["retries"], int)
def test_path_casting(self, tmp_path: Path):
"""Test successful casting of a string to a resolved pathlib.Path object."""
# Create a dummy file to resolve against
test_file = tmp_path / "test_file.txt"
test_file.touch()
env_str = f"config_path={test_file}"
type_map = {"config_path": Path}
result = parse_dict_from_str(env_str, type_map=type_map)
# The result should be a resolved Path object
assert isinstance(result["config_path"], Path)
assert result["config_path"] == test_file.resolve()
def test_custom_separator(self):
"""Test parsing with a custom separator like a semicolon."""
env_str = "host=db; port=5432; user=test"
expected = {"host": "db", "port": "5432", "user": "test"}
result = parse_dict_from_str(env_str, separator=";")
assert result == expected
def test_edge_cases_in_string(self):
"""Test malformed strings to ensure robustness."""
# Malformed pair 'debug' is skipped, extra comma is ignored
env_str = "key=val,, debug, foo=bar"
expected = {"key": "val", "foo": "bar"}
assert parse_dict_from_str(env_str) == expected
# Value can contain the equals sign
env_str = "url=postgres://user:pass@host:5432/db"
expected = {"url": "postgres://user:pass@host:5432/db"}
assert parse_dict_from_str(env_str) == expected
def test_casting_error_handling(self):
"""Test that a ValueError is raised for invalid casting."""
env_str = "port=not-a-number"
type_map = {"port": int}
with pytest.raises(ValueError) as excinfo:
parse_dict_from_str(env_str, type_map=type_map)
assert "Error casting key 'port'" in str(excinfo.value)
assert "value 'not-a-number'" in str(excinfo.value)
assert "to type 'int'" in str(excinfo.value)
def test_bool_casting_error(self):
"""Test that an invalid boolean string raises a ValueError."""
env_str = "debug=maybe"
type_map = {"debug": bool}
with pytest.raises(ValueError, match="Error casting key 'debug'"):
parse_dict_from_str(env_str, type_map=type_map)
def test_nested_key_parsing_basic(self):
"""Basic nested key parsing using dot-notation."""
env_str = "database.host=db.example.com, database.port=5432, logging.level=INFO"
result = parse_dict_from_str(env_str)
assert result == {
"database": {"host": "db.example.com", "port": "5432"},
"logging": {"level": "INFO"},
}
def test_nested_overrides_defaults_and_deepcopy(self):
"""Nested env keys override defaults and defaults are deep-copied."""
defaults = {"database": {"host": "127.0.0.1", "port": 3306, "user": "default"}}
env_str = "database.host=db.example.com, debug=true"
result = parse_dict_from_str(
env_str,
defaults=defaults,
type_map={"debug": bool},
)
assert result["database"]["host"] == "db.example.com"
# Unchanged default preserved
assert result["database"]["port"] == 3306
assert result["database"]["user"] == "default"
# Default object was deep-copied (no same nested object identity)
assert result is not defaults
assert result["database"] is not defaults["database"]
def test_nested_type_casting(self):
"""Type casting for nested keys (dot-notation) should work."""
env_str = "database.host=db.example.com, database.port=5433, debug=false"
type_map = {"database.port": int, "debug": bool}
result = parse_dict_from_str(env_str, type_map=type_map)
assert result["database"]["host"] == "db.example.com"
assert result["database"]["port"] == 5433
assert isinstance(result["database"]["port"], int)
assert result["debug"] is False
assert isinstance(result["debug"], bool)
def test_nested_casting_error_message(self):
"""Error messages should include the full dotted key name on failure."""
env_str = "database.port=not-a-number"
type_map = {"database.port": int}
with pytest.raises(ValueError) as excinfo:
parse_dict_from_str(env_str, type_map=type_map)
msg = str(excinfo.value)
assert "Error casting key 'database.port'" in msg
assert "value 'not-a-number'" in msg
assert "to type 'int'" in msg
def test_type_map_does_not_recast_non_string_defaults(self):
"""If a default already provides a non-string value, the caster should skip it."""
defaults = {"database": {"port": 3306}}
type_map = {"database.port": int}
result = parse_dict_from_str(None, defaults=defaults, type_map=type_map)
assert result["database"]["port"] == 3306
assert isinstance(result["database"]["port"], int)
class TestGetIntFromEnv:
@pytest.mark.parametrize(
("env_value", "expected"),
[
pytest.param("42", 42, id="positive"),
pytest.param("-10", -10, id="negative"),
pytest.param("0", 0, id="zero"),
pytest.param("999", 999, id="large_positive"),
pytest.param("-999", -999, id="large_negative"),
],
)
def test_existing_env_var_valid_ints(self, mocker, env_value, expected):
"""Test that existing environment variables with valid integers return correct values."""
mocker.patch.dict(os.environ, {"INT_VAR": env_value})
assert get_int_from_env("INT_VAR") == expected
@pytest.mark.parametrize(
("default", "expected"),
[
pytest.param(100, 100, id="positive_default"),
pytest.param(0, 0, id="zero_default"),
pytest.param(-50, -50, id="negative_default"),
pytest.param(None, None, id="none_default"),
],
)
def test_missing_env_var_with_defaults(self, mocker, default, expected):
"""Test that missing environment variables return provided defaults."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_int_from_env("MISSING_VAR", default=default) == expected
def test_missing_env_var_no_default(self, mocker):
"""Test that missing environment variable with no default returns None."""
mocker.patch.dict(os.environ, {}, clear=True)
assert get_int_from_env("MISSING_VAR") is None
@pytest.mark.parametrize(
"invalid_value",
[
pytest.param("not_a_number", id="text"),
pytest.param("42.5", id="float"),
pytest.param("42a", id="alpha_suffix"),
pytest.param("", id="empty"),
pytest.param(" ", id="whitespace"),
pytest.param("true", id="boolean"),
pytest.param("1.0", id="decimal"),
],
)
def test_invalid_int_values_raise_error(self, mocker, invalid_value):
"""Test that invalid integer values raise ValueError."""
mocker.patch.dict(os.environ, {"INVALID_INT": invalid_value})
with pytest.raises(ValueError):
get_int_from_env("INVALID_INT")
class TestGetEnvChoice:
@pytest.fixture
def valid_choices(self) -> set[str]:
"""Fixture providing a set of valid environment choices."""
return {"development", "staging", "production"}
def test_returns_valid_env_value(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function returns the environment value when it's valid."""
mocker.patch.dict("os.environ", {"TEST_ENV": "development"})
result = get_choice_from_env("TEST_ENV", valid_choices)
assert result == "development"
def test_returns_default_when_env_not_set(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function returns default value when env var is not set."""
mocker.patch.dict("os.environ", {}, clear=True)
result = get_choice_from_env("TEST_ENV", valid_choices, default="staging")
assert result == "staging"
def test_raises_error_when_env_not_set_and_no_default(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function raises ValueError when env var is missing and no default."""
mocker.patch.dict("os.environ", {}, clear=True)
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices)
assert "Environment variable 'TEST_ENV' is required but not set" in str(
exc_info.value,
)
def test_raises_error_when_env_value_invalid(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function raises ValueError when env value is not in choices."""
mocker.patch.dict("os.environ", {"TEST_ENV": "invalid_value"})
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices)
error_msg = str(exc_info.value)
assert (
"Environment variable 'TEST_ENV' has invalid value 'invalid_value'"
in error_msg
)
assert "Valid choices are:" in error_msg
assert "development" in error_msg
assert "staging" in error_msg
assert "production" in error_msg
def test_raises_error_when_default_invalid(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that function raises ValueError when default value is not in choices."""
mocker.patch.dict("os.environ", {}, clear=True)
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices, default="invalid_default")
error_msg = str(exc_info.value)
assert (
"Environment variable 'TEST_ENV' has invalid value 'invalid_default'"
in error_msg
)
def test_case_sensitive_validation(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test that validation is case sensitive."""
mocker.patch.dict("os.environ", {"TEST_ENV": "DEVELOPMENT"})
with pytest.raises(ValueError):
get_choice_from_env("TEST_ENV", valid_choices)
def test_empty_string_env_value(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test behavior with empty string environment value."""
mocker.patch.dict("os.environ", {"TEST_ENV": ""})
with pytest.raises(ValueError) as exc_info:
get_choice_from_env("TEST_ENV", valid_choices)
assert "has invalid value ''" in str(exc_info.value)
def test_whitespace_env_value(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test behavior with whitespace-only environment value."""
mocker.patch.dict("os.environ", {"TEST_ENV": " development "})
with pytest.raises(ValueError):
get_choice_from_env("TEST_ENV", valid_choices)
def test_single_choice_set(self, mocker: MockerFixture) -> None:
"""Test function works correctly with single choice set."""
single_choice: set[str] = {"production"}
mocker.patch.dict("os.environ", {"TEST_ENV": "production"})
result = get_choice_from_env("TEST_ENV", single_choice)
assert result == "production"
def test_large_choice_set(self, mocker: MockerFixture) -> None:
"""Test function works correctly with large choice set."""
large_choices: set[str] = {f"option_{i}" for i in range(100)}
mocker.patch.dict("os.environ", {"TEST_ENV": "option_50"})
result = get_choice_from_env("TEST_ENV", large_choices)
assert result == "option_50"
def test_different_env_keys(
self,
mocker: MockerFixture,
valid_choices: set[str],
) -> None:
"""Test function works with different environment variable keys."""
test_cases = [
("DJANGO_ENV", "development"),
("DATABASE_BACKEND", "staging"),
("LOG_LEVEL", "production"),
("APP_MODE", "development"),
]
for env_key, env_value in test_cases:
mocker.patch.dict("os.environ", {env_key: env_value})
result = get_choice_from_env(env_key, valid_choices)
assert result == env_value

View File

@@ -78,11 +78,15 @@ class TestCustomAccountAdapter(TestCase):
adapter = get_adapter()
# Test when PAPERLESS_URL is None
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
with override_settings(
PAPERLESS_URL=None,
ACCOUNT_DEFAULT_HTTP_PROTOCOL="https",
):
expected_url = f"https://foo.org{reverse('account_reset_password_from_key', kwargs={'uidb36': 'UID', 'key': 'KEY'})}"
self.assertEqual(
adapter.get_reset_password_from_key_url("UID-KEY"),
expected_url,
)
# Test when PAPERLESS_URL is not None
with override_settings(PAPERLESS_URL="https://bar.com"):

View File

@@ -2,13 +2,17 @@ import os
from pathlib import Path
from unittest import mock
import pytest
from django.core.checks import Warning
from django.test import TestCase
from django.test import override_settings
from pytest_mock import MockerFixture
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from paperless.checks import audit_log_check
from paperless.checks import binaries_check
from paperless.checks import check_deprecated_db_settings
from paperless.checks import debug_mode_check
from paperless.checks import paths_check
from paperless.checks import settings_values_check
@@ -237,3 +241,157 @@ class TestAuditLogChecks(TestCase):
("auditlog table was found but audit log is disabled."),
msg.msg,
)
DEPRECATED_VARS: dict[str, str] = {
"PAPERLESS_DB_TIMEOUT": "timeout",
"PAPERLESS_DB_POOLSIZE": "pool.min_size / pool.max_size",
"PAPERLESS_DBSSLMODE": "sslmode",
"PAPERLESS_DBSSLROOTCERT": "sslrootcert",
"PAPERLESS_DBSSLCERT": "sslcert",
"PAPERLESS_DBSSLKEY": "sslkey",
}
class TestDeprecatedDbSettings:
"""Test suite for the check_deprecated_db_settings system check."""
def test_no_deprecated_vars_returns_empty(
self,
mocker: MockerFixture,
) -> None:
"""No warnings when none of the deprecated vars are present."""
# clear=True ensures vars from the outer test environment do not leak in
mocker.patch.dict(os.environ, {}, clear=True)
result = check_deprecated_db_settings(None)
assert result == []
@pytest.mark.parametrize(
("env_var", "db_option_key"),
[
("PAPERLESS_DB_TIMEOUT", "timeout"),
("PAPERLESS_DB_POOLSIZE", "pool.min_size / pool.max_size"),
("PAPERLESS_DBSSLMODE", "sslmode"),
("PAPERLESS_DBSSLROOTCERT", "sslrootcert"),
("PAPERLESS_DBSSLCERT", "sslcert"),
("PAPERLESS_DBSSLKEY", "sslkey"),
],
ids=[
"db-timeout",
"db-poolsize",
"ssl-mode",
"ssl-rootcert",
"ssl-cert",
"ssl-key",
],
)
def test_single_deprecated_var_produces_one_warning(
self,
mocker: MockerFixture,
env_var: str,
db_option_key: str,
) -> None:
"""Each deprecated var in isolation produces exactly one warning."""
mocker.patch.dict(os.environ, {env_var: "some_value"}, clear=True)
result = check_deprecated_db_settings(None)
assert len(result) == 1
warning = result[0]
assert isinstance(warning, Warning)
assert warning.id == "paperless.W001"
assert env_var in warning.hint
assert db_option_key in warning.hint
def test_multiple_deprecated_vars_produce_one_warning_each(
self,
mocker: MockerFixture,
) -> None:
"""Each deprecated var present in the environment gets its own warning."""
set_vars = {
"PAPERLESS_DB_TIMEOUT": "30",
"PAPERLESS_DB_POOLSIZE": "10",
"PAPERLESS_DBSSLMODE": "require",
}
mocker.patch.dict(os.environ, set_vars, clear=True)
result = check_deprecated_db_settings(None)
assert len(result) == len(set_vars)
assert all(isinstance(w, Warning) for w in result)
assert all(w.id == "paperless.W001" for w in result)
all_hints = " ".join(w.hint for w in result)
for var_name in set_vars:
assert var_name in all_hints
def test_all_deprecated_vars_produces_one_warning_each(
self,
mocker: MockerFixture,
) -> None:
"""All deprecated vars set simultaneously produces one warning per var."""
all_vars = dict.fromkeys(DEPRECATED_VARS, "some_value")
mocker.patch.dict(os.environ, all_vars, clear=True)
result = check_deprecated_db_settings(None)
assert len(result) == len(DEPRECATED_VARS)
assert all(isinstance(w, Warning) for w in result)
assert all(w.id == "paperless.W001" for w in result)
def test_unset_vars_not_mentioned_in_warnings(
self,
mocker: MockerFixture,
) -> None:
"""Vars absent from the environment do not appear in any warning."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DB_TIMEOUT": "30"},
clear=True,
)
result = check_deprecated_db_settings(None)
assert len(result) == 1
assert "PAPERLESS_DB_TIMEOUT" in result[0].hint
unset_vars = [v for v in DEPRECATED_VARS if v != "PAPERLESS_DB_TIMEOUT"]
for var_name in unset_vars:
assert var_name not in result[0].hint
def test_empty_string_var_not_treated_as_set(
self,
mocker: MockerFixture,
) -> None:
"""A var set to an empty string is not flagged as a deprecated setting."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DB_TIMEOUT": ""},
clear=True,
)
result = check_deprecated_db_settings(None)
assert result == []
def test_warning_mentions_migration_target(
self,
mocker: MockerFixture,
) -> None:
"""Each warning hints at PAPERLESS_DB_OPTIONS as the migration target."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DBSSLMODE": "require"},
clear=True,
)
result = check_deprecated_db_settings(None)
assert len(result) == 1
assert "PAPERLESS_DB_OPTIONS" in result[0].hint
def test_warning_message_identifies_var(
self,
mocker: MockerFixture,
) -> None:
"""The warning message (not just the hint) identifies the offending var."""
mocker.patch.dict(
os.environ,
{"PAPERLESS_DBSSLCERT": "/path/to/cert.pem"},
clear=True,
)
result = check_deprecated_db_settings(None)
assert len(result) == 1
assert "PAPERLESS_DBSSLCERT" in result[0].msg

View File

@@ -9,7 +9,6 @@ from celery.schedules import crontab
from paperless.settings import _parse_base_paths
from paperless.settings import _parse_beat_schedule
from paperless.settings import _parse_dateparser_languages
from paperless.settings import _parse_db_settings
from paperless.settings import _parse_ignore_dates
from paperless.settings import _parse_paperless_url
from paperless.settings import _parse_redis_url
@@ -378,64 +377,6 @@ class TestCeleryScheduleParsing(TestCase):
)
class TestDBSettings(TestCase):
def test_db_timeout_with_sqlite(self) -> None:
"""
GIVEN:
- PAPERLESS_DB_TIMEOUT is set
WHEN:
- Settings are parsed
THEN:
- PAPERLESS_DB_TIMEOUT set for sqlite
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_DB_TIMEOUT": "10",
},
):
databases = _parse_db_settings()
self.assertDictEqual(
{
"timeout": 10.0,
},
databases["default"]["OPTIONS"],
)
def test_db_timeout_with_not_sqlite(self) -> None:
"""
GIVEN:
- PAPERLESS_DB_TIMEOUT is set but db is not sqlite
WHEN:
- Settings are parsed
THEN:
- PAPERLESS_DB_TIMEOUT set correctly in non-sqlite db & for fallback sqlite db
"""
with mock.patch.dict(
os.environ,
{
"PAPERLESS_DBHOST": "127.0.0.1",
"PAPERLESS_DB_TIMEOUT": "10",
},
):
databases = _parse_db_settings()
self.assertDictEqual(
databases["default"]["OPTIONS"],
databases["default"]["OPTIONS"]
| {
"connect_timeout": 10.0,
},
)
self.assertDictEqual(
{
"timeout": 10.0,
},
databases["sqlite"]["OPTIONS"],
)
class TestPaperlessURLSettings(TestCase):
def test_paperless_url(self) -> None:
"""

View File

@@ -1,7 +1,7 @@
import tempfile
from pathlib import Path
from django.conf import settings
from django.test import override_settings
def test_favicon_view(client):
@@ -11,15 +11,14 @@ def test_favicon_view(client):
favicon_path.parent.mkdir(parents=True, exist_ok=True)
favicon_path.write_bytes(b"FAKE ICON DATA")
settings.STATIC_ROOT = static_dir
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
with override_settings(STATIC_ROOT=static_dir):
response = client.get("/favicon.ico")
assert response.status_code == 200
assert response["Content-Type"] == "image/x-icon"
assert b"".join(response.streaming_content) == b"FAKE ICON DATA"
def test_favicon_view_missing_file(client):
settings.STATIC_ROOT = Path(tempfile.mkdtemp())
response = client.get("/favicon.ico")
assert response.status_code == 404
with override_settings(STATIC_ROOT=Path(tempfile.mkdtemp())):
response = client.get("/favicon.ico")
assert response.status_code == 404

View File

@@ -5,6 +5,7 @@ from pathlib import Path
from bleach import clean
from bleach import linkify
from django.conf import settings
from django.utils import timezone
from django.utils.timezone import is_naive
from django.utils.timezone import make_aware
from gotenberg_client import GotenbergClient
@@ -332,7 +333,9 @@ class MailDocumentParser(DocumentParser):
if data["attachments"]:
data["attachments_label"] = "Attachments"
data["date"] = clean_html(mail.date.astimezone().strftime("%Y-%m-%d %H:%M"))
data["date"] = clean_html(
timezone.localtime(mail.date).strftime("%Y-%m-%d %H:%M"),
)
data["content"] = clean_html(mail.text.strip())
from django.template.loader import render_to_string

View File

@@ -6,6 +6,7 @@ from unittest import mock
import httpx
import pytest
from django.test.html import parse_html
from django.utils import timezone
from pytest_django.fixtures import SettingsWrapper
from pytest_httpx import HTTPXMock
from pytest_mock import MockerFixture
@@ -634,13 +635,14 @@ class TestParser:
THEN:
- Resulting HTML is as expected
"""
mail = mail_parser.parse_file_to_message(html_email_file)
html_file = mail_parser.mail_to_html(mail)
with timezone.override("UTC"):
mail = mail_parser.parse_file_to_message(html_email_file)
html_file = mail_parser.mail_to_html(mail)
expected_html = parse_html(html_email_html_file.read_text())
actual_html = parse_html(html_file.read_text())
expected_html = parse_html(html_email_html_file.read_text())
actual_html = parse_html(html_file.read_text())
assert expected_html == actual_html
assert expected_html == actual_html
def test_generate_pdf_from_mail(
self,

View File

@@ -1,5 +1,6 @@
import shutil
import tempfile
import unicodedata
import uuid
from pathlib import Path
from unittest import mock
@@ -847,8 +848,18 @@ class TestParser(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
"application/pdf",
)
# Copied from the PDF to here. Don't even look at it
self.assertIn("ةﯾﻠﺧﺎدﻻ ةرازو", parser.get_text())
# OCR output for RTL text varies across platforms/versions due to
# bidi controls and presentation forms; normalize before assertion.
normalized_text = "".join(
char
for char in unicodedata.normalize("NFKC", parser.get_text())
if unicodedata.category(char) != "Cf" and not char.isspace()
)
self.assertIn("ةرازو", normalized_text)
self.assertTrue(
any(token in normalized_text for token in ("ةیلخادلا", "الاخليد")),
)
@mock.patch("ocrmypdf.ocr")
def test_gs_rendering_error(self, m) -> None:

View File

@@ -18,7 +18,10 @@ nav = [
"setup.md",
"usage.md",
"configuration.md",
"administration.md",
{ Administration = [
"administration.md",
{ "v3 Migration Guide" = "migration-v3.md" },
] },
"advanced_usage.md",
"api.md",
"development.md",