diff --git a/src/documents/barcodes.py b/src/documents/barcodes.py index d435e2956..668b77547 100644 --- a/src/documents/barcodes.py +++ b/src/documents/barcodes.py @@ -16,6 +16,7 @@ from pikepdf import Pdf from documents.converters import convert_from_tiff_to_pdf from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides +from documents.models import Document from documents.models import Tag from documents.plugins.base import ConsumeTaskPlugin from documents.plugins.base import StopConsumeTaskError @@ -129,6 +130,24 @@ class BarcodePlugin(ConsumeTaskPlugin): self._tiff_conversion_done = False self.barcodes: list[Barcode] = [] + def _apply_detected_asn(self, detected_asn: int) -> None: + """ + Apply a detected ASN to metadata if allowed. + """ + if ( + self.metadata.skip_asn_if_exists + and Document.global_objects.filter( + archive_serial_number=detected_asn, + ).exists() + ): + logger.info( + f"Found ASN in barcode {detected_asn} but skipping because it already exists.", + ) + return + + logger.info(f"Found ASN in barcode: {detected_asn}") + self.metadata.asn = detected_asn + def run(self) -> None: # Some operations may use PIL, override pixel setting if needed maybe_override_pixel_limit() @@ -206,13 +225,8 @@ class BarcodePlugin(ConsumeTaskPlugin): # Update/overwrite an ASN if possible # After splitting, as otherwise each split document gets the same ASN - if ( - self.settings.barcode_enable_asn - and not self.metadata.skip_asn - and (located_asn := self.asn) is not None - ): - logger.info(f"Found ASN in barcode: {located_asn}") - self.metadata.asn = located_asn + if self.settings.barcode_enable_asn and (located_asn := self.asn) is not None: + self._apply_detected_asn(located_asn) def cleanup(self) -> None: self.temp_dir.cleanup() diff --git a/src/documents/bulk_edit.py b/src/documents/bulk_edit.py index ec6217a0a..ff88ec287 100644 --- a/src/documents/bulk_edit.py +++ b/src/documents/bulk_edit.py @@ -7,7 +7,6 @@ from pathlib import Path from typing import TYPE_CHECKING from typing import Literal -from celery import chain from celery import chord from celery import group from celery import shared_task @@ -38,6 +37,42 @@ if TYPE_CHECKING: logger: logging.Logger = logging.getLogger("paperless.bulk_edit") +@shared_task(bind=True) +def restore_archive_serial_numbers_task( + self, + backup: dict[int, int | None], + *args, + **kwargs, +) -> None: + restore_archive_serial_numbers(backup) + + +def release_archive_serial_numbers(doc_ids: list[int]) -> dict[int, int | None]: + """ + Clears ASNs on documents that are about to be replaced so new documents + can be assigned ASNs without uniqueness collisions. Returns a backup map + of doc_id -> previous ASN for potential restoration. + """ + qs = Document.objects.filter( + id__in=doc_ids, + archive_serial_number__isnull=False, + ).only("pk", "archive_serial_number") + backup = dict(qs.values_list("pk", "archive_serial_number")) + qs.update(archive_serial_number=None) + logger.info(f"Released archive serial numbers for documents {list(backup.keys())}") + return backup + + +def restore_archive_serial_numbers(backup: dict[int, int | None]) -> None: + """ + Restores ASNs using the provided backup map, intended for + rollback when replacement consumption fails. + """ + for doc_id, asn in backup.items(): + Document.objects.filter(pk=doc_id).update(archive_serial_number=asn) + logger.info(f"Restored archive serial numbers for documents {list(backup.keys())}") + + def set_correspondent( doc_ids: list[int], correspondent: Correspondent, @@ -305,10 +340,10 @@ def reprocess(doc_ids: list[int]) -> Literal["OK"]: def set_permissions( doc_ids: list[int], - set_permissions, + set_permissions: dict, *, - owner=None, - merge=False, + owner: User | None = None, + merge: bool = False, ) -> Literal["OK"]: qs = Document.objects.filter(id__in=doc_ids).select_related("owner") @@ -386,6 +421,7 @@ def merge( merged_pdf = pikepdf.new() version: str = merged_pdf.pdf_version + handoff_asn: int | None = None # use doc_ids to preserve order for doc_id in doc_ids: doc = qs.get(id=doc_id) @@ -401,6 +437,8 @@ def merge( version = max(version, pdf.pdf_version) merged_pdf.pages.extend(pdf.pages) affected_docs.append(doc.id) + if handoff_asn is None and doc.archive_serial_number is not None: + handoff_asn = doc.archive_serial_number except Exception as e: logger.exception( f"Error merging document {doc.id}, it will not be included in the merge: {e}", @@ -426,6 +464,8 @@ def merge( DocumentMetadataOverrides.from_document(metadata_document) ) overrides.title = metadata_document.title + " (merged)" + if metadata_document.archive_serial_number is not None: + handoff_asn = metadata_document.archive_serial_number else: overrides = DocumentMetadataOverrides() else: @@ -433,8 +473,11 @@ def merge( if user is not None: overrides.owner_id = user.id - # Avoid copying or detecting ASN from merged PDFs to prevent collision - overrides.skip_asn = True + if not delete_originals: + overrides.skip_asn_if_exists = True + + if delete_originals and handoff_asn is not None: + overrides.asn = handoff_asn logger.info("Adding merged document to the task queue.") @@ -447,12 +490,20 @@ def merge( ) if delete_originals: + backup = release_archive_serial_numbers(affected_docs) logger.info( "Queueing removal of original documents after consumption of merged document", ) - chain(consume_task, delete.si(affected_docs)).delay() - else: - consume_task.delay() + try: + consume_task.apply_async( + link=[delete.si(affected_docs)], + link_error=[restore_archive_serial_numbers_task.s(backup)], + ) + except Exception: + restore_archive_serial_numbers(backup) + raise + else: + consume_task.delay() return "OK" @@ -494,6 +545,8 @@ def split( overrides.title = f"{doc.title} (split {idx + 1})" if user is not None: overrides.owner_id = user.id + if not delete_originals: + overrides.skip_asn_if_exists = True logger.info( f"Adding split document with pages {split_doc} to the task queue.", ) @@ -508,10 +561,20 @@ def split( ) if delete_originals: + backup = release_archive_serial_numbers([doc.id]) logger.info( "Queueing removal of original document after consumption of the split documents", ) - chord(header=consume_tasks, body=delete.si([doc.id])).delay() + try: + chord( + header=consume_tasks, + body=delete.si([doc.id]), + ).apply_async( + link_error=[restore_archive_serial_numbers_task.s(backup)], + ) + except Exception: + restore_archive_serial_numbers(backup) + raise else: group(consume_tasks).delay() @@ -551,7 +614,7 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]: def edit_pdf( doc_ids: list[int], - operations: list[dict], + operations: list[dict[str, int]], *, delete_original: bool = False, update_document: bool = False, @@ -614,7 +677,10 @@ def edit_pdf( ) if user is not None: overrides.owner_id = user.id - + if not delete_original: + overrides.skip_asn_if_exists = True + if delete_original and len(pdf_docs) == 1: + overrides.asn = doc.archive_serial_number for idx, pdf in enumerate(pdf_docs, start=1): filepath: Path = ( Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR)) @@ -633,7 +699,17 @@ def edit_pdf( ) if delete_original: - chord(header=consume_tasks, body=delete.si([doc.id])).delay() + backup = release_archive_serial_numbers([doc.id]) + try: + chord( + header=consume_tasks, + body=delete.si([doc.id]), + ).apply_async( + link_error=[restore_archive_serial_numbers_task.s(backup)], + ) + except Exception: + restore_archive_serial_numbers(backup) + raise else: group(consume_tasks).delay() diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 10a952015..52b53be42 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -697,7 +697,7 @@ class ConsumerPlugin( pk=self.metadata.storage_path_id, ) - if self.metadata.asn is not None and not self.metadata.skip_asn: + if self.metadata.asn is not None: document.archive_serial_number = self.metadata.asn if self.metadata.owner_id: @@ -865,8 +865,8 @@ class AsnCheckPlugin( """ Check that if override_asn is given, it is unique and within a valid range """ - if self.metadata.skip_asn or self.metadata.asn is None: - # if skip is set or ASN is None + if self.metadata.asn is None: + # if ASN is None return # Validate the range is above zero and less than uint32_t max # otherwise, Whoosh can't handle it in the index diff --git a/src/documents/data_models.py b/src/documents/data_models.py index 7c023dc13..558450f1f 100644 --- a/src/documents/data_models.py +++ b/src/documents/data_models.py @@ -30,7 +30,7 @@ class DocumentMetadataOverrides: change_users: list[int] | None = None change_groups: list[int] | None = None custom_fields: dict | None = None - skip_asn: bool = False + skip_asn_if_exists: bool = False def update(self, other: "DocumentMetadataOverrides") -> "DocumentMetadataOverrides": """ @@ -50,8 +50,8 @@ class DocumentMetadataOverrides: self.storage_path_id = other.storage_path_id if other.owner_id is not None: self.owner_id = other.owner_id - if other.skip_asn: - self.skip_asn = True + if other.skip_asn_if_exists: + self.skip_asn_if_exists = True # merge if self.tag_ids is None: diff --git a/src/documents/tests/test_bulk_edit.py b/src/documents/tests/test_bulk_edit.py index 0e83d94a8..0a7df909a 100644 --- a/src/documents/tests/test_bulk_edit.py +++ b/src/documents/tests/test_bulk_edit.py @@ -603,23 +603,21 @@ class TestPDFActions(DirectoriesMixin, TestCase): expected_filename, ) self.assertEqual(consume_file_args[1].title, None) - self.assertTrue(consume_file_args[1].skip_asn) + # No metadata_document_id, delete_originals False, so ASN should be None + self.assertIsNone(consume_file_args[1].asn) # With metadata_document_id overrides result = bulk_edit.merge(doc_ids, metadata_document_id=metadata_document_id) consume_file_args, _ = mock_consume_file.call_args self.assertEqual(consume_file_args[1].title, "B (merged)") self.assertEqual(consume_file_args[1].created, self.doc2.created) - self.assertTrue(consume_file_args[1].skip_asn) self.assertEqual(result, "OK") @mock.patch("documents.bulk_edit.delete.si") @mock.patch("documents.tasks.consume_file.s") - @mock.patch("documents.bulk_edit.chain") def test_merge_and_delete_originals( self, - mock_chain, mock_consume_file, mock_delete_documents, ): @@ -633,6 +631,12 @@ class TestPDFActions(DirectoriesMixin, TestCase): - Document deletion task should be called """ doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id] + self.doc1.archive_serial_number = 101 + self.doc2.archive_serial_number = 102 + self.doc3.archive_serial_number = 103 + self.doc1.save() + self.doc2.save() + self.doc3.save() result = bulk_edit.merge(doc_ids, delete_originals=True) self.assertEqual(result, "OK") @@ -643,7 +647,8 @@ class TestPDFActions(DirectoriesMixin, TestCase): mock_consume_file.assert_called() mock_delete_documents.assert_called() - mock_chain.assert_called_once() + consume_sig = mock_consume_file.return_value + consume_sig.apply_async.assert_called_once() consume_file_args, _ = mock_consume_file.call_args self.assertEqual( @@ -651,7 +656,7 @@ class TestPDFActions(DirectoriesMixin, TestCase): expected_filename, ) self.assertEqual(consume_file_args[1].title, None) - self.assertTrue(consume_file_args[1].skip_asn) + self.assertEqual(consume_file_args[1].asn, 101) delete_documents_args, _ = mock_delete_documents.call_args self.assertEqual( @@ -659,6 +664,92 @@ class TestPDFActions(DirectoriesMixin, TestCase): doc_ids, ) + self.doc1.refresh_from_db() + self.doc2.refresh_from_db() + self.doc3.refresh_from_db() + self.assertIsNone(self.doc1.archive_serial_number) + self.assertIsNone(self.doc2.archive_serial_number) + self.assertIsNone(self.doc3.archive_serial_number) + + @mock.patch("documents.bulk_edit.delete.si") + @mock.patch("documents.tasks.consume_file.s") + def test_merge_and_delete_originals_restore_on_failure( + self, + mock_consume_file, + mock_delete_documents, + ) -> None: + """ + GIVEN: + - Existing documents + WHEN: + - Merge action with deleting documents is called with 1 document + - Error occurs when queuing consume file task + THEN: + - Archive serial numbers are restored + """ + doc_ids = [self.doc1.id] + self.doc1.archive_serial_number = 111 + self.doc1.save() + sig = mock.Mock() + sig.apply_async.side_effect = Exception("boom") + mock_consume_file.return_value = sig + + with self.assertRaises(Exception): + bulk_edit.merge(doc_ids, delete_originals=True) + + self.doc1.refresh_from_db() + self.assertEqual(self.doc1.archive_serial_number, 111) + + @mock.patch("documents.bulk_edit.delete.si") + @mock.patch("documents.tasks.consume_file.s") + def test_merge_and_delete_originals_metadata_handoff( + self, + mock_consume_file, + mock_delete_documents, + ) -> None: + """ + GIVEN: + - Existing documents with ASNs + WHEN: + - Merge with delete_originals=True and metadata_document_id set + THEN: + - Handoff ASN uses metadata document ASN + """ + doc_ids = [self.doc1.id, self.doc2.id] + self.doc1.archive_serial_number = 101 + self.doc2.archive_serial_number = 202 + self.doc1.save() + self.doc2.save() + + result = bulk_edit.merge( + doc_ids, + metadata_document_id=self.doc2.id, + delete_originals=True, + ) + self.assertEqual(result, "OK") + + consume_file_args, _ = mock_consume_file.call_args + self.assertEqual(consume_file_args[1].asn, 202) + + def test_restore_archive_serial_numbers_task(self) -> None: + """ + GIVEN: + - Existing document with no archive serial number + WHEN: + - Restore archive serial number task is called with backup data + THEN: + - Document archive serial number is restored + """ + self.doc1.archive_serial_number = 444 + self.doc1.save() + Document.objects.filter(pk=self.doc1.id).update(archive_serial_number=None) + + backup: dict[int, int | None] = {self.doc1.id: 444} + bulk_edit.restore_archive_serial_numbers_task(backup) + + self.doc1.refresh_from_db() + self.assertEqual(self.doc1.archive_serial_number, 444) + @mock.patch("documents.tasks.consume_file.s") def test_merge_with_archive_fallback(self, mock_consume_file) -> None: """ @@ -727,6 +818,7 @@ class TestPDFActions(DirectoriesMixin, TestCase): self.assertEqual(mock_consume_file.call_count, 2) consume_file_args, _ = mock_consume_file.call_args self.assertEqual(consume_file_args[1].title, "B (split 2)") + self.assertIsNone(consume_file_args[1].asn) self.assertEqual(result, "OK") @@ -751,6 +843,8 @@ class TestPDFActions(DirectoriesMixin, TestCase): """ doc_ids = [self.doc2.id] pages = [[1, 2], [3]] + self.doc2.archive_serial_number = 200 + self.doc2.save() result = bulk_edit.split(doc_ids, pages, delete_originals=True) self.assertEqual(result, "OK") @@ -768,6 +862,42 @@ class TestPDFActions(DirectoriesMixin, TestCase): doc_ids, ) + self.doc2.refresh_from_db() + self.assertIsNone(self.doc2.archive_serial_number) + + @mock.patch("documents.bulk_edit.delete.si") + @mock.patch("documents.tasks.consume_file.s") + @mock.patch("documents.bulk_edit.chord") + def test_split_restore_on_failure( + self, + mock_chord, + mock_consume_file, + mock_delete_documents, + ) -> None: + """ + GIVEN: + - Existing documents + WHEN: + - Split action with deleting documents is called with 1 document and 2 page groups + - Error occurs when queuing chord task + THEN: + - Archive serial numbers are restored + """ + doc_ids = [self.doc2.id] + pages = [[1, 2]] + self.doc2.archive_serial_number = 222 + self.doc2.save() + + sig = mock.Mock() + sig.apply_async.side_effect = Exception("boom") + mock_chord.return_value = sig + + result = bulk_edit.split(doc_ids, pages, delete_originals=True) + self.assertEqual(result, "OK") + + self.doc2.refresh_from_db() + self.assertEqual(self.doc2.archive_serial_number, 222) + @mock.patch("documents.tasks.consume_file.delay") @mock.patch("pikepdf.Pdf.save") def test_split_with_errors(self, mock_save_pdf, mock_consume_file) -> None: @@ -977,13 +1107,55 @@ class TestPDFActions(DirectoriesMixin, TestCase): mock_chord.return_value.delay.return_value = None doc_ids = [self.doc2.id] operations = [{"page": 1}, {"page": 2}] + self.doc2.archive_serial_number = 250 + self.doc2.save() result = bulk_edit.edit_pdf(doc_ids, operations, delete_original=True) self.assertEqual(result, "OK") mock_chord.assert_called_once() + consume_file_args, _ = mock_consume_file.call_args + self.assertEqual(consume_file_args[1].asn, 250) + self.doc2.refresh_from_db() + self.assertIsNone(self.doc2.archive_serial_number) + + @mock.patch("documents.bulk_edit.delete.si") + @mock.patch("documents.tasks.consume_file.s") + @mock.patch("documents.bulk_edit.chord") + def test_edit_pdf_restore_on_failure( + self, + mock_chord: mock.Mock, + mock_consume_file: mock.Mock, + mock_delete_documents: mock.Mock, + ) -> None: + """ + GIVEN: + - Existing document + WHEN: + - edit_pdf is called with delete_original=True + - Error occurs when queuing chord task + THEN: + - Archive serial numbers are restored + """ + doc_ids = [self.doc2.id] + operations = [{"page": 1}] + self.doc2.archive_serial_number = 333 + self.doc2.save() + + sig = mock.Mock() + sig.apply_async.side_effect = Exception("boom") + mock_chord.return_value = sig + + with self.assertRaises(Exception): + bulk_edit.edit_pdf(doc_ids, operations, delete_original=True) + + self.doc2.refresh_from_db() + self.assertEqual(self.doc2.archive_serial_number, 333) @mock.patch("documents.tasks.update_document_content_maybe_archive_file.delay") - def test_edit_pdf_with_update_document(self, mock_update_document) -> None: + def test_edit_pdf_with_update_document( + self, + mock_update_document: mock.Mock, + ) -> None: """ GIVEN: - A single existing PDF document @@ -1013,7 +1185,11 @@ class TestPDFActions(DirectoriesMixin, TestCase): @mock.patch("documents.bulk_edit.group") @mock.patch("documents.tasks.consume_file.s") - def test_edit_pdf_without_metadata(self, mock_consume_file, mock_group) -> None: + def test_edit_pdf_without_metadata( + self, + mock_consume_file: mock.Mock, + mock_group: mock.Mock, + ) -> None: """ GIVEN: - Existing document @@ -1032,7 +1208,11 @@ class TestPDFActions(DirectoriesMixin, TestCase): @mock.patch("documents.bulk_edit.group") @mock.patch("documents.tasks.consume_file.s") - def test_edit_pdf_open_failure(self, mock_consume_file, mock_group) -> None: + def test_edit_pdf_open_failure( + self, + mock_consume_file: mock.Mock, + mock_group: mock.Mock, + ) -> None: """ GIVEN: - Existing document diff --git a/src/documents/tests/test_consumer.py b/src/documents/tests/test_consumer.py index 6dc979b20..717cffd6e 100644 --- a/src/documents/tests/test_consumer.py +++ b/src/documents/tests/test_consumer.py @@ -14,6 +14,7 @@ from django.test import override_settings from django.utils import timezone from guardian.core import ObjectPermissionChecker +from documents.barcodes import BarcodePlugin from documents.consumer import ConsumerError from documents.data_models import DocumentMetadataOverrides from documents.data_models import DocumentSource @@ -412,14 +413,6 @@ class TestConsumer( self.assertEqual(document.archive_serial_number, 123) self._assert_first_last_send_progress() - def testMetadataOverridesSkipAsnPropagation(self) -> None: - overrides = DocumentMetadataOverrides() - incoming = DocumentMetadataOverrides(skip_asn=True) - - overrides.update(incoming) - - self.assertTrue(overrides.skip_asn) - def testOverrideTitlePlaceholders(self) -> None: c = Correspondent.objects.create(name="Correspondent Name") dt = DocumentType.objects.create(name="DocType Name") @@ -1271,3 +1264,46 @@ class PostConsumeTestCase(DirectoriesMixin, GetConsumerMixin, TestCase): r"sample\.pdf: Error while executing post-consume script: Command '\[.*\]' returned non-zero exit status \d+\.", ): consumer.run_post_consume_script(doc) + + +class TestMetadataOverrides(TestCase): + def test_update_skip_asn_if_exists(self) -> None: + base = DocumentMetadataOverrides() + incoming = DocumentMetadataOverrides(skip_asn_if_exists=True) + base.update(incoming) + self.assertTrue(base.skip_asn_if_exists) + + +class TestBarcodeApplyDetectedASN(TestCase): + """ + GIVEN: + - Existing Documents with ASN 123 + WHEN: + - A BarcodePlugin which detected an ASN + THEN: + - If skip_asn_if_exists is set, and ASN exists, do not set ASN + - If skip_asn_if_exists is set, and ASN does not exist, set ASN + """ + + def test_apply_detected_asn_skips_existing_when_flag_set(self) -> None: + doc = Document.objects.create( + checksum="X1", + title="D1", + archive_serial_number=123, + ) + metadata = DocumentMetadataOverrides(skip_asn_if_exists=True) + plugin = BarcodePlugin( + input_doc=mock.Mock(), + metadata=metadata, + status_mgr=mock.Mock(), + base_tmp_dir=Path(tempfile.gettempdir()), + task_id="test-task", + ) + + plugin._apply_detected_asn(123) + self.assertIsNone(plugin.metadata.asn) + + doc.hard_delete() + + plugin._apply_detected_asn(123) + self.assertEqual(plugin.metadata.asn, 123)