mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-02-16 00:19:32 -06:00
Moves to the new parser module
This commit is contained in:
@@ -6,7 +6,6 @@ import math
|
||||
import multiprocessing
|
||||
import os
|
||||
import tempfile
|
||||
from os import PathLike
|
||||
from pathlib import Path
|
||||
from typing import Final
|
||||
from urllib.parse import urlparse
|
||||
@@ -17,6 +16,12 @@ from dateparser.languages.loader import LocaleDataLoader
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from paperless.settings.parsers import get_bool_from_env
|
||||
from paperless.settings.parsers import get_float_from_env
|
||||
from paperless.settings.parsers import get_int_from_env
|
||||
from paperless.settings.parsers import get_list_from_env
|
||||
from paperless.settings.parsers import get_path_from_env
|
||||
|
||||
logger = logging.getLogger("paperless.settings")
|
||||
|
||||
# Tap paperless.conf if it's available
|
||||
@@ -43,76 +48,6 @@ for path in [
|
||||
os.environ["OMP_THREAD_LIMIT"] = "1"
|
||||
|
||||
|
||||
def __get_boolean(key: str, default: str = "NO") -> bool:
|
||||
"""
|
||||
Return a boolean value based on whatever the user has supplied in the
|
||||
environment based on whether the value "looks like" it's True or not.
|
||||
"""
|
||||
return bool(os.getenv(key, default).lower() in ("yes", "y", "1", "t", "true"))
|
||||
|
||||
|
||||
def __get_int(key: str, default: int) -> int:
|
||||
"""
|
||||
Return an integer value based on the environment variable or a default
|
||||
"""
|
||||
return int(os.getenv(key, default))
|
||||
|
||||
|
||||
def __get_optional_int(key: str) -> int | None:
|
||||
"""
|
||||
Returns None if the environment key is not present, otherwise an integer
|
||||
"""
|
||||
if key in os.environ:
|
||||
return __get_int(key, -1) # pragma: no cover
|
||||
return None
|
||||
|
||||
|
||||
def __get_float(key: str, default: float) -> float:
|
||||
"""
|
||||
Return an integer value based on the environment variable or a default
|
||||
"""
|
||||
return float(os.getenv(key, default))
|
||||
|
||||
|
||||
def __get_path(
|
||||
key: str,
|
||||
default: PathLike | str,
|
||||
) -> Path:
|
||||
"""
|
||||
Return a normalized, absolute path based on the environment variable or a default,
|
||||
if provided
|
||||
"""
|
||||
if key in os.environ:
|
||||
return Path(os.environ[key]).resolve()
|
||||
return Path(default).resolve()
|
||||
|
||||
|
||||
def __get_optional_path(key: str) -> Path | None:
|
||||
"""
|
||||
Returns None if the environment key is not present, otherwise a fully resolved Path
|
||||
"""
|
||||
if key in os.environ:
|
||||
return __get_path(key, "")
|
||||
return None
|
||||
|
||||
|
||||
def __get_list(
|
||||
key: str,
|
||||
default: list[str] | None = None,
|
||||
sep: str = ",",
|
||||
) -> list[str]:
|
||||
"""
|
||||
Return a list of elements from the environment, as separated by the given
|
||||
string, or the default if the key does not exist
|
||||
"""
|
||||
if key in os.environ:
|
||||
return list(filter(None, os.environ[key].split(sep)))
|
||||
elif default is not None:
|
||||
return default
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def _parse_redis_url(env_redis: str | None) -> tuple[str, str]:
|
||||
"""
|
||||
Gets the Redis information from the environment or a default and handles
|
||||
@@ -275,7 +210,7 @@ def _parse_beat_schedule() -> dict:
|
||||
|
||||
|
||||
# NEVER RUN WITH DEBUG IN PRODUCTION.
|
||||
DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
|
||||
DEBUG = get_bool_from_env("PAPERLESS_DEBUG", "NO")
|
||||
|
||||
|
||||
###############################################################################
|
||||
@@ -284,21 +219,21 @@ DEBUG = __get_boolean("PAPERLESS_DEBUG", "NO")
|
||||
|
||||
BASE_DIR: Path = Path(__file__).resolve().parent.parent
|
||||
|
||||
STATIC_ROOT = __get_path("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
|
||||
STATIC_ROOT = get_path_from_env("PAPERLESS_STATICDIR", BASE_DIR.parent / "static")
|
||||
|
||||
MEDIA_ROOT = __get_path("PAPERLESS_MEDIA_ROOT", BASE_DIR.parent / "media")
|
||||
MEDIA_ROOT = get_path_from_env("PAPERLESS_MEDIA_ROOT", BASE_DIR.parent / "media")
|
||||
ORIGINALS_DIR = MEDIA_ROOT / "documents" / "originals"
|
||||
ARCHIVE_DIR = MEDIA_ROOT / "documents" / "archive"
|
||||
THUMBNAIL_DIR = MEDIA_ROOT / "documents" / "thumbnails"
|
||||
SHARE_LINK_BUNDLE_DIR = MEDIA_ROOT / "documents" / "share_link_bundles"
|
||||
|
||||
DATA_DIR = __get_path("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data")
|
||||
DATA_DIR = get_path_from_env("PAPERLESS_DATA_DIR", BASE_DIR.parent / "data")
|
||||
|
||||
NLTK_DIR = __get_path("PAPERLESS_NLTK_DIR", "/usr/share/nltk_data")
|
||||
NLTK_DIR = get_path_from_env("PAPERLESS_NLTK_DIR", "/usr/share/nltk_data")
|
||||
|
||||
# Check deprecated setting first
|
||||
EMPTY_TRASH_DIR = (
|
||||
__get_path("PAPERLESS_TRASH_DIR", os.getenv("PAPERLESS_EMPTY_TRASH_DIR"))
|
||||
get_path_from_env("PAPERLESS_TRASH_DIR", os.getenv("PAPERLESS_EMPTY_TRASH_DIR"))
|
||||
if os.getenv("PAPERLESS_TRASH_DIR") or os.getenv("PAPERLESS_EMPTY_TRASH_DIR")
|
||||
else None
|
||||
)
|
||||
@@ -307,21 +242,21 @@ EMPTY_TRASH_DIR = (
|
||||
# threads.
|
||||
MEDIA_LOCK = MEDIA_ROOT / "media.lock"
|
||||
INDEX_DIR = DATA_DIR / "index"
|
||||
MODEL_FILE = __get_path(
|
||||
MODEL_FILE = get_path_from_env(
|
||||
"PAPERLESS_MODEL_FILE",
|
||||
DATA_DIR / "classification_model.pickle",
|
||||
)
|
||||
LLM_INDEX_DIR = DATA_DIR / "llm_index"
|
||||
|
||||
LOGGING_DIR = __get_path("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
|
||||
LOGGING_DIR = get_path_from_env("PAPERLESS_LOGGING_DIR", DATA_DIR / "log")
|
||||
|
||||
CONSUMPTION_DIR = __get_path(
|
||||
CONSUMPTION_DIR = get_path_from_env(
|
||||
"PAPERLESS_CONSUMPTION_DIR",
|
||||
BASE_DIR.parent / "consume",
|
||||
)
|
||||
|
||||
# This will be created if it doesn't exist
|
||||
SCRATCH_DIR = __get_path(
|
||||
SCRATCH_DIR = get_path_from_env(
|
||||
"PAPERLESS_SCRATCH_DIR",
|
||||
Path(tempfile.gettempdir()) / "paperless",
|
||||
)
|
||||
@@ -330,7 +265,7 @@ SCRATCH_DIR = __get_path(
|
||||
# Application Definition #
|
||||
###############################################################################
|
||||
|
||||
env_apps = __get_list("PAPERLESS_APPS")
|
||||
env_apps = get_list_from_env("PAPERLESS_APPS")
|
||||
|
||||
INSTALLED_APPS = [
|
||||
"whitenoise.runserver_nostatic",
|
||||
@@ -403,7 +338,7 @@ MIDDLEWARE = [
|
||||
]
|
||||
|
||||
# Optional to enable compression
|
||||
if __get_boolean("PAPERLESS_ENABLE_COMPRESSION", "yes"): # pragma: no cover
|
||||
if get_bool_from_env("PAPERLESS_ENABLE_COMPRESSION", "yes"): # pragma: no cover
|
||||
MIDDLEWARE.insert(0, "compression_middleware.middleware.CompressionMiddleware")
|
||||
|
||||
# Workaround to not compress streaming responses (e.g. chat).
|
||||
@@ -512,8 +447,8 @@ EMAIL_PORT: Final[int] = int(os.getenv("PAPERLESS_EMAIL_PORT", 25))
|
||||
EMAIL_HOST_USER: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_USER", "")
|
||||
EMAIL_HOST_PASSWORD: Final[str] = os.getenv("PAPERLESS_EMAIL_HOST_PASSWORD", "")
|
||||
DEFAULT_FROM_EMAIL: Final[str] = os.getenv("PAPERLESS_EMAIL_FROM", EMAIL_HOST_USER)
|
||||
EMAIL_USE_TLS: Final[bool] = __get_boolean("PAPERLESS_EMAIL_USE_TLS")
|
||||
EMAIL_USE_SSL: Final[bool] = __get_boolean("PAPERLESS_EMAIL_USE_SSL")
|
||||
EMAIL_USE_TLS: Final[bool] = get_bool_from_env("PAPERLESS_EMAIL_USE_TLS")
|
||||
EMAIL_USE_SSL: Final[bool] = get_bool_from_env("PAPERLESS_EMAIL_USE_SSL")
|
||||
EMAIL_SUBJECT_PREFIX: Final[str] = "[Paperless-ngx] "
|
||||
EMAIL_TIMEOUT = 30.0
|
||||
EMAIL_ENABLED = EMAIL_HOST != "localhost" or EMAIL_HOST_USER != ""
|
||||
@@ -538,20 +473,22 @@ ACCOUNT_DEFAULT_HTTP_PROTOCOL = os.getenv(
|
||||
)
|
||||
|
||||
ACCOUNT_ADAPTER = "paperless.adapter.CustomAccountAdapter"
|
||||
ACCOUNT_ALLOW_SIGNUPS = __get_boolean("PAPERLESS_ACCOUNT_ALLOW_SIGNUPS")
|
||||
ACCOUNT_DEFAULT_GROUPS = __get_list("PAPERLESS_ACCOUNT_DEFAULT_GROUPS")
|
||||
ACCOUNT_ALLOW_SIGNUPS = get_bool_from_env("PAPERLESS_ACCOUNT_ALLOW_SIGNUPS")
|
||||
ACCOUNT_DEFAULT_GROUPS = get_list_from_env("PAPERLESS_ACCOUNT_DEFAULT_GROUPS")
|
||||
|
||||
SOCIALACCOUNT_ADAPTER = "paperless.adapter.CustomSocialAccountAdapter"
|
||||
SOCIALACCOUNT_ALLOW_SIGNUPS = __get_boolean(
|
||||
SOCIALACCOUNT_ALLOW_SIGNUPS = get_bool_from_env(
|
||||
"PAPERLESS_SOCIALACCOUNT_ALLOW_SIGNUPS",
|
||||
"yes",
|
||||
)
|
||||
SOCIALACCOUNT_AUTO_SIGNUP = __get_boolean("PAPERLESS_SOCIAL_AUTO_SIGNUP")
|
||||
SOCIALACCOUNT_AUTO_SIGNUP = get_bool_from_env("PAPERLESS_SOCIAL_AUTO_SIGNUP")
|
||||
SOCIALACCOUNT_PROVIDERS = json.loads(
|
||||
os.getenv("PAPERLESS_SOCIALACCOUNT_PROVIDERS", "{}"),
|
||||
)
|
||||
SOCIAL_ACCOUNT_DEFAULT_GROUPS = __get_list("PAPERLESS_SOCIAL_ACCOUNT_DEFAULT_GROUPS")
|
||||
SOCIAL_ACCOUNT_SYNC_GROUPS = __get_boolean("PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS")
|
||||
SOCIAL_ACCOUNT_DEFAULT_GROUPS = get_list_from_env(
|
||||
"PAPERLESS_SOCIAL_ACCOUNT_DEFAULT_GROUPS",
|
||||
)
|
||||
SOCIAL_ACCOUNT_SYNC_GROUPS = get_bool_from_env("PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS")
|
||||
SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM: Final[str] = os.getenv(
|
||||
"PAPERLESS_SOCIAL_ACCOUNT_SYNC_GROUPS_CLAIM",
|
||||
"groups",
|
||||
@@ -563,8 +500,8 @@ MFA_TOTP_ISSUER = "Paperless-ngx"
|
||||
|
||||
ACCOUNT_EMAIL_SUBJECT_PREFIX = "[Paperless-ngx] "
|
||||
|
||||
DISABLE_REGULAR_LOGIN = __get_boolean("PAPERLESS_DISABLE_REGULAR_LOGIN")
|
||||
REDIRECT_LOGIN_TO_SSO = __get_boolean("PAPERLESS_REDIRECT_LOGIN_TO_SSO")
|
||||
DISABLE_REGULAR_LOGIN = get_bool_from_env("PAPERLESS_DISABLE_REGULAR_LOGIN")
|
||||
REDIRECT_LOGIN_TO_SSO = get_bool_from_env("PAPERLESS_REDIRECT_LOGIN_TO_SSO")
|
||||
|
||||
AUTO_LOGIN_USERNAME = os.getenv("PAPERLESS_AUTO_LOGIN_USERNAME")
|
||||
|
||||
@@ -577,12 +514,15 @@ ACCOUNT_EMAIL_VERIFICATION = (
|
||||
)
|
||||
)
|
||||
|
||||
ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS = __get_boolean(
|
||||
ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS = get_bool_from_env(
|
||||
"PAPERLESS_ACCOUNT_EMAIL_UNKNOWN_ACCOUNTS",
|
||||
"True",
|
||||
)
|
||||
|
||||
ACCOUNT_SESSION_REMEMBER = __get_boolean("PAPERLESS_ACCOUNT_SESSION_REMEMBER", "True")
|
||||
ACCOUNT_SESSION_REMEMBER = get_bool_from_env(
|
||||
"PAPERLESS_ACCOUNT_SESSION_REMEMBER",
|
||||
"True",
|
||||
)
|
||||
SESSION_EXPIRE_AT_BROWSER_CLOSE = not ACCOUNT_SESSION_REMEMBER
|
||||
SESSION_COOKIE_AGE = int(
|
||||
os.getenv("PAPERLESS_SESSION_COOKIE_AGE", 60 * 60 * 24 * 7 * 3),
|
||||
@@ -599,8 +539,8 @@ if AUTO_LOGIN_USERNAME:
|
||||
|
||||
def _parse_remote_user_settings() -> str:
|
||||
global MIDDLEWARE, AUTHENTICATION_BACKENDS, REST_FRAMEWORK
|
||||
enable = __get_boolean("PAPERLESS_ENABLE_HTTP_REMOTE_USER")
|
||||
enable_api = __get_boolean("PAPERLESS_ENABLE_HTTP_REMOTE_USER_API")
|
||||
enable = get_bool_from_env("PAPERLESS_ENABLE_HTTP_REMOTE_USER")
|
||||
enable_api = get_bool_from_env("PAPERLESS_ENABLE_HTTP_REMOTE_USER_API")
|
||||
if enable or enable_api:
|
||||
MIDDLEWARE.append("paperless.auth.HttpRemoteUserMiddleware")
|
||||
AUTHENTICATION_BACKENDS.insert(
|
||||
@@ -628,16 +568,16 @@ HTTP_REMOTE_USER_HEADER_NAME = _parse_remote_user_settings()
|
||||
X_FRAME_OPTIONS = "SAMEORIGIN"
|
||||
|
||||
# The next 3 settings can also be set using just PAPERLESS_URL
|
||||
CSRF_TRUSTED_ORIGINS = __get_list("PAPERLESS_CSRF_TRUSTED_ORIGINS")
|
||||
CSRF_TRUSTED_ORIGINS = get_list_from_env("PAPERLESS_CSRF_TRUSTED_ORIGINS")
|
||||
|
||||
if DEBUG:
|
||||
# Allow access from the angular development server during debugging
|
||||
CSRF_TRUSTED_ORIGINS.append("http://localhost:4200")
|
||||
|
||||
# We allow CORS from localhost:8000
|
||||
CORS_ALLOWED_ORIGINS = __get_list(
|
||||
CORS_ALLOWED_ORIGINS = get_list_from_env(
|
||||
"PAPERLESS_CORS_ALLOWED_HOSTS",
|
||||
["http://localhost:8000"],
|
||||
default=["http://localhost:8000"],
|
||||
)
|
||||
|
||||
if DEBUG:
|
||||
@@ -650,7 +590,7 @@ CORS_EXPOSE_HEADERS = [
|
||||
"Content-Disposition",
|
||||
]
|
||||
|
||||
ALLOWED_HOSTS = __get_list("PAPERLESS_ALLOWED_HOSTS", ["*"])
|
||||
ALLOWED_HOSTS = get_list_from_env("PAPERLESS_ALLOWED_HOSTS", default=["*"])
|
||||
if ALLOWED_HOSTS != ["*"]:
|
||||
# always allow localhost. Necessary e.g. for healthcheck in docker.
|
||||
ALLOWED_HOSTS.append("localhost")
|
||||
@@ -670,10 +610,10 @@ def _parse_paperless_url():
|
||||
PAPERLESS_URL = _parse_paperless_url()
|
||||
|
||||
# For use with trusted proxies
|
||||
TRUSTED_PROXIES = __get_list("PAPERLESS_TRUSTED_PROXIES")
|
||||
TRUSTED_PROXIES = get_list_from_env("PAPERLESS_TRUSTED_PROXIES")
|
||||
|
||||
USE_X_FORWARDED_HOST = __get_boolean("PAPERLESS_USE_X_FORWARD_HOST", "false")
|
||||
USE_X_FORWARDED_PORT = __get_boolean("PAPERLESS_USE_X_FORWARD_PORT", "false")
|
||||
USE_X_FORWARDED_HOST = get_bool_from_env("PAPERLESS_USE_X_FORWARD_HOST", "false")
|
||||
USE_X_FORWARDED_PORT = get_bool_from_env("PAPERLESS_USE_X_FORWARD_PORT", "false")
|
||||
SECURE_PROXY_SSL_HEADER = (
|
||||
tuple(json.loads(os.environ["PAPERLESS_PROXY_SSL_HEADER"]))
|
||||
if "PAPERLESS_PROXY_SSL_HEADER" in os.environ
|
||||
@@ -716,7 +656,7 @@ CSRF_COOKIE_NAME = f"{COOKIE_PREFIX}csrftoken"
|
||||
SESSION_COOKIE_NAME = f"{COOKIE_PREFIX}sessionid"
|
||||
LANGUAGE_COOKIE_NAME = f"{COOKIE_PREFIX}django_language"
|
||||
|
||||
EMAIL_CERTIFICATE_FILE = __get_optional_path("PAPERLESS_EMAIL_CERTIFICATE_LOCATION")
|
||||
EMAIL_CERTIFICATE_FILE = get_path_from_env("PAPERLESS_EMAIL_CERTIFICATE_LOCATION", None)
|
||||
|
||||
|
||||
###############################################################################
|
||||
@@ -942,7 +882,7 @@ CELERY_BROKER_URL = _CELERY_REDIS_URL
|
||||
CELERY_TIMEZONE = TIME_ZONE
|
||||
|
||||
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
|
||||
CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1)
|
||||
CELERY_WORKER_CONCURRENCY: Final[int] = get_int_from_env("PAPERLESS_TASK_WORKERS", 1)
|
||||
TASK_WORKERS = CELERY_WORKER_CONCURRENCY
|
||||
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
|
||||
CELERY_WORKER_SEND_TASK_EVENTS = True
|
||||
@@ -955,7 +895,7 @@ CELERY_BROKER_TRANSPORT_OPTIONS = {
|
||||
}
|
||||
|
||||
CELERY_TASK_TRACK_STARTED = True
|
||||
CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
|
||||
CELERY_TASK_TIME_LIMIT: Final[int] = get_int_from_env("PAPERLESS_WORKER_TIMEOUT", 1800)
|
||||
|
||||
CELERY_RESULT_EXTENDED = True
|
||||
CELERY_RESULT_BACKEND = "django-db"
|
||||
@@ -975,14 +915,14 @@ CELERY_BEAT_SCHEDULE_FILENAME = str(DATA_DIR / "celerybeat-schedule.db")
|
||||
|
||||
# Cachalot: Database read cache.
|
||||
def _parse_cachalot_settings():
|
||||
ttl = __get_int("PAPERLESS_READ_CACHE_TTL", 3600)
|
||||
ttl = get_int_from_env("PAPERLESS_READ_CACHE_TTL", 3600)
|
||||
ttl = min(ttl, 31536000) if ttl > 0 else 3600
|
||||
_, redis_url = _parse_redis_url(
|
||||
os.getenv("PAPERLESS_READ_CACHE_REDIS_URL", _CHANNELS_REDIS_URL),
|
||||
)
|
||||
result = {
|
||||
"CACHALOT_CACHE": "read-cache",
|
||||
"CACHALOT_ENABLED": __get_boolean(
|
||||
"CACHALOT_ENABLED": get_bool_from_env(
|
||||
"PAPERLESS_DB_READ_CACHE_ENABLED",
|
||||
default="no",
|
||||
),
|
||||
@@ -1067,9 +1007,9 @@ CONSUMER_POLLING_INTERVAL = float(os.getenv("PAPERLESS_CONSUMER_POLLING_INTERVAL
|
||||
|
||||
CONSUMER_STABILITY_DELAY = float(os.getenv("PAPERLESS_CONSUMER_STABILITY_DELAY", 5))
|
||||
|
||||
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
|
||||
CONSUMER_DELETE_DUPLICATES = get_bool_from_env("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
|
||||
|
||||
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
|
||||
CONSUMER_RECURSIVE = get_bool_from_env("PAPERLESS_CONSUMER_RECURSIVE")
|
||||
|
||||
# Ignore regex patterns, matched against filename only
|
||||
CONSUMER_IGNORE_PATTERNS = list(
|
||||
@@ -1091,13 +1031,13 @@ CONSUMER_IGNORE_DIRS = list(
|
||||
),
|
||||
)
|
||||
|
||||
CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
|
||||
CONSUMER_SUBDIRS_AS_TAGS = get_bool_from_env("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
|
||||
|
||||
CONSUMER_ENABLE_BARCODES: Final[bool] = __get_boolean(
|
||||
CONSUMER_ENABLE_BARCODES: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_ENABLE_BARCODES",
|
||||
)
|
||||
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = __get_boolean(
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT",
|
||||
)
|
||||
|
||||
@@ -1106,7 +1046,7 @@ CONSUMER_BARCODE_STRING: Final[str] = os.getenv(
|
||||
"PATCHT",
|
||||
)
|
||||
|
||||
CONSUMER_ENABLE_ASN_BARCODE: Final[bool] = __get_boolean(
|
||||
CONSUMER_ENABLE_ASN_BARCODE: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_ENABLE_ASN_BARCODE",
|
||||
)
|
||||
|
||||
@@ -1115,23 +1055,26 @@ CONSUMER_ASN_BARCODE_PREFIX: Final[str] = os.getenv(
|
||||
"ASN",
|
||||
)
|
||||
|
||||
CONSUMER_BARCODE_UPSCALE: Final[float] = __get_float(
|
||||
CONSUMER_BARCODE_UPSCALE: Final[float] = get_float_from_env(
|
||||
"PAPERLESS_CONSUMER_BARCODE_UPSCALE",
|
||||
0.0,
|
||||
)
|
||||
|
||||
CONSUMER_BARCODE_DPI: Final[int] = __get_int("PAPERLESS_CONSUMER_BARCODE_DPI", 300)
|
||||
CONSUMER_BARCODE_DPI: Final[int] = get_int_from_env(
|
||||
"PAPERLESS_CONSUMER_BARCODE_DPI",
|
||||
300,
|
||||
)
|
||||
|
||||
CONSUMER_BARCODE_MAX_PAGES: Final[int] = __get_int(
|
||||
CONSUMER_BARCODE_MAX_PAGES: Final[int] = get_int_from_env(
|
||||
"PAPERLESS_CONSUMER_BARCODE_MAX_PAGES",
|
||||
0,
|
||||
)
|
||||
|
||||
CONSUMER_BARCODE_RETAIN_SPLIT_PAGES = __get_boolean(
|
||||
CONSUMER_BARCODE_RETAIN_SPLIT_PAGES = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_BARCODE_RETAIN_SPLIT_PAGES",
|
||||
)
|
||||
|
||||
CONSUMER_ENABLE_TAG_BARCODE: Final[bool] = __get_boolean(
|
||||
CONSUMER_ENABLE_TAG_BARCODE: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_ENABLE_TAG_BARCODE",
|
||||
)
|
||||
|
||||
@@ -1144,11 +1087,11 @@ CONSUMER_TAG_BARCODE_MAPPING = dict(
|
||||
),
|
||||
)
|
||||
|
||||
CONSUMER_TAG_BARCODE_SPLIT: Final[bool] = __get_boolean(
|
||||
CONSUMER_TAG_BARCODE_SPLIT: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_TAG_BARCODE_SPLIT",
|
||||
)
|
||||
|
||||
CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED: Final[bool] = __get_boolean(
|
||||
CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_ENABLE_COLLATE_DOUBLE_SIDED",
|
||||
)
|
||||
|
||||
@@ -1157,13 +1100,13 @@ CONSUMER_COLLATE_DOUBLE_SIDED_SUBDIR_NAME: Final[str] = os.getenv(
|
||||
"double-sided",
|
||||
)
|
||||
|
||||
CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT: Final[bool] = __get_boolean(
|
||||
CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_CONSUMER_COLLATE_DOUBLE_SIDED_TIFF_SUPPORT",
|
||||
)
|
||||
|
||||
CONSUMER_PDF_RECOVERABLE_MIME_TYPES = ("application/octet-stream",)
|
||||
|
||||
OCR_PAGES = __get_optional_int("PAPERLESS_OCR_PAGES")
|
||||
OCR_PAGES = get_int_from_env("PAPERLESS_OCR_PAGES", None)
|
||||
|
||||
# The default language that tesseract will attempt to use when parsing
|
||||
# documents. It should be a 3-letter language code consistent with ISO 639.
|
||||
@@ -1177,21 +1120,22 @@ OCR_MODE = os.getenv("PAPERLESS_OCR_MODE", "skip")
|
||||
|
||||
OCR_SKIP_ARCHIVE_FILE = os.getenv("PAPERLESS_OCR_SKIP_ARCHIVE_FILE", "never")
|
||||
|
||||
OCR_IMAGE_DPI = __get_optional_int("PAPERLESS_OCR_IMAGE_DPI")
|
||||
OCR_IMAGE_DPI = get_int_from_env("PAPERLESS_OCR_IMAGE_DPI", None)
|
||||
|
||||
OCR_CLEAN = os.getenv("PAPERLESS_OCR_CLEAN", "clean")
|
||||
|
||||
OCR_DESKEW: Final[bool] = __get_boolean("PAPERLESS_OCR_DESKEW", "true")
|
||||
OCR_DESKEW: Final[bool] = get_bool_from_env("PAPERLESS_OCR_DESKEW", "true")
|
||||
|
||||
OCR_ROTATE_PAGES: Final[bool] = __get_boolean("PAPERLESS_OCR_ROTATE_PAGES", "true")
|
||||
OCR_ROTATE_PAGES: Final[bool] = get_bool_from_env("PAPERLESS_OCR_ROTATE_PAGES", "true")
|
||||
|
||||
OCR_ROTATE_PAGES_THRESHOLD: Final[float] = __get_float(
|
||||
OCR_ROTATE_PAGES_THRESHOLD: Final[float] = get_float_from_env(
|
||||
"PAPERLESS_OCR_ROTATE_PAGES_THRESHOLD",
|
||||
12.0,
|
||||
)
|
||||
|
||||
OCR_MAX_IMAGE_PIXELS: Final[int | None] = __get_optional_int(
|
||||
OCR_MAX_IMAGE_PIXELS: Final[int | None] = get_int_from_env(
|
||||
"PAPERLESS_OCR_MAX_IMAGE_PIXELS",
|
||||
None,
|
||||
)
|
||||
|
||||
OCR_COLOR_CONVERSION_STRATEGY = os.getenv(
|
||||
@@ -1201,8 +1145,9 @@ OCR_COLOR_CONVERSION_STRATEGY = os.getenv(
|
||||
|
||||
OCR_USER_ARGS = os.getenv("PAPERLESS_OCR_USER_ARGS")
|
||||
|
||||
MAX_IMAGE_PIXELS: Final[int | None] = __get_optional_int(
|
||||
MAX_IMAGE_PIXELS: Final[int | None] = get_int_from_env(
|
||||
"PAPERLESS_MAX_IMAGE_PIXELS",
|
||||
None,
|
||||
)
|
||||
|
||||
# GNUPG needs a home directory for some reason
|
||||
@@ -1216,7 +1161,7 @@ CONVERT_MEMORY_LIMIT = os.getenv("PAPERLESS_CONVERT_MEMORY_LIMIT")
|
||||
GS_BINARY = os.getenv("PAPERLESS_GS_BINARY", "gs")
|
||||
|
||||
# Fallback layout for .eml consumption
|
||||
EMAIL_PARSE_DEFAULT_LAYOUT = __get_int(
|
||||
EMAIL_PARSE_DEFAULT_LAYOUT = get_int_from_env(
|
||||
"PAPERLESS_EMAIL_PARSE_DEFAULT_LAYOUT",
|
||||
1, # MailRule.PdfLayout.TEXT_HTML but that can't be imported here
|
||||
)
|
||||
@@ -1257,7 +1202,7 @@ DATE_PARSER_LANGUAGES = (
|
||||
# Maximum number of dates taken from document start to end to show as suggestions for
|
||||
# `created` date in the frontend. Duplicates are removed, which can result in
|
||||
# fewer dates shown.
|
||||
NUMBER_OF_SUGGESTED_DATES = __get_int("PAPERLESS_NUMBER_OF_SUGGESTED_DATES", 3)
|
||||
NUMBER_OF_SUGGESTED_DATES = get_int_from_env("PAPERLESS_NUMBER_OF_SUGGESTED_DATES", 3)
|
||||
|
||||
# Specify the filename format for out files
|
||||
FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")
|
||||
@@ -1265,7 +1210,7 @@ FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")
|
||||
# If this is enabled, variables in filename format will resolve to
|
||||
# empty-string instead of 'none'.
|
||||
# Directories with 'empty names' are omitted, too.
|
||||
FILENAME_FORMAT_REMOVE_NONE = __get_boolean(
|
||||
FILENAME_FORMAT_REMOVE_NONE = get_bool_from_env(
|
||||
"PAPERLESS_FILENAME_FORMAT_REMOVE_NONE",
|
||||
"NO",
|
||||
)
|
||||
@@ -1276,7 +1221,7 @@ THUMBNAIL_FONT_NAME = os.getenv(
|
||||
)
|
||||
|
||||
# Tika settings
|
||||
TIKA_ENABLED = __get_boolean("PAPERLESS_TIKA_ENABLED", "NO")
|
||||
TIKA_ENABLED = get_bool_from_env("PAPERLESS_TIKA_ENABLED", "NO")
|
||||
TIKA_ENDPOINT = os.getenv("PAPERLESS_TIKA_ENDPOINT", "http://localhost:9998")
|
||||
TIKA_GOTENBERG_ENDPOINT = os.getenv(
|
||||
"PAPERLESS_TIKA_GOTENBERG_ENDPOINT",
|
||||
@@ -1286,7 +1231,7 @@ TIKA_GOTENBERG_ENDPOINT = os.getenv(
|
||||
if TIKA_ENABLED:
|
||||
INSTALLED_APPS.append("paperless_tika.apps.PaperlessTikaConfig")
|
||||
|
||||
AUDIT_LOG_ENABLED = __get_boolean("PAPERLESS_AUDIT_LOG_ENABLED", "true")
|
||||
AUDIT_LOG_ENABLED = get_bool_from_env("PAPERLESS_AUDIT_LOG_ENABLED", "true")
|
||||
if AUDIT_LOG_ENABLED:
|
||||
INSTALLED_APPS.append("auditlog")
|
||||
MIDDLEWARE.append("auditlog.middleware.AuditlogMiddleware")
|
||||
@@ -1331,7 +1276,7 @@ if os.getenv("PAPERLESS_IGNORE_DATES") is not None:
|
||||
|
||||
ENABLE_UPDATE_CHECK = os.getenv("PAPERLESS_ENABLE_UPDATE_CHECK", "default")
|
||||
if ENABLE_UPDATE_CHECK != "default":
|
||||
ENABLE_UPDATE_CHECK = __get_boolean("PAPERLESS_ENABLE_UPDATE_CHECK")
|
||||
ENABLE_UPDATE_CHECK = get_bool_from_env("PAPERLESS_ENABLE_UPDATE_CHECK")
|
||||
|
||||
APP_TITLE = os.getenv("PAPERLESS_APP_TITLE", None)
|
||||
APP_LOGO = os.getenv("PAPERLESS_APP_LOGO", None)
|
||||
@@ -1376,7 +1321,7 @@ def _get_nltk_language_setting(ocr_lang: str) -> str | None:
|
||||
return iso_code_to_nltk.get(ocr_lang)
|
||||
|
||||
|
||||
NLTK_ENABLED: Final[bool] = __get_boolean("PAPERLESS_ENABLE_NLTK", "yes")
|
||||
NLTK_ENABLED: Final[bool] = get_bool_from_env("PAPERLESS_ENABLE_NLTK", "yes")
|
||||
|
||||
NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE)
|
||||
|
||||
@@ -1385,7 +1330,7 @@ NLTK_LANGUAGE: str | None = _get_nltk_language_setting(OCR_LANGUAGE)
|
||||
###############################################################################
|
||||
|
||||
EMAIL_GNUPG_HOME: Final[str | None] = os.getenv("PAPERLESS_EMAIL_GNUPG_HOME")
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = __get_boolean(
|
||||
EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = get_bool_from_env(
|
||||
"PAPERLESS_ENABLE_GPG_DECRYPTOR",
|
||||
)
|
||||
|
||||
@@ -1393,7 +1338,7 @@ EMAIL_ENABLE_GPG_DECRYPTOR: Final[bool] = __get_boolean(
|
||||
###############################################################################
|
||||
# Soft Delete #
|
||||
###############################################################################
|
||||
EMPTY_TRASH_DELAY = max(__get_int("PAPERLESS_EMPTY_TRASH_DELAY", 30), 1)
|
||||
EMPTY_TRASH_DELAY = max(get_int_from_env("PAPERLESS_EMPTY_TRASH_DELAY", 30), 1)
|
||||
|
||||
|
||||
###############################################################################
|
||||
@@ -1420,19 +1365,19 @@ OUTLOOK_OAUTH_ENABLED = bool(
|
||||
###############################################################################
|
||||
WEBHOOKS_ALLOWED_SCHEMES = set(
|
||||
s.lower()
|
||||
for s in __get_list(
|
||||
for s in get_list_from_env(
|
||||
"PAPERLESS_WEBHOOKS_ALLOWED_SCHEMES",
|
||||
["http", "https"],
|
||||
default=["http", "https"],
|
||||
)
|
||||
)
|
||||
WEBHOOKS_ALLOWED_PORTS = set(
|
||||
int(p)
|
||||
for p in __get_list(
|
||||
for p in get_list_from_env(
|
||||
"PAPERLESS_WEBHOOKS_ALLOWED_PORTS",
|
||||
[],
|
||||
default=[],
|
||||
)
|
||||
)
|
||||
WEBHOOKS_ALLOW_INTERNAL_REQUESTS = __get_boolean(
|
||||
WEBHOOKS_ALLOW_INTERNAL_REQUESTS = get_bool_from_env(
|
||||
"PAPERLESS_WEBHOOKS_ALLOW_INTERNAL_REQUESTS",
|
||||
"true",
|
||||
)
|
||||
@@ -1447,7 +1392,7 @@ REMOTE_OCR_ENDPOINT = os.getenv("PAPERLESS_REMOTE_OCR_ENDPOINT")
|
||||
################################################################################
|
||||
# AI Settings #
|
||||
################################################################################
|
||||
AI_ENABLED = __get_boolean("PAPERLESS_AI_ENABLED", "NO")
|
||||
AI_ENABLED = get_bool_from_env("PAPERLESS_AI_ENABLED", "NO")
|
||||
LLM_EMBEDDING_BACKEND = os.getenv(
|
||||
"PAPERLESS_AI_LLM_EMBEDDING_BACKEND",
|
||||
) # "huggingface" or "openai"
|
||||
|
||||
294
src/paperless/settings/parsers.py
Normal file
294
src/paperless/settings/parsers.py
Normal file
@@ -0,0 +1,294 @@
|
||||
import copy
|
||||
import os
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Mapping
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import TypeVar
|
||||
from typing import overload
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def str_to_bool(value: str) -> bool:
|
||||
"""
|
||||
Converts a string representation of truth to a boolean value.
|
||||
|
||||
Recognizes 'true', '1', 't', 'y', 'yes' as True, and
|
||||
'false', '0', 'f', 'n', 'no' as False. Case-insensitive.
|
||||
|
||||
Args:
|
||||
value: The string to convert.
|
||||
|
||||
Returns:
|
||||
The boolean representation of the string.
|
||||
|
||||
Raises:
|
||||
ValueError: If the string is not a recognized boolean value.
|
||||
"""
|
||||
val_lower = value.strip().lower()
|
||||
if val_lower in ("true", "1", "t", "y", "yes"):
|
||||
return True
|
||||
elif val_lower in ("false", "0", "f", "n", "no"):
|
||||
return False
|
||||
raise ValueError(f"Cannot convert '{value}' to a boolean.")
|
||||
|
||||
|
||||
def parse_dict_from_str(
|
||||
env_str: str | None,
|
||||
defaults: dict[str, Any] | None = None,
|
||||
type_map: Mapping[str, Callable[[str], Any]] | None = None,
|
||||
separator: str = ",",
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Parses a key-value string into a dictionary, applying defaults and casting types.
|
||||
|
||||
Supports nested keys via dot-notation, e.g.:
|
||||
"database.host=localhost,database.port=5432"
|
||||
|
||||
Args:
|
||||
env_str: The string from the environment variable (e.g., "port=9090,debug=true").
|
||||
defaults: A dictionary of default values (can contain nested dicts).
|
||||
type_map: A dictionary mapping keys (dot-notation allowed) to a type or a parsing
|
||||
function (e.g., {'port': int, 'debug': bool, 'database.port': int}).
|
||||
The special `bool` type triggers custom boolean parsing.
|
||||
separator: The character used to separate key-value pairs. Defaults to ','.
|
||||
|
||||
Returns:
|
||||
A dictionary with the parsed and correctly-typed settings.
|
||||
|
||||
Raises:
|
||||
ValueError: If a value cannot be cast to its specified type.
|
||||
"""
|
||||
|
||||
def _set_nested(d: dict, keys: list[str], value: Any) -> None:
|
||||
"""Set a nested value, creating intermediate dicts as needed."""
|
||||
cur = d
|
||||
for k in keys[:-1]:
|
||||
if k not in cur or not isinstance(cur[k], dict):
|
||||
cur[k] = {}
|
||||
cur = cur[k]
|
||||
cur[keys[-1]] = value
|
||||
|
||||
def _get_nested(d: dict, keys: list[str]) -> Any:
|
||||
"""Get nested value or raise KeyError if not present."""
|
||||
cur = d
|
||||
for k in keys:
|
||||
if not isinstance(cur, dict) or k not in cur:
|
||||
raise KeyError
|
||||
cur = cur[k]
|
||||
return cur
|
||||
|
||||
def _has_nested(d: dict, keys: list[str]) -> bool:
|
||||
try:
|
||||
_get_nested(d, keys)
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
settings: dict[str, Any] = copy.deepcopy(defaults) if defaults else {}
|
||||
_type_map = type_map if type_map else {}
|
||||
|
||||
if not env_str:
|
||||
return settings
|
||||
|
||||
# Parse the environment string using the specified separator
|
||||
pairs = [p.strip() for p in env_str.split(separator) if p.strip()]
|
||||
for pair in pairs:
|
||||
if "=" not in pair:
|
||||
# ignore malformed pairs
|
||||
continue
|
||||
key, val = pair.split("=", 1)
|
||||
key = key.strip()
|
||||
val = val.strip()
|
||||
if not key:
|
||||
continue
|
||||
parts = key.split(".")
|
||||
_set_nested(settings, parts, val)
|
||||
|
||||
# Apply type casting to the updated settings (supports nested keys in type_map)
|
||||
for key, caster in _type_map.items():
|
||||
key_parts = key.split(".")
|
||||
if _has_nested(settings, key_parts):
|
||||
raw_val = _get_nested(settings, key_parts)
|
||||
# Only cast if it's a string (i.e. from env parsing). If defaults already provided
|
||||
# a different type we leave it as-is.
|
||||
if isinstance(raw_val, str):
|
||||
try:
|
||||
if caster is bool:
|
||||
parsed = str_to_bool(raw_val)
|
||||
elif caster is Path:
|
||||
parsed = Path(raw_val).resolve()
|
||||
else:
|
||||
parsed = caster(raw_val)
|
||||
except (ValueError, TypeError) as e:
|
||||
caster_name = getattr(caster, "__name__", repr(caster))
|
||||
raise ValueError(
|
||||
f"Error casting key '{key}' with value '{raw_val}' "
|
||||
f"to type '{caster_name}'",
|
||||
) from e
|
||||
_set_nested(settings, key_parts, parsed)
|
||||
|
||||
return settings
|
||||
|
||||
|
||||
def get_bool_from_env(key: str, default: str = "NO") -> bool:
|
||||
"""
|
||||
Return a boolean value based on whatever the user has supplied in the
|
||||
environment based on whether the value "looks like" it's True or not.
|
||||
"""
|
||||
return str_to_bool(os.getenv(key, default))
|
||||
|
||||
|
||||
@overload
|
||||
def get_int_from_env(key: str) -> int | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def get_int_from_env(key: str, default: None) -> int | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def get_int_from_env(key: str, default: int) -> int: ...
|
||||
|
||||
|
||||
def get_int_from_env(key: str, default: int | None = None) -> int | None:
|
||||
"""
|
||||
Return an integer value based on the environment variable.
|
||||
If default is provided, returns that value when key is missing.
|
||||
If default is None, returns None when key is missing.
|
||||
"""
|
||||
if key not in os.environ:
|
||||
return default
|
||||
|
||||
return int(os.environ[key])
|
||||
|
||||
|
||||
@overload
|
||||
def get_float_from_env(key: str) -> float | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def get_float_from_env(key: str, default: None) -> float | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def get_float_from_env(key: str, default: float) -> float: ...
|
||||
|
||||
|
||||
def get_float_from_env(key: str, default: float | None = None) -> float | None:
|
||||
"""
|
||||
Return a float value based on the environment variable.
|
||||
If default is provided, returns that value when key is missing.
|
||||
If default is None, returns None when key is missing.
|
||||
"""
|
||||
if key not in os.environ:
|
||||
return default
|
||||
|
||||
return float(os.environ[key])
|
||||
|
||||
|
||||
@overload
|
||||
def get_path_from_env(key: str) -> Path | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def get_path_from_env(key: str, default: None) -> Path | None: ...
|
||||
|
||||
|
||||
@overload
|
||||
def get_path_from_env(key: str, default: Path | str) -> Path: ...
|
||||
|
||||
|
||||
def get_path_from_env(key: str, default: Path | str | None = None) -> Path | None:
|
||||
"""
|
||||
Return a Path object based on the environment variable.
|
||||
If default is provided, returns that value when key is missing.
|
||||
If default is None, returns None when key is missing.
|
||||
"""
|
||||
if key not in os.environ:
|
||||
return default if default is None else Path(default).resolve()
|
||||
|
||||
return Path(os.environ[key]).resolve()
|
||||
|
||||
|
||||
def get_list_from_env(
|
||||
key: str,
|
||||
separator: str = ",",
|
||||
default: list[T] | None = None,
|
||||
*,
|
||||
strip_whitespace: bool = True,
|
||||
remove_empty: bool = True,
|
||||
required: bool = False,
|
||||
) -> list[str] | list[T]:
|
||||
"""
|
||||
Get and parse a list from an environment variable or return a default.
|
||||
|
||||
Args:
|
||||
key: Environment variable name
|
||||
separator: Character(s) to split on (default: ',')
|
||||
default: Default value to return if env var is not set or empty
|
||||
strip_whitespace: Whether to strip whitespace from each element
|
||||
remove_empty: Whether to remove empty strings from the result
|
||||
required: If True, raise an error when the env var is missing and no default provided
|
||||
|
||||
Returns:
|
||||
List of strings, the default if env var is empty/None or an empty list
|
||||
|
||||
Raises:
|
||||
ValueError: If required=True and env var is missing and there is no default
|
||||
"""
|
||||
# Get the environment variable value
|
||||
env_value = os.environ.get(key)
|
||||
|
||||
# Handle required environment variables
|
||||
if required and env_value is None and default is None:
|
||||
raise ValueError(f"Required environment variable '{key}' is not set")
|
||||
|
||||
if env_value:
|
||||
items = env_value.split(separator)
|
||||
if strip_whitespace:
|
||||
items = [item.strip() for item in items]
|
||||
if remove_empty:
|
||||
items = [item for item in items if item]
|
||||
return items
|
||||
elif default is not None:
|
||||
return default
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
def get_choice_from_env(
|
||||
env_key: str,
|
||||
choices: set[str],
|
||||
default: str | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
Gets and validates an environment variable against a set of allowed choices.
|
||||
|
||||
Args:
|
||||
env_key: The environment variable key to validate
|
||||
choices: Set of valid choices for the environment variable
|
||||
default: Optional default value if environment variable is not set
|
||||
|
||||
Returns:
|
||||
The validated environment variable value
|
||||
|
||||
Raises:
|
||||
ValueError: If the environment variable value is not in choices
|
||||
or if no default is provided and env var is missing
|
||||
"""
|
||||
value = os.environ.get(env_key, default)
|
||||
|
||||
if value is None:
|
||||
raise ValueError(
|
||||
f"Environment variable '{env_key}' is required but not set.",
|
||||
)
|
||||
|
||||
if value not in choices:
|
||||
raise ValueError(
|
||||
f"Environment variable '{env_key}' has invalid value '{value}'. "
|
||||
f"Valid choices are: {', '.join(sorted(choices))}",
|
||||
)
|
||||
|
||||
return value
|
||||
Reference in New Issue
Block a user