diff --git a/Dockerfile b/Dockerfile index 2eeaba1dc..8b562e73e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -151,6 +151,7 @@ COPY [ \ "docker/paperless_cmd.sh", \ "docker/wait-for-redis.py", \ "docker/management_script.sh", \ + "docker/flower-conditional.sh", \ "docker/install_management_commands.sh", \ "/usr/src/paperless/src/docker/" \ ] @@ -170,6 +171,8 @@ RUN set -eux \ && chmod 755 /sbin/wait-for-redis.py \ && mv paperless_cmd.sh /usr/local/bin/paperless_cmd.sh \ && chmod 755 /usr/local/bin/paperless_cmd.sh \ + && mv flower-conditional.sh /usr/local/bin/flower-conditional.sh \ + && chmod 755 /usr/local/bin/flower-conditional.sh \ && echo "Installing managment commands" \ && chmod +x install_management_commands.sh \ && ./install_management_commands.sh diff --git a/Pipfile b/Pipfile index df511483b..62b70f12e 100644 --- a/Pipfile +++ b/Pipfile @@ -60,6 +60,7 @@ django-celery-results = "*" setproctitle = "*" nltk = "*" pdf2image = "*" +flower = "*" bleach = "*" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 4fa1aa8aa..9bbb72bc7 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "a13540e996f7e6988c49809d728e854118886dd9e99f2e67c7bb077eb4baf794" + "sha256": "5558c489e948de1779e547beae0dd36a2e551aa6be8505c26b651fd87eac2834" }, "pipfile-spec": 6, "requires": {}, @@ -401,6 +401,14 @@ "index": "pypi", "version": "==3.8.0" }, + "flower": { + "hashes": [ + "sha256:46493c7e8d9ca2167e8a46eb97ae8d280997cb40a81993230124d74f0fe40bac", + "sha256:ae2977cf7343c526cf44def8c7e7173db8dedb8249b91ba4b88cfd18e7a2d486" + ], + "index": "pypi", + "version": "==1.2.0" + }, "fuzzywuzzy": { "extras": [ "speedup" @@ -529,6 +537,14 @@ "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", "version": "==10.0" }, + "humanize": { + "hashes": [ + "sha256:8830ebf2d65d0395c1bd4c79189ad71e023f277c2c7ae00f263124432e6f2ffa", + "sha256:efb2584565cc86b7ea87a977a15066de34cdedaf341b11c851cfcfd2b964779c" + ], + "markers": "python_version >= '3.7'", + "version": "==4.4.0" + }, "hyperlink": { "hashes": [ "sha256:427af957daa58bc909471c6c40f74c5450fa123dd093fc53efd2e91d2705a56b", @@ -1092,6 +1108,14 @@ "markers": "python_version >= '3'", "version": "==2.6.0" }, + "prometheus-client": { + "hashes": [ + "sha256:be26aa452490cfcf6da953f9436e95a9f2b4d578ca80094b4458930e5f584ab1", + "sha256:db7c05cbd13a0f79975592d112320f2605a325969b270a94b71dcabc47b931d2" + ], + "markers": "python_version >= '3.6'", + "version": "==0.15.0" + }, "prompt-toolkit": { "hashes": [ "sha256:9696f386133df0fc8ca5af4895afe5d78f5fcfe5258111c2a79a1c3e41ffa96d", @@ -1705,6 +1729,23 @@ "index": "pypi", "version": "==1.24" }, + "tornado": { + "hashes": [ + "sha256:1d54d13ab8414ed44de07efecb97d4ef7c39f7438cf5e976ccd356bebb1b5fca", + "sha256:20f638fd8cc85f3cbae3c732326e96addff0a15e22d80f049e00121651e82e72", + "sha256:5c87076709343557ef8032934ce5f637dbb552efa7b21d08e89ae7619ed0eb23", + "sha256:5f8c52d219d4995388119af7ccaa0bcec289535747620116a58d830e7c25d8a8", + "sha256:6fdfabffd8dfcb6cf887428849d30cf19a3ea34c2c248461e1f7d718ad30b66b", + "sha256:87dcafae3e884462f90c90ecc200defe5e580a7fbbb4365eda7c7c1eb809ebc9", + "sha256:9b630419bde84ec666bfd7ea0a4cb2a8a651c2d5cccdbdd1972a0c859dfc3c13", + "sha256:b8150f721c101abdef99073bf66d3903e292d851bee51910839831caba341a75", + "sha256:ba09ef14ca9893954244fd872798b4ccb2367c165946ce2dd7376aebdde8e3ac", + "sha256:d3a2f5999215a3a06a4fc218026cd84c61b8b2b40ac5296a6db1f1451ef04c1e", + "sha256:e5f923aa6a47e133d1cf87d60700889d7eae68988704e20c75fb2d65677a8e4b" + ], + "markers": "python_version >= '3.7'", + "version": "==6.2" + }, "tqdm": { "hashes": [ "sha256:5f4f682a004951c1b450bc753c710e9280c5746ce6ffedee253ddbcbf54cf1e4", @@ -2028,11 +2069,11 @@ }, "zipp": { "hashes": [ - "sha256:3a7af91c3db40ec72dd9d154ae18e008c69efe8ca88dde4f9a731bb82fe2f9eb", - "sha256:972cfa31bc2fedd3fa838a51e9bc7e64b7fb725a8c00e7431554311f180e9980" + "sha256:4fcb6f278987a6605757302a6e40e896257570d11c51628968ccb2a47e80c6c1", + "sha256:7a7262fd930bd3e36c50b9a64897aec3fafff3dfdeec9623ae22b40e93f99bb8" ], "markers": "python_version < '3.9'", - "version": "==3.9.0" + "version": "==3.10.0" }, "zope.interface": { "hashes": [ @@ -2530,11 +2571,11 @@ }, "pytest-env": { "hashes": [ - "sha256:89b6a7a00174ee289029358364b37b688e964d2cb05a8b9e4514d4ab52b134bb", - "sha256:a11102037f91ab765390e6da684a99797ede08971f355eed53c7d1365d852467" + "sha256:8c0605ae09a5b7e41c20ebcc44f2c906eea9654095b4b0c342b3814bcc3a8492", + "sha256:d7b2f5273ec6d1e221757998bc2f50d2474ed7d0b9331b92556011fadc4e9abf" ], "index": "pypi", - "version": "==0.7.0" + "version": "==0.8.1" }, "pytest-forked": { "hashes": [ @@ -2761,7 +2802,7 @@ "sha256:d3a2f5999215a3a06a4fc218026cd84c61b8b2b40ac5296a6db1f1451ef04c1e", "sha256:e5f923aa6a47e133d1cf87d60700889d7eae68988704e20c75fb2d65677a8e4b" ], - "markers": "python_version > '2.7'", + "markers": "python_version >= '3.7'", "version": "==6.2" }, "tox": { @@ -2798,11 +2839,11 @@ }, "zipp": { "hashes": [ - "sha256:3a7af91c3db40ec72dd9d154ae18e008c69efe8ca88dde4f9a731bb82fe2f9eb", - "sha256:972cfa31bc2fedd3fa838a51e9bc7e64b7fb725a8c00e7431554311f180e9980" + "sha256:4fcb6f278987a6605757302a6e40e896257570d11c51628968ccb2a47e80c6c1", + "sha256:7a7262fd930bd3e36c50b9a64897aec3fafff3dfdeec9623ae22b40e93f99bb8" ], "markers": "python_version < '3.9'", - "version": "==3.9.0" + "version": "==3.10.0" } } } diff --git a/docker/flower-conditional.sh b/docker/flower-conditional.sh new file mode 100644 index 000000000..04319a8e3 --- /dev/null +++ b/docker/flower-conditional.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +echo "Checking if we should start flower..." + +if [[ -n "${PAPERLESS_ENABLE_FLOWER}" ]]; then + celery --app paperless flower +fi diff --git a/docker/supervisord.conf b/docker/supervisord.conf index 0199b86fe..bfb78330a 100644 --- a/docker/supervisord.conf +++ b/docker/supervisord.conf @@ -10,7 +10,7 @@ user=root [program:gunicorn] command=gunicorn -c /usr/src/paperless/gunicorn.conf.py paperless.asgi:application user=paperless - +priority = 1 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr @@ -20,7 +20,7 @@ stderr_logfile_maxbytes=0 command=python3 manage.py document_consumer user=paperless stopsignal=INT - +priority = 20 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr @@ -32,7 +32,7 @@ command = celery --app paperless worker --loglevel INFO user=paperless stopasgroup = true stopwaitsecs = 60 - +priority = 5 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr @@ -43,7 +43,17 @@ stderr_logfile_maxbytes=0 command = celery --app paperless beat --loglevel INFO user=paperless stopasgroup = true - +priority = 10 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 + +[program:celery-flower] +command = /usr/local/bin/flower-conditional.sh +user = paperless +startsecs = 0 +priority = 40 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr diff --git a/docs/advanced_usage.rst b/docs/advanced_usage.rst index 4723ff4b7..844a73817 100644 --- a/docs/advanced_usage.rst +++ b/docs/advanced_usage.rst @@ -364,3 +364,25 @@ For simplicity, `By Year` defines the same structure as in the previous example If you adjust the format of an existing storage path, old documents don't get relocated automatically. You need to run the :ref:`document renamer ` to adjust their pathes. + +.. _advanced-celery-monitoring: + +Celery Monitoring +################# + +The monitoring tool `Flower `_ can be used to view more +detailed information about the health of the celery workers used for asynchronous tasks. This includes details +on currently running, queued and completed tasks, timing and more. Flower can also be used with Prometheus, as it +exports metrics. For details on its capabilities, refer to the Flower documentation. + +To configure Flower further, create a `flowerconfig.py` and place it into the `src/paperless` directory. For +a Docker installation, you can use volumes to accomplish this: + +.. code:: yaml + + services: + # ... + webserver: + # ... + volumes: + - /path/to/my/flowerconfig.py:/usr/src/paperless/src/paperless/flowerconfig.py:ro diff --git a/docs/configuration.rst b/docs/configuration.rst index 5824f69f9..a2d3176d9 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -705,6 +705,17 @@ PAPERLESS_CONSUMER_ENABLE_BARCODES= Defaults to false. +PAPERLESS_CONSUMER_USE_LEGACY_DETECTION= + Enables the legacy method of detecting barcodes. By default, images are + extracted directly from the PDF structure for barcode detection. If this + configuration value is set, images of the whole PDF page will be used instead. + + This is a slower and more memory intensive process, but may be required for + certain files, depending on how it is produced and how images are encoded. + + Defaults to false. + + PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT= Whether TIFF image files should be scanned for barcodes. This will automatically convert any TIFF image(s) to pdfs for later @@ -905,6 +916,14 @@ PAPERLESS_OCR_LANGUAGES= Defaults to none, which does not install any additional languages. +PAPERLESS_ENABLE_FLOWER= + If this environment variable is defined, the Celery monitoring tool + `Flower `_ will + be started by the container. + + You can read more about this in the :ref:`advanced setup ` + documentation. + .. _configuration-update-checking: diff --git a/src-ui/messages.xlf b/src-ui/messages.xlf index 3b037ff25..8260dff93 100644 --- a/src-ui/messages.xlf +++ b/src-ui/messages.xlf @@ -305,8 +305,8 @@ 122 - - Drag-and-drop documents here to start uploading or place them in the consume folder. You can also drag-and-drop documents anywhere on all other pages of the web app. Once you do, Paperless-ngx will start training it's machine learning algorithms. + + Drag-and-drop documents here to start uploading or place them in the consume folder. You can also drag-and-drop documents anywhere on all other pages of the web app. Once you do, Paperless-ngx will start training its machine learning algorithms. src/app/app.component.ts 129 @@ -2507,14 +2507,14 @@ View "" saved successfully. src/app/components/document-list/document-list.component.ts - 176 + 170 View "" created successfully. src/app/components/document-list/document-list.component.ts - 206 + 210 diff --git a/src-ui/src/app/app.component.ts b/src-ui/src/app/app.component.ts index 4ab48f32c..1d1280eaa 100644 --- a/src-ui/src/app/app.component.ts +++ b/src-ui/src/app/app.component.ts @@ -126,7 +126,7 @@ export class AppComponent implements OnInit, OnDestroy { }, { anchorId: 'tour.upload-widget', - content: $localize`Drag-and-drop documents here to start uploading or place them in the consume folder. You can also drag-and-drop documents anywhere on all other pages of the web app. Once you do, Paperless-ngx will start training it's machine learning algorithms.`, + content: $localize`Drag-and-drop documents here to start uploading or place them in the consume folder. You can also drag-and-drop documents anywhere on all other pages of the web app. Once you do, Paperless-ngx will start training its machine learning algorithms.`, route: '/dashboard', enableBackdrop: true, }, diff --git a/src-ui/src/app/components/manage/tasks/tasks.component.html b/src-ui/src/app/components/manage/tasks/tasks.component.html index 961b8b091..08c065247 100644 --- a/src-ui/src/app/components/manage/tasks/tasks.component.html +++ b/src-ui/src/app/components/manage/tasks/tasks.component.html @@ -53,7 +53,7 @@ - {{ task.name }} + {{ task.task_file_name }} {{ task.date_created | customDate:'short' }}
Tuple[Optional[str], Lis if mime_type == "image/tiff": pdf_filepath = convert_from_tiff_to_pdf(filepath) - try: - _pikepdf_barcode_scan(pdf_filepath) - except Exception as e: - - logger.warning( - f"Exception using pikepdf for barcodes, falling back to pdf2image: {e}", - ) - # Reset this incase pikepdf got part way through - separator_page_numbers = [] + if settings.CONSUMER_USE_LEGACY_DETECTION: _pdf2image_barcode_scan(pdf_filepath) + else: + try: + _pikepdf_barcode_scan(pdf_filepath) + except Exception as e: + + logger.warning( + f"Exception using pikepdf for barcodes," + f" falling back to pdf2image: {e}", + ) + # Reset this incase pikepdf got part way through + separator_page_numbers = [] + _pdf2image_barcode_scan(pdf_filepath) else: logger.warning( diff --git a/src/documents/migrations/1027_remove_paperlesstask_attempted_task_and_more.py b/src/documents/migrations/1027_remove_paperlesstask_attempted_task_and_more.py new file mode 100644 index 000000000..fc8ff8ec2 --- /dev/null +++ b/src/documents/migrations/1027_remove_paperlesstask_attempted_task_and_more.py @@ -0,0 +1,134 @@ +# Generated by Django 4.1.2 on 2022-10-17 16:31 + +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ("documents", "1026_transition_to_celery"), + ] + + operations = [ + migrations.RemoveField( + model_name="paperlesstask", + name="attempted_task", + ), + migrations.AddField( + model_name="paperlesstask", + name="date_created", + field=models.DateTimeField( + default=django.utils.timezone.now, + help_text="Datetime field when the task result was created in UTC", + null=True, + verbose_name="Created DateTime", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="date_done", + field=models.DateTimeField( + default=None, + help_text="Datetime field when the task was completed in UTC", + null=True, + verbose_name="Completed DateTime", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="date_started", + field=models.DateTimeField( + default=None, + help_text="Datetime field when the task was started in UTC", + null=True, + verbose_name="Started DateTime", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="result", + field=models.TextField( + default=None, + help_text="The data returned by the task", + null=True, + verbose_name="Result Data", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="status", + field=models.CharField( + choices=[ + ("FAILURE", "FAILURE"), + ("PENDING", "PENDING"), + ("RECEIVED", "RECEIVED"), + ("RETRY", "RETRY"), + ("REVOKED", "REVOKED"), + ("STARTED", "STARTED"), + ("SUCCESS", "SUCCESS"), + ], + default="PENDING", + help_text="Current state of the task being run", + max_length=30, + verbose_name="Task State", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="task_args", + field=models.JSONField( + help_text="JSON representation of the positional arguments used with the task", + null=True, + verbose_name="Task Positional Arguments", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="task_file_name", + field=models.CharField( + help_text="Name of the file which the Task was run for", + max_length=255, + null=True, + verbose_name="Task Name", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="task_kwargs", + field=models.JSONField( + help_text="JSON representation of the named arguments used with the task", + null=True, + verbose_name="Task Named Arguments", + ), + ), + migrations.AddField( + model_name="paperlesstask", + name="task_name", + field=models.CharField( + help_text="Name of the Task which was run", + max_length=255, + null=True, + verbose_name="Task Name", + ), + ), + migrations.AlterField( + model_name="paperlesstask", + name="acknowledged", + field=models.BooleanField( + default=False, + help_text="If the task is acknowledged via the frontend or API", + verbose_name="Acknowledged", + ), + ), + migrations.AlterField( + model_name="paperlesstask", + name="task_id", + field=models.CharField( + help_text="Celery ID for the Task that was run", + max_length=255, + unique=True, + verbose_name="Task ID", + ), + ), + ] diff --git a/src/documents/models.py b/src/documents/models.py index 5a84c467b..c1b9c88bc 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -7,14 +7,17 @@ from typing import Optional import dateutil.parser import pathvalidate +from celery import states from django.conf import settings from django.contrib.auth.models import User from django.db import models from django.utils import timezone from django.utils.translation import gettext_lazy as _ -from django_celery_results.models import TaskResult from documents.parsers import get_default_file_extension +ALL_STATES = sorted(states.ALL_STATES) +TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES)) + class MatchingModel(models.Model): @@ -527,15 +530,79 @@ class UiSettings(models.Model): class PaperlessTask(models.Model): - task_id = models.CharField(max_length=128) - acknowledged = models.BooleanField(default=False) + task_id = models.CharField( + max_length=255, + unique=True, + verbose_name=_("Task ID"), + help_text=_("Celery ID for the Task that was run"), + ) - attempted_task = models.OneToOneField( - TaskResult, - on_delete=models.CASCADE, - related_name="attempted_task", + acknowledged = models.BooleanField( + default=False, + verbose_name=_("Acknowledged"), + help_text=_("If the task is acknowledged via the frontend or API"), + ) + + task_file_name = models.CharField( null=True, - blank=True, + max_length=255, + verbose_name=_("Task Name"), + help_text=_("Name of the file which the Task was run for"), + ) + + task_name = models.CharField( + null=True, + max_length=255, + verbose_name=_("Task Name"), + help_text=_("Name of the Task which was run"), + ) + + task_args = models.JSONField( + null=True, + verbose_name=_("Task Positional Arguments"), + help_text=_( + "JSON representation of the positional arguments used with the task", + ), + ) + task_kwargs = models.JSONField( + null=True, + verbose_name=_("Task Named Arguments"), + help_text=_( + "JSON representation of the named arguments used with the task", + ), + ) + status = models.CharField( + max_length=30, + default=states.PENDING, + choices=TASK_STATE_CHOICES, + verbose_name=_("Task State"), + help_text=_("Current state of the task being run"), + ) + date_created = models.DateTimeField( + null=True, + default=timezone.now, + verbose_name=_("Created DateTime"), + help_text=_("Datetime field when the task result was created in UTC"), + ) + date_started = models.DateTimeField( + null=True, + default=None, + verbose_name=_("Started DateTime"), + help_text=_("Datetime field when the task was started in UTC"), + ) + date_done = models.DateTimeField( + null=True, + default=None, + verbose_name=_("Completed DateTime"), + help_text=_("Datetime field when the task was completed in UTC"), + ) + result = models.TextField( + null=True, + default=None, + verbose_name=_("Result Data"), + help_text=_( + "The data returned by the task", + ), ) diff --git a/src/documents/serialisers.py b/src/documents/serialisers.py index 5f59d2c17..db282cacd 100644 --- a/src/documents/serialisers.py +++ b/src/documents/serialisers.py @@ -1,12 +1,6 @@ import datetime import math import re -from ast import literal_eval -from asyncio.log import logger -from pathlib import Path -from typing import Dict -from typing import Optional -from typing import Tuple from celery import states @@ -640,14 +634,13 @@ class TasksViewSerializer(serializers.ModelSerializer): fields = ( "id", "task_id", + "task_file_name", "date_created", "date_done", "type", "status", "result", "acknowledged", - "task_name", - "name", "related_document", ) @@ -657,108 +650,14 @@ class TasksViewSerializer(serializers.ModelSerializer): # just file tasks, for now return "file" - result = serializers.SerializerMethodField() - - def get_result(self, obj): - result = "" - if ( - hasattr(obj, "attempted_task") - and obj.attempted_task - and obj.attempted_task.result - ): - try: - result: str = obj.attempted_task.result - if "exc_message" in result: - # This is a dict in this case - result: Dict = literal_eval(result) - # This is a list, grab the first item (most recent) - result = result["exc_message"][0] - except Exception as e: # pragma: no cover - # Extra security if something is malformed - logger.warn(f"Error getting task result: {e}", exc_info=True) - return result - - status = serializers.SerializerMethodField() - - def get_status(self, obj): - result = "unknown" - if hasattr(obj, "attempted_task") and obj.attempted_task: - result = obj.attempted_task.status - return result - - date_created = serializers.SerializerMethodField() - - def get_date_created(self, obj): - result = "" - if hasattr(obj, "attempted_task") and obj.attempted_task: - result = obj.attempted_task.date_created - return result - - date_done = serializers.SerializerMethodField() - - def get_date_done(self, obj): - result = "" - if hasattr(obj, "attempted_task") and obj.attempted_task: - result = obj.attempted_task.date_done - return result - - task_id = serializers.SerializerMethodField() - - def get_task_id(self, obj): - result = "" - if hasattr(obj, "attempted_task") and obj.attempted_task: - result = obj.attempted_task.task_id - return result - - task_name = serializers.SerializerMethodField() - - def get_task_name(self, obj): - result = "" - if hasattr(obj, "attempted_task") and obj.attempted_task: - result = obj.attempted_task.task_name - return result - - name = serializers.SerializerMethodField() - - def get_name(self, obj): - result = "" - if hasattr(obj, "attempted_task") and obj.attempted_task: - try: - task_kwargs: Optional[str] = obj.attempted_task.task_kwargs - # Try the override filename first (this is a webui created task?) - if task_kwargs is not None: - # It's a string, string of a dict. Who knows why... - kwargs = literal_eval(literal_eval(task_kwargs)) - if "override_filename" in kwargs: - result = kwargs["override_filename"] - - # Nothing was found, report the task first argument - if not len(result): - # There are always some arguments to the consume - task_args: Tuple = literal_eval( - literal_eval(obj.attempted_task.task_args), - ) - filepath = Path(task_args[0]) - result = filepath.name - except Exception as e: # pragma: no cover - # Extra security if something is malformed - logger.warning(f"Error getting file name from task: {e}", exc_info=True) - - return result - related_document = serializers.SerializerMethodField() + related_doc_re = re.compile(r"New document id (\d+) created") def get_related_document(self, obj): - result = "" - regexp = r"New document id (\d+) created" - if ( - hasattr(obj, "attempted_task") - and obj.attempted_task - and obj.attempted_task.result - and obj.attempted_task.status == states.SUCCESS - ): + result = None + if obj.status is not None and obj.status == states.SUCCESS: try: - result = re.search(regexp, obj.attempted_task.result).group(1) + result = self.related_doc_re.search(obj.result).group(1) except Exception: pass diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index 76ae974c6..1b180626a 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -1,7 +1,13 @@ import logging import os import shutil +from ast import literal_eval +from pathlib import Path +from celery import states +from celery.signals import before_task_publish +from celery.signals import task_postrun +from celery.signals import task_prerun from django.conf import settings from django.contrib.admin.models import ADDITION from django.contrib.admin.models import LogEntry @@ -13,7 +19,6 @@ from django.db.models import Q from django.dispatch import receiver from django.utils import termcolors from django.utils import timezone -from django_celery_results.models import TaskResult from filelock import FileLock from .. import matching @@ -502,19 +507,94 @@ def add_to_index(sender, document, **kwargs): index.add_or_update_document(document) -@receiver(models.signals.post_save, sender=TaskResult) -def update_paperless_task(sender, instance: TaskResult, **kwargs): +@before_task_publish.connect +def before_task_publish_handler(sender=None, headers=None, body=None, **kwargs): + """ + Creates the PaperlessTask object in a pending state. This is sent before + the task reaches the broker, but + + https://docs.celeryq.dev/en/stable/userguide/signals.html#before-task-publish + + """ + if "task" not in headers or headers["task"] != "documents.tasks.consume_file": + # Assumption: this is only ever a v2 message + return + try: - if instance.task_name == "documents.tasks.consume_file": - paperless_task, _ = PaperlessTask.objects.get_or_create( - task_id=instance.task_id, - ) - paperless_task.name = instance.task_name - paperless_task.created = instance.date_created - paperless_task.completed = instance.date_done - paperless_task.attempted_task = instance - paperless_task.save() - except Exception as e: + task_file_name = "" + if headers["kwargsrepr"] is not None: + task_kwargs = literal_eval(headers["kwargsrepr"]) + if "override_filename" in task_kwargs: + task_file_name = task_kwargs["override_filename"] + else: + task_kwargs = None + + task_args = literal_eval(headers["argsrepr"]) + + # Nothing was found, report the task first argument + if not len(task_file_name): + # There are always some arguments to the consume, first is always filename + filepath = Path(task_args[0]) + task_file_name = filepath.name + + PaperlessTask.objects.create( + task_id=headers["id"], + status=states.PENDING, + task_file_name=task_file_name, + task_name=headers["task"], + task_args=task_args, + task_kwargs=task_kwargs, + result=None, + date_created=timezone.now(), + date_started=None, + date_done=None, + ) + except Exception as e: # pragma: no cover # Don't let an exception in the signal handlers prevent # a document from being consumed. logger.error(f"Creating PaperlessTask failed: {e}") + + +@task_prerun.connect +def task_prerun_handler(sender=None, task_id=None, task=None, **kwargs): + """ + + Updates the PaperlessTask to be started. Sent before the task begins execution + on a worker. + + https://docs.celeryq.dev/en/stable/userguide/signals.html#task-prerun + """ + try: + task_instance = PaperlessTask.objects.filter(task_id=task_id).first() + + if task_instance is not None: + task_instance.status = states.STARTED + task_instance.date_started = timezone.now() + task_instance.save() + except Exception as e: # pragma: no cover + # Don't let an exception in the signal handlers prevent + # a document from being consumed. + logger.error(f"Setting PaperlessTask started failed: {e}") + + +@task_postrun.connect +def task_postrun_handler( + sender=None, task_id=None, task=None, retval=None, state=None, **kwargs +): + """ + Updates the result of the PaperlessTask. + + https://docs.celeryq.dev/en/stable/userguide/signals.html#task-postrun + """ + try: + task_instance = PaperlessTask.objects.filter(task_id=task_id).first() + + if task_instance is not None: + task_instance.status = state + task_instance.result = retval + task_instance.date_done = timezone.now() + task_instance.save() + except Exception as e: # pragma: no cover + # Don't let an exception in the signal handlers prevent + # a document from being consumed. + logger.error(f"Updating PaperlessTask failed: {e}") diff --git a/src/documents/tests/test_api.py b/src/documents/tests/test_api.py index 0a8d72155..d876984bd 100644 --- a/src/documents/tests/test_api.py +++ b/src/documents/tests/test_api.py @@ -32,7 +32,6 @@ from documents.models import PaperlessTask from documents.models import SavedView from documents.models import StoragePath from documents.models import Tag -from django_celery_results.models import TaskResult from documents.models import Comment from documents.models import StoragePath from documents.tests.utils import DirectoriesMixin @@ -2756,19 +2755,16 @@ class TestTasks(APITestCase): THEN: - Attempting and pending tasks are serialized and provided """ - result1 = TaskResult.objects.create( - task_id=str(uuid.uuid4()), - task_name="documents.tasks.some_great_task", - status=celery.states.PENDING, - ) - PaperlessTask.objects.create(attempted_task=result1) - result2 = TaskResult.objects.create( + task1 = PaperlessTask.objects.create( task_id=str(uuid.uuid4()), - task_name="documents.tasks.some_awesome_task", - status=celery.states.STARTED, + task_file_name="task_one.pdf", + ) + + task2 = PaperlessTask.objects.create( + task_id=str(uuid.uuid4()), + task_file_name="task_two.pdf", ) - PaperlessTask.objects.create(attempted_task=result2) response = self.client.get(self.ENDPOINT) @@ -2777,13 +2773,18 @@ class TestTasks(APITestCase): returned_task1 = response.data[1] returned_task2 = response.data[0] - self.assertEqual(returned_task1["task_id"], result1.task_id) - self.assertEqual(returned_task1["status"], celery.states.PENDING) - self.assertEqual(returned_task1["task_name"], result1.task_name) + from pprint import pprint - self.assertEqual(returned_task2["task_id"], result2.task_id) - self.assertEqual(returned_task2["status"], celery.states.STARTED) - self.assertEqual(returned_task2["task_name"], result2.task_name) + pprint(returned_task1) + pprint(returned_task2) + + self.assertEqual(returned_task1["task_id"], task1.task_id) + self.assertEqual(returned_task1["status"], celery.states.PENDING) + self.assertEqual(returned_task1["task_file_name"], task1.task_file_name) + + self.assertEqual(returned_task2["task_id"], task2.task_id) + self.assertEqual(returned_task2["status"], celery.states.PENDING) + self.assertEqual(returned_task2["task_file_name"], task2.task_file_name) def test_acknowledge_tasks(self): """ @@ -2794,12 +2795,10 @@ class TestTasks(APITestCase): THEN: - Task is marked as acknowledged """ - result1 = TaskResult.objects.create( + task = PaperlessTask.objects.create( task_id=str(uuid.uuid4()), - task_name="documents.tasks.some_task", - status=celery.states.PENDING, + task_file_name="task_one.pdf", ) - task = PaperlessTask.objects.create(attempted_task=result1) response = self.client.get(self.ENDPOINT) self.assertEqual(len(response.data), 1) @@ -2822,13 +2821,12 @@ class TestTasks(APITestCase): THEN: - The returned data includes the task result """ - result1 = TaskResult.objects.create( + task = PaperlessTask.objects.create( task_id=str(uuid.uuid4()), - task_name="documents.tasks.some_task", + task_file_name="task_one.pdf", status=celery.states.SUCCESS, result="Success. New document id 1 created", ) - _ = PaperlessTask.objects.create(attempted_task=result1) response = self.client.get(self.ENDPOINT) @@ -2849,17 +2847,12 @@ class TestTasks(APITestCase): THEN: - The returned result is the exception info """ - result1 = TaskResult.objects.create( + task = PaperlessTask.objects.create( task_id=str(uuid.uuid4()), - task_name="documents.tasks.some_task", - status=celery.states.SUCCESS, - result={ - "exc_type": "ConsumerError", - "exc_message": ["test.pdf: Not consuming test.pdf: It is a duplicate."], - "exc_module": "documents.consumer", - }, + task_file_name="task_one.pdf", + status=celery.states.FAILURE, + result="test.pdf: Not consuming test.pdf: It is a duplicate.", ) - _ = PaperlessTask.objects.create(attempted_task=result1) response = self.client.get(self.ENDPOINT) @@ -2883,14 +2876,22 @@ class TestTasks(APITestCase): THEN: - Returned data include the filename """ - result1 = TaskResult.objects.create( + task = PaperlessTask.objects.create( task_id=str(uuid.uuid4()), + task_file_name="test.pdf", task_name="documents.tasks.some_task", status=celery.states.SUCCESS, - task_args="\"('/tmp/paperless/paperless-upload-5iq7skzc',)\"", - task_kwargs="\"{'override_filename': 'test.pdf', 'override_title': None, 'override_correspondent_id': None, 'override_document_type_id': None, 'override_tag_ids': None, 'task_id': '466e8fe7-7193-4698-9fff-72f0340e2082', 'override_created': None}\"", + task_args=("/tmp/paperless/paperless-upload-5iq7skzc",), + task_kwargs={ + "override_filename": "test.pdf", + "override_title": None, + "override_correspondent_id": None, + "override_document_type_id": None, + "override_tag_ids": None, + "task_id": "466e8fe7-7193-4698-9fff-72f0340e2082", + "override_created": None, + }, ) - _ = PaperlessTask.objects.create(attempted_task=result1) response = self.client.get(self.ENDPOINT) @@ -2899,7 +2900,7 @@ class TestTasks(APITestCase): returned_data = response.data[0] - self.assertEqual(returned_data["name"], "test.pdf") + self.assertEqual(returned_data["task_file_name"], "test.pdf") def test_task_name_consume_folder(self): """ @@ -2911,14 +2912,14 @@ class TestTasks(APITestCase): THEN: - Returned data include the filename """ - result1 = TaskResult.objects.create( + task = PaperlessTask.objects.create( task_id=str(uuid.uuid4()), + task_file_name="anothertest.pdf", task_name="documents.tasks.some_task", status=celery.states.SUCCESS, - task_args="\"('/consume/anothertest.pdf',)\"", - task_kwargs="\"{'override_tag_ids': None}\"", + task_args=("/consume/anothertest.pdf",), + task_kwargs={"override_tag_ids": None}, ) - _ = PaperlessTask.objects.create(attempted_task=result1) response = self.client.get(self.ENDPOINT) @@ -2927,4 +2928,4 @@ class TestTasks(APITestCase): returned_data = response.data[0] - self.assertEqual(returned_data["name"], "anothertest.pdf") + self.assertEqual(returned_data["task_file_name"], "anothertest.pdf") diff --git a/src/documents/tests/test_barcodes.py b/src/documents/tests/test_barcodes.py index ee8df9f34..1c4ab7cc3 100644 --- a/src/documents/tests/test_barcodes.py +++ b/src/documents/tests/test_barcodes.py @@ -468,6 +468,41 @@ class TestBarcode(DirectoriesMixin, TestCase): self.assertTrue(os.path.isfile(target_file1)) self.assertTrue(os.path.isfile(target_file2)) + @override_settings(CONSUMER_USE_LEGACY_DETECTION=True) + def test_barcode_splitter_legacy_fallback(self): + """ + GIVEN: + - File containing barcode + - Legacy method of detection is enabled + WHEN: + - File is scanned for barcodes + THEN: + - Barcodes are properly detected + """ + test_file = os.path.join( + self.BARCODE_SAMPLE_DIR, + "patch-code-t-middle.pdf", + ) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + + pdf_file, separator_page_numbers = barcodes.scan_file_for_separating_barcodes( + test_file, + ) + + self.assertEqual(test_file, pdf_file) + self.assertTrue(len(separator_page_numbers) > 0) + + document_list = barcodes.separate_pages(test_file, separator_page_numbers) + self.assertTrue(document_list) + for document in document_list: + barcodes.save_to_dir(document, target_dir=tempdir) + + target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf") + target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf") + + self.assertTrue(os.path.isfile(target_file1)) + self.assertTrue(os.path.isfile(target_file2)) + @override_settings(CONSUMER_ENABLE_BARCODES=True) def test_consume_barcode_file(self): test_file = os.path.join( diff --git a/src/documents/tests/test_task_signals.py b/src/documents/tests/test_task_signals.py new file mode 100644 index 000000000..8aafc1f12 --- /dev/null +++ b/src/documents/tests/test_task_signals.py @@ -0,0 +1,126 @@ +import celery +from django.test import TestCase +from documents.models import PaperlessTask +from documents.signals.handlers import before_task_publish_handler +from documents.signals.handlers import task_postrun_handler +from documents.signals.handlers import task_prerun_handler +from documents.tests.utils import DirectoriesMixin + + +class TestTaskSignalHandler(DirectoriesMixin, TestCase): + + HEADERS_CONSUME = { + "lang": "py", + "task": "documents.tasks.consume_file", + "id": "52d31e24-9dcc-4c32-9e16-76007e9add5e", + "shadow": None, + "eta": None, + "expires": None, + "group": None, + "group_index": None, + "retries": 0, + "timelimit": [None, None], + "root_id": "52d31e24-9dcc-4c32-9e16-76007e9add5e", + "parent_id": None, + "argsrepr": "('/consume/hello-999.pdf',)", + "kwargsrepr": "{'override_tag_ids': None}", + "origin": "gen260@paperless-ngx-dev-webserver", + "ignore_result": False, + } + + HEADERS_WEB_UI = { + "lang": "py", + "task": "documents.tasks.consume_file", + "id": "6e88a41c-e5f8-4631-9972-68c314512498", + "shadow": None, + "eta": None, + "expires": None, + "group": None, + "group_index": None, + "retries": 0, + "timelimit": [None, None], + "root_id": "6e88a41c-e5f8-4631-9972-68c314512498", + "parent_id": None, + "argsrepr": "('/tmp/paperless/paperless-upload-st9lmbvx',)", + "kwargsrepr": "{'override_filename': 'statement.pdf', 'override_title': None, 'override_correspondent_id': None, 'override_document_type_id': None, 'override_tag_ids': None, 'task_id': 'f5622ca9-3707-4ed0-b418-9680b912572f', 'override_created': None}", + "origin": "gen342@paperless-ngx-dev-webserver", + "ignore_result": False, + } + + def util_call_before_task_publish_handler(self, headers_to_use): + self.assertEqual(PaperlessTask.objects.all().count(), 0) + + before_task_publish_handler(headers=headers_to_use) + + self.assertEqual(PaperlessTask.objects.all().count(), 1) + + def test_before_task_publish_handler_consume(self): + """ + GIVEN: + - A celery task completed with an exception + WHEN: + - API call is made to get tasks + THEN: + - The returned result is the exception info + """ + self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME) + + task = PaperlessTask.objects.get() + self.assertIsNotNone(task) + self.assertEqual(self.HEADERS_CONSUME["id"], task.task_id) + self.assertListEqual(["/consume/hello-999.pdf"], task.task_args) + self.assertDictEqual({"override_tag_ids": None}, task.task_kwargs) + self.assertEqual("hello-999.pdf", task.task_file_name) + self.assertEqual("documents.tasks.consume_file", task.task_name) + self.assertEqual(celery.states.PENDING, task.status) + + def test_before_task_publish_handler_webui(self): + + self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_WEB_UI) + + task = PaperlessTask.objects.get() + + self.assertIsNotNone(task) + + self.assertEqual(self.HEADERS_WEB_UI["id"], task.task_id) + self.assertListEqual( + ["/tmp/paperless/paperless-upload-st9lmbvx"], + task.task_args, + ) + self.assertDictEqual( + { + "override_filename": "statement.pdf", + "override_title": None, + "override_correspondent_id": None, + "override_document_type_id": None, + "override_tag_ids": None, + "task_id": "f5622ca9-3707-4ed0-b418-9680b912572f", + "override_created": None, + }, + task.task_kwargs, + ) + self.assertEqual("statement.pdf", task.task_file_name) + self.assertEqual("documents.tasks.consume_file", task.task_name) + self.assertEqual(celery.states.PENDING, task.status) + + def test_task_prerun_handler(self): + self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME) + + task_prerun_handler(task_id=self.HEADERS_CONSUME["id"]) + + task = PaperlessTask.objects.get() + + self.assertEqual(celery.states.STARTED, task.status) + + def test_task_postrun_handler(self): + self.util_call_before_task_publish_handler(headers_to_use=self.HEADERS_CONSUME) + + task_postrun_handler( + task_id=self.HEADERS_CONSUME["id"], + retval="Success. New document id 1 created", + state=celery.states.SUCCESS, + ) + + task = PaperlessTask.objects.get() + + self.assertEqual(celery.states.SUCCESS, task.status) diff --git a/src/documents/views.py b/src/documents/views.py index 025ff2f67..10225be6f 100644 --- a/src/documents/views.py +++ b/src/documents/views.py @@ -886,9 +886,8 @@ class TasksViewSet(ReadOnlyModelViewSet): queryset = ( PaperlessTask.objects.filter( acknowledged=False, - attempted_task__isnull=False, ) - .order_by("attempted_task__date_created") + .order_by("date_created") .reverse() ) diff --git a/src/paperless/settings.py b/src/paperless/settings.py index a262bd501..1fb6ba913 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -558,15 +558,23 @@ CONSUMER_IGNORE_PATTERNS = list( CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS") -CONSUMER_ENABLE_BARCODES = __get_boolean( +CONSUMER_ENABLE_BARCODES: Final[bool] = __get_boolean( "PAPERLESS_CONSUMER_ENABLE_BARCODES", ) -CONSUMER_BARCODE_TIFF_SUPPORT = __get_boolean( +CONSUMER_BARCODE_TIFF_SUPPORT: Final[bool] = __get_boolean( "PAPERLESS_CONSUMER_BARCODE_TIFF_SUPPORT", ) -CONSUMER_BARCODE_STRING = os.getenv("PAPERLESS_CONSUMER_BARCODE_STRING", "PATCHT") +CONSUMER_USE_LEGACY_DETECTION: Final[bool] = __get_boolean( + "PAPERLESS_CONSUMER_USE_LEGACY_DETECTION", + "NO", +) + +CONSUMER_BARCODE_STRING: Final[str] = os.getenv( + "PAPERLESS_CONSUMER_BARCODE_STRING", + "PATCHT", +) OCR_PAGES = int(os.getenv("PAPERLESS_OCR_PAGES", 0))