mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-09-01 01:46:16 +00:00
Merge branch 'dev' into celery-tasks
This commit is contained in:
@@ -3,7 +3,6 @@ import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from channels.layers import get_channel_layer
|
||||
@@ -13,7 +12,9 @@ from django.utils import timezone
|
||||
|
||||
from paperless.db import GnuPG
|
||||
from .classifier import DocumentClassifier, IncompatibleClassifierVersionError
|
||||
from .models import Document, FileInfo
|
||||
from .file_handling import generate_filename, create_source_path_directory
|
||||
from .loggers import LoggingMixin
|
||||
from .models import Document, FileInfo, Correspondent, DocumentType, Tag
|
||||
from .parsers import ParseError, get_parser_class
|
||||
from .signals import (
|
||||
document_consumption_finished,
|
||||
@@ -25,17 +26,10 @@ class ConsumerError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Consumer:
|
||||
"""
|
||||
Loop over every file found in CONSUMPTION_DIR and:
|
||||
1. Convert it to a greyscale pnm
|
||||
2. Use tesseract on the pnm
|
||||
3. Store the document in the MEDIA_ROOT with optional encryption
|
||||
4. Store the OCR'd text in the database
|
||||
5. Delete the document and image(s)
|
||||
"""
|
||||
class Consumer(LoggingMixin):
|
||||
|
||||
def _send_progress(self, filename, current_progress, max_progress, status, message, document_id=None):
|
||||
def _send_progress(self, filename, current_progress, max_progress, status,
|
||||
message, document_id=None):
|
||||
payload = {
|
||||
'filename': os.path.basename(filename),
|
||||
'current_progress': current_progress,
|
||||
@@ -44,156 +38,226 @@ class Consumer:
|
||||
'message': message,
|
||||
'document_id': document_id
|
||||
}
|
||||
async_to_sync(self.channel_layer.group_send)("status_updates", {'type': 'status_update', 'data': payload})
|
||||
async_to_sync(self.channel_layer.group_send)("status_updates",
|
||||
{'type': 'status_update',
|
||||
'data': payload})
|
||||
|
||||
def __init__(self, consume=settings.CONSUMPTION_DIR,
|
||||
scratch=settings.SCRATCH_DIR):
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.logging_group = None
|
||||
|
||||
self.consume = consume
|
||||
self.scratch = scratch
|
||||
|
||||
self.classifier = DocumentClassifier()
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.path = None
|
||||
self.filename = None
|
||||
self.override_title = None
|
||||
self.override_correspondent_id = None
|
||||
self.override_tag_ids = None
|
||||
self.override_document_type_id = None
|
||||
|
||||
self.channel_layer = get_channel_layer()
|
||||
|
||||
os.makedirs(self.scratch, exist_ok=True)
|
||||
def pre_check_file_exists(self):
|
||||
if not os.path.isfile(self.path):
|
||||
raise ConsumerError("Cannot consume {}: It is not a file".format(
|
||||
self.path))
|
||||
|
||||
self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
if settings.PASSPHRASE:
|
||||
self.storage_type = Document.STORAGE_TYPE_GPG
|
||||
|
||||
if not self.consume:
|
||||
def pre_check_consumption_dir(self):
|
||||
if not settings.CONSUMPTION_DIR:
|
||||
raise ConsumerError(
|
||||
"The CONSUMPTION_DIR settings variable does not appear to be "
|
||||
"set."
|
||||
)
|
||||
"set.")
|
||||
|
||||
if not os.path.exists(self.consume):
|
||||
if not os.path.isdir(settings.CONSUMPTION_DIR):
|
||||
raise ConsumerError(
|
||||
"Consumption directory {} does not exist".format(self.consume))
|
||||
"Consumption directory {} does not exist".format(
|
||||
settings.CONSUMPTION_DIR))
|
||||
|
||||
def log(self, level, message):
|
||||
getattr(self.logger, level)(message, extra={
|
||||
"group": self.logging_group
|
||||
})
|
||||
def pre_check_regex(self):
|
||||
if not re.match(FileInfo.REGEXES["title"], self.filename):
|
||||
raise ConsumerError(
|
||||
"Filename {} does not seem to be safe to "
|
||||
"consume".format(self.filename))
|
||||
|
||||
@transaction.atomic
|
||||
def try_consume_file(self, file):
|
||||
"""
|
||||
Return True if file was consumed
|
||||
"""
|
||||
|
||||
self.logging_group = uuid.uuid4()
|
||||
|
||||
if not re.match(FileInfo.REGEXES["title"], file):
|
||||
return False
|
||||
|
||||
doc = file
|
||||
|
||||
if self._is_duplicate(doc):
|
||||
self.log(
|
||||
"warning",
|
||||
"Skipping {} as it appears to be a duplicate".format(doc)
|
||||
def pre_check_duplicate(self):
|
||||
with open(self.path, "rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
if Document.objects.filter(checksum=checksum).exists():
|
||||
if settings.CONSUMER_DELETE_DUPLICATES:
|
||||
os.unlink(self.path)
|
||||
raise ConsumerError(
|
||||
"Not consuming {}: It is a duplicate.".format(self.filename)
|
||||
)
|
||||
return False
|
||||
|
||||
self.log("info", "Consuming {}".format(doc))
|
||||
def pre_check_directories(self):
|
||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||
os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True)
|
||||
os.makedirs(settings.ORIGINALS_DIR, exist_ok=True)
|
||||
|
||||
def try_consume_file(self,
|
||||
path,
|
||||
override_filename=None,
|
||||
override_title=None,
|
||||
override_correspondent_id=None,
|
||||
override_document_type_id=None,
|
||||
override_tag_ids=None):
|
||||
"""
|
||||
Return the document object if it was successfully created.
|
||||
"""
|
||||
|
||||
parser_class = get_parser_class(doc)
|
||||
self.path = path
|
||||
self.filename = override_filename or os.path.basename(path)
|
||||
self.override_title = override_title
|
||||
self.override_correspondent_id = override_correspondent_id
|
||||
self.override_document_type_id = override_document_type_id
|
||||
self.override_tag_ids = override_tag_ids
|
||||
|
||||
# this is for grouping logging entries for this particular file
|
||||
# together.
|
||||
|
||||
self.renew_logging_group()
|
||||
|
||||
# Make sure that preconditions for consuming the file are met.
|
||||
|
||||
self.pre_check_file_exists()
|
||||
self.pre_check_consumption_dir()
|
||||
self.pre_check_directories()
|
||||
self.pre_check_regex()
|
||||
self.pre_check_duplicate()
|
||||
|
||||
self.log("info", "Consuming {}".format(self.filename))
|
||||
|
||||
# Determine the parser class.
|
||||
|
||||
parser_class = get_parser_class(self.filename)
|
||||
if not parser_class:
|
||||
self.log(
|
||||
"error", "No parsers could be found for {}".format(doc))
|
||||
return False
|
||||
raise ConsumerError("No parsers abvailable for {}".format(self.filename))
|
||||
else:
|
||||
self.log("info", "Parser: {}".format(parser_class.__name__))
|
||||
self.log("debug", "Parser: {}".format(parser_class.__name__))
|
||||
|
||||
self._send_progress(file, 0, 100, 'WORKING', 'Consumption started')
|
||||
# Notify all listeners that we're going to do some work.
|
||||
|
||||
self._send_progress(self.filename, 0, 100, 'WORKING', 'Consumption started')
|
||||
|
||||
document_consumption_started.send(
|
||||
sender=self.__class__,
|
||||
filename=doc,
|
||||
filename=self.path,
|
||||
logging_group=self.logging_group
|
||||
)
|
||||
|
||||
def progress_callback(current_progress, max_progress, message):
|
||||
# recalculate progress to be within 20 and 80
|
||||
p = int((current_progress / max_progress) * 60 + 20)
|
||||
self._send_progress(file, p, 100, "WORKING", message)
|
||||
self._send_progress(self.filename, p, 100, "WORKING", message)
|
||||
|
||||
document_parser = parser_class(doc, self.logging_group, progress_callback)
|
||||
# This doesn't parse the document yet, but gives us a parser.
|
||||
|
||||
document_parser = parser_class(self.path, self.logging_group, progress_callback)
|
||||
|
||||
# However, this already created working directories which we have to
|
||||
# clean up.
|
||||
|
||||
# Parse the document. This may take some time.
|
||||
|
||||
try:
|
||||
self.log("info", "Generating thumbnail for {}...".format(doc))
|
||||
self._send_progress(file, 10, 100, 'WORKING',
|
||||
self.log("debug", "Generating thumbnail for {}...".format(self.filename))
|
||||
self._send_progress(self.filename, 10, 100, 'WORKING',
|
||||
'Generating thumbnail...')
|
||||
thumbnail = document_parser.get_optimised_thumbnail()
|
||||
self._send_progress(file, 20, 100, 'WORKING',
|
||||
self.log("debug", "Parsing {}...".format(self.filename))
|
||||
self._send_progress(self.filename, 20, 100, 'WORKING',
|
||||
'Getting text from document...')
|
||||
text = document_parser.get_text()
|
||||
self._send_progress(file, 80, 100, 'WORKING',
|
||||
self._send_progress(self.filename, 80, 100, 'WORKING',
|
||||
'Getting date from document...')
|
||||
date = document_parser.get_date()
|
||||
self._send_progress(file, 85, 100, 'WORKING',
|
||||
'Storing the document...')
|
||||
document = self._store(
|
||||
text,
|
||||
doc,
|
||||
thumbnail,
|
||||
date
|
||||
)
|
||||
except ParseError as e:
|
||||
self.log("fatal", "PARSE FAILURE for {}: {}".format(doc, e))
|
||||
document_parser.cleanup()
|
||||
self._send_progress(self.filename, 100, 100, 'FAILED',
|
||||
"Failed: {}".format(e))
|
||||
raise ConsumerError(e)
|
||||
|
||||
# Prepare the document classifier.
|
||||
|
||||
# TODO: I don't really like to do this here, but this way we avoid
|
||||
# reloading the classifier multiple times, since there are multiple
|
||||
# post-consume hooks that all require the classifier.
|
||||
|
||||
try:
|
||||
classifier = DocumentClassifier()
|
||||
classifier.reload()
|
||||
except (FileNotFoundError, IncompatibleClassifierVersionError) as e:
|
||||
logging.getLogger(__name__).warning(
|
||||
"Cannot classify documents: {}.".format(e))
|
||||
classifier = None
|
||||
self._send_progress(self.filename, 85, 100, 'WORKING',
|
||||
'Storing the document...')
|
||||
# now that everything is done, we can start to store the document
|
||||
# in the system. This will be a transaction and reasonably fast.
|
||||
try:
|
||||
with transaction.atomic():
|
||||
|
||||
# store the document.
|
||||
document = self._store(
|
||||
text=text,
|
||||
date=date
|
||||
)
|
||||
|
||||
# If we get here, it was successful. Proceed with post-consume
|
||||
# hooks. If they fail, nothing will get changed.
|
||||
|
||||
self._send_progress(self.filename, 90, 100, 'WORKING',
|
||||
'Performing post-consumption tasks...')
|
||||
|
||||
document_consumption_finished.send(
|
||||
sender=self.__class__,
|
||||
document=document,
|
||||
logging_group=self.logging_group,
|
||||
classifier=classifier
|
||||
)
|
||||
|
||||
# After everything is in the database, copy the files into
|
||||
# place. If this fails, we'll also rollback the transaction.
|
||||
|
||||
create_source_path_directory(document.source_path)
|
||||
self._write(document, self.path, document.source_path)
|
||||
self._write(document, thumbnail, document.thumbnail_path)
|
||||
|
||||
# Delete the file only if it was successfully consumed
|
||||
self.log("debug", "Deleting file {}".format(self.path))
|
||||
os.unlink(self.path)
|
||||
except Exception as e:
|
||||
raise ConsumerError(e)
|
||||
self._send_progress(file, 100, 100, 'FAILED',
|
||||
"Failed: {}".format(e))
|
||||
|
||||
finally:
|
||||
document_parser.cleanup()
|
||||
return False
|
||||
else:
|
||||
document_parser.cleanup()
|
||||
self._cleanup_doc(doc)
|
||||
|
||||
self.log(
|
||||
"info",
|
||||
"Document {} consumption finished".format(document)
|
||||
)
|
||||
self.log(
|
||||
"info",
|
||||
"Document {} consumption finished".format(document)
|
||||
)
|
||||
|
||||
classifier = None
|
||||
self._send_progress(file, 100, 100, 'SUCCESS',
|
||||
'Finished.', document.id)
|
||||
|
||||
try:
|
||||
self.classifier.reload()
|
||||
classifier = self.classifier
|
||||
except (FileNotFoundError, IncompatibleClassifierVersionError) as e:
|
||||
logging.getLogger(__name__).warning("Cannot classify documents: {}.".format(e))
|
||||
return document
|
||||
|
||||
self._send_progress(file, 90, 100, 'WORKING',
|
||||
'Performing post-consumption tasks...')
|
||||
def _store(self, text, date):
|
||||
|
||||
document_consumption_finished.send(
|
||||
sender=self.__class__,
|
||||
document=document,
|
||||
logging_group=self.logging_group,
|
||||
classifier=classifier
|
||||
)
|
||||
self._send_progress(file, 100, 100, 'SUCCESS',
|
||||
'Finished.', document.id)
|
||||
return True
|
||||
# If someone gave us the original filename, use it instead of doc.
|
||||
|
||||
def _store(self, text, doc, thumbnail, date):
|
||||
file_info = FileInfo.from_path(self.filename)
|
||||
|
||||
file_info = FileInfo.from_path(doc)
|
||||
|
||||
stats = os.stat(doc)
|
||||
stats = os.stat(self.path)
|
||||
|
||||
self.log("debug", "Saving record to database")
|
||||
|
||||
created = file_info.created or date or timezone.make_aware(
|
||||
datetime.datetime.fromtimestamp(stats.st_mtime))
|
||||
datetime.datetime.fromtimestamp(stats.st_mtime))
|
||||
|
||||
with open(doc, "rb") as f:
|
||||
if settings.PASSPHRASE:
|
||||
storage_type = Document.STORAGE_TYPE_GPG
|
||||
else:
|
||||
storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
|
||||
with open(self.path, "rb") as f:
|
||||
document = Document.objects.create(
|
||||
correspondent=file_info.correspondent,
|
||||
title=file_info.title,
|
||||
@@ -202,7 +266,7 @@ class Consumer:
|
||||
checksum=hashlib.md5(f.read()).hexdigest(),
|
||||
created=created,
|
||||
modified=created,
|
||||
storage_type=self.storage_type
|
||||
storage_type=storage_type
|
||||
)
|
||||
|
||||
relevant_tags = set(file_info.tags)
|
||||
@@ -211,14 +275,30 @@ class Consumer:
|
||||
self.log("debug", "Tagging with {}".format(tag_names))
|
||||
document.tags.add(*relevant_tags)
|
||||
|
||||
self._write(document, doc, document.source_path)
|
||||
self._write(document, thumbnail, document.thumbnail_path)
|
||||
self.apply_overrides(document)
|
||||
|
||||
#TODO: why do we need to save the document again?
|
||||
document.filename = generate_filename(document)
|
||||
|
||||
# We need to save the document twice, since we need the PK of the
|
||||
# document in order to create its filename above.
|
||||
document.save()
|
||||
|
||||
return document
|
||||
|
||||
def apply_overrides(self, document):
|
||||
if self.override_title:
|
||||
document.title = self.override_title
|
||||
|
||||
if self.override_correspondent_id:
|
||||
document.correspondent = Correspondent.objects.get(pk=self.override_correspondent_id)
|
||||
|
||||
if self.override_document_type_id:
|
||||
document.document_type = DocumentType.objects.get(pk=self.override_document_type_id)
|
||||
|
||||
if self.override_tag_ids:
|
||||
for tag_id in self.override_tag_ids:
|
||||
document.tags.add(Tag.objects.get(pk=tag_id))
|
||||
|
||||
def _write(self, document, source, target):
|
||||
with open(source, "rb") as read_file:
|
||||
with open(target, "wb") as write_file:
|
||||
@@ -227,13 +307,3 @@ class Consumer:
|
||||
return
|
||||
self.log("debug", "Encrypting")
|
||||
write_file.write(GnuPG.encrypted(read_file))
|
||||
|
||||
def _cleanup_doc(self, doc):
|
||||
self.log("debug", "Deleting document {}".format(doc))
|
||||
os.unlink(doc)
|
||||
|
||||
@staticmethod
|
||||
def _is_duplicate(doc):
|
||||
with open(doc, "rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
return Document.objects.filter(checksum=checksum).exists()
|
||||
|
Reference in New Issue
Block a user