Expire the scheduled tasks shortly a new one will be added to the queue by default

This commit is contained in:
Trenton H 2023-02-03 14:41:10 -08:00
parent b9f0418038
commit 7af0b47ba9
2 changed files with 57 additions and 7 deletions

View File

@ -109,6 +109,16 @@ def _parse_redis_url(env_redis: Optional[str]) -> Tuple[str]:
def _parse_beat_schedule() -> Dict:
"""
Configures the scheduled tasks, according to default or
environment variables. Task expiration is configured so the task will
expire (and not run), shortly before the default frequency will put another
of the same task into the queue
https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html#beat-entries
https://docs.celeryq.dev/en/latest/userguide/calling.html#expiration
"""
schedule = {}
tasks = [
{
@ -117,6 +127,11 @@ def _parse_beat_schedule() -> Dict:
# Default every ten minutes
"env_default": "*/10 * * * *",
"task": "paperless_mail.tasks.process_mail_accounts",
"options": {
# 1 minute before default schedule sends again
"expires": 9.0
* 60.0,
},
},
{
"name": "Train the classifier",
@ -124,6 +139,11 @@ def _parse_beat_schedule() -> Dict:
# Default hourly at 5 minutes past the hour
"env_default": "5 */1 * * *",
"task": "documents.tasks.train_classifier",
"options": {
# 1 minute before default schedule sends again
"expires": 59.0
* 60.0,
},
},
{
"name": "Optimize the index",
@ -131,6 +151,12 @@ def _parse_beat_schedule() -> Dict:
# Default daily at midnight
"env_default": "0 0 * * *",
"task": "documents.tasks.index_optimize",
"options": {
# 1 hour before default schedule sends again
"expires": 23.0
* 60.0
* 60.0,
},
},
{
"name": "Perform sanity check",
@ -138,6 +164,13 @@ def _parse_beat_schedule() -> Dict:
# Default Sunday at 00:30
"env_default": "30 0 * * sun",
"task": "documents.tasks.sanity_check",
"options": {
# 1 hour before default schedule sends again
"expires": 7.0
* 23.0
* 60.0
* 60.0,
},
},
]
for task in tasks:
@ -151,9 +184,11 @@ def _parse_beat_schedule() -> Dict:
# - five time-and-date fields
# - separated by at least one blank
minute, hour, day_month, month, day_week = value.split(" ")
schedule[task["name"]] = {
"task": task["task"],
"schedule": crontab(minute, hour, day_week, day_month, month),
"options": task["options"],
}
return schedule
@ -561,22 +596,21 @@ LOGGING = {
# Task queue #
###############################################################################
TASK_WORKERS = __get_int("PAPERLESS_TASK_WORKERS", 1)
WORKER_TIMEOUT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
# https://docs.celeryq.dev/en/stable/userguide/configuration.html
CELERY_BROKER_URL = _CELERY_REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_CONCURRENCY = TASK_WORKERS
CELERY_WORKER_CONCURRENCY: Final[int] = __get_int("PAPERLESS_TASK_WORKERS", 1)
TASK_WORKERS = CELERY_WORKER_CONCURRENCY
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
CELERY_SEND_TASK_SENT_EVENT = True
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = WORKER_TIMEOUT
CELERY_TASK_TIME_LIMIT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db"
@ -608,7 +642,7 @@ def default_threads_per_worker(task_workers) -> int:
THREADS_PER_WORKER = os.getenv(
"PAPERLESS_THREADS_PER_WORKER",
default_threads_per_worker(TASK_WORKERS),
default_threads_per_worker(CELERY_WORKER_CONCURRENCY),
)
###############################################################################

View File

@ -149,6 +149,11 @@ class TestRedisSocketConversion(TestCase):
class TestCeleryScheduleParsing(TestCase):
MAIL_EXPIRE_TIME = 9.0 * 60.0
CLASSIFIER_EXPIRE_TIME = 59.0 * 60.0
INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0
SANITY_EXPIRE_TIME = 7.0 * 23.0 * 60.0 * 60.0
def test_schedule_configuration_default(self):
"""
GIVEN:
@ -165,18 +170,22 @@ class TestCeleryScheduleParsing(TestCase):
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
},
schedule,
@ -203,18 +212,22 @@ class TestCeleryScheduleParsing(TestCase):
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/50", day_of_week="mon"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
"options": {"expires": self.INDEX_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
},
schedule,
@ -238,14 +251,17 @@ class TestCeleryScheduleParsing(TestCase):
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
"options": {"expires": self.MAIL_EXPIRE_TIME},
},
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
"options": {"expires": self.CLASSIFIER_EXPIRE_TIME},
},
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
"options": {"expires": self.SANITY_EXPIRE_TIME},
},
},
schedule,