diff --git a/src/documents/consumer.py b/src/documents/consumer.py index c78c21d37..d051c4259 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -98,15 +98,7 @@ class ConsumerStatusShortMessage(str, Enum): FAILED = "failed" -class ConsumerPlugin( - AlwaysRunPluginMixin, - NoSetupPluginMixin, - NoCleanupPluginMixin, - LoggingMixin, - ConsumeTaskPlugin, -): - logging_name = "paperless.consumer" - +class ConsumerPluginMixin: def __init__( self, input_doc: ConsumableDocument, @@ -155,88 +147,16 @@ class ConsumerPlugin( self.log.error(log_message or message, exc_info=exc_info) raise ConsumerError(f"{self.filename}: {log_message or message}") from exception - def pre_check_file_exists(self): - """ - Confirm the input file still exists where it should - """ - if TYPE_CHECKING: - assert isinstance(self.input_doc.original_file, Path), ( - self.input_doc.original_file - ) - if not self.input_doc.original_file.is_file(): - self._fail( - ConsumerStatusShortMessage.FILE_NOT_FOUND, - f"Cannot consume {self.input_doc.original_file}: File not found.", - ) - def pre_check_duplicate(self): - """ - Using the MD5 of the file, check this exact file doesn't already exist - """ - with Path(self.input_doc.original_file).open("rb") as f: - checksum = hashlib.md5(f.read()).hexdigest() - existing_doc = Document.global_objects.filter( - Q(checksum=checksum) | Q(archive_checksum=checksum), - ) - if existing_doc.exists(): - msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS - log_msg = f"Not consuming {self.filename}: It is a duplicate of {existing_doc.get().title} (#{existing_doc.get().pk})." - - if existing_doc.first().deleted_at is not None: - msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS_IN_TRASH - log_msg += " Note: existing document is in the trash." - - if settings.CONSUMER_DELETE_DUPLICATES: - Path(self.input_doc.original_file).unlink() - self._fail( - msg, - log_msg, - ) - - def pre_check_directories(self): - """ - Ensure all required directories exist before attempting to use them - """ - settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) - settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) - settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) - settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) - - def pre_check_asn_value(self): - """ - Check that if override_asn is given, it is unique and within a valid range - """ - if self.metadata.asn is None: - # check not necessary in case no ASN gets set - return - # Validate the range is above zero and less than uint32_t max - # otherwise, Whoosh can't handle it in the index - if ( - self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN - or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX - ): - self._fail( - ConsumerStatusShortMessage.ASN_RANGE, - f"Not consuming {self.filename}: " - f"Given ASN {self.metadata.asn} is out of range " - f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, " - f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]", - ) - existing_asn_doc = Document.global_objects.filter( - archive_serial_number=self.metadata.asn, - ) - if existing_asn_doc.exists(): - msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS - log_msg = f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!" - - if existing_asn_doc.first().deleted_at is not None: - msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS_IN_TRASH - log_msg += " Note: existing document is in the trash." - - self._fail( - msg, - log_msg, - ) +class ConsumerPlugin( + AlwaysRunPluginMixin, + NoSetupPluginMixin, + NoCleanupPluginMixin, + LoggingMixin, + ConsumerPluginMixin, + ConsumeTaskPlugin, +): + logging_name = "paperless.consumer" def run_pre_consume_script(self): """ @@ -365,20 +285,7 @@ class ConsumerPlugin( tempdir = None try: - self._send_progress( - 0, - 100, - ProgressStatusOptions.STARTED, - ConsumerStatusShortMessage.NEW_FILE, - ) - - # Make sure that preconditions for consuming the file are met. - - self.pre_check_file_exists() - self.pre_check_directories() - self.pre_check_duplicate() - self.pre_check_asn_value() - + # Preflight has already run including progress update to 0% self.log.info(f"Consuming {self.filename}") # For the actual work, copy the file into a tempdir @@ -836,3 +743,113 @@ class ConsumerPlugin( copy_basic_file_stats(source, target) except Exception: # pragma: no cover pass + + +class ConsumerPreflightPlugin( + NoCleanupPluginMixin, + NoSetupPluginMixin, + AlwaysRunPluginMixin, + LoggingMixin, + ConsumerPluginMixin, + ConsumeTaskPlugin, +): + NAME: str = "ConsumerPreflightPlugin" + logging_name = "paperless.consumer" + + def pre_check_file_exists(self): + """ + Confirm the input file still exists where it should + """ + if TYPE_CHECKING: + assert isinstance(self.input_doc.original_file, Path), ( + self.input_doc.original_file + ) + if not self.input_doc.original_file.is_file(): + self._fail( + ConsumerStatusShortMessage.FILE_NOT_FOUND, + f"Cannot consume {self.input_doc.original_file}: File not found.", + ) + + def pre_check_duplicate(self): + """ + Using the MD5 of the file, check this exact file doesn't already exist + """ + with Path(self.input_doc.original_file).open("rb") as f: + checksum = hashlib.md5(f.read()).hexdigest() + existing_doc = Document.global_objects.filter( + Q(checksum=checksum) | Q(archive_checksum=checksum), + ) + if existing_doc.exists(): + msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS + log_msg = f"Not consuming {self.filename}: It is a duplicate of {existing_doc.get().title} (#{existing_doc.get().pk})." + + if existing_doc.first().deleted_at is not None: + msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS_IN_TRASH + log_msg += " Note: existing document is in the trash." + + if settings.CONSUMER_DELETE_DUPLICATES: + Path(self.input_doc.original_file).unlink() + self._fail( + msg, + log_msg, + ) + + def pre_check_directories(self): + """ + Ensure all required directories exist before attempting to use them + """ + settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True) + settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True) + settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True) + settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True) + + def pre_check_asn_value(self): + """ + Check that if override_asn is given, it is unique and within a valid range + """ + if self.metadata.asn is None: + # check not necessary in case no ASN gets set + return + # Validate the range is above zero and less than uint32_t max + # otherwise, Whoosh can't handle it in the index + if ( + self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN + or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX + ): + self._fail( + ConsumerStatusShortMessage.ASN_RANGE, + f"Not consuming {self.filename}: " + f"Given ASN {self.metadata.asn} is out of range " + f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, " + f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]", + ) + existing_asn_doc = Document.global_objects.filter( + archive_serial_number=self.metadata.asn, + ) + if existing_asn_doc.exists(): + msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS + log_msg = f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!" + + if existing_asn_doc.first().deleted_at is not None: + msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS_IN_TRASH + log_msg += " Note: existing document is in the trash." + + self._fail( + msg, + log_msg, + ) + + def run(self) -> None: + self._send_progress( + 0, + 100, + ProgressStatusOptions.STARTED, + ConsumerStatusShortMessage.NEW_FILE, + ) + + # Make sure that preconditions for consuming the file are met. + + self.pre_check_file_exists() + self.pre_check_duplicate() + self.pre_check_directories() + self.pre_check_asn_value() diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 13c104185..cff4168f4 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -26,6 +26,7 @@ from documents.caching import clear_document_caches from documents.classifier import DocumentClassifier from documents.classifier import load_classifier from documents.consumer import ConsumerPlugin +from documents.consumer import ConsumerPreflightPlugin from documents.consumer import WorkflowTriggerPlugin from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides @@ -144,6 +145,7 @@ def consume_file( overrides = DocumentMetadataOverrides() plugins: list[type[ConsumeTaskPlugin]] = [ + ConsumerPreflightPlugin, CollatePlugin, BarcodePlugin, WorkflowTriggerPlugin, diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 370ff0ef6..876ecb5f6 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -484,8 +484,8 @@ class TestConsumer( self._assert_first_last_send_progress() def testNotAFile(self): - with self.get_consumer(Path("non-existing-file")) as consumer: - with self.assertRaisesMessage(ConsumerError, "File not found"): + with self.assertRaisesMessage(ConsumerError, "File not found"): + with self.get_consumer(Path("non-existing-file")) as consumer: consumer.run() self._assert_first_last_send_progress(last_status="FAILED") @@ -493,8 +493,8 @@ class TestConsumer( with self.get_consumer(self.get_test_file()) as consumer: consumer.run() - with self.get_consumer(self.get_test_file()) as consumer: - with self.assertRaisesMessage(ConsumerError, "It is a duplicate"): + with self.assertRaisesMessage(ConsumerError, "It is a duplicate"): + with self.get_consumer(self.get_test_file()) as consumer: consumer.run() self._assert_first_last_send_progress(last_status="FAILED") @@ -503,8 +503,8 @@ class TestConsumer( with self.get_consumer(self.get_test_file()) as consumer: consumer.run() - with self.get_consumer(self.get_test_archive_file()) as consumer: - with self.assertRaisesMessage(ConsumerError, "It is a duplicate"): + with self.assertRaisesMessage(ConsumerError, "It is a duplicate"): + with self.get_consumer(self.get_test_archive_file()) as consumer: consumer.run() self._assert_first_last_send_progress(last_status="FAILED") @@ -521,8 +521,8 @@ class TestConsumer( Document.objects.all().delete() - with self.get_consumer(self.get_test_file()) as consumer: - with self.assertRaisesMessage(ConsumerError, "document is in the trash"): + with self.assertRaisesMessage(ConsumerError, "document is in the trash"): + with self.get_consumer(self.get_test_file()) as consumer: consumer.run() def testAsnExists(self): @@ -532,11 +532,11 @@ class TestConsumer( ) as consumer: consumer.run() - with self.get_consumer( - self.get_test_file2(), - DocumentMetadataOverrides(asn=123), - ) as consumer: - with self.assertRaisesMessage(ConsumerError, "ASN 123 already exists"): + with self.assertRaisesMessage(ConsumerError, "ASN 123 already exists"): + with self.get_consumer( + self.get_test_file2(), + DocumentMetadataOverrides(asn=123), + ) as consumer: consumer.run() def testAsnExistsInTrash(self): @@ -549,22 +549,22 @@ class TestConsumer( document = Document.objects.first() document.delete() - with self.get_consumer( - self.get_test_file2(), - DocumentMetadataOverrides(asn=123), - ) as consumer: - with self.assertRaisesMessage(ConsumerError, "document is in the trash"): + with self.assertRaisesMessage(ConsumerError, "document is in the trash"): + with self.get_consumer( + self.get_test_file2(), + DocumentMetadataOverrides(asn=123), + ) as consumer: consumer.run() @mock.patch("documents.parsers.document_consumer_declaration.send") def testNoParsers(self, m): m.return_value = [] - with self.get_consumer(self.get_test_file()) as consumer: - with self.assertRaisesMessage( - ConsumerError, - "sample.pdf: Unsupported mime type application/pdf", - ): + with self.assertRaisesMessage( + ConsumerError, + "sample.pdf: Unsupported mime type application/pdf", + ): + with self.get_consumer(self.get_test_file()) as consumer: consumer.run() self._assert_first_last_send_progress(last_status="FAILED") @@ -726,8 +726,8 @@ class TestConsumer( dst = self.get_test_file() self.assertIsFile(dst) - with self.get_consumer(dst) as consumer: - with self.assertRaises(ConsumerError): + with self.assertRaises(ConsumerError): + with self.get_consumer(dst) as consumer: consumer.run() self.assertIsNotFile(dst) @@ -751,11 +751,11 @@ class TestConsumer( dst = self.get_test_file() self.assertIsFile(dst) - with self.get_consumer(dst) as consumer: - with self.assertRaisesRegex( - ConsumerError, - r"sample\.pdf: Not consuming sample\.pdf: It is a duplicate of sample \(#\d+\)", - ): + with self.assertRaisesRegex( + ConsumerError, + r"sample\.pdf: Not consuming sample\.pdf: It is a duplicate of sample \(#\d+\)", + ): + with self.get_consumer(dst) as consumer: consumer.run() self.assertIsFile(dst) diff --git a/src/documents/tests/utils.py b/src/documents/tests/utils.py index fc50b3948..8abbac391 100644 --- a/src/documents/tests/utils.py +++ b/src/documents/tests/utils.py @@ -21,6 +21,7 @@ from django.test import TransactionTestCase from django.test import override_settings from documents.consumer import ConsumerPlugin +from documents.consumer import ConsumerPreflightPlugin from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides from documents.data_models import DocumentSource @@ -344,12 +345,21 @@ class GetConsumerMixin: ) -> Generator[ConsumerPlugin, None, None]: # Store this for verification self.status = DummyProgressManager(filepath.name, None) + doc = ConsumableDocument( + source, + original_file=filepath, + mailrule_id=mailrule_id or None, + ) + preflight_plugin = ConsumerPreflightPlugin( + doc, + overrides or DocumentMetadataOverrides(), + self.status, # type: ignore + self.dirs.scratch_dir, + "task-id", + ) + preflight_plugin.setup() reader = ConsumerPlugin( - ConsumableDocument( - source, - original_file=filepath, - mailrule_id=mailrule_id or None, - ), + doc, overrides or DocumentMetadataOverrides(), self.status, # type: ignore self.dirs.scratch_dir, @@ -357,6 +367,7 @@ class GetConsumerMixin: ) reader.setup() try: + preflight_plugin.run() yield reader finally: reader.cleanup()