diff --git a/src/paperless/settings.py b/src/paperless/settings.py index cf119ea8a..fcd933cfd 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -109,6 +109,16 @@ def _parse_redis_url(env_redis: Optional[str]) -> Tuple[str]: def _parse_beat_schedule() -> Dict: + """ + Configures the scheduled tasks, according to default or + environment variables. Task expiration is configured so the task will + expire (and not run), shortly before the default frequency will put another + of the same task into the queue + + + https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries + https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration + """ schedule = {} tasks = [ { @@ -117,6 +127,11 @@ def _parse_beat_schedule() -> Dict: # Default every ten minutes "env_default": "*/10 * * * *", "task": "paperless_mail.tasks.process_mail_accounts", + "options": { + # 1 minute before default schedule sends again + "expires": 9.0 + * 60.0, + }, }, { "name": "Train the classifier", @@ -124,6 +139,11 @@ def _parse_beat_schedule() -> Dict: # Default hourly at 5 minutes past the hour "env_default": "5 */1 * * *", "task": "documents.tasks.train_classifier", + "options": { + # 1 minute before default schedule sends again + "expires": 59.0 + * 60.0, + }, }, { "name": "Optimize the index", @@ -131,6 +151,12 @@ def _parse_beat_schedule() -> Dict: # Default daily at midnight "env_default": "0 0 * * *", "task": "documents.tasks.index_optimize", + "options": { + # 1 hour before default schedule sends again + "expires": 23.0 + * 60.0 + * 60.0, + }, }, { "name": "Perform sanity check", @@ -138,6 +164,13 @@ def _parse_beat_schedule() -> Dict: # Default Sunday at 00:30 "env_default": "30 0 * * sun", "task": "documents.tasks.sanity_check", + "options": { + # 1 hour before default schedule sends again + "expires": 7.0 + * 23.0 + * 60.0 + * 60.0, + }, }, ] for task in tasks: @@ -151,9 +184,11 @@ def _parse_beat_schedule() -> Dict: # - five time-and-date fields # - separated by at least one blank minute, hour, day_month, month, day_week = value.split(" ") + schedule[task["name"]] = { "task": task["task"], "schedule": crontab(minute, hour, day_week, day_month, month), + "options": task["options"], } return schedule @@ -561,22 +596,21 @@ LOGGING = { # Task queue # ############################################################################### -TASK_WORKERS = __get_int("PAPERLESS_TASK_WORKERS", 1) - -WORKER_TIMEOUT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800) +# https://docs.celeryq.dev/en/stable/userguide/configuration.html CELERY_BROKER_URL = _CELERY_REDIS_URL CELERY_TIMEZONE = TIME_ZONE CELERY_WORKER_HIJACK_ROOT_LOGGER = False -CELERY_WORKER_CONCURRENCY = TASK_WORKERS +CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1) +TASK_WORKERS = CELERY_WORKER_CONCURRENCY CELERY_WORKER_MAX_TASKS_PER_CHILD = 1 CELERY_WORKER_SEND_TASK_EVENTS = True - +CELERY_TASK_SEND_SENT_EVENT = True CELERY_SEND_TASK_SENT_EVENT = True CELERY_TASK_TRACK_STARTED = True -CELERY_TASK_TIME_LIMIT = WORKER_TIMEOUT +CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800) CELERY_RESULT_EXTENDED = True CELERY_RESULT_BACKEND = "django-db" @@ -608,7 +642,7 @@ def default_threads_per_worker(task_workers) -> int: THREADS_PER_WORKER = os.getenv( "PAPERLESS_THREADS_PER_WORKER", - default_threads_per_worker(TASK_WORKERS), + default_threads_per_worker(CELERY_WORKER_CONCURRENCY), ) ############################################################################### diff --git a/src/paperless/tests/test_settings.py b/src/paperless/tests/test_settings.py index f6d25f6fd..a85f0e06a 100644 --- a/src/paperless/tests/test_settings.py +++ b/src/paperless/tests/test_settings.py @@ -149,6 +149,11 @@ class TestRedisSocketConversion(TestCase): class TestCeleryScheduleParsing(TestCase): + MAIL_EXPIRE_TIME = 9.0 * 60.0 + CLASSIFIER_EXPIRE_TIME = 59.0 * 60.0 + INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0 + SANITY_EXPIRE_TIME = 7.0 * 23.0 * 60.0 * 60.0 + def test_schedule_configuration_default(self): """ GIVEN: @@ -165,18 +170,22 @@ class TestCeleryScheduleParsing(TestCase): "Check all e-mail accounts": { "task": "paperless_mail.tasks.process_mail_accounts", "schedule": crontab(minute="*/10"), + "options": {"expires": self.MAIL_EXPIRE_TIME}, }, "Train the classifier": { "task": "documents.tasks.train_classifier", "schedule": crontab(minute="5", hour="*/1"), + "options": {"expires": self.CLASSIFIER_EXPIRE_TIME}, }, "Optimize the index": { "task": "documents.tasks.index_optimize", "schedule": crontab(minute=0, hour=0), + "options": {"expires": self.INDEX_EXPIRE_TIME}, }, "Perform sanity check": { "task": "documents.tasks.sanity_check", "schedule": crontab(minute=30, hour=0, day_of_week="sun"), + "options": {"expires": self.SANITY_EXPIRE_TIME}, }, }, schedule, @@ -203,18 +212,22 @@ class TestCeleryScheduleParsing(TestCase): "Check all e-mail accounts": { "task": "paperless_mail.tasks.process_mail_accounts", "schedule": crontab(minute="*/50", day_of_week="mon"), + "options": {"expires": self.MAIL_EXPIRE_TIME}, }, "Train the classifier": { "task": "documents.tasks.train_classifier", "schedule": crontab(minute="5", hour="*/1"), + "options": {"expires": self.CLASSIFIER_EXPIRE_TIME}, }, "Optimize the index": { "task": "documents.tasks.index_optimize", "schedule": crontab(minute=0, hour=0), + "options": {"expires": self.INDEX_EXPIRE_TIME}, }, "Perform sanity check": { "task": "documents.tasks.sanity_check", "schedule": crontab(minute=30, hour=0, day_of_week="sun"), + "options": {"expires": self.SANITY_EXPIRE_TIME}, }, }, schedule, @@ -238,14 +251,17 @@ class TestCeleryScheduleParsing(TestCase): "Check all e-mail accounts": { "task": "paperless_mail.tasks.process_mail_accounts", "schedule": crontab(minute="*/10"), + "options": {"expires": self.MAIL_EXPIRE_TIME}, }, "Train the classifier": { "task": "documents.tasks.train_classifier", "schedule": crontab(minute="5", hour="*/1"), + "options": {"expires": self.CLASSIFIER_EXPIRE_TIME}, }, "Perform sanity check": { "task": "documents.tasks.sanity_check", "schedule": crontab(minute=30, hour=0, day_of_week="sun"), + "options": {"expires": self.SANITY_EXPIRE_TIME}, }, }, schedule,