Small improvement to the consumer status with stronger typing

This commit is contained in:
Trenton Holmes 2023-07-22 16:51:05 -07:00 committed by Trenton H
parent 802e5591ce
commit 07e7bcd30b
3 changed files with 103 additions and 41 deletions

View File

@ -164,7 +164,7 @@ export class ConsumerStatusService {
} }
status.documentId = statusMessage.document_id status.documentId = statusMessage.document_id
if (created && statusMessage.status == 'STARTING') { if (created && statusMessage.status == 'STARTED') {
this.documentDetectedSubject.next(status) this.documentDetectedSubject.next(status)
} }
if (statusMessage.status == 'SUCCESS') { if (statusMessage.status == 'SUCCESS') {

View File

@ -3,6 +3,7 @@ import hashlib
import os import os
import tempfile import tempfile
import uuid import uuid
from enum import Enum
from pathlib import Path from pathlib import Path
from subprocess import CompletedProcess from subprocess import CompletedProcess
from subprocess import run from subprocess import run
@ -44,21 +45,30 @@ class ConsumerError(Exception):
pass pass
MESSAGE_DOCUMENT_ALREADY_EXISTS = "document_already_exists" class ConsumerStatusShortMessage(str, Enum):
MESSAGE_ASN_ALREADY_EXISTS = "asn_already_exists" DOCUMENT_ALREADY_EXISTS = "document_already_exists"
MESSAGE_ASN_RANGE = "asn_value_out_of_range" ASN_ALREADY_EXISTS = "asn_already_exists"
MESSAGE_FILE_NOT_FOUND = "file_not_found" ASN_RANGE = "asn_value_out_of_range"
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found" FILE_NOT_FOUND = "file_not_found"
MESSAGE_PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error" PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found"
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found" PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error"
MESSAGE_POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error" POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found"
MESSAGE_NEW_FILE = "new_file" POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error"
MESSAGE_UNSUPPORTED_TYPE = "unsupported_type" NEW_FILE = "new_file"
MESSAGE_PARSING_DOCUMENT = "parsing_document" UNSUPPORTED_TYPE = "unsupported_type"
MESSAGE_GENERATING_THUMBNAIL = "generating_thumbnail" PARSING_DOCUMENT = "parsing_document"
MESSAGE_PARSE_DATE = "parse_date" GENERATING_THUMBNAIL = "generating_thumbnail"
MESSAGE_SAVE_DOCUMENT = "save_document" PARSE_DATE = "parse_date"
MESSAGE_FINISHED = "finished" SAVE_DOCUMENT = "save_document"
FINISHED = "finished"
FAILED = "failed"
class ConsumerFilePhase(str, Enum):
STARTED = "STARTED"
WORKING = "WORKING"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
class Consumer(LoggingMixin): class Consumer(LoggingMixin):
@ -66,10 +76,10 @@ class Consumer(LoggingMixin):
def _send_progress( def _send_progress(
self, self,
current_progress, current_progress: int,
max_progress, max_progress: int,
status, status: ConsumerFilePhase,
message=None, message: Optional[ConsumerStatusShortMessage] = None,
document_id=None, document_id=None,
): # pragma: no cover ): # pragma: no cover
payload = { payload = {
@ -88,12 +98,12 @@ class Consumer(LoggingMixin):
def _fail( def _fail(
self, self,
message, message: ConsumerStatusShortMessage,
log_message=None, log_message: Optional[str] = None,
exc_info=None, exc_info=None,
exception: Optional[Exception] = None, exception: Optional[Exception] = None,
): ):
self._send_progress(100, 100, "FAILED", message) self._send_progress(100, 100, ConsumerFilePhase.FAILED, message)
self.log.error(log_message or message, exc_info=exc_info) self.log.error(log_message or message, exc_info=exc_info)
raise ConsumerError(f"{self.filename}: {log_message or message}") from exception raise ConsumerError(f"{self.filename}: {log_message or message}") from exception
@ -113,13 +123,19 @@ class Consumer(LoggingMixin):
self.channel_layer = get_channel_layer() self.channel_layer = get_channel_layer()
def pre_check_file_exists(self): def pre_check_file_exists(self):
"""
Confirm the input file still exists where it should
"""
if not os.path.isfile(self.path): if not os.path.isfile(self.path):
self._fail( self._fail(
MESSAGE_FILE_NOT_FOUND, ConsumerStatusShortMessage.FILE_NOT_FOUND,
f"Cannot consume {self.path}: File not found.", f"Cannot consume {self.path}: File not found.",
) )
def pre_check_duplicate(self): def pre_check_duplicate(self):
"""
Using the MD5 of the file, check this exact file doesn't already exist
"""
with open(self.path, "rb") as f: with open(self.path, "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest() checksum = hashlib.md5(f.read()).hexdigest()
existing_doc = Document.objects.filter( existing_doc = Document.objects.filter(
@ -129,12 +145,15 @@ class Consumer(LoggingMixin):
if settings.CONSUMER_DELETE_DUPLICATES: if settings.CONSUMER_DELETE_DUPLICATES:
os.unlink(self.path) os.unlink(self.path)
self._fail( self._fail(
MESSAGE_DOCUMENT_ALREADY_EXISTS, ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS,
f"Not consuming {self.filename}: It is a duplicate of" f"Not consuming {self.filename}: It is a duplicate of"
f" {existing_doc.get().title} (#{existing_doc.get().pk})", f" {existing_doc.get().title} (#{existing_doc.get().pk})",
) )
def pre_check_directories(self): def pre_check_directories(self):
"""
Ensure all required directories exist before attempting to use them
"""
os.makedirs(settings.SCRATCH_DIR, exist_ok=True) os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True) os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True)
os.makedirs(settings.ORIGINALS_DIR, exist_ok=True) os.makedirs(settings.ORIGINALS_DIR, exist_ok=True)
@ -154,7 +173,7 @@ class Consumer(LoggingMixin):
or self.override_asn > Document.ARCHIVE_SERIAL_NUMBER_MAX or self.override_asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
): ):
self._fail( self._fail(
MESSAGE_ASN_RANGE, ConsumerStatusShortMessage.ASN_RANGE,
f"Not consuming {self.filename}: " f"Not consuming {self.filename}: "
f"Given ASN {self.override_asn} is out of range " f"Given ASN {self.override_asn} is out of range "
f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, " f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
@ -162,17 +181,21 @@ class Consumer(LoggingMixin):
) )
if Document.objects.filter(archive_serial_number=self.override_asn).exists(): if Document.objects.filter(archive_serial_number=self.override_asn).exists():
self._fail( self._fail(
MESSAGE_ASN_ALREADY_EXISTS, ConsumerStatusShortMessage.ASN_ALREADY_EXISTS,
f"Not consuming {self.filename}: Given ASN already exists!", f"Not consuming {self.filename}: Given ASN already exists!",
) )
def run_pre_consume_script(self): def run_pre_consume_script(self):
"""
If one is configured and exists, run the pre-consume script and
handle its output and/or errors
"""
if not settings.PRE_CONSUME_SCRIPT: if not settings.PRE_CONSUME_SCRIPT:
return return
if not os.path.isfile(settings.PRE_CONSUME_SCRIPT): if not os.path.isfile(settings.PRE_CONSUME_SCRIPT):
self._fail( self._fail(
MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND, ConsumerStatusShortMessage.PRE_CONSUME_SCRIPT_NOT_FOUND,
f"Configured pre-consume script " f"Configured pre-consume script "
f"{settings.PRE_CONSUME_SCRIPT} does not exist.", f"{settings.PRE_CONSUME_SCRIPT} does not exist.",
) )
@ -203,19 +226,23 @@ class Consumer(LoggingMixin):
except Exception as e: except Exception as e:
self._fail( self._fail(
MESSAGE_PRE_CONSUME_SCRIPT_ERROR, ConsumerStatusShortMessage.PRE_CONSUME_SCRIPT_ERROR,
f"Error while executing pre-consume script: {e}", f"Error while executing pre-consume script: {e}",
exc_info=True, exc_info=True,
exception=e, exception=e,
) )
def run_post_consume_script(self, document: Document): def run_post_consume_script(self, document: Document):
"""
If one is configured and exists, run the pre-consume script and
handle its output and/or errors
"""
if not settings.POST_CONSUME_SCRIPT: if not settings.POST_CONSUME_SCRIPT:
return return
if not os.path.isfile(settings.POST_CONSUME_SCRIPT): if not os.path.isfile(settings.POST_CONSUME_SCRIPT):
self._fail( self._fail(
MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND, ConsumerStatusShortMessage.POST_CONSUME_SCRIPT_NOT_FOUND,
f"Configured post-consume script " f"Configured post-consume script "
f"{settings.POST_CONSUME_SCRIPT} does not exist.", f"{settings.POST_CONSUME_SCRIPT} does not exist.",
) )
@ -276,7 +303,7 @@ class Consumer(LoggingMixin):
except Exception as e: except Exception as e:
self._fail( self._fail(
MESSAGE_POST_CONSUME_SCRIPT_ERROR, ConsumerStatusShortMessage.POST_CONSUME_SCRIPT_ERROR,
f"Error while executing post-consume script: {e}", f"Error while executing post-consume script: {e}",
exc_info=True, exc_info=True,
exception=e, exception=e,
@ -310,7 +337,12 @@ class Consumer(LoggingMixin):
self.override_asn = override_asn self.override_asn = override_asn
self.override_owner_id = override_owner_id self.override_owner_id = override_owner_id
self._send_progress(0, 100, "STARTING", MESSAGE_NEW_FILE) self._send_progress(
0,
100,
ConsumerFilePhase.STARTED,
ConsumerStatusShortMessage.NEW_FILE,
)
# Make sure that preconditions for consuming the file are met. # Make sure that preconditions for consuming the file are met.
@ -342,7 +374,10 @@ class Consumer(LoggingMixin):
) )
if not parser_class: if not parser_class:
tempdir.cleanup() tempdir.cleanup()
self._fail(MESSAGE_UNSUPPORTED_TYPE, f"Unsupported mime type {mime_type}") self._fail(
ConsumerStatusShortMessage.UNSUPPORTED_TYPE,
f"Unsupported mime type {mime_type}",
)
# Notify all listeners that we're going to do some work. # Notify all listeners that we're going to do some work.
@ -357,7 +392,7 @@ class Consumer(LoggingMixin):
def progress_callback(current_progress, max_progress): # pragma: no cover def progress_callback(current_progress, max_progress): # pragma: no cover
# recalculate progress to be within 20 and 80 # recalculate progress to be within 20 and 80
p = int((current_progress / max_progress) * 50 + 20) p = int((current_progress / max_progress) * 50 + 20)
self._send_progress(p, 100, "WORKING") self._send_progress(p, 100, ConsumerFilePhase.WORKING)
# This doesn't parse the document yet, but gives us a parser. # This doesn't parse the document yet, but gives us a parser.
@ -379,12 +414,22 @@ class Consumer(LoggingMixin):
archive_path = None archive_path = None
try: try:
self._send_progress(20, 100, "WORKING", MESSAGE_PARSING_DOCUMENT) self._send_progress(
20,
100,
ConsumerFilePhase.WORKING,
ConsumerStatusShortMessage.PARSING_DOCUMENT,
)
self.log.debug(f"Parsing {self.filename}...") self.log.debug(f"Parsing {self.filename}...")
document_parser.parse(self.path, mime_type, self.filename) document_parser.parse(self.path, mime_type, self.filename)
self.log.debug(f"Generating thumbnail for {self.filename}...") self.log.debug(f"Generating thumbnail for {self.filename}...")
self._send_progress(70, 100, "WORKING", MESSAGE_GENERATING_THUMBNAIL) self._send_progress(
70,
100,
ConsumerFilePhase.WORKING,
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
)
thumbnail = document_parser.get_thumbnail( thumbnail = document_parser.get_thumbnail(
self.path, self.path,
mime_type, mime_type,
@ -394,7 +439,12 @@ class Consumer(LoggingMixin):
text = document_parser.get_text() text = document_parser.get_text()
date = document_parser.get_date() date = document_parser.get_date()
if date is None: if date is None:
self._send_progress(90, 100, "WORKING", MESSAGE_PARSE_DATE) self._send_progress(
90,
100,
ConsumerFilePhase.WORKING,
ConsumerStatusShortMessage.PARSE_DATE,
)
date = parse_date(self.filename, text) date = parse_date(self.filename, text)
archive_path = document_parser.get_archive_path() archive_path = document_parser.get_archive_path()
@ -416,7 +466,12 @@ class Consumer(LoggingMixin):
classifier = load_classifier() classifier = load_classifier()
self._send_progress(95, 100, "WORKING", MESSAGE_SAVE_DOCUMENT) self._send_progress(
95,
100,
ConsumerFilePhase.WORKING,
ConsumerStatusShortMessage.SAVE_DOCUMENT,
)
# now that everything is done, we can start to store the document # now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast. # in the system. This will be a transaction and reasonably fast.
try: try:
@ -501,7 +556,13 @@ class Consumer(LoggingMixin):
self.log.info(f"Document {document} consumption finished") self.log.info(f"Document {document} consumption finished")
self._send_progress(100, 100, "SUCCESS", MESSAGE_FINISHED, document.id) self._send_progress(
100,
100,
ConsumerFilePhase.SUCCESS,
ConsumerStatusShortMessage.FINISHED,
document.id,
)
# Return the most up to date fields # Return the most up to date fields
document.refresh_from_db() document.refresh_from_db()

View File

@ -21,6 +21,7 @@ from django.utils import timezone
from documents.consumer import Consumer from documents.consumer import Consumer
from documents.consumer import ConsumerError from documents.consumer import ConsumerError
from documents.consumer import ConsumerFilePhase
from documents.models import Correspondent from documents.models import Correspondent
from documents.models import Document from documents.models import Document
from documents.models import DocumentType from documents.models import DocumentType
@ -228,8 +229,8 @@ def fake_magic_from_file(file, mime=False):
class TestConsumer(DirectoriesMixin, FileSystemAssertsMixin, TestCase): class TestConsumer(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def _assert_first_last_send_progress( def _assert_first_last_send_progress(
self, self,
first_status="STARTING", first_status=ConsumerFilePhase.STARTED,
last_status="SUCCESS", last_status=ConsumerFilePhase.SUCCESS,
first_progress=0, first_progress=0,
first_progress_max=100, first_progress_max=100,
last_progress=100, last_progress=100,