Merge branch 'feature-websockets-status' into dev

This commit is contained in:
jonaswinkler
2021-01-31 14:37:15 +01:00
45 changed files with 1763 additions and 371 deletions

View File

@@ -1,9 +1,12 @@
import datetime
import hashlib
import os
import uuid
from subprocess import Popen
import magic
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
from django.db import transaction
from django.db.models import Q
@@ -27,8 +30,43 @@ class ConsumerError(Exception):
pass
MESSAGE_DOCUMENT_ALREADY_EXISTS = "document_already_exists"
MESSAGE_FILE_NOT_FOUND = "file_not_found"
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found"
MESSAGE_PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error"
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found"
MESSAGE_POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error"
MESSAGE_NEW_FILE = "new_file"
MESSAGE_UNSUPPORTED_TYPE = "unsupported_type"
MESSAGE_PARSING_DOCUMENT = "parsing_document"
MESSAGE_GENERATING_THUMBNAIL = "generating_thumbnail"
MESSAGE_PARSE_DATE = "parse_date"
MESSAGE_SAVE_DOCUMENT = "save_document"
MESSAGE_FINISHED = "finished"
class Consumer(LoggingMixin):
def _send_progress(self, current_progress, max_progress, status,
message=None, document_id=None):
payload = {
'filename': os.path.basename(self.filename) if self.filename else None, # NOQA: E501
'task_id': self.task_id,
'current_progress': current_progress,
'max_progress': max_progress,
'status': status,
'message': message,
'document_id': document_id
}
async_to_sync(self.channel_layer.group_send)("status_updates",
{'type': 'status_update',
'data': payload})
def _fail(self, message, log_message=None):
self._send_progress(100, 100, 'FAILED', message)
self.log("error", log_message or message)
raise ConsumerError(f"{self.filename}: {log_message or message}")
def __init__(self):
super().__init__()
self.path = None
@@ -37,15 +75,16 @@ class Consumer(LoggingMixin):
self.override_correspondent_id = None
self.override_tag_ids = None
self.override_document_type_id = None
self.task_id = None
self.channel_layer = get_channel_layer()
def pre_check_file_exists(self):
if not os.path.isfile(self.path):
self.log(
"error",
"Cannot consume {}: It is not a file.".format(self.path)
self._fail(
MESSAGE_FILE_NOT_FOUND,
f"Cannot consume {self.path}: File not found."
)
raise ConsumerError("Cannot consume {}: It is not a file".format(
self.path))
def pre_check_duplicate(self):
with open(self.path, "rb") as f:
@@ -53,12 +92,9 @@ class Consumer(LoggingMixin):
if Document.objects.filter(Q(checksum=checksum) | Q(archive_checksum=checksum)).exists(): # NOQA: E501
if settings.CONSUMER_DELETE_DUPLICATES:
os.unlink(self.path)
self.log(
"error",
"Not consuming {}: It is a duplicate.".format(self.filename)
)
raise ConsumerError(
"Not consuming {}: It is a duplicate.".format(self.filename)
self._fail(
MESSAGE_DOCUMENT_ALREADY_EXISTS,
f"Not consuming {self.filename}: It is a duplicate."
)
def pre_check_directories(self):
@@ -72,14 +108,16 @@ class Consumer(LoggingMixin):
return
if not os.path.isfile(settings.PRE_CONSUME_SCRIPT):
raise ConsumerError(
self._fail(
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND,
f"Configured pre-consume script "
f"{settings.PRE_CONSUME_SCRIPT} does not exist.")
try:
Popen((settings.PRE_CONSUME_SCRIPT, self.path)).wait()
except Exception as e:
raise ConsumerError(
self._fail(
MESSAGE_PRE_CONSUME_SCRIPT_ERROR,
f"Error while executing pre-consume script: {e}"
)
@@ -88,9 +126,11 @@ class Consumer(LoggingMixin):
return
if not os.path.isfile(settings.POST_CONSUME_SCRIPT):
raise ConsumerError(
self._fail(
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND,
f"Configured post-consume script "
f"{settings.POST_CONSUME_SCRIPT} does not exist.")
f"{settings.POST_CONSUME_SCRIPT} does not exist."
)
try:
Popen((
@@ -106,8 +146,9 @@ class Consumer(LoggingMixin):
"name", flat=True)))
)).wait()
except Exception as e:
raise ConsumerError(
f"Error while executing pre-consume script: {e}"
self._fail(
MESSAGE_POST_CONSUME_SCRIPT_ERROR,
f"Error while executing post-consume script: {e}"
)
def try_consume_file(self,
@@ -116,7 +157,8 @@ class Consumer(LoggingMixin):
override_title=None,
override_correspondent_id=None,
override_document_type_id=None,
override_tag_ids=None):
override_tag_ids=None,
task_id=None):
"""
Return the document object if it was successfully created.
"""
@@ -127,6 +169,9 @@ class Consumer(LoggingMixin):
self.override_correspondent_id = override_correspondent_id
self.override_document_type_id = override_document_type_id
self.override_tag_ids = override_tag_ids
self.task_id = task_id or str(uuid.uuid4())
self._send_progress(0, 100, 'STARTING', MESSAGE_NEW_FILE)
# this is for grouping logging entries for this particular file
# together.
@@ -149,11 +194,12 @@ class Consumer(LoggingMixin):
parser_class = get_parser_class_for_mime_type(mime_type)
if not parser_class:
raise ConsumerError(
f"Unsupported mime type {mime_type} of file {self.filename}")
self._fail(
MESSAGE_UNSUPPORTED_TYPE,
f"Unsupported mime type {mime_type}"
)
else:
self.log("debug",
f"Parser: {parser_class.__name__}")
self.log("debug", f"Parser: {parser_class.__name__}")
# Notify all listeners that we're going to do some work.
@@ -165,35 +211,50 @@ class Consumer(LoggingMixin):
self.run_pre_consume_script()
def progress_callback(current_progress, max_progress):
# recalculate progress to be within 20 and 80
p = int((current_progress / max_progress) * 50 + 20)
self._send_progress(p, 100, "WORKING")
# This doesn't parse the document yet, but gives us a parser.
document_parser = parser_class(self.logging_group)
document_parser = parser_class(self.logging_group, progress_callback)
# However, this already created working directories which we have to
# clean up.
# Parse the document. This may take some time.
text = None
date = None
thumbnail = None
archive_path = None
try:
self._send_progress(20, 100, 'WORKING', MESSAGE_PARSING_DOCUMENT)
self.log("debug", "Parsing {}...".format(self.filename))
document_parser.parse(self.path, mime_type, self.filename)
self.log("debug", f"Generating thumbnail for {self.filename}...")
self._send_progress(70, 100, 'WORKING',
MESSAGE_GENERATING_THUMBNAIL)
thumbnail = document_parser.get_optimised_thumbnail(
self.path, mime_type)
text = document_parser.get_text()
date = document_parser.get_date()
if not date:
self._send_progress(90, 100, 'WORKING',
MESSAGE_PARSE_DATE)
date = parse_date(self.filename, text)
archive_path = document_parser.get_archive_path()
except ParseError as e:
document_parser.cleanup()
self.log(
"error",
f"Error while consuming document {self.filename}: {e}")
raise ConsumerError(e)
self._fail(
str(e),
f"Error while consuming document {self.filename}: {e}"
)
# Prepare the document classifier.
@@ -203,6 +264,7 @@ class Consumer(LoggingMixin):
classifier = load_classifier()
self._send_progress(95, 100, 'WORKING', MESSAGE_SAVE_DOCUMENT)
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
@@ -256,12 +318,11 @@ class Consumer(LoggingMixin):
os.unlink(self.path)
except Exception as e:
self.log(
"error",
self._fail(
str(e),
f"The following error occured while consuming "
f"{self.filename}: {e}"
)
raise ConsumerError(e)
finally:
document_parser.cleanup()
@@ -272,6 +333,8 @@ class Consumer(LoggingMixin):
"Document {} consumption finished".format(document)
)
self._send_progress(100, 100, 'SUCCESS', MESSAGE_FINISHED, document.id)
return document
def _store(self, text, date, mime_type):

View File

@@ -261,7 +261,7 @@ class DocumentParser(LoggingMixin):
`paperless_tesseract.parsers` for inspiration.
"""
def __init__(self, logging_group):
def __init__(self, logging_group, progress_callback=None):
super().__init__()
self.logging_group = logging_group
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
@@ -271,6 +271,12 @@ class DocumentParser(LoggingMixin):
self.archive_path = None
self.text = None
self.date = None
self.progress_callback = progress_callback
def progress(self, current, max):
print(self.progress_callback)
if self.progress_callback:
self.progress_callback(current, max)
def extract_metadata(self, document_path, mime_type):
return []

View File

@@ -8,6 +8,8 @@ from .models import Correspondent, Tag, Document, Log, DocumentType, \
SavedView, SavedViewFilterRule
from .parsers import is_mime_type_supported
from django.utils.translation import gettext as _
# https://www.django-rest-framework.org/api-guide/serializers/#example
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
@@ -378,7 +380,9 @@ class PostDocumentSerializer(serializers.Serializer):
if not is_mime_type_supported(mime_type):
raise serializers.ValidationError(
"This file type is not supported.")
_("File type %(type)s not supported") %
{'type': mime_type}
)
return document.name, document_data

View File

@@ -66,7 +66,8 @@ def consume_file(path,
override_title=None,
override_correspondent_id=None,
override_document_type_id=None,
override_tag_ids=None):
override_tag_ids=None,
task_id=None):
document = Consumer().try_consume_file(
path,
@@ -74,7 +75,9 @@ def consume_file(path,
override_title=override_title,
override_correspondent_id=override_correspondent_id,
override_document_type_id=override_document_type_id,
override_tag_ids=override_tag_ids)
override_tag_ids=override_tag_ids,
task_id=task_id
)
if document:
return "Success. New document id {} created".format(

View File

@@ -170,7 +170,7 @@ class DummyParser(DocumentParser):
raise NotImplementedError()
def __init__(self, logging_group, scratch_dir, archive_path):
super(DummyParser, self).__init__(logging_group)
super(DummyParser, self).__init__(logging_group, None)
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir)
self.archive_path = archive_path
@@ -212,10 +212,24 @@ def fake_magic_from_file(file, mime=False):
@mock.patch("documents.consumer.magic.from_file", fake_magic_from_file)
class TestConsumer(DirectoriesMixin, TestCase):
def make_dummy_parser(self, logging_group):
def _assert_first_last_send_progress(self, first_status="STARTING", last_status="SUCCESS", first_progress=0, first_progress_max=100, last_progress=100, last_progress_max=100):
self._send_progress.assert_called()
args, kwargs = self._send_progress.call_args_list[0]
self.assertEqual(args[0], first_progress)
self.assertEqual(args[1], first_progress_max)
self.assertEqual(args[2], first_status)
args, kwargs = self._send_progress.call_args_list[len(self._send_progress.call_args_list) - 1]
self.assertEqual(args[0], last_progress)
self.assertEqual(args[1], last_progress_max)
self.assertEqual(args[2], last_status)
def make_dummy_parser(self, logging_group, progress_callback=None):
return DummyParser(logging_group, self.dirs.scratch_dir, self.get_test_archive_file())
def make_faulty_parser(self, logging_group):
def make_faulty_parser(self, logging_group, progress_callback=None):
return FaultyParser(logging_group, self.dirs.scratch_dir)
def setUp(self):
@@ -228,7 +242,11 @@ class TestConsumer(DirectoriesMixin, TestCase):
"mime_types": {"application/pdf": ".pdf"},
"weight": 0
})]
self.addCleanup(patcher.stop)
# this prevents websocket message reports during testing.
patcher = mock.patch("documents.consumer.Consumer._send_progress")
self._send_progress = patcher.start()
self.addCleanup(patcher.stop)
self.consumer = Consumer()
@@ -274,6 +292,8 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertFalse(os.path.isfile(filename))
self._assert_first_last_send_progress()
def testOverrideFilename(self):
filename = self.get_test_file()
override_filename = "Statement for November.pdf"
@@ -282,21 +302,26 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertEqual(document.title, "Statement for November")
self._assert_first_last_send_progress()
def testOverrideTitle(self):
document = self.consumer.try_consume_file(self.get_test_file(), override_title="Override Title")
self.assertEqual(document.title, "Override Title")
self._assert_first_last_send_progress()
def testOverrideCorrespondent(self):
c = Correspondent.objects.create(name="test")
document = self.consumer.try_consume_file(self.get_test_file(), override_correspondent_id=c.pk)
self.assertEqual(document.correspondent.id, c.id)
self._assert_first_last_send_progress()
def testOverrideDocumentType(self):
dt = DocumentType.objects.create(name="test")
document = self.consumer.try_consume_file(self.get_test_file(), override_document_type_id=dt.pk)
self.assertEqual(document.document_type.id, dt.id)
self._assert_first_last_send_progress()
def testOverrideTags(self):
t1 = Tag.objects.create(name="t1")
@@ -307,37 +332,42 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertIn(t1, document.tags.all())
self.assertNotIn(t2, document.tags.all())
self.assertIn(t3, document.tags.all())
self._assert_first_last_send_progress()
def testNotAFile(self):
try:
self.consumer.try_consume_file("non-existing-file")
except ConsumerError as e:
self.assertTrue(str(e).endswith('It is not a file'))
return
self.fail("Should throw exception")
self.assertRaisesMessage(
ConsumerError,
"File not found",
self.consumer.try_consume_file,
"non-existing-file"
)
self._assert_first_last_send_progress(last_status="FAILED")
def testDuplicates1(self):
self.consumer.try_consume_file(self.get_test_file())
try:
self.consumer.try_consume_file(self.get_test_file())
except ConsumerError as e:
self.assertTrue(str(e).endswith("It is a duplicate."))
return
self.assertRaisesMessage(
ConsumerError,
"It is a duplicate",
self.consumer.try_consume_file,
self.get_test_file()
)
self.fail("Should throw exception")
self._assert_first_last_send_progress(last_status="FAILED")
def testDuplicates2(self):
self.consumer.try_consume_file(self.get_test_file())
try:
self.consumer.try_consume_file(self.get_test_archive_file())
except ConsumerError as e:
self.assertTrue(str(e).endswith("It is a duplicate."))
return
self.assertRaisesMessage(
ConsumerError,
"It is a duplicate",
self.consumer.try_consume_file,
self.get_test_archive_file()
)
self.fail("Should throw exception")
self._assert_first_last_send_progress(last_status="FAILED")
def testDuplicates3(self):
self.consumer.try_consume_file(self.get_test_archive_file())
@@ -347,13 +377,15 @@ class TestConsumer(DirectoriesMixin, TestCase):
def testNoParsers(self, m):
m.return_value = []
try:
self.consumer.try_consume_file(self.get_test_file())
except ConsumerError as e:
self.assertEqual("Unsupported mime type application/pdf of file sample.pdf", str(e))
return
self.assertRaisesMessage(
ConsumerError,
"sample.pdf: Unsupported mime type application/pdf",
self.consumer.try_consume_file,
self.get_test_file()
)
self._assert_first_last_send_progress(last_status="FAILED")
self.fail("Should throw exception")
@mock.patch("documents.parsers.document_consumer_declaration.send")
def testFaultyParser(self, m):
@@ -363,24 +395,28 @@ class TestConsumer(DirectoriesMixin, TestCase):
"weight": 0
})]
try:
self.consumer.try_consume_file(self.get_test_file())
except ConsumerError as e:
self.assertEqual(str(e), "Does not compute.")
return
self.assertRaisesMessage(
ConsumerError,
"sample.pdf: Error while consuming document sample.pdf: Does not compute.",
self.consumer.try_consume_file,
self.get_test_file()
)
self.fail("Should throw exception.")
self._assert_first_last_send_progress(last_status="FAILED")
@mock.patch("documents.consumer.Consumer._write")
def testPostSaveError(self, m):
filename = self.get_test_file()
m.side_effect = OSError("NO.")
try:
self.consumer.try_consume_file(filename)
except ConsumerError as e:
self.assertEqual(str(e), "NO.")
else:
self.fail("Should raise exception")
self.assertRaisesMessage(
ConsumerError,
"sample.pdf: The following error occured while consuming sample.pdf: NO.",
self.consumer.try_consume_file,
filename
)
self._assert_first_last_send_progress(last_status="FAILED")
# file not deleted
self.assertTrue(os.path.isfile(filename))
@@ -397,6 +433,8 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertEqual(document.title, "new docs")
self.assertEqual(document.filename, "none/new docs.pdf")
self._assert_first_last_send_progress()
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{title}")
@mock.patch("documents.signals.handlers.generate_unique_filename")
def testFilenameHandlingUnstableFormat(self, m):
@@ -420,6 +458,8 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertIsNotNone(os.path.isfile(document.title))
self.assertTrue(os.path.isfile(document.source_path))
self._assert_first_last_send_progress()
@mock.patch("documents.consumer.load_classifier")
def testClassifyDocument(self, m):
correspondent = Correspondent.objects.create(name="test")
@@ -439,19 +479,26 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertIn(t1, document.tags.all())
self.assertNotIn(t2, document.tags.all())
self._assert_first_last_send_progress()
@override_settings(CONSUMER_DELETE_DUPLICATES=True)
def test_delete_duplicate(self):
dst = self.get_test_file()
self.assertTrue(os.path.isfile(dst))
doc = self.consumer.try_consume_file(dst)
self._assert_first_last_send_progress()
self.assertFalse(os.path.isfile(dst))
self.assertIsNotNone(doc)
self._send_progress.reset_mock()
dst = self.get_test_file()
self.assertTrue(os.path.isfile(dst))
self.assertRaises(ConsumerError, self.consumer.try_consume_file, dst)
self.assertFalse(os.path.isfile(dst))
self._assert_first_last_send_progress(last_status="FAILED")
@override_settings(CONSUMER_DELETE_DUPLICATES=False)
def test_no_delete_duplicate(self):
@@ -467,6 +514,8 @@ class TestConsumer(DirectoriesMixin, TestCase):
self.assertRaises(ConsumerError, self.consumer.try_consume_file, dst)
self.assertTrue(os.path.isfile(dst))
self._assert_first_last_send_progress(last_status="FAILED")
class PreConsumeTestCase(TestCase):
@@ -479,9 +528,11 @@ class PreConsumeTestCase(TestCase):
m.assert_not_called()
@mock.patch("documents.consumer.Popen")
@mock.patch("documents.consumer.Consumer._send_progress")
@override_settings(PRE_CONSUME_SCRIPT="does-not-exist")
def test_pre_consume_script_not_found(self, m):
def test_pre_consume_script_not_found(self, m, m2):
c = Consumer()
c.filename = "somefile.pdf"
c.path = "path-to-file"
self.assertRaises(ConsumerError, c.run_pre_consume_script)
@@ -503,7 +554,6 @@ class PreConsumeTestCase(TestCase):
self.assertEqual(command[1], "path-to-file")
class PostConsumeTestCase(TestCase):
@mock.patch("documents.consumer.Popen")
@@ -519,12 +569,13 @@ class PostConsumeTestCase(TestCase):
m.assert_not_called()
@override_settings(POST_CONSUME_SCRIPT="does-not-exist")
def test_post_consume_script_not_found(self):
@mock.patch("documents.consumer.Consumer._send_progress")
def test_post_consume_script_not_found(self, m):
doc = Document.objects.create(title="Test", mime_type="application/pdf")
self.assertRaises(ConsumerError, Consumer().run_post_consume_script, doc)
c = Consumer()
c.filename = "somefile.pdf"
self.assertRaises(ConsumerError, c.run_post_consume_script, doc)
@mock.patch("documents.consumer.Popen")
def test_post_consume_script_simple(self, m):

View File

@@ -1,6 +1,7 @@
import logging
import os
import tempfile
import uuid
from datetime import datetime
from time import mktime
@@ -213,7 +214,7 @@ class DocumentViewSet(RetrieveModelMixin,
parser_class = get_parser_class_for_mime_type(mime_type)
if parser_class:
parser = parser_class(logging_group=None)
parser = parser_class(progress_callback=None, logging_group=None)
try:
return parser.extract_metadata(file, mime_type)
@@ -403,6 +404,8 @@ class PostDocumentView(APIView):
os.utime(f.name, times=(t, t))
temp_filename = f.name
task_id = str(uuid.uuid4())
async_task("documents.tasks.consume_file",
temp_filename,
override_filename=doc_name,
@@ -410,6 +413,7 @@ class PostDocumentView(APIView):
override_correspondent_id=correspondent_id,
override_document_type_id=document_type_id,
override_tag_ids=tag_ids,
task_id=task_id,
task_name=os.path.basename(doc_name)[:100])
return Response("OK")