mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-09-01 01:46:16 +00:00
Chore: Convert the consumer to a plugin (#6361)
This commit is contained in:
@@ -2,15 +2,13 @@ import datetime
|
||||
import hashlib
|
||||
import os
|
||||
import tempfile
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
import magic
|
||||
from asgiref.sync import async_to_sync
|
||||
from channels.layers import get_channel_layer
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.db import transaction
|
||||
@@ -20,6 +18,7 @@ from filelock import FileLock
|
||||
from rest_framework.reverse import reverse
|
||||
|
||||
from documents.classifier import load_classifier
|
||||
from documents.data_models import ConsumableDocument
|
||||
from documents.data_models import DocumentMetadataOverrides
|
||||
from documents.file_handling import create_source_path_directory
|
||||
from documents.file_handling import generate_unique_filename
|
||||
@@ -45,6 +44,8 @@ from documents.plugins.base import AlwaysRunPluginMixin
|
||||
from documents.plugins.base import ConsumeTaskPlugin
|
||||
from documents.plugins.base import NoCleanupPluginMixin
|
||||
from documents.plugins.base import NoSetupPluginMixin
|
||||
from documents.plugins.helpers import ProgressManager
|
||||
from documents.plugins.helpers import ProgressStatusOptions
|
||||
from documents.signals import document_consumption_finished
|
||||
from documents.signals import document_consumption_started
|
||||
from documents.utils import copy_basic_file_stats
|
||||
@@ -247,88 +248,81 @@ class ConsumerStatusShortMessage(str, Enum):
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
class ConsumerFilePhase(str, Enum):
|
||||
STARTED = "STARTED"
|
||||
WORKING = "WORKING"
|
||||
SUCCESS = "SUCCESS"
|
||||
FAILED = "FAILED"
|
||||
|
||||
|
||||
class Consumer(LoggingMixin):
|
||||
class ConsumerPlugin(
|
||||
AlwaysRunPluginMixin,
|
||||
NoSetupPluginMixin,
|
||||
NoCleanupPluginMixin,
|
||||
LoggingMixin,
|
||||
ConsumeTaskPlugin,
|
||||
):
|
||||
logging_name = "paperless.consumer"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_doc: ConsumableDocument,
|
||||
metadata: DocumentMetadataOverrides,
|
||||
status_mgr: ProgressManager,
|
||||
base_tmp_dir: Path,
|
||||
task_id: str,
|
||||
) -> None:
|
||||
super().__init__(input_doc, metadata, status_mgr, base_tmp_dir, task_id)
|
||||
|
||||
self.renew_logging_group()
|
||||
|
||||
self.filename = self.metadata.filename or self.input_doc.original_file.name
|
||||
|
||||
def _send_progress(
|
||||
self,
|
||||
current_progress: int,
|
||||
max_progress: int,
|
||||
status: ConsumerFilePhase,
|
||||
message: Optional[ConsumerStatusShortMessage] = None,
|
||||
status: ProgressStatusOptions,
|
||||
message: Optional[Union[ConsumerStatusShortMessage, str]] = None,
|
||||
document_id=None,
|
||||
): # pragma: no cover
|
||||
payload = {
|
||||
"filename": os.path.basename(self.filename) if self.filename else None,
|
||||
"task_id": self.task_id,
|
||||
"current_progress": current_progress,
|
||||
"max_progress": max_progress,
|
||||
"status": status,
|
||||
"message": message,
|
||||
"document_id": document_id,
|
||||
"owner_id": self.override_owner_id if self.override_owner_id else None,
|
||||
}
|
||||
async_to_sync(self.channel_layer.group_send)(
|
||||
"status_updates",
|
||||
{"type": "status_update", "data": payload},
|
||||
self.status_mgr.send_progress(
|
||||
status,
|
||||
message,
|
||||
current_progress,
|
||||
max_progress,
|
||||
extra_args={
|
||||
"document_id": document_id,
|
||||
"owner_id": self.metadata.owner_id if self.metadata.owner_id else None,
|
||||
},
|
||||
)
|
||||
|
||||
def _fail(
|
||||
self,
|
||||
message: ConsumerStatusShortMessage,
|
||||
message: Union[ConsumerStatusShortMessage, str],
|
||||
log_message: Optional[str] = None,
|
||||
exc_info=None,
|
||||
exception: Optional[Exception] = None,
|
||||
):
|
||||
self._send_progress(100, 100, ConsumerFilePhase.FAILED, message)
|
||||
self._send_progress(100, 100, ProgressStatusOptions.FAILED, message)
|
||||
self.log.error(log_message or message, exc_info=exc_info)
|
||||
raise ConsumerError(f"{self.filename}: {log_message or message}") from exception
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.path: Optional[Path] = None
|
||||
self.original_path: Optional[Path] = None
|
||||
self.filename = None
|
||||
self.override_title = None
|
||||
self.override_correspondent_id = None
|
||||
self.override_tag_ids = None
|
||||
self.override_document_type_id = None
|
||||
self.override_asn = None
|
||||
self.task_id = None
|
||||
self.override_owner_id = None
|
||||
self.override_custom_field_ids = None
|
||||
|
||||
self.channel_layer = get_channel_layer()
|
||||
|
||||
def pre_check_file_exists(self):
|
||||
"""
|
||||
Confirm the input file still exists where it should
|
||||
"""
|
||||
if not os.path.isfile(self.original_path):
|
||||
if not os.path.isfile(self.input_doc.original_file):
|
||||
self._fail(
|
||||
ConsumerStatusShortMessage.FILE_NOT_FOUND,
|
||||
f"Cannot consume {self.original_path}: File not found.",
|
||||
f"Cannot consume {self.input_doc.original_file}: File not found.",
|
||||
)
|
||||
|
||||
def pre_check_duplicate(self):
|
||||
"""
|
||||
Using the MD5 of the file, check this exact file doesn't already exist
|
||||
"""
|
||||
with open(self.original_path, "rb") as f:
|
||||
with open(self.input_doc.original_file, "rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
existing_doc = Document.objects.filter(
|
||||
Q(checksum=checksum) | Q(archive_checksum=checksum),
|
||||
)
|
||||
if existing_doc.exists():
|
||||
if settings.CONSUMER_DELETE_DUPLICATES:
|
||||
os.unlink(self.original_path)
|
||||
os.unlink(self.input_doc.original_file)
|
||||
self._fail(
|
||||
ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS,
|
||||
f"Not consuming {self.filename}: It is a duplicate of"
|
||||
@@ -348,26 +342,26 @@ class Consumer(LoggingMixin):
|
||||
"""
|
||||
Check that if override_asn is given, it is unique and within a valid range
|
||||
"""
|
||||
if not self.override_asn:
|
||||
if not self.metadata.asn:
|
||||
# check not necessary in case no ASN gets set
|
||||
return
|
||||
# Validate the range is above zero and less than uint32_t max
|
||||
# otherwise, Whoosh can't handle it in the index
|
||||
if (
|
||||
self.override_asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
|
||||
or self.override_asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
|
||||
self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
|
||||
or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
|
||||
):
|
||||
self._fail(
|
||||
ConsumerStatusShortMessage.ASN_RANGE,
|
||||
f"Not consuming {self.filename}: "
|
||||
f"Given ASN {self.override_asn} is out of range "
|
||||
f"Given ASN {self.metadata.asn} is out of range "
|
||||
f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
|
||||
f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]",
|
||||
)
|
||||
if Document.objects.filter(archive_serial_number=self.override_asn).exists():
|
||||
if Document.objects.filter(archive_serial_number=self.metadata.asn).exists():
|
||||
self._fail(
|
||||
ConsumerStatusShortMessage.ASN_ALREADY_EXISTS,
|
||||
f"Not consuming {self.filename}: Given ASN {self.override_asn} already exists!",
|
||||
f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!",
|
||||
)
|
||||
|
||||
def run_pre_consume_script(self):
|
||||
@@ -388,7 +382,7 @@ class Consumer(LoggingMixin):
|
||||
self.log.info(f"Executing pre-consume script {settings.PRE_CONSUME_SCRIPT}")
|
||||
|
||||
working_file_path = str(self.working_copy)
|
||||
original_file_path = str(self.original_path)
|
||||
original_file_path = str(self.input_doc.original_file)
|
||||
|
||||
script_env = os.environ.copy()
|
||||
script_env["DOCUMENT_SOURCE_PATH"] = original_file_path
|
||||
@@ -486,50 +480,15 @@ class Consumer(LoggingMixin):
|
||||
exception=e,
|
||||
)
|
||||
|
||||
def try_consume_file(
|
||||
self,
|
||||
path: Path,
|
||||
override_filename=None,
|
||||
override_title=None,
|
||||
override_correspondent_id=None,
|
||||
override_document_type_id=None,
|
||||
override_tag_ids=None,
|
||||
override_storage_path_id=None,
|
||||
task_id=None,
|
||||
override_created=None,
|
||||
override_asn=None,
|
||||
override_owner_id=None,
|
||||
override_view_users=None,
|
||||
override_view_groups=None,
|
||||
override_change_users=None,
|
||||
override_change_groups=None,
|
||||
override_custom_field_ids=None,
|
||||
) -> Document:
|
||||
def run(self) -> str:
|
||||
"""
|
||||
Return the document object if it was successfully created.
|
||||
"""
|
||||
|
||||
self.original_path = Path(path).resolve()
|
||||
self.filename = override_filename or self.original_path.name
|
||||
self.override_title = override_title
|
||||
self.override_correspondent_id = override_correspondent_id
|
||||
self.override_document_type_id = override_document_type_id
|
||||
self.override_tag_ids = override_tag_ids
|
||||
self.override_storage_path_id = override_storage_path_id
|
||||
self.task_id = task_id or str(uuid.uuid4())
|
||||
self.override_created = override_created
|
||||
self.override_asn = override_asn
|
||||
self.override_owner_id = override_owner_id
|
||||
self.override_view_users = override_view_users
|
||||
self.override_view_groups = override_view_groups
|
||||
self.override_change_users = override_change_users
|
||||
self.override_change_groups = override_change_groups
|
||||
self.override_custom_field_ids = override_custom_field_ids
|
||||
|
||||
self._send_progress(
|
||||
0,
|
||||
100,
|
||||
ConsumerFilePhase.STARTED,
|
||||
ProgressStatusOptions.STARTED,
|
||||
ConsumerStatusShortMessage.NEW_FILE,
|
||||
)
|
||||
|
||||
@@ -548,7 +507,7 @@ class Consumer(LoggingMixin):
|
||||
dir=settings.SCRATCH_DIR,
|
||||
)
|
||||
self.working_copy = Path(tempdir.name) / Path(self.filename)
|
||||
copy_file_with_basic_stats(self.original_path, self.working_copy)
|
||||
copy_file_with_basic_stats(self.input_doc.original_file, self.working_copy)
|
||||
|
||||
# Determine the parser class.
|
||||
|
||||
@@ -580,7 +539,7 @@ class Consumer(LoggingMixin):
|
||||
def progress_callback(current_progress, max_progress): # pragma: no cover
|
||||
# recalculate progress to be within 20 and 80
|
||||
p = int((current_progress / max_progress) * 50 + 20)
|
||||
self._send_progress(p, 100, ConsumerFilePhase.WORKING)
|
||||
self._send_progress(p, 100, ProgressStatusOptions.WORKING)
|
||||
|
||||
# This doesn't parse the document yet, but gives us a parser.
|
||||
|
||||
@@ -591,9 +550,6 @@ class Consumer(LoggingMixin):
|
||||
|
||||
self.log.debug(f"Parser: {type(document_parser).__name__}")
|
||||
|
||||
# However, this already created working directories which we have to
|
||||
# clean up.
|
||||
|
||||
# Parse the document. This may take some time.
|
||||
|
||||
text = None
|
||||
@@ -605,7 +561,7 @@ class Consumer(LoggingMixin):
|
||||
self._send_progress(
|
||||
20,
|
||||
100,
|
||||
ConsumerFilePhase.WORKING,
|
||||
ProgressStatusOptions.WORKING,
|
||||
ConsumerStatusShortMessage.PARSING_DOCUMENT,
|
||||
)
|
||||
self.log.debug(f"Parsing {self.filename}...")
|
||||
@@ -615,7 +571,7 @@ class Consumer(LoggingMixin):
|
||||
self._send_progress(
|
||||
70,
|
||||
100,
|
||||
ConsumerFilePhase.WORKING,
|
||||
ProgressStatusOptions.WORKING,
|
||||
ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
|
||||
)
|
||||
thumbnail = document_parser.get_thumbnail(
|
||||
@@ -630,7 +586,7 @@ class Consumer(LoggingMixin):
|
||||
self._send_progress(
|
||||
90,
|
||||
100,
|
||||
ConsumerFilePhase.WORKING,
|
||||
ProgressStatusOptions.WORKING,
|
||||
ConsumerStatusShortMessage.PARSE_DATE,
|
||||
)
|
||||
date = parse_date(self.filename, text)
|
||||
@@ -664,7 +620,7 @@ class Consumer(LoggingMixin):
|
||||
self._send_progress(
|
||||
95,
|
||||
100,
|
||||
ConsumerFilePhase.WORKING,
|
||||
ProgressStatusOptions.WORKING,
|
||||
ConsumerStatusShortMessage.SAVE_DOCUMENT,
|
||||
)
|
||||
# now that everything is done, we can start to store the document
|
||||
@@ -726,13 +682,13 @@ class Consumer(LoggingMixin):
|
||||
|
||||
# Delete the file only if it was successfully consumed
|
||||
self.log.debug(f"Deleting file {self.working_copy}")
|
||||
self.original_path.unlink()
|
||||
self.input_doc.original_file.unlink()
|
||||
self.working_copy.unlink()
|
||||
|
||||
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
|
||||
shadow_file = os.path.join(
|
||||
os.path.dirname(self.original_path),
|
||||
"._" + os.path.basename(self.original_path),
|
||||
os.path.dirname(self.input_doc.original_file),
|
||||
"._" + os.path.basename(self.input_doc.original_file),
|
||||
)
|
||||
|
||||
if os.path.isfile(shadow_file):
|
||||
@@ -758,7 +714,7 @@ class Consumer(LoggingMixin):
|
||||
self._send_progress(
|
||||
100,
|
||||
100,
|
||||
ConsumerFilePhase.SUCCESS,
|
||||
ProgressStatusOptions.SUCCESS,
|
||||
ConsumerStatusShortMessage.FINISHED,
|
||||
document.id,
|
||||
)
|
||||
@@ -766,24 +722,24 @@ class Consumer(LoggingMixin):
|
||||
# Return the most up to date fields
|
||||
document.refresh_from_db()
|
||||
|
||||
return document
|
||||
return f"Success. New document id {document.pk} created"
|
||||
|
||||
def _parse_title_placeholders(self, title: str) -> str:
|
||||
local_added = timezone.localtime(timezone.now())
|
||||
|
||||
correspondent_name = (
|
||||
Correspondent.objects.get(pk=self.override_correspondent_id).name
|
||||
if self.override_correspondent_id is not None
|
||||
Correspondent.objects.get(pk=self.metadata.correspondent_id).name
|
||||
if self.metadata.correspondent_id is not None
|
||||
else None
|
||||
)
|
||||
doc_type_name = (
|
||||
DocumentType.objects.get(pk=self.override_document_type_id).name
|
||||
if self.override_document_type_id is not None
|
||||
DocumentType.objects.get(pk=self.metadata.document_type_id).name
|
||||
if self.metadata.document_type_id is not None
|
||||
else None
|
||||
)
|
||||
owner_username = (
|
||||
User.objects.get(pk=self.override_owner_id).username
|
||||
if self.override_owner_id is not None
|
||||
User.objects.get(pk=self.metadata.owner_id).username
|
||||
if self.metadata.owner_id is not None
|
||||
else None
|
||||
)
|
||||
|
||||
@@ -808,8 +764,8 @@ class Consumer(LoggingMixin):
|
||||
|
||||
self.log.debug("Saving record to database")
|
||||
|
||||
if self.override_created is not None:
|
||||
create_date = self.override_created
|
||||
if self.metadata.created is not None:
|
||||
create_date = self.metadata.created
|
||||
self.log.debug(
|
||||
f"Creation date from post_documents parameter: {create_date}",
|
||||
)
|
||||
@@ -820,7 +776,7 @@ class Consumer(LoggingMixin):
|
||||
create_date = date
|
||||
self.log.debug(f"Creation date from parse_date: {create_date}")
|
||||
else:
|
||||
stats = os.stat(self.original_path)
|
||||
stats = os.stat(self.input_doc.original_file)
|
||||
create_date = timezone.make_aware(
|
||||
datetime.datetime.fromtimestamp(stats.st_mtime),
|
||||
)
|
||||
@@ -829,12 +785,12 @@ class Consumer(LoggingMixin):
|
||||
storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
|
||||
title = file_info.title
|
||||
if self.override_title is not None:
|
||||
if self.metadata.title is not None:
|
||||
try:
|
||||
title = self._parse_title_placeholders(self.override_title)
|
||||
title = self._parse_title_placeholders(self.metadata.title)
|
||||
except Exception as e:
|
||||
self.log.error(
|
||||
f"Error occurred parsing title override '{self.override_title}', falling back to original. Exception: {e}",
|
||||
f"Error occurred parsing title override '{self.metadata.title}', falling back to original. Exception: {e}",
|
||||
)
|
||||
|
||||
document = Document.objects.create(
|
||||
@@ -855,53 +811,53 @@ class Consumer(LoggingMixin):
|
||||
return document
|
||||
|
||||
def apply_overrides(self, document):
|
||||
if self.override_correspondent_id:
|
||||
if self.metadata.correspondent_id:
|
||||
document.correspondent = Correspondent.objects.get(
|
||||
pk=self.override_correspondent_id,
|
||||
pk=self.metadata.correspondent_id,
|
||||
)
|
||||
|
||||
if self.override_document_type_id:
|
||||
if self.metadata.document_type_id:
|
||||
document.document_type = DocumentType.objects.get(
|
||||
pk=self.override_document_type_id,
|
||||
pk=self.metadata.document_type_id,
|
||||
)
|
||||
|
||||
if self.override_tag_ids:
|
||||
for tag_id in self.override_tag_ids:
|
||||
if self.metadata.tag_ids:
|
||||
for tag_id in self.metadata.tag_ids:
|
||||
document.tags.add(Tag.objects.get(pk=tag_id))
|
||||
|
||||
if self.override_storage_path_id:
|
||||
if self.metadata.storage_path_id:
|
||||
document.storage_path = StoragePath.objects.get(
|
||||
pk=self.override_storage_path_id,
|
||||
pk=self.metadata.storage_path_id,
|
||||
)
|
||||
|
||||
if self.override_asn:
|
||||
document.archive_serial_number = self.override_asn
|
||||
if self.metadata.asn:
|
||||
document.archive_serial_number = self.metadata.asn
|
||||
|
||||
if self.override_owner_id:
|
||||
if self.metadata.owner_id:
|
||||
document.owner = User.objects.get(
|
||||
pk=self.override_owner_id,
|
||||
pk=self.metadata.owner_id,
|
||||
)
|
||||
|
||||
if (
|
||||
self.override_view_users is not None
|
||||
or self.override_view_groups is not None
|
||||
or self.override_change_users is not None
|
||||
or self.override_change_users is not None
|
||||
self.metadata.view_users is not None
|
||||
or self.metadata.view_groups is not None
|
||||
or self.metadata.change_users is not None
|
||||
or self.metadata.change_users is not None
|
||||
):
|
||||
permissions = {
|
||||
"view": {
|
||||
"users": self.override_view_users or [],
|
||||
"groups": self.override_view_groups or [],
|
||||
"users": self.metadata.view_users or [],
|
||||
"groups": self.metadata.view_groups or [],
|
||||
},
|
||||
"change": {
|
||||
"users": self.override_change_users or [],
|
||||
"groups": self.override_change_groups or [],
|
||||
"users": self.metadata.change_users or [],
|
||||
"groups": self.metadata.change_groups or [],
|
||||
},
|
||||
}
|
||||
set_permissions_for_object(permissions=permissions, object=document)
|
||||
|
||||
if self.override_custom_field_ids:
|
||||
for field_id in self.override_custom_field_ids:
|
||||
if self.metadata.custom_field_ids:
|
||||
for field_id in self.metadata.custom_field_ids:
|
||||
field = CustomField.objects.get(pk=field_id)
|
||||
CustomFieldInstance.objects.create(
|
||||
field=field,
|
||||
|
Reference in New Issue
Block a user