Merge branch 'dev' into feature-websockets-status

This commit is contained in:
jonaswinkler
2021-01-04 22:45:56 +01:00
257 changed files with 18414 additions and 3310 deletions

View File

@@ -1,7 +1,7 @@
import datetime
import hashlib
import logging
import os
from subprocess import Popen
import magic
from asgiref.sync import async_to_sync
@@ -10,13 +10,15 @@ from django.conf import settings
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from filelock import FileLock
from rest_framework.reverse import reverse
from .classifier import DocumentClassifier, IncompatibleClassifierVersionError
from .file_handling import create_source_path_directory
from .file_handling import create_source_path_directory, \
generate_unique_filename
from .loggers import LoggingMixin
from .models import Document, FileInfo, Correspondent, DocumentType, Tag
from .parsers import ParseError, get_parser_class_for_mime_type, \
get_supported_file_extensions, parse_date
from .parsers import ParseError, get_parser_class_for_mime_type, parse_date
from .signals import (
document_consumption_finished,
document_consumption_started
@@ -61,6 +63,10 @@ class Consumer(LoggingMixin):
def pre_check_file_exists(self):
if not os.path.isfile(self.path):
self.log(
"error",
"Cannot consume {}: It is not a file.".format(self.path)
)
self._fail("File not found")
def pre_check_duplicate(self):
@@ -69,6 +75,10 @@ class Consumer(LoggingMixin):
if Document.objects.filter(Q(checksum=checksum) | Q(archive_checksum=checksum)).exists(): # NOQA: E501
if settings.CONSUMER_DELETE_DUPLICATES:
os.unlink(self.path)
self.log(
"error",
"Not consuming {}: It is a duplicate.".format(self.filename)
)
self._fail("Document is a duplicate")
def pre_check_directories(self):
@@ -77,6 +87,39 @@ class Consumer(LoggingMixin):
os.makedirs(settings.ORIGINALS_DIR, exist_ok=True)
os.makedirs(settings.ARCHIVE_DIR, exist_ok=True)
def run_pre_consume_script(self):
if not settings.PRE_CONSUME_SCRIPT:
return
try:
Popen((settings.PRE_CONSUME_SCRIPT, self.path)).wait()
except Exception as e:
raise ConsumerError(
f"Error while executing pre-consume script: {e}"
)
def run_post_consume_script(self, document):
if not settings.POST_CONSUME_SCRIPT:
return
try:
Popen((
settings.POST_CONSUME_SCRIPT,
str(document.pk),
document.get_public_filename(),
os.path.normpath(document.source_path),
os.path.normpath(document.thumbnail_path),
reverse("document-download", kwargs={"pk": document.pk}),
reverse("document-thumb", kwargs={"pk": document.pk}),
str(document.correspondent),
str(",".join(document.tags.all().values_list(
"name", flat=True)))
)).wait()
except Exception as e:
raise ConsumerError(
f"Error while executing pre-consume script: {e}"
)
def try_consume_file(self,
path,
override_filename=None,
@@ -109,19 +152,20 @@ class Consumer(LoggingMixin):
self.pre_check_directories()
self.pre_check_duplicate()
self.log("info", "Consuming {}".format(self.filename))
self.log("info", f"Consuming {self.filename}")
# Determine the parser class.
mime_type = magic.from_file(self.path, mime=True)
self.log("debug", f"Detected mime type: {mime_type}")
parser_class = get_parser_class_for_mime_type(mime_type)
if not parser_class:
self._fail("No parsers abvailable")
self._fail(f"Unsupported mime type {mime_type}")
else:
self.log("debug",
f"Parser: {parser_class.__name__} "
f"based on mime type {mime_type}")
f"Parser: {parser_class.__name__}")
# Notify all listeners that we're going to do some work.
@@ -131,6 +175,8 @@ class Consumer(LoggingMixin):
logging_group=self.logging_group
)
self.run_pre_consume_script()
def progress_callback(current_progress, max_progress, message):
# recalculate progress to be within 20 and 80
p = int((current_progress / max_progress) * 50 + 20)
@@ -149,7 +195,7 @@ class Consumer(LoggingMixin):
self._send_progress(self.filename, 20, 100, 'WORKING',
'Parsing document...')
self.log("debug", "Parsing {}...".format(self.filename))
document_parser.parse(self.path, mime_type)
document_parser.parse(self.path, mime_type, self.filename)
self.log("debug", f"Generating thumbnail for {self.filename}...")
self._send_progress(self.filename, 70, 100, 'WORKING',
@@ -181,9 +227,10 @@ class Consumer(LoggingMixin):
try:
classifier = DocumentClassifier()
classifier.reload()
except (FileNotFoundError, IncompatibleClassifierVersionError) as e:
logging.getLogger(__name__).warning(
"Cannot classify documents: {}.".format(e))
except (OSError, EOFError, IncompatibleClassifierVersionError) as e:
self.log(
"warning",
f"Cannot classify documents: {e}.")
classifier = None
self._send_progress(self.filename, 95, 100, 'WORKING',
'Storing the document...')
@@ -211,36 +258,34 @@ class Consumer(LoggingMixin):
# After everything is in the database, copy the files into
# place. If this fails, we'll also rollback the transaction.
with FileLock(settings.MEDIA_LOCK):
document.filename = generate_unique_filename(
document, settings.ORIGINALS_DIR)
create_source_path_directory(document.source_path)
# TODO: not required, since this is done by the file handling
# logic
create_source_path_directory(document.source_path)
self._write(document.storage_type,
self.path, document.source_path)
self._write(document.storage_type,
thumbnail, document.thumbnail_path)
if archive_path and os.path.isfile(archive_path):
self._write(document.storage_type,
archive_path, document.archive_path)
self.path, document.source_path)
with open(archive_path, 'rb') as f:
document.archive_checksum = hashlib.md5(
f.read()).hexdigest()
document.save()
self._write(document.storage_type,
thumbnail, document.thumbnail_path)
# Afte performing all database operations and moving files
# into place, tell paperless where the file is.
document.filename = os.path.basename(document.source_path)
# Saving the document now will trigger the filename handling
# logic.
if archive_path and os.path.isfile(archive_path):
create_source_path_directory(document.archive_path)
self._write(document.storage_type,
archive_path, document.archive_path)
with open(archive_path, 'rb') as f:
document.archive_checksum = hashlib.md5(
f.read()).hexdigest()
# Don't save with the lock active. Saving will cause the file
# renaming logic to aquire the lock as well.
document.save()
# Delete the file only if it was successfully consumed
self.log("debug", "Deleting file {}".format(self.path))
os.unlink(self.path)
except Exception as e:
self.log(
"error",
@@ -251,6 +296,8 @@ class Consumer(LoggingMixin):
finally:
document_parser.cleanup()
self.run_post_consume_script(document)
self.log(
"info",
"Document {} consumption finished".format(document)
@@ -278,8 +325,7 @@ class Consumer(LoggingMixin):
with open(self.path, "rb") as f:
document = Document.objects.create(
correspondent=file_info.correspondent,
title=file_info.title,
title=(self.override_title or file_info.title)[:127],
content=text,
mime_type=mime_type,
checksum=hashlib.md5(f.read()).hexdigest(),
@@ -288,20 +334,13 @@ class Consumer(LoggingMixin):
storage_type=storage_type
)
relevant_tags = set(file_info.tags)
if relevant_tags:
tag_names = ", ".join([t.slug for t in relevant_tags])
self.log("debug", "Tagging with {}".format(tag_names))
document.tags.add(*relevant_tags)
self.apply_overrides(document)
document.save()
return document
def apply_overrides(self, document):
if self.override_title:
document.title = self.override_title
if self.override_correspondent_id:
document.correspondent = Correspondent.objects.get(
pk=self.override_correspondent_id)