Merge pull request #1648 from paperless-ngx/feature-use-celery

Feature: Transition to celery for background tasks
This commit is contained in:
shamoon 2022-10-10 00:07:55 -07:00 committed by GitHub
commit 6f50285f47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 959 additions and 297 deletions

3
.gitignore vendored
View File

@ -93,3 +93,6 @@ scripts/nuke
# mac os
.DS_Store
# celery schedule file
celerybeat-schedule*

View File

@ -14,7 +14,6 @@ django = "~=4.0"
django-cors-headers = "*"
django-extensions = "*"
django-filter = "~=22.1"
django-q = {editable = true, ref = "paperless-main", git = "https://github.com/paperless-ngx/django-q.git"}
djangorestframework = "~=3.13"
filelock = "*"
fuzzywuzzy = {extras = ["speedup"], version = "*"}
@ -54,6 +53,8 @@ concurrent-log-handler = "*"
zipp = {version = "*", markers = "python_version < '3.9'"}
pyzbar = "*"
mysqlclient = "*"
celery = {extras = ["redis"], version = "*"}
django-celery-results = "*"
setproctitle = "*"
[dev-packages]

96
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "ebfb2f03a5e15c2ff5b40d2a406f41d8f2a9705f2d4e3e339b2aaad464d69855"
"sha256": "79ef8a0dae2a57c93935fa6ee7c591b53a64cf8c6925d16dc95aa8f8a937f9c7"
},
"pipfile-spec": 6,
"requires": {},
@ -26,6 +26,14 @@
],
"version": "==1.3.1"
},
"amqp": {
"hashes": [
"sha256:2c1b13fecc0893e946c65cbd5f36427861cffa4ea2201d8f6fca22e2a373b5e2",
"sha256:6f0956d2c23d8fa6e7691934d8c3930eadb44972cbbd1a7ae3a520f735d43359"
],
"markers": "python_version >= '3.6'",
"version": "==5.1.1"
},
"anyio": {
"hashes": [
"sha256:413adf95f93886e442aea925f3ee43baa5a765a64a0f52c6081894f9992fdd0b",
@ -102,6 +110,13 @@
"markers": "python_version < '3.9'",
"version": "==0.2.1"
},
"billiard": {
"hashes": [
"sha256:299de5a8da28a783d51b197d496bef4f1595dd023a93a4f59dde1886ae905547",
"sha256:87103ea78fa6ab4d5c751c4909bcff74617d985de7fa8b672cf8618afd5a875b"
],
"version": "==3.6.4.0"
},
"blessed": {
"hashes": [
"sha256:63b8554ae2e0e7f43749b6715c734cc8f3883010a809bf16790102563e6cf25b",
@ -110,6 +125,14 @@
"markers": "python_version >= '2.7'",
"version": "==1.19.1"
},
"celery": {
"hashes": [
"sha256:138420c020cd58d6707e6257b6beda91fd39af7afde5d36c6334d175302c0e14",
"sha256:fafbd82934d30f8a004f81e8f7a062e31413a23d444be8ee3326553915958c6d"
],
"index": "pypi",
"version": "==5.2.7"
},
"certifi": {
"hashes": [
"sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14",
@ -219,6 +242,28 @@
"markers": "python_version >= '3.7'",
"version": "==8.1.3"
},
"click-didyoumean": {
"hashes": [
"sha256:a0713dc7a1de3f06bc0df5a9567ad19ead2d3d5689b434768a6145bff77c0667",
"sha256:f184f0d851d96b6d29297354ed981b7dd71df7ff500d82fa6d11f0856bee8035"
],
"markers": "python_full_version >= '3.6.2' and python_full_version < '4.0.0'",
"version": "==0.3.0"
},
"click-plugins": {
"hashes": [
"sha256:46ab999744a9d831159c3411bb0c79346d94a444df9a3a3742e9ed63645f264b",
"sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8"
],
"version": "==1.1.1"
},
"click-repl": {
"hashes": [
"sha256:94b3fbbc9406a236f176e0506524b2937e4b23b6f4c0c0b2a0a83f8a64e9194b",
"sha256:cd12f68d745bf6151210790540b4cb064c7b13e571bc64b6957d98d120dacfd8"
],
"version": "==0.2.0"
},
"coloredlogs": {
"hashes": [
"sha256:612ee75c546f53e92e70049c9dbfcc18c935a2b9a53b66085ce9ef6a6e5c0934",
@ -313,6 +358,14 @@
"index": "pypi",
"version": "==4.1.2"
},
"django-celery-results": {
"hashes": [
"sha256:75aa51970db5691cbf242c6a0ff50c8cdf419e265cd0e9b772335d06436c4b99",
"sha256:be91307c02fbbf0dda21993c3001c60edb74595444ccd6ad696552fe3689e85b"
],
"index": "pypi",
"version": "==2.4.0"
},
"django-cors-headers": {
"hashes": [
"sha256:37e42883b5f1f2295df6b4bba96eb2417a14a03270cb24b2a07f021cd4487cf4",
@ -345,10 +398,13 @@
"markers": "python_version >= '3'",
"version": "==3.1"
},
"django-q": {
"editable": true,
"git": "https://github.com/paperless-ngx/django-q.git",
"ref": "8b5289d8caf36f67fb99448e76ead20d5b498c1b"
"django-timezone-field": {
"hashes": [
"sha256:15746ed367a5a32eda76cfa2886eeec1de8cda79f519b7c5e12f87ed7cdbd663",
"sha256:199f211082eeac7e83563929b8ce41399c1c0f00dfc2f36bc00bea381027eaaa"
],
"markers": "python_version >= '3.7' and python_version < '4'",
"version": "==5.0"
},
"djangorestframework": {
"hashes": [
@ -563,6 +619,14 @@
"markers": "python_version >= '3.7'",
"version": "==1.2.0"
},
"kombu": {
"hashes": [
"sha256:37cee3ee725f94ea8bb173eaab7c1760203ea53bbebae226328600f9d2799610",
"sha256:8b213b24293d3417bcf0d2f5537b7f756079e3ea232a8386dcc89a59fd2361a4"
],
"markers": "python_version >= '3.7'",
"version": "==5.2.4"
},
"langdetect": {
"hashes": [
"sha256:7cbc0746252f19e76f77c0b1690aadf01963be835ef0cd4b56dddf2a8f1dfc2a",
@ -1033,6 +1097,14 @@
"markers": "python_version >= '3'",
"version": "==2.5.1"
},
"prompt-toolkit": {
"hashes": [
"sha256:859b283c50bde45f5f97829f77a4674d1c1fcd88539364f1b28a37805cfd89c0",
"sha256:d8916d3f62a7b67ab353a952ce4ced6a1d2587dfe9ef8ebc30dd7c386751f289"
],
"markers": "python_full_version >= '3.6.2'",
"version": "==3.0.30"
},
"psycopg2": {
"hashes": [
"sha256:06f32425949bd5fe8f625c49f17ebb9784e1e4fe928b7cce72edc36fb68e4c0c",
@ -1108,6 +1180,12 @@
"markers": "python_full_version >= '3.6.8'",
"version": "==3.0.9"
},
"python-crontab": {
"hashes": [
"sha256:1e35ed7a3cdc3100545b43e196d34754e6551e7f95e4caebbe0e1c0ca41c2f1b"
],
"version": "==2.6.0"
},
"python-dateutil": {
"hashes": [
"sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86",
@ -1740,6 +1818,14 @@
],
"version": "==0.17.0"
},
"vine": {
"hashes": [
"sha256:4c9dceab6f76ed92105027c49c823800dd33cacce13bdedc5b914e3514b7fb30",
"sha256:7d3b1624a953da82ef63462013bbd271d3eb75751489f9807598e8f340bd637e"
],
"markers": "python_version >= '3.6'",
"version": "==5.0.0"
},
"watchdog": {
"hashes": [
"sha256:083171652584e1b8829581f965b9b7723ca5f9a2cd7e20271edf264cfd7c1412",

View File

@ -26,8 +26,21 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:scheduler]
command=python3 manage.py qcluster
[program:celery]
command = celery --app paperless worker --loglevel INFO
user=paperless
stopasgroup = true
stopwaitsecs = 60
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:celery-beat]
command = celery --app paperless beat --loglevel INFO
user=paperless
stopasgroup = true

View File

@ -112,7 +112,7 @@ To do the setup you need to perform the steps from the following chapters in a c
.. code:: shell-session
python3 manage.py runserver & python3 manage.py document_consumer & python3 manage.py qcluster
python3 manage.py runserver & python3 manage.py document_consumer & celery --app paperless worker
11. Login with the superuser credentials provided in step 8 at ``http://localhost:8000`` to create a session that enables you to use the backend.
@ -128,14 +128,14 @@ Configure the IDE to use the src/ folder as the base source folder. Configure th
launch configurations in your IDE:
* python3 manage.py runserver
* python3 manage.py qcluster
* celery --app paperless worker
* python3 manage.py document_consumer
To start them all:
.. code:: shell-session
python3 manage.py runserver & python3 manage.py document_consumer & python3 manage.py qcluster
python3 manage.py runserver & python3 manage.py document_consumer & celery --app paperless worker
Testing and code style:

View File

@ -39,7 +39,7 @@ Paperless consists of the following components:
.. _setup-task_processor:
* **The task processor:** Paperless relies on `Django Q <https://django-q.readthedocs.io/en/latest/>`_
* **The task processor:** Paperless relies on `Celery - Distributed Task Queue <https://docs.celeryq.dev/en/stable/index.html>`_
for doing most of the heavy lifting. This is a task queue that accepts tasks from
multiple sources and processes these in parallel. It also comes with a scheduler that executes
certain commands periodically.
@ -62,13 +62,6 @@ Paperless consists of the following components:
tasks fail and inspect the errors (i.e., wrong email credentials, errors during consuming a specific
file, etc).
You may start the task processor by executing:
.. code:: shell-session
$ cd /path/to/paperless/src/
$ python3 manage.py qcluster
* A `redis <https://redis.io/>`_ message broker: This is a really lightweight service that is responsible
for getting the tasks from the webserver and the consumer to the task scheduler. These run in a different
process (maybe even on different machines!), and therefore, this is necessary.
@ -291,7 +284,20 @@ Build the Docker image yourself
.. code:: yaml
webserver:
build: .
build:
context: .
args:
QPDF_VERSION: x.y.x
PIKEPDF_VERSION: x.y.z
PSYCOPG2_VERSION: x.y.z
JBIG2ENC_VERSION: 0.29
.. note::
You should match the build argument versions to the version for the release you have
checked out. These are pre-built images with certain, more updated software.
If you want to build these images your self, that is possible, but beyond
the scope of these steps.
4. Follow steps 3 to 8 of :ref:`setup-docker_hub`. When asked to run
``docker-compose pull`` to pull the image, do
@ -332,7 +338,7 @@ writing. Windows is not and will never be supported.
.. code::
python3 python3-pip python3-dev imagemagick fonts-liberation gnupg libpq-dev libmagic-dev mime-support libzbar0 poppler-utils
python3 python3-pip python3-dev imagemagick fonts-liberation gnupg libpq-dev default-libmysqlclient-dev libmagic-dev mime-support libzbar0 poppler-utils
These dependencies are required for OCRmyPDF, which is used for text recognition.
@ -361,7 +367,7 @@ writing. Windows is not and will never be supported.
You will also need ``build-essential``, ``python3-setuptools`` and ``python3-wheel``
for installing some of the python dependencies.
2. Install ``redis`` >= 5.0 and configure it to start automatically.
2. Install ``redis`` >= 6.0 and configure it to start automatically.
3. Optional. Install ``postgresql`` and configure a database, user and password for paperless. If you do not wish
to use PostgreSQL, MariaDB and SQLite are available as well.
@ -461,8 +467,9 @@ writing. Windows is not and will never be supported.
as a starting point.
Paperless needs the ``webserver`` script to run the webserver, the
``consumer`` script to watch the input folder, and the ``scheduler``
script to run tasks such as email checking and document consumption.
``consumer`` script to watch the input folder, ``taskqueue`` for the background workers
used to handle things like document consumption and the ``scheduler`` script to run tasks such as
email checking at certain times .
The ``socket`` script enables ``gunicorn`` to run on port 80 without
root privileges. For this you need to uncomment the ``Require=paperless-webserver.socket``

View File

@ -19,7 +19,7 @@ Check for the following issues:
.. code:: shell-session
$ python3 manage.py qcluster
$ celery --app paperless worker
* Look at the output of paperless and inspect it for any errors.
* Go to the admin interface, and check if there are failed tasks. If so, the

View File

@ -1,12 +1,12 @@
[Unit]
Description=Paperless scheduler
Description=Paperless Celery Beat
Requires=redis.service
[Service]
User=paperless
Group=paperless
WorkingDirectory=/opt/paperless/src
ExecStart=python3 manage.py qcluster
ExecStart=celery --app paperless beat --loglevel INFO
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,12 @@
[Unit]
Description=Paperless Celery Workers
Requires=redis.service
[Service]
User=paperless
Group=paperless
WorkingDirectory=/opt/paperless/src
ExecStart=celery --app paperless worker --loglevel INFO
[Install]
WantedBy=multi-user.target

File diff suppressed because one or more lines are too long

View File

@ -54,7 +54,7 @@
</div>
</th>
<td class="overflow-auto">{{ task.name }}</td>
<td class="d-none d-lg-table-cell">{{ task.created | customDate:'short' }}</td>
<td class="d-none d-lg-table-cell">{{ task.date_created | customDate:'short' }}</td>
<td class="d-none d-lg-table-cell" *ngIf="activeTab != 'started' && activeTab != 'queued'">
<div *ngIf="task.result.length > 50" class="result" (click)="expandTask(task); $event.stopPropagation();"
[ngbPopover]="resultPopover" popoverClass="shadow small mobile" triggers="mouseenter:mouseleave" container="body">
@ -74,11 +74,18 @@
</button>
</td>
<td scope="row">
<button class="btn btn-sm btn-outline-secondary" (click)="dismissTask(task); $event.stopPropagation();">
<svg class="sidebaricon" fill="currentColor">
<use xlink:href="assets/bootstrap-icons.svg#check"/>
</svg>&nbsp;<ng-container i18n>Dismiss</ng-container>
</button>
<div class="btn-group" role="group">
<button class="btn btn-sm btn-outline-secondary" (click)="dismissTask(task); $event.stopPropagation();">
<svg class="sidebaricon" fill="currentColor">
<use xlink:href="assets/bootstrap-icons.svg#check"/>
</svg>&nbsp;<ng-container i18n>Dismiss</ng-container>
</button>
<button *ngIf="task.related_document" class="btn btn-sm btn-outline-primary" (click)="dismissAndGo(task); $event.stopPropagation();">
<svg class="sidebaricon" fill="currentColor">
<use xlink:href="assets/bootstrap-icons.svg#file-text"/>
</svg>&nbsp;<ng-container i18n>Open Document</ng-container>
</button>
</div>
</td>
</tr>
<tr>

View File

@ -1,6 +1,7 @@
import { Component, OnInit, OnDestroy } from '@angular/core'
import { Router } from '@angular/router'
import { NgbModal } from '@ng-bootstrap/ng-bootstrap'
import { takeUntil, Subject, first } from 'rxjs'
import { Subject, first } from 'rxjs'
import { PaperlessTask } from 'src/app/data/paperless-task'
import { TasksService } from 'src/app/services/tasks.service'
import { ConfirmDialogComponent } from '../../common/confirm-dialog/confirm-dialog.component'
@ -24,7 +25,8 @@ export class TasksComponent implements OnInit, OnDestroy {
constructor(
public tasksService: TasksService,
private modalService: NgbModal
private modalService: NgbModal,
private readonly router: Router
) {}
ngOnInit() {
@ -64,6 +66,11 @@ export class TasksComponent implements OnInit, OnDestroy {
}
}
dismissAndGo(task: PaperlessTask) {
this.dismissTask(task)
this.router.navigate(['documents', task.related_document])
}
expandTask(task: PaperlessTask) {
this.expandedTask = this.expandedTask == task.id ? undefined : task.id
}

View File

@ -6,11 +6,10 @@ export enum PaperlessTaskType {
}
export enum PaperlessTaskStatus {
Queued = 'queued',
Started = 'started',
Complete = 'complete',
Failed = 'failed',
Unknown = 'unknown',
Pending = 'PENDING',
Started = 'STARTED',
Complete = 'SUCCESS',
Failed = 'FAILURE',
}
export interface PaperlessTask extends ObjectWithId {
@ -24,9 +23,11 @@ export interface PaperlessTask extends ObjectWithId {
name: string
created: Date
date_created: Date
started?: Date
done?: Date
result: string
related_document?: number
}

View File

@ -1,6 +1,6 @@
import { HttpClient } from '@angular/common/http'
import { Injectable } from '@angular/core'
import { first, map } from 'rxjs/operators'
import { first } from 'rxjs/operators'
import {
PaperlessTask,
PaperlessTaskStatus,
@ -27,7 +27,7 @@ export class TasksService {
}
public get queuedFileTasks(): PaperlessTask[] {
return this.fileTasks.filter((t) => t.status == PaperlessTaskStatus.Queued)
return this.fileTasks.filter((t) => t.status == PaperlessTaskStatus.Pending)
}
public get startedFileTasks(): PaperlessTask[] {

View File

@ -1,11 +1,12 @@
import itertools
from django.db.models import Q
from django_q.tasks import async_task
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.tasks import bulk_update_documents
from documents.tasks import update_document_archive_file
def set_correspondent(doc_ids, correspondent):
@ -16,7 +17,7 @@ def set_correspondent(doc_ids, correspondent):
affected_docs = [doc.id for doc in qs]
qs.update(correspondent=correspondent)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -31,8 +32,7 @@ def set_storage_path(doc_ids, storage_path):
affected_docs = [doc.id for doc in qs]
qs.update(storage_path=storage_path)
async_task(
"documents.tasks.bulk_update_documents",
bulk_update_documents.delay(
document_ids=affected_docs,
)
@ -47,7 +47,7 @@ def set_document_type(doc_ids, document_type):
affected_docs = [doc.id for doc in qs]
qs.update(document_type=document_type)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -63,7 +63,7 @@ def add_tag(doc_ids, tag):
[DocumentTagRelationship(document_id=doc, tag_id=tag) for doc in affected_docs],
)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -79,7 +79,7 @@ def remove_tag(doc_ids, tag):
Q(document_id__in=affected_docs) & Q(tag_id=tag),
).delete()
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -103,7 +103,7 @@ def modify_tags(doc_ids, add_tags, remove_tags):
ignore_conflicts=True,
)
async_task("documents.tasks.bulk_update_documents", document_ids=affected_docs)
bulk_update_documents.delay(document_ids=affected_docs)
return "OK"
@ -123,8 +123,7 @@ def delete(doc_ids):
def redo_ocr(doc_ids):
for document_id in doc_ids:
async_task(
"documents.tasks.update_document_archive_file",
update_document_archive_file.delay(
document_id=document_id,
)

View File

@ -11,9 +11,9 @@ from typing import Final
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
from django_q.tasks import async_task
from documents.models import Tag
from documents.parsers import is_file_ext_supported
from documents.tasks import consume_file
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
@ -92,11 +92,9 @@ def _consume(filepath):
try:
logger.info(f"Adding {filepath} to the task queue.")
async_task(
"documents.tasks.consume_file",
consume_file.delay(
filepath,
override_tag_ids=tag_ids if tag_ids else None,
task_name=os.path.basename(filepath)[:100],
)
except Exception:
# Catch all so that the consumer won't crash.

View File

@ -1,34 +1,14 @@
# Generated by Django 3.1.3 on 2020-11-09 16:36
from django.db import migrations
from django.db.migrations import RunPython
from django_q.models import Schedule
from django_q.tasks import schedule
def add_schedules(apps, schema_editor):
schedule(
"documents.tasks.train_classifier",
name="Train the classifier",
schedule_type=Schedule.HOURLY,
)
schedule(
"documents.tasks.index_optimize",
name="Optimize the index",
schedule_type=Schedule.DAILY,
)
def remove_schedules(apps, schema_editor):
Schedule.objects.filter(func="documents.tasks.train_classifier").delete()
Schedule.objects.filter(func="documents.tasks.index_optimize").delete()
class Migration(migrations.Migration):
dependencies = [
("documents", "1000_update_paperless_all"),
("django_q", "0013_task_attempt_count"),
]
operations = [RunPython(add_schedules, remove_schedules)]
operations = [
migrations.RunPython(migrations.RunPython.noop, migrations.RunPython.noop)
]

View File

@ -2,27 +2,12 @@
from django.db import migrations
from django.db.migrations import RunPython
from django_q.models import Schedule
from django_q.tasks import schedule
def add_schedules(apps, schema_editor):
schedule(
"documents.tasks.sanity_check",
name="Perform sanity check",
schedule_type=Schedule.WEEKLY,
)
def remove_schedules(apps, schema_editor):
Schedule.objects.filter(func="documents.tasks.sanity_check").delete()
class Migration(migrations.Migration):
dependencies = [
("documents", "1003_mime_types"),
("django_q", "0013_task_attempt_count"),
]
operations = [RunPython(add_schedules, remove_schedules)]
operations = [RunPython(migrations.RunPython.noop, migrations.RunPython.noop)]

View File

@ -4,28 +4,9 @@ from django.db import migrations, models
import django.db.models.deletion
def init_paperless_tasks(apps, schema_editor):
PaperlessTask = apps.get_model("documents", "PaperlessTask")
Task = apps.get_model("django_q", "Task")
for task in Task.objects.filter(func="documents.tasks.consume_file"):
if not hasattr(task, "paperlesstask"):
paperlesstask = PaperlessTask.objects.create(
attempted_task=task,
task_id=task.id,
name=task.name,
created=task.started,
started=task.started,
acknowledged=True,
)
task.paperlesstask = paperlesstask
task.save()
class Migration(migrations.Migration):
dependencies = [
("django_q", "0014_schedule_cluster"),
("documents", "1021_webp_thumbnail_conversion"),
]
@ -60,10 +41,12 @@ class Migration(migrations.Migration):
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="attempted_task",
to="django_q.task",
# This is a dummy field, 1026 will fix up the column
# This manual change is required, as django doesn't django doesn't really support
# removing an app which has migration deps like this
to="documents.document",
),
),
],
),
migrations.RunPython(init_paperless_tasks, migrations.RunPython.noop),
)
]

View File

@ -0,0 +1,57 @@
# Generated by Django 4.1.1 on 2022-09-27 19:31
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
("django_celery_results", "0011_taskresult_periodic_task_name"),
("documents", "1025_alter_savedviewfilterrule_rule_type"),
]
operations = [
migrations.RemoveField(
model_name="paperlesstask",
name="created",
),
migrations.RemoveField(
model_name="paperlesstask",
name="name",
),
migrations.RemoveField(
model_name="paperlesstask",
name="started",
),
# Remove the field from the model
migrations.RemoveField(
model_name="paperlesstask",
name="attempted_task",
),
# Add the field back, pointing to the correct model
# This resolves a problem where the temporary change in 1022
# results in a type mismatch
migrations.AddField(
model_name="paperlesstask",
name="attempted_task",
field=models.OneToOneField(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="attempted_task",
to="django_celery_results.taskresult",
),
),
# Drop the django-q tables entirely
# Must be done last or there could be references here
migrations.RunSQL(
"DROP TABLE IF EXISTS django_q_ormq", reverse_sql=migrations.RunSQL.noop
),
migrations.RunSQL(
"DROP TABLE IF EXISTS django_q_schedule", reverse_sql=migrations.RunSQL.noop
),
migrations.RunSQL(
"DROP TABLE IF EXISTS django_q_task", reverse_sql=migrations.RunSQL.noop
),
]

View File

@ -12,7 +12,7 @@ from django.contrib.auth.models import User
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django_q.tasks import Task
from django_celery_results.models import TaskResult
from documents.parsers import get_default_file_extension
@ -527,19 +527,16 @@ class UiSettings(models.Model):
class PaperlessTask(models.Model):
task_id = models.CharField(max_length=128)
name = models.CharField(max_length=256)
created = models.DateTimeField(_("created"), auto_now=True)
started = models.DateTimeField(_("started"), null=True)
acknowledged = models.BooleanField(default=False)
attempted_task = models.OneToOneField(
Task,
TaskResult,
on_delete=models.CASCADE,
related_name="attempted_task",
null=True,
blank=True,
)
acknowledged = models.BooleanField(default=False)
class Comment(models.Model):

View File

@ -1,6 +1,14 @@
import datetime
import math
import re
from ast import literal_eval
from asyncio.log import logger
from pathlib import Path
from typing import Dict
from typing import Optional
from typing import Tuple
from celery import states
try:
import zoneinfo
@ -18,12 +26,12 @@ from .models import Correspondent
from .models import Document
from .models import DocumentType
from .models import MatchingModel
from .models import PaperlessTask
from .models import SavedView
from .models import SavedViewFilterRule
from .models import StoragePath
from .models import Tag
from .models import UiSettings
from .models import PaperlessTask
from .parsers import is_mime_type_supported
@ -629,7 +637,19 @@ class TasksViewSerializer(serializers.ModelSerializer):
class Meta:
model = PaperlessTask
depth = 1
fields = "__all__"
fields = (
"id",
"task_id",
"date_created",
"date_done",
"type",
"status",
"result",
"acknowledged",
"task_name",
"name",
"related_document",
)
type = serializers.SerializerMethodField()
@ -641,24 +661,108 @@ class TasksViewSerializer(serializers.ModelSerializer):
def get_result(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.result
if (
hasattr(obj, "attempted_task")
and obj.attempted_task
and obj.attempted_task.result
):
try:
result: str = obj.attempted_task.result
if "exc_message" in result:
# This is a dict in this case
result: Dict = literal_eval(result)
# This is a list, grab the first item (most recent)
result = result["exc_message"][0]
except Exception as e: # pragma: no cover
# Extra security if something is malformed
logger.warn(f"Error getting task result: {e}", exc_info=True)
return result
status = serializers.SerializerMethodField()
def get_status(self, obj):
if obj.attempted_task is None:
if obj.started:
return "started"
else:
return "queued"
elif obj.attempted_task.success:
return "complete"
elif not obj.attempted_task.success:
return "failed"
else:
return "unknown"
result = "unknown"
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.status
return result
date_created = serializers.SerializerMethodField()
def get_date_created(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.date_created
return result
date_done = serializers.SerializerMethodField()
def get_date_done(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.date_done
return result
task_id = serializers.SerializerMethodField()
def get_task_id(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.task_id
return result
task_name = serializers.SerializerMethodField()
def get_task_name(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
result = obj.attempted_task.task_name
return result
name = serializers.SerializerMethodField()
def get_name(self, obj):
result = ""
if hasattr(obj, "attempted_task") and obj.attempted_task:
try:
task_kwargs: Optional[str] = obj.attempted_task.task_kwargs
# Try the override filename first (this is a webui created task?)
if task_kwargs is not None:
# It's a string, string of a dict. Who knows why...
kwargs = literal_eval(literal_eval(task_kwargs))
if "override_filename" in kwargs:
result = kwargs["override_filename"]
# Nothing was found, report the task first argument
if not len(result):
# There are always some arguments to the consume
task_args: Tuple = literal_eval(
literal_eval(obj.attempted_task.task_args),
)
filepath = Path(task_args[0])
result = filepath.name
except Exception as e: # pragma: no cover
# Extra security if something is malformed
logger.warn(f"Error getting file name from task: {e}", exc_info=True)
return result
related_document = serializers.SerializerMethodField()
def get_related_document(self, obj):
result = ""
regexp = r"New document id (\d+) created"
if (
hasattr(obj, "attempted_task")
and obj.attempted_task
and obj.attempted_task.result
and obj.attempted_task.status == states.SUCCESS
):
try:
result = re.search(regexp, obj.attempted_task.result).group(1)
except Exception:
pass
return result
class AcknowledgeTasksViewSerializer(serializers.Serializer):

View File

@ -2,7 +2,6 @@ import logging
import os
import shutil
import django_q
from django.conf import settings
from django.contrib.admin.models import ADDITION
from django.contrib.admin.models import LogEntry
@ -14,6 +13,7 @@ from django.db.models import Q
from django.dispatch import receiver
from django.utils import termcolors
from django.utils import timezone
from django_celery_results.models import TaskResult
from filelock import FileLock
from .. import matching
@ -25,7 +25,6 @@ from ..models import MatchingModel
from ..models import PaperlessTask
from ..models import Tag
logger = logging.getLogger("paperless.handlers")
@ -503,47 +502,19 @@ def add_to_index(sender, document, **kwargs):
index.add_or_update_document(document)
@receiver(django_q.signals.pre_enqueue)
def init_paperless_task(sender, task, **kwargs):
if task["func"] == "documents.tasks.consume_file":
try:
paperless_task, created = PaperlessTask.objects.get_or_create(
task_id=task["id"],
)
paperless_task.name = task["name"]
paperless_task.created = task["started"]
paperless_task.save()
except Exception as e:
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")
@receiver(django_q.signals.pre_execute)
def paperless_task_started(sender, task, **kwargs):
@receiver(models.signals.post_save, sender=TaskResult)
def update_paperless_task(sender, instance: TaskResult, **kwargs):
try:
if task["func"] == "documents.tasks.consume_file":
paperless_task, created = PaperlessTask.objects.get_or_create(
task_id=task["id"],
)
paperless_task.started = timezone.now()
paperless_task.save()
except PaperlessTask.DoesNotExist:
pass
except Exception as e:
logger.error(f"Creating PaperlessTask failed: {e}")
@receiver(models.signals.post_save, sender=django_q.models.Task)
def update_paperless_task(sender, instance, **kwargs):
try:
if instance.func == "documents.tasks.consume_file":
paperless_task, created = PaperlessTask.objects.get_or_create(
task_id=instance.id,
if instance.task_name == "documents.tasks.consume_file":
paperless_task, _ = PaperlessTask.objects.get_or_create(
task_id=instance.task_id,
)
paperless_task.name = instance.task_name
paperless_task.created = instance.date_created
paperless_task.completed = instance.date_done
paperless_task.attempted_task = instance
paperless_task.save()
except PaperlessTask.DoesNotExist:
pass
except Exception as e:
# Don't let an exception in the signal handlers prevent
# a document from being consumed.
logger.error(f"Creating PaperlessTask failed: {e}")

View File

@ -8,6 +8,7 @@ from typing import Type
import tqdm
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from django.conf import settings
from django.db import transaction
@ -36,6 +37,7 @@ from whoosh.writing import AsyncWriter
logger = logging.getLogger("paperless.tasks")
@shared_task
def index_optimize():
ix = index.open_index()
writer = AsyncWriter(ix)
@ -52,6 +54,7 @@ def index_reindex(progress_bar_disable=False):
index.update_document(writer, document)
@shared_task
def train_classifier():
if (
not Tag.objects.filter(matching_algorithm=Tag.MATCH_AUTO).exists()
@ -80,6 +83,7 @@ def train_classifier():
logger.warning("Classifier error: " + str(e))
@shared_task
def consume_file(
path,
override_filename=None,
@ -183,6 +187,7 @@ def consume_file(
)
@shared_task
def sanity_check():
messages = sanity_checker.check_sanity()
@ -198,6 +203,7 @@ def sanity_check():
return "No issues detected."
@shared_task
def bulk_update_documents(document_ids):
documents = Document.objects.filter(id__in=document_ids)
@ -211,6 +217,7 @@ def bulk_update_documents(document_ids):
index.update_document(writer, doc)
@shared_task
def update_document_archive_file(document_id):
"""
Re-creates the archive file of a document, including new OCR content and thumbnail

View File

@ -10,6 +10,8 @@ import zipfile
from unittest import mock
from unittest.mock import MagicMock
import celery
try:
import zoneinfo
except ImportError:
@ -20,7 +22,6 @@ from django.conf import settings
from django.contrib.auth.models import User
from django.test import override_settings
from django.utils import timezone
from django_q.models import Task
from documents import bulk_edit
from documents import index
from documents.models import Correspondent
@ -31,7 +32,7 @@ from documents.models import PaperlessTask
from documents.models import SavedView
from documents.models import StoragePath
from documents.models import Tag
from documents.models import UiSettings
from django_celery_results.models import TaskResult
from documents.models import Comment
from documents.models import StoragePath
from documents.tests.utils import DirectoriesMixin
@ -790,7 +791,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["documents_inbox"], None)
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload(self, m):
with open(
@ -813,7 +814,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertIsNone(kwargs["override_document_type_id"])
self.assertIsNone(kwargs["override_tag_ids"])
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_empty_metadata(self, m):
with open(
@ -836,7 +837,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertIsNone(kwargs["override_document_type_id"])
self.assertIsNone(kwargs["override_tag_ids"])
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_invalid_form(self, m):
with open(
@ -850,7 +851,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, 400)
m.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_invalid_file(self, m):
with open(
@ -864,7 +865,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, 400)
m.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_title(self, async_task):
with open(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
@ -882,7 +883,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(kwargs["override_title"], "my custom title")
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_correspondent(self, async_task):
c = Correspondent.objects.create(name="test-corres")
with open(
@ -901,7 +902,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(kwargs["override_correspondent_id"], c.id)
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_invalid_correspondent(self, async_task):
with open(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
@ -915,7 +916,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
async_task.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_document_type(self, async_task):
dt = DocumentType.objects.create(name="invoice")
with open(
@ -934,7 +935,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertEqual(kwargs["override_document_type_id"], dt.id)
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_invalid_document_type(self, async_task):
with open(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
@ -948,7 +949,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
async_task.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_tags(self, async_task):
t1 = Tag.objects.create(name="tag1")
t2 = Tag.objects.create(name="tag2")
@ -968,7 +969,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
self.assertCountEqual(kwargs["override_tag_ids"], [t1.id, t2.id])
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_invalid_tags(self, async_task):
t1 = Tag.objects.create(name="tag1")
t2 = Tag.objects.create(name="tag2")
@ -984,7 +985,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
async_task.assert_not_called()
@mock.patch("documents.views.async_task")
@mock.patch("documents.views.consume_file.delay")
def test_upload_with_created(self, async_task):
created = datetime.datetime(
2022,
@ -1619,7 +1620,7 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
patcher = mock.patch("documents.bulk_edit.async_task")
patcher = mock.patch("documents.bulk_edit.bulk_update_documents.delay")
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
self.c1 = Correspondent.objects.create(name="c1")
@ -2738,7 +2739,7 @@ class TestApiStoragePaths(DirectoriesMixin, APITestCase):
class TestTasks(APITestCase):
ENDPOINT = "/api/tasks/"
ENDPOINT_ACKOWLEDGE = "/api/acknowledge_tasks/"
ENDPOINT_ACKNOWLEDGE = "/api/acknowledge_tasks/"
def setUp(self):
super().setUp()
@ -2747,16 +2748,27 @@ class TestTasks(APITestCase):
self.client.force_authenticate(user=self.user)
def test_get_tasks(self):
task_id1 = str(uuid.uuid4())
PaperlessTask.objects.create(task_id=task_id1)
Task.objects.create(
id=task_id1,
started=timezone.now() - datetime.timedelta(seconds=30),
stopped=timezone.now(),
func="documents.tasks.consume_file",
"""
GIVEN:
- Attempted celery tasks
WHEN:
- API call is made to get tasks
THEN:
- Attempting and pending tasks are serialized and provided
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_great_task",
status=celery.states.PENDING,
)
task_id2 = str(uuid.uuid4())
PaperlessTask.objects.create(task_id=task_id2)
PaperlessTask.objects.create(attempted_task=result1)
result2 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_awesome_task",
status=celery.states.STARTED,
)
PaperlessTask.objects.create(attempted_task=result2)
response = self.client.get(self.ENDPOINT)
@ -2764,25 +2776,155 @@ class TestTasks(APITestCase):
self.assertEqual(len(response.data), 2)
returned_task1 = response.data[1]
returned_task2 = response.data[0]
self.assertEqual(returned_task1["task_id"], task_id1)
self.assertEqual(returned_task1["status"], "complete")
self.assertIsNotNone(returned_task1["attempted_task"])
self.assertEqual(returned_task2["task_id"], task_id2)
self.assertEqual(returned_task2["status"], "queued")
self.assertIsNone(returned_task2["attempted_task"])
self.assertEqual(returned_task1["task_id"], result1.task_id)
self.assertEqual(returned_task1["status"], celery.states.PENDING)
self.assertEqual(returned_task1["task_name"], result1.task_name)
self.assertEqual(returned_task2["task_id"], result2.task_id)
self.assertEqual(returned_task2["status"], celery.states.STARTED)
self.assertEqual(returned_task2["task_name"], result2.task_name)
def test_acknowledge_tasks(self):
task_id = str(uuid.uuid4())
task = PaperlessTask.objects.create(task_id=task_id)
"""
GIVEN:
- Attempted celery tasks
WHEN:
- API call is made to get mark task as acknowledged
THEN:
- Task is marked as acknowledged
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.PENDING,
)
task = PaperlessTask.objects.create(attempted_task=result1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(len(response.data), 1)
response = self.client.post(
self.ENDPOINT_ACKOWLEDGE,
self.ENDPOINT_ACKNOWLEDGE,
{"tasks": [task.id]},
)
self.assertEqual(response.status_code, 200)
response = self.client.get(self.ENDPOINT)
self.assertEqual(len(response.data), 0)
def test_task_result_no_error(self):
"""
GIVEN:
- A celery task completed without error
WHEN:
- API call is made to get tasks
THEN:
- The returned data includes the task result
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
result="Success. New document id 1 created",
)
_ = PaperlessTask.objects.create(attempted_task=result1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(returned_data["result"], "Success. New document id 1 created")
self.assertEqual(returned_data["related_document"], "1")
def test_task_result_with_error(self):
"""
GIVEN:
- A celery task completed with an exception
WHEN:
- API call is made to get tasks
THEN:
- The returned result is the exception info
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
result={
"exc_type": "ConsumerError",
"exc_message": ["test.pdf: Not consuming test.pdf: It is a duplicate."],
"exc_module": "documents.consumer",
},
)
_ = PaperlessTask.objects.create(attempted_task=result1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(
returned_data["result"],
"test.pdf: Not consuming test.pdf: It is a duplicate.",
)
def test_task_name_webui(self):
"""
GIVEN:
- Attempted celery task
- Task was created through the webui
WHEN:
- API call is made to get tasks
THEN:
- Returned data include the filename
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
task_args="\"('/tmp/paperless/paperless-upload-5iq7skzc',)\"",
task_kwargs="\"{'override_filename': 'test.pdf', 'override_title': None, 'override_correspondent_id': None, 'override_document_type_id': None, 'override_tag_ids': None, 'task_id': '466e8fe7-7193-4698-9fff-72f0340e2082', 'override_created': None}\"",
)
_ = PaperlessTask.objects.create(attempted_task=result1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(returned_data["name"], "test.pdf")
def test_task_name_consume_folder(self):
"""
GIVEN:
- Attempted celery task
- Task was created through the consume folder
WHEN:
- API call is made to get tasks
THEN:
- Returned data include the filename
"""
result1 = TaskResult.objects.create(
task_id=str(uuid.uuid4()),
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
task_args="\"('/consume/anothertest.pdf',)\"",
task_kwargs="\"{'override_tag_ids': None}\"",
)
_ = PaperlessTask.objects.create(attempted_task=result1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(returned_data["name"], "anothertest.pdf")

View File

@ -43,7 +43,7 @@ class ConsumerMixin:
super().setUp()
self.t = None
patcher = mock.patch(
"documents.management.commands.document_consumer.async_task",
"documents.tasks.consume_file.delay",
)
self.task_mock = patcher.start()
self.addCleanup(patcher.stop)
@ -76,7 +76,7 @@ class ConsumerMixin:
# A bogus async_task that will simply check the file for
# completeness and raise an exception otherwise.
def bogus_task(self, func, filename, **kwargs):
def bogus_task(self, filename, **kwargs):
eq = filecmp.cmp(filename, self.sample_file, shallow=False)
if not eq:
print("Consumed an INVALID file.")
@ -115,7 +115,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
self.assertEqual(args[0], f)
def test_consume_file_invalid_ext(self):
self.t_start()
@ -135,7 +135,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
self.assertEqual(args[0], f)
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):
@ -155,7 +155,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
self.assertEqual(args[0], fname)
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_and_move(self, error_logger):
@ -175,7 +175,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname2)
self.assertEqual(args[0], fname2)
error_logger.assert_not_called()
@ -193,7 +193,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
self.assertEqual(args[0], fname)
# assert that we have an error logged with this invalid file.
error_logger.assert_called_once()
@ -241,7 +241,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
self.assertEqual(2, self.task_mock.call_count)
fnames = [
os.path.basename(args[1]) for args, _ in self.task_mock.call_args_list
os.path.basename(args[0]) for args, _ in self.task_mock.call_args_list
]
self.assertCountEqual(fnames, ["my_file.pdf", "my_second_file.pdf"])
@ -338,7 +338,7 @@ class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
tag_ids.append(Tag.objects.get(name=tag_names[1]).pk)
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
self.assertEqual(args[0], f)
# assertCountEqual has a bad name, but test that the first
# sequence contains the same elements as second, regardless of

View File

@ -28,7 +28,7 @@ from django.utils.translation import get_language
from django.views.decorators.cache import cache_control
from django.views.generic import TemplateView
from django_filters.rest_framework import DjangoFilterBackend
from django_q.tasks import async_task
from documents.tasks import consume_file
from packaging import version as packaging_version
from paperless import version
from paperless.db import GnuPG
@ -615,8 +615,7 @@ class PostDocumentView(GenericAPIView):
task_id = str(uuid.uuid4())
async_task(
"documents.tasks.consume_file",
consume_file.delay(
temp_filename,
override_filename=doc_name,
override_title=title,
@ -624,7 +623,6 @@ class PostDocumentView(GenericAPIView):
override_document_type_id=document_type_id,
override_tag_ids=tag_ids,
task_id=task_id,
task_name=os.path.basename(doc_name)[:100],
override_created=created,
)
@ -888,8 +886,9 @@ class TasksViewSet(ReadOnlyModelViewSet):
queryset = (
PaperlessTask.objects.filter(
acknowledged=False,
attempted_task__isnull=False,
)
.order_by("created")
.order_by("attempted_task__date_created")
.reverse()
)

View File

@ -1,5 +1,11 @@
from .celery import app as celery_app
from .checks import binaries_check
from .checks import paths_check
from .checks import settings_values_check
__all__ = ["binaries_check", "paths_check", "settings_values_check"]
__all__ = [
"celery_app",
"binaries_check",
"paths_check",
"settings_values_check",
]

17
src/paperless/celery.py Normal file
View File

@ -0,0 +1,17 @@
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "paperless.settings")
app = Celery("paperless")
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks()

View File

@ -10,6 +10,7 @@ from typing import Optional
from typing import Set
from urllib.parse import urlparse
from celery.schedules import crontab
from concurrent_log_handler.queue import setup_logging_queues
from django.utils.translation import gettext_lazy as _
from dotenv import load_dotenv
@ -128,7 +129,7 @@ INSTALLED_APPS = [
"rest_framework",
"rest_framework.authtoken",
"django_filters",
"django_q",
"django_celery_results",
] + env_apps
if DEBUG:
@ -179,6 +180,8 @@ ASGI_APPLICATION = "paperless.asgi.application"
STATIC_URL = os.getenv("PAPERLESS_STATIC_URL", BASE_URL + "static/")
WHITENOISE_STATIC_PREFIX = "/static/"
_REDIS_URL = os.getenv("PAPERLESS_REDIS", "redis://localhost:6379")
# TODO: what is this used for?
TEMPLATES = [
{
@ -200,7 +203,7 @@ CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [os.getenv("PAPERLESS_REDIS", "redis://localhost:6379")],
"hosts": [_REDIS_URL],
"capacity": 2000, # default 100
"expiry": 15, # default 60
},
@ -456,24 +459,53 @@ TASK_WORKERS = __get_int("PAPERLESS_TASK_WORKERS", 1)
WORKER_TIMEOUT: Final[int] = __get_int("PAPERLESS_WORKER_TIMEOUT", 1800)
# Per django-q docs, timeout must be smaller than retry
# We default retry to 10s more than the timeout to silence the
# warning, as retry functionality isn't used.
WORKER_RETRY: Final[int] = __get_int(
"PAPERLESS_WORKER_RETRY",
WORKER_TIMEOUT + 10,
)
CELERY_BROKER_URL = _REDIS_URL
CELERY_TIMEZONE = TIME_ZONE
Q_CLUSTER = {
"name": "paperless",
"guard_cycle": 5,
"catch_up": False,
"recycle": 1,
"retry": WORKER_RETRY,
"timeout": WORKER_TIMEOUT,
"workers": TASK_WORKERS,
"redis": os.getenv("PAPERLESS_REDIS", "redis://localhost:6379"),
"log_level": "DEBUG" if DEBUG else "INFO",
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_CONCURRENCY = TASK_WORKERS
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_SEND_TASK_SENT_EVENT = True
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = WORKER_TIMEOUT
CELERY_RESULT_EXTENDED = True
CELERY_RESULT_BACKEND = "django-db"
CELERY_CACHE_BACKEND = "default"
CELERY_BEAT_SCHEDULE = {
# Every ten minutes
"Check all e-mail accounts": {
"task": "paperless_mail.tasks.process_mail_accounts",
"schedule": crontab(minute="*/10"),
},
# Hourly at 5 minutes past the hour
"Train the classifier": {
"task": "documents.tasks.train_classifier",
"schedule": crontab(minute="5", hour="*/1"),
},
# Daily at midnight
"Optimize the index": {
"task": "documents.tasks.index_optimize",
"schedule": crontab(minute=0, hour=0),
},
# Weekly, Sunday at 00:30
"Perform sanity check": {
"task": "documents.tasks.sanity_check",
"schedule": crontab(minute=30, hour=0, day_of_week="sun"),
},
}
CELERY_BEAT_SCHEDULE_FILENAME = os.path.join(DATA_DIR, "celerybeat-schedule.db")
# django setting.
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"LOCATION": _REDIS_URL,
},
}

View File

@ -10,10 +10,10 @@ import magic
import pathvalidate
from django.conf import settings
from django.db import DatabaseError
from django_q.tasks import async_task
from documents.loggers import LoggingMixin
from documents.models import Correspondent
from documents.parsers import is_mime_type_supported
from documents.tasks import consume_file
from imap_tools import AND
from imap_tools import MailBox
from imap_tools import MailboxFolderSelectError
@ -414,8 +414,7 @@ class MailAccountHandler(LoggingMixin):
f"{message.subject} from {message.from_}",
)
async_task(
"documents.tasks.consume_file",
consume_file.delay(
path=temp_filename,
override_filename=pathvalidate.sanitize_filename(
att.filename,

View File

@ -2,28 +2,12 @@
from django.db import migrations
from django.db.migrations import RunPython
from django_q.models import Schedule
from django_q.tasks import schedule
def add_schedules(apps, schema_editor):
schedule(
"paperless_mail.tasks.process_mail_accounts",
name="Check all e-mail accounts",
schedule_type=Schedule.MINUTES,
minutes=10,
)
def remove_schedules(apps, schema_editor):
Schedule.objects.filter(func="paperless_mail.tasks.process_mail_accounts").delete()
class Migration(migrations.Migration):
dependencies = [
("paperless_mail", "0001_initial"),
("django_q", "0013_task_attempt_count"),
]
operations = [RunPython(add_schedules, remove_schedules)]
operations = [RunPython(migrations.RunPython.noop, migrations.RunPython.noop)]

View File

@ -1,13 +1,14 @@
import logging
from celery import shared_task
from paperless_mail.mail import MailAccountHandler
from paperless_mail.mail import MailError
from paperless_mail.models import MailAccount
logger = logging.getLogger("paperless.mail.tasks")
@shared_task
def process_mail_accounts():
total_new_documents = 0
for account in MailAccount.objects.all():
@ -20,11 +21,3 @@ def process_mail_accounts():
return f"Added {total_new_documents} document(s)."
else:
return "No new documents were added."
def process_mail_account(name):
try:
account = MailAccount.objects.get(name=name)
MailAccountHandler().handle_mail_account(account)
except MailAccount.DoesNotExist:
logger.error(f"Unknown mail acccount: {name}")

View File

@ -248,7 +248,7 @@ class TestMail(DirectoriesMixin, TestCase):
m.return_value = self.bogus_mailbox
self.addCleanup(patcher.stop)
patcher = mock.patch("paperless_mail.mail.async_task")
patcher = mock.patch("paperless_mail.mail.consume_file.delay")
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
@ -1032,20 +1032,3 @@ class TestTasks(TestCase):
m.side_effect = lambda account: 0
result = tasks.process_mail_accounts()
self.assertIn("No new", result)
@mock.patch("paperless_mail.tasks.MailAccountHandler.handle_mail_account")
def test_single_accounts(self, m):
MailAccount.objects.create(
name="A",
imap_server="A",
username="A",
password="A",
)
tasks.process_mail_account("A")
m.assert_called_once()
m.reset_mock()
tasks.process_mail_account("B")
m.assert_not_called()