From 8f8a99a645412160fcd0264d0362cd69c40da159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Heuer?= Date: Sun, 20 Aug 2023 20:55:18 +0200 Subject: [PATCH 1/3] Added task id to pre/post consume script as env --- src/documents/consumer.py | 10 ++++---- src/documents/tasks.py | 4 +++- src/documents/tests/test_consumer.py | 34 ++++++++++++++++++++-------- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 0ec6090c2..fa756a3ce 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -186,7 +186,7 @@ class Consumer(LoggingMixin): f"Not consuming {self.filename}: Given ASN already exists!", ) - def run_pre_consume_script(self): + def run_pre_consume_script(self, task_id): """ If one is configured and exists, run the pre-consume script and handle its output and/or errors @@ -209,6 +209,7 @@ class Consumer(LoggingMixin): script_env = os.environ.copy() script_env["DOCUMENT_SOURCE_PATH"] = original_file_path script_env["DOCUMENT_WORKING_PATH"] = working_file_path + script_env["TASK_ID"] = task_id try: completed_proc = run( @@ -233,7 +234,7 @@ class Consumer(LoggingMixin): exception=e, ) - def run_post_consume_script(self, document: Document): + def run_post_consume_script(self, document: Document, task_id): """ If one is configured and exists, run the pre-consume script and handle its output and/or errors @@ -279,6 +280,7 @@ class Consumer(LoggingMixin): ",".join(document.tags.all().values_list("name", flat=True)), ) script_env["DOCUMENT_ORIGINAL_FILENAME"] = str(document.original_filename) + script_env["TASK_ID"] = task_id try: completed_proc = run( @@ -388,7 +390,7 @@ class Consumer(LoggingMixin): logging_group=self.logging_group, ) - self.run_pre_consume_script() + self.run_pre_consume_script(task_id=self.task_id) def progress_callback(current_progress, max_progress): # pragma: no cover # recalculate progress to be within 20 and 80 @@ -553,7 +555,7 @@ class Consumer(LoggingMixin): document_parser.cleanup() tempdir.cleanup() - self.run_post_consume_script(document) + self.run_post_consume_script(document, task_id=self.task_id) self.log.info(f"Document {document} consumption finished") diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 2dbc9d6eb..7d73f852a 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -91,8 +91,9 @@ def train_classifier(): logger.warning("Classifier error: " + str(e)) -@shared_task +@shared_task(bind=True) def consume_file( + self, input_doc: ConsumableDocument, overrides: Optional[DocumentMetadataOverrides] = None, ): @@ -163,6 +164,7 @@ def consume_file( override_created=overrides.created, override_asn=overrides.asn, override_owner_id=overrides.owner_id, + task_id=self.request.id, ) if document: diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index a8f427c37..3285104b3 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -4,6 +4,7 @@ import re import shutil import stat import tempfile +import uuid from unittest import mock from unittest.mock import MagicMock @@ -802,7 +803,7 @@ class PreConsumeTestCase(TestCase): def test_no_pre_consume_script(self, m): c = Consumer() c.path = "path-to-file" - c.run_pre_consume_script() + c.run_pre_consume_script(str(uuid.uuid4())) m.assert_not_called() @mock.patch("documents.consumer.run") @@ -812,7 +813,7 @@ class PreConsumeTestCase(TestCase): c = Consumer() c.filename = "somefile.pdf" c.path = "path-to-file" - self.assertRaises(ConsumerError, c.run_pre_consume_script) + self.assertRaises(ConsumerError, c.run_pre_consume_script, str(uuid.uuid4())) @mock.patch("documents.consumer.run") def test_pre_consume_script(self, m): @@ -821,7 +822,8 @@ class PreConsumeTestCase(TestCase): c = Consumer() c.original_path = "path-to-file" c.path = "/tmp/somewhere/path-to-file" - c.run_pre_consume_script() + task_id = str(uuid.uuid4()) + c.run_pre_consume_script(task_id) m.assert_called_once() @@ -836,6 +838,7 @@ class PreConsumeTestCase(TestCase): subset = { "DOCUMENT_SOURCE_PATH": c.original_path, "DOCUMENT_WORKING_PATH": c.path, + "TASK_ID": task_id, } self.assertDictEqual(environment, {**environment, **subset}) @@ -864,7 +867,7 @@ class PreConsumeTestCase(TestCase): c = Consumer() c.path = "path-to-file" - c.run_pre_consume_script() + c.run_pre_consume_script(str(uuid.uuid4())) self.assertIn( "INFO:paperless.consumer:This message goes to stdout", cm.output, @@ -896,7 +899,11 @@ class PreConsumeTestCase(TestCase): with override_settings(PRE_CONSUME_SCRIPT=script.name): c = Consumer() c.path = "path-to-file" - self.assertRaises(ConsumerError, c.run_pre_consume_script) + self.assertRaises( + ConsumerError, + c.run_pre_consume_script, + str(uuid.uuid4()), + ) class PostConsumeTestCase(TestCase): @@ -917,7 +924,7 @@ class PostConsumeTestCase(TestCase): doc.tags.add(tag1) doc.tags.add(tag2) - Consumer().run_post_consume_script(doc) + Consumer().run_post_consume_script(doc, str(uuid.uuid4())) m.assert_not_called() @@ -927,7 +934,12 @@ class PostConsumeTestCase(TestCase): doc = Document.objects.create(title="Test", mime_type="application/pdf") c = Consumer() c.filename = "somefile.pdf" - self.assertRaises(ConsumerError, c.run_post_consume_script, doc) + self.assertRaises( + ConsumerError, + c.run_post_consume_script, + doc, + str(uuid.uuid4()), + ) @mock.patch("documents.consumer.run") def test_post_consume_script_simple(self, m): @@ -935,7 +947,7 @@ class PostConsumeTestCase(TestCase): with override_settings(POST_CONSUME_SCRIPT=script.name): doc = Document.objects.create(title="Test", mime_type="application/pdf") - Consumer().run_post_consume_script(doc) + Consumer().run_post_consume_script(doc, str(uuid.uuid4())) m.assert_called_once() @@ -953,8 +965,9 @@ class PostConsumeTestCase(TestCase): tag2 = Tag.objects.create(name="b") doc.tags.add(tag1) doc.tags.add(tag2) + task_id = str(uuid.uuid4()) - Consumer().run_post_consume_script(doc) + Consumer().run_post_consume_script(doc, task_id) m.assert_called_once() @@ -976,6 +989,7 @@ class PostConsumeTestCase(TestCase): "DOCUMENT_THUMBNAIL_URL": f"/api/documents/{doc.pk}/thumb/", "DOCUMENT_CORRESPONDENT": "my_bank", "DOCUMENT_TAGS": "a,b", + "TASK_ID": task_id, } self.assertDictEqual(environment, {**environment, **subset}) @@ -1004,4 +1018,4 @@ class PostConsumeTestCase(TestCase): doc = Document.objects.create(title="Test", mime_type="application/pdf") c.path = "path-to-file" with self.assertRaises(ConsumerError): - c.run_post_consume_script(doc) + c.run_post_consume_script(doc, str(uuid.uuid4())) From 88ee3bdb6d0728f0a27508af07aaa8c4e07ed931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Heuer?= Date: Mon, 21 Aug 2023 13:33:34 +0200 Subject: [PATCH 2/3] Removed parameter, added documentation --- docs/advanced_usage.md | 32 +++++++++++++++------------- src/documents/consumer.py | 12 +++++------ src/documents/tests/test_consumer.py | 27 +++++++++++------------ 3 files changed, 36 insertions(+), 35 deletions(-) diff --git a/docs/advanced_usage.md b/docs/advanced_usage.md index 957d5287e..e279e48af 100644 --- a/docs/advanced_usage.md +++ b/docs/advanced_usage.md @@ -126,6 +126,7 @@ script can access the following relevant environment variables set: | ----------------------- | ------------------------------------------------------------ | | `DOCUMENT_SOURCE_PATH` | Original path of the consumed document | | `DOCUMENT_WORKING_PATH` | Path to a copy of the original that consumption will work on | +| `TASK_ID` | UUID of the task used to process the new document (if any) | !!! note @@ -168,21 +169,22 @@ Executed after the consumer has successfully processed a document and has moved it into paperless. It receives the following environment variables: -| Environment Variable | Description | -| ---------------------------- | --------------------------------------------- | -| `DOCUMENT_ID` | Database primary key of the document | -| `DOCUMENT_FILE_NAME` | Formatted filename, not including paths | -| `DOCUMENT_CREATED` | Date & time when document created | -| `DOCUMENT_MODIFIED` | Date & time when document was last modified | -| `DOCUMENT_ADDED` | Date & time when document was added | -| `DOCUMENT_SOURCE_PATH` | Path to the original document file | -| `DOCUMENT_ARCHIVE_PATH` | Path to the generate archive file (if any) | -| `DOCUMENT_THUMBNAIL_PATH` | Path to the generated thumbnail | -| `DOCUMENT_DOWNLOAD_URL` | URL for document download | -| `DOCUMENT_THUMBNAIL_URL` | URL for the document thumbnail | -| `DOCUMENT_CORRESPONDENT` | Assigned correspondent (if any) | -| `DOCUMENT_TAGS` | Comma separated list of tags applied (if any) | -| `DOCUMENT_ORIGINAL_FILENAME` | Filename of original document | +| Environment Variable | Description | +| ---------------------------- | ---------------------------------------------- | +| `DOCUMENT_ID` | Database primary key of the document | +| `DOCUMENT_FILE_NAME` | Formatted filename, not including paths | +| `DOCUMENT_CREATED` | Date & time when document created | +| `DOCUMENT_MODIFIED` | Date & time when document was last modified | +| `DOCUMENT_ADDED` | Date & time when document was added | +| `DOCUMENT_SOURCE_PATH` | Path to the original document file | +| `DOCUMENT_ARCHIVE_PATH` | Path to the generate archive file (if any) | +| `DOCUMENT_THUMBNAIL_PATH` | Path to the generated thumbnail | +| `DOCUMENT_DOWNLOAD_URL` | URL for document download | +| `DOCUMENT_THUMBNAIL_URL` | URL for the document thumbnail | +| `DOCUMENT_CORRESPONDENT` | Assigned correspondent (if any) | +| `DOCUMENT_TAGS` | Comma separated list of tags applied (if any) | +| `DOCUMENT_ORIGINAL_FILENAME` | Filename of original document | +| `TASK_ID` | Task UUID used to import the document (if any) | The script can be in any language, A simple shell script example: diff --git a/src/documents/consumer.py b/src/documents/consumer.py index fa756a3ce..c3e8bd056 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -186,7 +186,7 @@ class Consumer(LoggingMixin): f"Not consuming {self.filename}: Given ASN already exists!", ) - def run_pre_consume_script(self, task_id): + def run_pre_consume_script(self): """ If one is configured and exists, run the pre-consume script and handle its output and/or errors @@ -209,7 +209,7 @@ class Consumer(LoggingMixin): script_env = os.environ.copy() script_env["DOCUMENT_SOURCE_PATH"] = original_file_path script_env["DOCUMENT_WORKING_PATH"] = working_file_path - script_env["TASK_ID"] = task_id + script_env["TASK_ID"] = self.task_id or "" try: completed_proc = run( @@ -234,7 +234,7 @@ class Consumer(LoggingMixin): exception=e, ) - def run_post_consume_script(self, document: Document, task_id): + def run_post_consume_script(self, document: Document): """ If one is configured and exists, run the pre-consume script and handle its output and/or errors @@ -280,7 +280,7 @@ class Consumer(LoggingMixin): ",".join(document.tags.all().values_list("name", flat=True)), ) script_env["DOCUMENT_ORIGINAL_FILENAME"] = str(document.original_filename) - script_env["TASK_ID"] = task_id + script_env["TASK_ID"] = self.task_id or "" try: completed_proc = run( @@ -390,7 +390,7 @@ class Consumer(LoggingMixin): logging_group=self.logging_group, ) - self.run_pre_consume_script(task_id=self.task_id) + self.run_pre_consume_script() def progress_callback(current_progress, max_progress): # pragma: no cover # recalculate progress to be within 20 and 80 @@ -555,7 +555,7 @@ class Consumer(LoggingMixin): document_parser.cleanup() tempdir.cleanup() - self.run_post_consume_script(document, task_id=self.task_id) + self.run_post_consume_script(document) self.log.info(f"Document {document} consumption finished") diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 3285104b3..bb4577484 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -803,7 +803,7 @@ class PreConsumeTestCase(TestCase): def test_no_pre_consume_script(self, m): c = Consumer() c.path = "path-to-file" - c.run_pre_consume_script(str(uuid.uuid4())) + c.run_pre_consume_script() m.assert_not_called() @mock.patch("documents.consumer.run") @@ -813,7 +813,7 @@ class PreConsumeTestCase(TestCase): c = Consumer() c.filename = "somefile.pdf" c.path = "path-to-file" - self.assertRaises(ConsumerError, c.run_pre_consume_script, str(uuid.uuid4())) + self.assertRaises(ConsumerError, c.run_pre_consume_script) @mock.patch("documents.consumer.run") def test_pre_consume_script(self, m): @@ -822,8 +822,8 @@ class PreConsumeTestCase(TestCase): c = Consumer() c.original_path = "path-to-file" c.path = "/tmp/somewhere/path-to-file" - task_id = str(uuid.uuid4()) - c.run_pre_consume_script(task_id) + c.task_id = str(uuid.uuid4()) + c.run_pre_consume_script() m.assert_called_once() @@ -838,7 +838,7 @@ class PreConsumeTestCase(TestCase): subset = { "DOCUMENT_SOURCE_PATH": c.original_path, "DOCUMENT_WORKING_PATH": c.path, - "TASK_ID": task_id, + "TASK_ID": c.task_id, } self.assertDictEqual(environment, {**environment, **subset}) @@ -867,7 +867,7 @@ class PreConsumeTestCase(TestCase): c = Consumer() c.path = "path-to-file" - c.run_pre_consume_script(str(uuid.uuid4())) + c.run_pre_consume_script() self.assertIn( "INFO:paperless.consumer:This message goes to stdout", cm.output, @@ -902,7 +902,6 @@ class PreConsumeTestCase(TestCase): self.assertRaises( ConsumerError, c.run_pre_consume_script, - str(uuid.uuid4()), ) @@ -924,7 +923,7 @@ class PostConsumeTestCase(TestCase): doc.tags.add(tag1) doc.tags.add(tag2) - Consumer().run_post_consume_script(doc, str(uuid.uuid4())) + Consumer().run_post_consume_script(doc) m.assert_not_called() @@ -938,7 +937,6 @@ class PostConsumeTestCase(TestCase): ConsumerError, c.run_post_consume_script, doc, - str(uuid.uuid4()), ) @mock.patch("documents.consumer.run") @@ -947,7 +945,7 @@ class PostConsumeTestCase(TestCase): with override_settings(POST_CONSUME_SCRIPT=script.name): doc = Document.objects.create(title="Test", mime_type="application/pdf") - Consumer().run_post_consume_script(doc, str(uuid.uuid4())) + Consumer().run_post_consume_script(doc) m.assert_called_once() @@ -965,9 +963,10 @@ class PostConsumeTestCase(TestCase): tag2 = Tag.objects.create(name="b") doc.tags.add(tag1) doc.tags.add(tag2) - task_id = str(uuid.uuid4()) - Consumer().run_post_consume_script(doc, task_id) + consumer = Consumer() + consumer.task_id = str(uuid.uuid4()) + consumer.run_post_consume_script(doc) m.assert_called_once() @@ -989,7 +988,7 @@ class PostConsumeTestCase(TestCase): "DOCUMENT_THUMBNAIL_URL": f"/api/documents/{doc.pk}/thumb/", "DOCUMENT_CORRESPONDENT": "my_bank", "DOCUMENT_TAGS": "a,b", - "TASK_ID": task_id, + "TASK_ID": consumer.task_id, } self.assertDictEqual(environment, {**environment, **subset}) @@ -1018,4 +1017,4 @@ class PostConsumeTestCase(TestCase): doc = Document.objects.create(title="Test", mime_type="application/pdf") c.path = "path-to-file" with self.assertRaises(ConsumerError): - c.run_post_consume_script(doc, str(uuid.uuid4())) + c.run_post_consume_script(doc) From 52bdb1a80c94f8ecf5d3833283b3cd3f078cd245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Heuer?= Date: Mon, 21 Aug 2023 21:13:26 +0200 Subject: [PATCH 3/3] Added self type --- src/documents/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 7d73f852a..0f16b717c 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -7,6 +7,7 @@ from typing import Type import tqdm from asgiref.sync import async_to_sync +from celery import Task from celery import shared_task from channels.layers import get_channel_layer from django.conf import settings @@ -93,7 +94,7 @@ def train_classifier(): @shared_task(bind=True) def consume_file( - self, + self: Task, input_doc: ConsumableDocument, overrides: Optional[DocumentMetadataOverrides] = None, ):