This commit is contained in:
jonaswinkler
2020-12-07 12:44:23 +01:00
parent a76452028f
commit dd60100796
6 changed files with 25 additions and 24 deletions

View File

@@ -43,6 +43,11 @@ class Consumer(LoggingMixin):
{'type': 'status_update',
'data': payload})
def _fail(self, message):
self._send_progress(self.filename, 100, 100, 'FAILED',
message)
raise ConsumerError(f"{self.filename}: {message}")
def __init__(self):
super().__init__()
self.path = None
@@ -56,8 +61,7 @@ class Consumer(LoggingMixin):
def pre_check_file_exists(self):
if not os.path.isfile(self.path):
raise ConsumerError("Cannot consume {}: It is not a file".format(
self.path))
self._fail("File not found")
def pre_check_duplicate(self):
with open(self.path, "rb") as f:
@@ -65,9 +69,7 @@ class Consumer(LoggingMixin):
if Document.objects.filter(Q(checksum=checksum) | Q(archive_checksum=checksum)).exists(): # NOQA: E501
if settings.CONSUMER_DELETE_DUPLICATES:
os.unlink(self.path)
raise ConsumerError(
"Not consuming {}: It is a duplicate.".format(self.filename)
)
self._fail("Document is a duplicate")
def pre_check_directories(self):
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
@@ -93,6 +95,9 @@ class Consumer(LoggingMixin):
self.override_document_type_id = override_document_type_id
self.override_tag_ids = override_tag_ids
self._send_progress(self.filename, 0, 100, 'WORKING',
'Received new file.')
# this is for grouping logging entries for this particular file
# together.
@@ -112,7 +117,7 @@ class Consumer(LoggingMixin):
parser_class = get_parser_class_for_mime_type(mime_type)
if not parser_class:
raise ConsumerError(f"No parsers abvailable for {self.filename}")
self._fail("No parsers abvailable")
else:
self.log("debug",
f"Parser: {parser_class.__name__} "
@@ -120,8 +125,6 @@ class Consumer(LoggingMixin):
# Notify all listeners that we're going to do some work.
self._send_progress(self.filename, 0, 100, 'WORKING', 'Consumption started')
document_consumption_started.send(
sender=self.__class__,
filename=self.path,
@@ -130,7 +133,7 @@ class Consumer(LoggingMixin):
def progress_callback(current_progress, max_progress, message):
# recalculate progress to be within 20 and 80
p = int((current_progress / max_progress) * 60 + 20)
p = int((current_progress / max_progress) * 50 + 20)
self._send_progress(self.filename, p, 100, "WORKING", message)
# This doesn't parse the document yet, but gives us a parser.
@@ -167,9 +170,7 @@ class Consumer(LoggingMixin):
self.log(
"error",
f"Error while consuming document {self.filename}: {e}")
self._send_progress(self.filename, 100, 100, 'FAILED',
"Failed: {}".format(e))
raise ConsumerError(e)
self._fail(e)
# Prepare the document classifier.
@@ -246,9 +247,7 @@ class Consumer(LoggingMixin):
f"The following error occured while consuming "
f"{self.filename}: {e}"
)
self._send_progress(self.filename, 100, 100, 'FAILED',
"Failed: {}".format(e))
raise ConsumerError(e)
self._fail(str(e))
finally:
document_parser.cleanup()