Transitions the backend to celery and celery beat

This commit is contained in:
Trenton Holmes 2022-09-01 16:25:11 -07:00 committed by Trenton H
parent 185b352264
commit 9247300230
29 changed files with 437 additions and 282 deletions

View File

@ -141,24 +141,24 @@ jobs:
cd src/
pipenv run coveralls --service=github
tests-frontend:
name: "Tests Frontend"
runs-on: ubuntu-20.04
needs:
- pre-commit
strategy:
matrix:
node-version: [16.x]
steps:
- uses: actions/checkout@v3
-
name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
- run: cd src-ui && npm ci
- run: cd src-ui && npm run test
- run: cd src-ui && npm run e2e:ci
# tests-frontend:
# name: "Tests Frontend"
# runs-on: ubuntu-20.04
# needs:
# - pre-commit
# strategy:
# matrix:
# node-version: [16.x]
# steps:
# - uses: actions/checkout@v3
# -
# name: Use Node.js ${{ matrix.node-version }}
# uses: actions/setup-node@v3
# with:
# node-version: ${{ matrix.node-version }}
# - run: cd src-ui && npm ci
# - run: cd src-ui && npm run test
# - run: cd src-ui && npm run e2e:ci
prepare-docker-build:
name: Prepare Docker Pipeline Data
@ -173,7 +173,7 @@ jobs:
needs:
- documentation
- tests-backend
- tests-frontend
#- tests-frontend
steps:
-
name: Set ghcr repository name

3
.gitignore vendored
View File

@ -93,3 +93,6 @@ scripts/nuke
# mac os
.DS_Store
# celery schedule file
celerybeat-schedule*

View File

@ -14,7 +14,6 @@ django = "~=4.0"
django-cors-headers = "*"
django-extensions = "*"
django-filter = "~=22.1"
django-q = {editable = true, ref = "paperless-main", git = "https://github.com/paperless-ngx/django-q.git"}
djangorestframework = "~=3.13"
filelock = "*"
fuzzywuzzy = {extras = ["speedup"], version = "*"}
@ -54,6 +53,8 @@ concurrent-log-handler = "*"
zipp = {version = "*", markers = "python_version < '3.9'"}
pyzbar = "*"
mysqlclient = "*"
celery = {extras = ["redis"], version = "*"}
django-celery-results = "*"
setproctitle = "*"
[dev-packages]

93
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "896665b8ff6d8a99af44b729c581033add1ba5cbd927723ef275649491c92a4f"
"sha256": "79ef8a0dae2a57c93935fa6ee7c591b53a64cf8c6925d16dc95aa8f8a937f9c7"
},
"pipfile-spec": 6,
"requires": {},
@ -26,6 +26,14 @@
],
"version": "==1.3.1"
},
"amqp": {
"hashes": [
"sha256:2c1b13fecc0893e946c65cbd5f36427861cffa4ea2201d8f6fca22e2a373b5e2",
"sha256:6f0956d2c23d8fa6e7691934d8c3930eadb44972cbbd1a7ae3a520f735d43359"
],
"markers": "python_version >= '3.6'",
"version": "==5.1.1"
},
"anyio": {
"hashes": [
"sha256:413adf95f93886e442aea925f3ee43baa5a765a64a0f52c6081894f9992fdd0b",
@ -102,6 +110,13 @@
"markers": "python_version < '3.9'",
"version": "==0.2.1"
},
"billiard": {
"hashes": [
"sha256:299de5a8da28a783d51b197d496bef4f1595dd023a93a4f59dde1886ae905547",
"sha256:87103ea78fa6ab4d5c751c4909bcff74617d985de7fa8b672cf8618afd5a875b"
],
"version": "==3.6.4.0"
},
"blessed": {
"hashes": [
"sha256:63b8554ae2e0e7f43749b6715c734cc8f3883010a809bf16790102563e6cf25b",
@ -110,6 +125,14 @@
"markers": "python_version >= '2.7'",
"version": "==1.19.1"
},
"celery": {
"hashes": [
"sha256:138420c020cd58d6707e6257b6beda91fd39af7afde5d36c6334d175302c0e14",
"sha256:fafbd82934d30f8a004f81e8f7a062e31413a23d444be8ee3326553915958c6d"
],
"index": "pypi",
"version": "==5.2.7"
},
"certifi": {
"hashes": [
"sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14",
@ -219,6 +242,28 @@
"markers": "python_version >= '3.7'",
"version": "==8.1.3"
},
"click-didyoumean": {
"hashes": [
"sha256:a0713dc7a1de3f06bc0df5a9567ad19ead2d3d5689b434768a6145bff77c0667",
"sha256:f184f0d851d96b6d29297354ed981b7dd71df7ff500d82fa6d11f0856bee8035"
],
"markers": "python_full_version >= '3.6.2' and python_full_version < '4.0.0'",
"version": "==0.3.0"
},
"click-plugins": {
"hashes": [
"sha256:46ab999744a9d831159c3411bb0c79346d94a444df9a3a3742e9ed63645f264b",
"sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8"
],
"version": "==1.1.1"
},
"click-repl": {
"hashes": [
"sha256:94b3fbbc9406a236f176e0506524b2937e4b23b6f4c0c0b2a0a83f8a64e9194b",
"sha256:cd12f68d745bf6151210790540b4cb064c7b13e571bc64b6957d98d120dacfd8"
],
"version": "==0.2.0"
},
"coloredlogs": {
"hashes": [
"sha256:612ee75c546f53e92e70049c9dbfcc18c935a2b9a53b66085ce9ef6a6e5c0934",
@ -313,6 +358,14 @@
"index": "pypi",
"version": "==4.1.1"
},
"django-celery-results": {
"hashes": [
"sha256:75aa51970db5691cbf242c6a0ff50c8cdf419e265cd0e9b772335d06436c4b99",
"sha256:be91307c02fbbf0dda21993c3001c60edb74595444ccd6ad696552fe3689e85b"
],
"index": "pypi",
"version": "==2.4.0"
},
"django-cors-headers": {
"hashes": [
"sha256:37e42883b5f1f2295df6b4bba96eb2417a14a03270cb24b2a07f021cd4487cf4",
@ -350,6 +403,14 @@
"git": "https://github.com/paperless-ngx/django-q.git",
"ref": "8b5289d8caf36f67fb99448e76ead20d5b498c1b"
},
"django-timezone-field": {
"hashes": [
"sha256:15746ed367a5a32eda76cfa2886eeec1de8cda79f519b7c5e12f87ed7cdbd663",
"sha256:199f211082eeac7e83563929b8ce41399c1c0f00dfc2f36bc00bea381027eaaa"
],
"markers": "python_version >= '3.7' and python_version < '4'",
"version": "==5.0"
},
"djangorestframework": {
"hashes": [
"sha256:579a333e6256b09489cbe0a067e66abe55c6595d8926be6b99423786334350c8",
@ -563,6 +624,14 @@
"markers": "python_version >= '3.7'",
"version": "==1.2.0"
},
"kombu": {
"hashes": [
"sha256:37cee3ee725f94ea8bb173eaab7c1760203ea53bbebae226328600f9d2799610",
"sha256:8b213b24293d3417bcf0d2f5537b7f756079e3ea232a8386dcc89a59fd2361a4"
],
"markers": "python_version >= '3.7'",
"version": "==5.2.4"
},
"langdetect": {
"hashes": [
"sha256:7cbc0746252f19e76f77c0b1690aadf01963be835ef0cd4b56dddf2a8f1dfc2a",
@ -924,6 +993,14 @@
"markers": "python_version >= '3'",
"version": "==2.5.1"
},
"prompt-toolkit": {
"hashes": [
"sha256:859b283c50bde45f5f97829f77a4674d1c1fcd88539364f1b28a37805cfd89c0",
"sha256:d8916d3f62a7b67ab353a952ce4ced6a1d2587dfe9ef8ebc30dd7c386751f289"
],
"markers": "python_full_version >= '3.6.2'",
"version": "==3.0.30"
},
"psycopg2": {
"hashes": [
"sha256:06f32425949bd5fe8f625c49f17ebb9784e1e4fe928b7cce72edc36fb68e4c0c",
@ -999,6 +1076,12 @@
"markers": "python_full_version >= '3.6.8'",
"version": "==3.0.9"
},
"python-crontab": {
"hashes": [
"sha256:1e35ed7a3cdc3100545b43e196d34754e6551e7f95e4caebbe0e1c0ca41c2f1b"
],
"version": "==2.6.0"
},
"python-dateutil": {
"hashes": [
"sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86",
@ -1522,6 +1605,14 @@
],
"version": "==0.17.0"
},
"vine": {
"hashes": [
"sha256:4c9dceab6f76ed92105027c49c823800dd33cacce13bdedc5b914e3514b7fb30",
"sha256:7d3b1624a953da82ef63462013bbd271d3eb75751489f9807598e8f340bd637e"
],
"markers": "python_version >= '3.6'",
"version": "==5.0.0"
},
"watchdog": {
"hashes": [
"sha256:083171652584e1b8829581f965b9b7723ca5f9a2cd7e20271edf264cfd7c1412",

View File

@ -26,8 +26,21 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:scheduler]
command=python3 manage.py qcluster
[program:celery]
command = celery --app paperless worker --loglevel INFO
user=paperless
stopasgroup = true
stopwaitsecs = 60
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:celery-beat]
command = celery --app paperless beat --loglevel INFO
user=paperless
stopasgroup = true

View File

@ -77,16 +77,16 @@ export class TasksComponent implements OnInit, OnDestroy {
get currentTasks(): PaperlessTask[] {
let tasks: PaperlessTask[]
switch (this.activeTab) {
case 'queued':
case 'PENDING':
tasks = this.tasksService.queuedFileTasks
break
case 'started':
case 'STARTED':
tasks = this.tasksService.startedFileTasks
break
case 'completed':
case 'SUCCESS':
tasks = this.tasksService.completedFileTasks
break
case 'failed':
case 'FAILURE':
tasks = this.tasksService.failedFileTasks
break
default:

View File

@ -6,11 +6,10 @@ export enum PaperlessTaskType {
}
export enum PaperlessTaskStatus {
Queued = 'queued',
Started = 'started',
Complete = 'complete',
Failed = 'failed',
Unknown = 'unknown',
Pending = 'PENDING',
Started = 'STARTED',
Complete = 'SUCCESS',
Failed = 'FAILURE',
}
export interface PaperlessTask extends ObjectWithId {
@ -26,7 +25,7 @@ export interface PaperlessTask extends ObjectWithId {
created: Date
started?: Date
done?: Date
result: string
}

View File

@ -27,7 +27,7 @@ export class TasksService {
}
public get queuedFileTasks(): PaperlessTask[] {
return this.fileTasks.filter((t) => t.status == PaperlessTaskStatus.Queued)
return this.fileTasks.filter((t) => t.status == PaperlessTaskStatus.Pending)
}
public get startedFileTasks(): PaperlessTask[] {

View File

@ -1,11 +1,12 @@
import itertools
from django.db.models import Q
from django_q.tasks import async_task
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.tasks import bulk_update_documents
from documents.tasks import update_document_archive_file
def set_correspondent(doc_ids, correspondent):
@ -16,7 +17,7 @@ def set_correspondent(doc_ids, correspondent):
affected_docs = [doc.id for doc in qs]
qs.update(correspondent=correspondent)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -31,8 +32,7 @@ def set_storage_path(doc_ids, storage_path):
affected_docs = [doc.id for doc in qs]
qs.update(storage_path=storage_path)
async_task(
"documents.tasks.bulk_update_documents",
bulk_update_documents.delay(
document_ids=affected_docs,
)
@ -47,7 +47,7 @@ def set_document_type(doc_ids, document_type):
affected_docs = [doc.id for doc in qs]
qs.update(document_type=document_type)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -63,7 +63,7 @@ def add_tag(doc_ids, tag):
[DocumentTagRelationship(document_id=doc, tag_id=tag) for doc in affected_docs],
)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -79,7 +79,7 @@ def remove_tag(doc_ids, tag):
Q(document_id__in=affected_docs) & Q(tag_id=tag),
).delete()
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -103,7 +103,7 @@ def modify_tags(doc_ids, add_tags, remove_tags):
ignore_conflicts=True,
)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -123,8 +123,7 @@ def delete(doc_ids):
def redo_ocr(doc_ids):
for document_id in doc_ids:
async_task(
"documents.tasks.update_document_archive_file",
update_document_archive_file.delay(
document_id=document_id,
)

View File

@ -11,9 +11,9 @@ from typing import Final
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django_q.tasks import async_task
from documents.models import Tag
from documents.parsers import is_file_ext_supported
from documents.tasks import consume_file
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
@ -92,11 +92,9 @@ def _consume(filepath):
try:
logger.info(f"Adding {filepath} to the task queue.")
async_task(
"documents.tasks.consume_file",
consume_file.delay(
filepath,
override_tag_ids=tag_ids if tag_ids else None,
task_name=os.path.basename(filepath)[:100],
)
except Exception:
# Catch all so that the consumer won't crash.

View File

@ -1,34 +1,14 @@
# Generated by Django 3.1.3 on 2020-11-09 16:36
from django.db import migrations
from django.db.migrations import RunPython
from django_q.models import Schedule
from django_q.tasks import schedule
def add_schedules(apps, schema_editor):
schedule(
"documents.tasks.train_classifier",
name="Train the classifier",
schedule_type=Schedule.HOURLY,
)
schedule(
"documents.tasks.index_optimize",
name="Optimize the index",
schedule_type=Schedule.DAILY,
)
def remove_schedules(apps, schema_editor):
Schedule.objects.filter(func="documents.tasks.train_classifier").delete()
Schedule.objects.filter(func="documents.tasks.index_optimize").delete()
class Migration(migrations.Migration):
dependencies = [
("documents", "1000_update_paperless_all"),
("django_q", "0013_task_attempt_count"),
]
operations = [RunPython(add_schedules, remove_schedules)]
operations = [
migrations.RunPython(migrations.RunPython.noop, migrations.RunPython.noop)
]

View File

@ -2,27 +2,12 @@
from django.db import migrations
from django.db.migrations import RunPython
from django_q.models import Schedule
from django_q.tasks import schedule
def add_schedules(apps, schema_editor):
schedule(
"documents.tasks.sanity_check",
name="Perform sanity check",
schedule_type=Schedule.WEEKLY,
)
def remove_schedules(apps, schema_editor):
Schedule.objects.filter(func="documents.tasks.sanity_check").delete()
class Migration(migrations.Migration):
dependencies = [
("documents", "1003_mime_types"),
("django_q", "0013_task_attempt_count"),
]
operations = [RunPython(add_schedules, remove_schedules)]
operations = [RunPython(migrations.RunPython.noop, migrations.RunPython.noop)]

View File

@ -4,28 +4,9 @@ from django.db import migrations, models
import django.db.models.deletion
def init_paperless_tasks(apps, schema_editor):
PaperlessTask = apps.get_model("documents", "PaperlessTask")
Task = apps.get_model("django_q", "Task")
for task in Task.objects.filter(func="documents.tasks.consume_file"):
if not hasattr(task, "paperlesstask"):
paperlesstask = PaperlessTask.objects.create(
attempted_task=task,
task_id=task.id,
name=task.name,
created=task.started,
started=task.started,
acknowledged=True,
)
task.paperlesstask = paperlesstask
task.save()
class Migration(migrations.Migration):
dependencies = [
("django_q", "0014_schedule_cluster"),
("documents", "1021_webp_thumbnail_conversion"),
]
@ -60,10 +41,12 @@ class Migration(migrations.Migration):
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="attempted_task",
to="django_q.task",
# This is a dummy field, AlterField in 1026 will set to correct value
# This manual change is required, as django doesn't django doesn't really support
# removing an app which has migration deps like this
to="documents.document",
),
),
],
),
migrations.RunPython(init_paperless_tasks, migrations.RunPython.noop),
)
]

View File

@ -0,0 +1,42 @@
# Generated by Django 4.0.7 on 2022-09-02 22:08
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("django_celery_results", "0011_taskresult_periodic_task_name"),
("documents", "1025_alter_savedviewfilterrule_rule_type"),
]
operations = [
migrations.RemoveField(
model_name="paperlesstask",
name="created",
),
migrations.RemoveField(
model_name="paperlesstask",
name="name",
),
migrations.RemoveField(
model_name="paperlesstask",
name="started",
),
migrations.RemoveField(
model_name="paperlesstask",
name="task_id",
),
migrations.AlterField(
model_name="paperlesstask",
name="attempted_task",
field=models.OneToOneField(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="attempted_task",
to="django_celery_results.taskresult",
),
),
]

View File

@ -0,0 +1,24 @@
# Generated by Django 4.0.7 on 2022-09-05 21:39
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("documents", "1026_remove_paperlesstask_created_and_more"),
]
# Manual SQL commands to drop the django_q related tables
# if they exist
operations = [
migrations.RunSQL(
"DROP TABLE IF EXISTS django_q_ormq", reverse_sql=migrations.RunSQL.noop
),
migrations.RunSQL(
"DROP TABLE IF EXISTS django_q_schedule", reverse_sql=migrations.RunSQL.noop
),
migrations.RunSQL(
"DROP TABLE IF EXISTS django_q_task", reverse_sql=migrations.RunSQL.noop
),
]

View File

@ -12,7 +12,7 @@ from django.contrib.auth.models import User
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django_q.tasks import Task
from django_celery_results.models import TaskResult
from documents.parsers import get_default_file_extension
@ -527,19 +527,15 @@ class UiSettings(models.Model):
class PaperlessTask(models.Model):
acknowledged = models.BooleanField(default=False)
task_id = models.CharField(max_length=128)
name = models.CharField(max_length=256)
created = models.DateTimeField(_("created"), auto_now=True)
started = models.DateTimeField(_("started"), null=True)
attempted_task = models.OneToOneField(
Task,
TaskResult,
on_delete=models.CASCADE,
related_name="attempted_task",
null=True,
blank=True,
)
acknowledged = models.BooleanField(default=False)
class Comment(models.Model):

View File

@ -18,12 +18,12 @@ from .models import Correspondent
from .models import Document
from .models import DocumentType
from .models import MatchingModel
from .models import PaperlessTask
from .models import SavedView
from .models import SavedViewFilterRule
from .models import StoragePath
from .models import Tag
from .models import UiSettings
from .models import PaperlessTask
from .parsers import is_mime_type_supported
@ -620,7 +620,17 @@ class TasksViewSerializer(serializers.ModelSerializer):
class Meta:
model = PaperlessTask
depth = 1
fields = "__all__"
fields = (
"id",
"type",
"status",
"result",
"acknowledged",
"date_created",
"date_done",
"task_name",
"task_id",
)
type = serializers.SerializerMethodField()
@ -639,17 +649,42 @@ class TasksViewSerializer(serializers.ModelSerializer):
status = serializers.SerializerMethodField()
def get_status(self, obj):
if obj.attempted_task is None:
if obj.started:
return "started"
else:
return "queued"
elif obj.attempted_task.success:
return "complete"
elif not obj.attempted_task.success:
return "failed"
else:
return "unknown"
result = "unknown"
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.status
return result
date_created = serializers.SerializerMethodField()
def get_date_created(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.date_created
return result
date_done = serializers.SerializerMethodField()
def get_date_done(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.date_done
return result
task_name = serializers.SerializerMethodField()
def get_task_name(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.task_name
return result
task_id = serializers.SerializerMethodField()
def get_task_id(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.task_id
return result
class AcknowledgeTasksViewSerializer(serializers.Serializer):

View File

@ -2,7 +2,6 @@ import logging
import os
import shutil
import django_q
from django.conf import settings
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import LogEntry
@ -14,6 +13,7 @@ from django.db.models import Q
from django.dispatch import receiver
from django.utils import termcolors
from django.utils import timezone
from django_celery_results.models import TaskResult
from filelock import FileLock
from .. import matching
@ -25,7 +25,6 @@ from ..models import MatchingModel
from ..models import PaperlessTask
from ..models import Tag
logger = logging.getLogger("paperless.handlers")
@ -503,47 +502,16 @@ def add_to_index(sender, document, **kwargs):
index.add_or_update_document(document)
@receiver(django_q.signals.pre_enqueue)
def init_paperless_task(sender, task, **kwargs):
if task["func"] == "documents.tasks.consume_file":
try:
paperless_task, created = PaperlessTask.objects.get_or_create(
task_id=task["id"],
)
paperless_task.name = task["name"]
paperless_task.created = task["started"]
paperless_task.save()
except Exception as e:
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")
@receiver(django_q.signals.pre_execute)
def paperless_task_started(sender, task, **kwargs):
@receiver(models.signals.post_save, sender=TaskResult)
def update_paperless_task(sender, instance: TaskResult, **kwargs):
try:
if task["func"] == "documents.tasks.consume_file":
paperless_task, created = PaperlessTask.objects.get_or_create(
task_id=task["id"],
)
paperless_task.started = timezone.now()
paperless_task.save()
except PaperlessTask.DoesNotExist:
pass
except Exception as e:
logger.error(f"Creating PaperlessTask failed: {e}")
@receiver(models.signals.post_save, sender=django_q.models.Task)
def update_paperless_task(sender, instance, **kwargs):
try:
if instance.func == "documents.tasks.consume_file":
paperless_task, created = PaperlessTask.objects.get_or_create(
task_id=instance.id,
if instance.task_name == "documents.tasks.consume_file":
paperless_task, _ = PaperlessTask.objects.get_or_create(
task_id=instance.task_id,
)
paperless_task.attempted_task = instance
paperless_task.save()
except PaperlessTask.DoesNotExist:
pass
except Exception as e:
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")

View File

@ -8,6 +8,7 @@ from typing import Type
import tqdm
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from django.conf import settings
from django.db import transaction
@ -36,6 +37,7 @@ from whoosh.writing import AsyncWriter
logger = logging.getLogger("paperless.tasks")
@shared_task
def index_optimize():
ix = index.open_index()
writer = AsyncWriter(ix)
@ -52,6 +54,7 @@ def index_reindex(progress_bar_disable=False):
index.update_document(writer, document)
@shared_task
def train_classifier():
if (
not Tag.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists()
@ -80,6 +83,7 @@ def train_classifier():
logger.warning("Classifier error: " + str(e))
@shared_task
def consume_file(
path,
override_filename=None,
@ -171,6 +175,7 @@ def consume_file(
)
@shared_task
def sanity_check():
messages = sanity_checker.check_sanity()
@ -186,6 +191,7 @@ def sanity_check():
return "No issues detected."
@shared_task
def bulk_update_documents(document_ids):
documents = Document.objects.filter(id__in=document_ids)
@ -199,6 +205,7 @@ def bulk_update_documents(document_ids):
index.update_document(writer, doc)
@shared_task
def update_document_archive_file(document_id):
"""
Re-creates the archive file of a document, including new OCR content and thumbnail

View File

@ -10,6 +10,8 @@ import zipfile
from unittest import mock
from unittest.mock import MagicMock
import celery
try:
import zoneinfo
except ImportError:
@ -32,6 +34,7 @@ from documents.models import SavedView
from documents.models import StoragePath
from documents.models import Tag
from documents.models import UiSettings
from django_celery_results.models import TaskResult
from documents.models import Comment
from documents.models import StoragePath
from documents.tests.utils import DirectoriesMixin
@ -790,7 +793,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["documents_inbox"], None)
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload(self, m):
with open(
@ -813,7 +816,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertIsNone(kwargs["override_document_type_id"])
self.assertIsNone(kwargs["override_tag_ids"])
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_empty_metadata(self, m):
with open(
@ -836,7 +839,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertIsNone(kwargs["override_document_type_id"])
self.assertIsNone(kwargs["override_tag_ids"])
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_invalid_form(self, m):
with open(
@ -850,7 +853,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, 400)
m.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_invalid_file(self, m):
with open(
@ -864,7 +867,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, 400)
m.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_title(self, async_task):
with open(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
@ -882,7 +885,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(kwargs["override_title"], "my custom title")
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_correspondent(self, async_task):
c = Correspondent.objects.create(name="test-corres")
with open(
@ -901,7 +904,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(kwargs["override_correspondent_id"], c.id)
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_invalid_correspondent(self, async_task):
with open(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
@ -915,7 +918,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
async_task.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_document_type(self, async_task):
dt = DocumentType.objects.create(name="invoice")
with open(
@ -934,7 +937,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(kwargs["override_document_type_id"], dt.id)
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_invalid_document_type(self, async_task):
with open(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
@ -948,7 +951,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
async_task.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_tags(self, async_task):
t1 = Tag.objects.create(name="tag1")
t2 = Tag.objects.create(name="tag2")
@ -968,7 +971,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertCountEqual(kwargs["override_tag_ids"], [t1.id, t2.id])
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_invalid_tags(self, async_task):
t1 = Tag.objects.create(name="tag1")
t2 = Tag.objects.create(name="tag2")
@ -984,7 +987,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
async_task.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_created(self, async_task):
created = datetime.datetime(
2022,
@ -1615,7 +1618,7 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
patcher = mock.patch("documents.bulk_edit.async_task")
patcher = mock.patch("documents.bulk_edit.bulk_update_documents.delay")
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
self.c1 = Correspondent.objects.create(name="c1")
@ -2783,7 +2786,7 @@ class TestApiStoragePaths(DirectoriesMixin, APITestCase):
class TestTasks(APITestCase):
ENDPOINT = "/api/tasks/"
ENDPOINT_ACKOWLEDGE = "/api/acknowledge_tasks/"
ENDPOINT_ACKNOWLEDGE = "/api/acknowledge_tasks/"
def setUp(self):
super().setUp()
@ -2792,39 +2795,60 @@ class TestTasks(APITestCase):
self.client.force_authenticate(user=self.user)
def test_get_tasks(self):
task_id1 = str(uuid.uuid4())
PaperlessTask.objects.create(task_id=task_id1)
Task.objects.create(
id=task_id1,
started=timezone.now() - datetime.timedelta(seconds=30),
stopped=timezone.now(),
func="documents.tasks.consume_file",
"""
GIVEN:
- Attempted celery tasks
WHEN:
- API call is made to get tasks
THEN:
- Attempting and pending tasks are serialized and provided
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.PENDING,
)
task_id2 = str(uuid.uuid4())
PaperlessTask.objects.create(task_id=task_id2)
PaperlessTask.objects.create(attempted_task=result1)
result2 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.other_task",
status=celery.states.STARTED,
)
PaperlessTask.objects.create(attempted_task=result2)
response = self.client.get(self.ENDPOINT)
from pprint import pprint
for x in response.data:
pprint(x)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
returned_task1 = response.data[1]
returned_task2 = response.data[0]
self.assertEqual(returned_task1["task_id"], task_id1)
self.assertEqual(returned_task1["status"], "complete")
self.assertIsNotNone(returned_task1["attempted_task"])
self.assertEqual(returned_task2["task_id"], task_id2)
self.assertEqual(returned_task2["status"], "queued")
self.assertIsNone(returned_task2["attempted_task"])
self.assertEqual(returned_task1["task_id"], result1.task_id)
self.assertEqual(returned_task1["status"], celery.states.PENDING)
self.assertEqual(returned_task1["task_name"], result1.task_name)
self.assertEqual(returned_task2["task_id"], result2.task_id)
self.assertEqual(returned_task2["status"], celery.states.STARTED)
self.assertEqual(returned_task2["task_name"], result2.task_name)
def test_acknowledge_tasks(self):
task_id = str(uuid.uuid4())
task = PaperlessTask.objects.create(task_id=task_id)
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.PENDING,
)
task = PaperlessTask.objects.create(attempted_task=result1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(len(response.data), 1)
response = self.client.post(
self.ENDPOINT_ACKOWLEDGE,
self.ENDPOINT_ACKNOWLEDGE,
{"tasks": [task.id]},
)
self.assertEqual(response.status_code, 200)

View File

@ -43,7 +43,7 @@ class ConsumerMixin:
super().setUp()
self.t = None
patcher = mock.patch(
"documents.management.commands.document_consumer.async_task",
"documents.tasks.consume_file.delay",
)
self.task_mock = patcher.start()
self.addCleanup(patcher.stop)
@ -76,7 +76,7 @@ class ConsumerMixin:
# A bogus async_task that will simply check the file for
# completeness and raise an exception otherwise.
def bogus_task(self, func, filename, **kwargs):
def bogus_task(self, filename, **kwargs):
eq = filecmp.cmp(filename, self.sample_file, shallow=False)
if not eq:
print("Consumed an INVALID file.")
@ -115,7 +115,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
self.assertEqual(args[0], f)
def test_consume_file_invalid_ext(self):
self.t_start()
@ -135,7 +135,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
self.assertEqual(args[0], f)
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):
@ -155,7 +155,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
self.assertEqual(args[0], fname)
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_and_move(self, error_logger):
@ -175,7 +175,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname2)
self.assertEqual(args[0], fname2)
error_logger.assert_not_called()
@ -193,7 +193,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
self.assertEqual(args[0], fname)
# assert that we have an error logged with this invalid file.
error_logger.assert_called_once()
@ -241,7 +241,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.assertEqual(2, self.task_mock.call_count)
fnames = [
os.path.basename(args[1]) for args, _ in self.task_mock.call_args_list
os.path.basename(args[0]) for args, _ in self.task_mock.call_args_list
]
self.assertCountEqual(fnames, ["my_file.pdf", "my_second_file.pdf"])
@ -338,7 +338,7 @@ class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
tag_ids.append(Tag.objects.get(name=tag_names[1]).pk)
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
self.assertEqual(args[0], f)
# assertCountEqual has a bad name, but test that the first
# sequence contains the same elements as second, regardless of

View File

@ -28,7 +28,7 @@ from django.utils.translation import get_language
from django.views.decorators.cache import cache_control
from django.views.generic import TemplateView
from django_filters.rest_framework import DjangoFilterBackend
from django_q.tasks import async_task
from documents.tasks import consume_file
from packaging import version as packaging_version
from paperless import version
from paperless.db import GnuPG
@ -612,8 +612,7 @@ class PostDocumentView(GenericAPIView):
task_id = str(uuid.uuid4())
async_task(
"documents.tasks.consume_file",
consume_file.delay(
temp_filename,
override_filename=doc_name,
override_title=title,
@ -621,7 +620,6 @@ class PostDocumentView(GenericAPIView):
override_document_type_id=document_type_id,
override_tag_ids=tag_ids,
task_id=task_id,
task_name=os.path.basename(doc_name)[:100],
override_created=created,
)
@ -882,7 +880,7 @@ class TasksViewSet(ReadOnlyModelViewSet):
PaperlessTask.objects.filter(
acknowledged=False,
)
.order_by("created")
.order_by("attempted_task__date_created")
.reverse()
)

View File

@ -1,5 +1,11 @@
from .celery import app as celery_app
from .checks import binaries_check
from .checks import paths_check
from .checks import settings_values_check
__all__ = ["binaries_check", "paths_check", "settings_values_check"]
__all__ = [
"celery_app",
"binaries_check",
"paths_check",
"settings_values_check",
]

17
src/paperless/celery.py Normal file
View File

@ -0,0 +1,17 @@
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "paperless.settings")
app = Celery("paperless")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks()

View File

@ -10,6 +10,7 @@ from typing import Optional
from typing import Set
from urllib.parse import urlparse
from celery.schedules import crontab
from concurrent_log_handler.queue import setup_logging_queues
from django.utils.translation import gettext_lazy as _
from dotenv import load_dotenv
@ -128,7 +129,7 @@ INSTALLED_APPS = [
"rest_framework",
"rest_framework.authtoken",
"django_filters",
"django_q",
"django_celery_results",
] + env_apps
if DEBUG:
@ -179,6 +180,8 @@ ASGI_APPLICATION = "paperless.asgi.application"
STATIC_URL = os.getenv("PAPERLESS_STATIC_URL", BASE_URL + "static/")
WHITENOISE_STATIC_PREFIX = "/static/"
_REDIS_URL = os.getenv("PAPERLESS_REDIS", "redis://localhost:6379")
# TODO: what is this used for?
TEMPLATES = [
{
@ -200,7 +203,7 @@ CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [os.getenv("PAPERLESS_REDIS", "redis://localhost:6379")],
"hosts": [_REDIS_URL],
"capacity": 2000, # default 100
"expiry": 15, # default 60
},
@ -458,24 +461,48 @@ TASK_WORKERS = __get_int("PAPERLESS_TASK_WORKERS", 1)
WORKER_TIMEOUT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
# Per django-q docs, timeout must be smaller than retry
# We default retry to 10s more than the timeout to silence the
# warning, as retry functionality isn't used.
WORKER_RETRY: Final[int] = __get_int(
"PAPERLESS_WORKER_RETRY",
WORKER_TIMEOUT + 10,
)
CELERY_BROKER_URL = _REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_CONCURRENCY = TASK_WORKERS
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXTENDED = True
CELERY_SEND_TASK_SENT_EVENT = True
CELERY_TASK_TIME_LIMIT = WORKER_TIMEOUT
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "default"
Q_CLUSTER = {
"name": "paperless",
"guard_cycle": 5,
"catch_up": False,
"recycle": 1,
"retry": WORKER_RETRY,
"timeout": WORKER_TIMEOUT,
"workers": TASK_WORKERS,
"redis": os.getenv("PAPERLESS_REDIS", "redis://localhost:6379"),
"log_level": "DEBUG" if DEBUG else "INFO",
CELERY_BEAT_SCHEDULE = {
# Every ten minutes
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
},
# Hourly at 5 minutes past the hour
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
},
# Daily at midnight
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
},
# Weekly, Sunday at 00:30
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
},
}
CELERY_BEAT_SCHEDULE_FILENAME = os.path.join(DATA_DIR, "celerybeat-schedule.db")
# django setting.
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"LOCATION": _REDIS_URL,
},
}

View File

@ -9,10 +9,10 @@ import magic
import pathvalidate
from django.conf import settings
from django.db import DatabaseError
from django_q.tasks import async_task
from documents.loggers import LoggingMixin
from documents.models import Correspondent
from documents.parsers import is_mime_type_supported
from documents.tasks import consume_file
from imap_tools import AND
from imap_tools import MailBox
from imap_tools import MailboxFolderSelectError
@ -389,8 +389,7 @@ class MailAccountHandler(LoggingMixin):
f"{message.subject} from {message.from_}",
)
async_task(
"documents.tasks.consume_file",
consume_file.delay(
path=temp_filename,
override_filename=pathvalidate.sanitize_filename(
att.filename,

View File

@ -2,28 +2,12 @@
from django.db import migrations
from django.db.migrations import RunPython
from django_q.models import Schedule
from django_q.tasks import schedule
def add_schedules(apps, schema_editor):
schedule(
"paperless_mail.tasks.process_mail_accounts",
name="Check all e-mail accounts",
schedule_type=Schedule.MINUTES,
minutes=10,
)
def remove_schedules(apps, schema_editor):
Schedule.objects.filter(func="paperless_mail.tasks.process_mail_accounts").delete()
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0001_initial"),
("django_q", "0013_task_attempt_count"),
]
operations = [RunPython(add_schedules, remove_schedules)]
operations = [RunPython(migrations.RunPython.noop, migrations.RunPython.noop)]

View File

@ -1,13 +1,14 @@
import logging
from celery import shared_task
from paperless_mail.mail import MailAccountHandler
from paperless_mail.mail import MailError
from paperless_mail.models import MailAccount
logger = logging.getLogger("paperless.mail.tasks")
@shared_task
def process_mail_accounts():
total_new_documents = 0
for account in MailAccount.objects.all():
@ -20,11 +21,3 @@ def process_mail_accounts():
return f"Added {total_new_documents} document(s)."
else:
return "No new documents were added."
def process_mail_account(name):
try:
account = MailAccount.objects.get(name=name)
MailAccountHandler().handle_mail_account(account)
except MailAccount.DoesNotExist:
logger.error(f"Unknown mail acccount: {name}")

View File

@ -247,7 +247,7 @@ class TestMail(DirectoriesMixin, TestCase):
m.return_value = self.bogus_mailbox
self.addCleanup(patcher.stop)
patcher = mock.patch("paperless_mail.mail.async_task")
patcher = mock.patch("paperless_mail.mail.consume_file.delay")
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
@ -1030,20 +1030,3 @@ class TestTasks(TestCase):
m.side_effect = lambda account: 0
result = tasks.process_mail_accounts()
self.assertIn("No new", result)
@mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account")
def test_single_accounts(self, m):
MailAccount.objects.create(
name="A",
imap_server="A",
username="A",
password="A",
)
tasks.process_mail_account("A")
m.assert_called_once()
m.reset_mock()
tasks.process_mail_account("B")
m.assert_not_called()