First, release ASNs before document replacement (and restore if needed)

This commit is contained in:
shamoon
2025-12-31 09:49:20 -08:00
parent cf89d81b9e
commit 8e12f3e93c
2 changed files with 98 additions and 13 deletions

View File

@@ -7,7 +7,6 @@ from pathlib import Path
from typing import TYPE_CHECKING
from typing import Literal
from celery import chain
from celery import chord
from celery import group
from celery import shared_task
@@ -38,6 +37,42 @@ if TYPE_CHECKING:
logger: logging.Logger = logging.getLogger("paperless.bulk_edit")
@shared_task(bind=True)
def restore_archive_serial_numbers_task(
self,
backup: dict[int, int],
*args,
**kwargs,
) -> None:
restore_archive_serial_numbers(backup)
def release_archive_serial_numbers(doc_ids: list[int]) -> dict[int, int]:
"""
Clears ASNs on documents that are about to be replaced so new documents
can be assigned ASNs without uniqueness collisions. Returns a backup map
of doc_id -> previous ASN for potential restoration.
"""
qs = Document.objects.filter(
id__in=doc_ids,
archive_serial_number__isnull=False,
).only("pk", "archive_serial_number")
backup = dict(qs.values_list("pk", "archive_serial_number"))
qs.update(archive_serial_number=None)
logger.info(f"Released archive serial numbers for documents {list(backup.keys())}")
return backup
def restore_archive_serial_numbers(backup: dict[int, int]) -> None:
"""
Restores ASNs using the provided backup map, intended for
rollback when replacement consumption fails.
"""
for doc_id, asn in backup.items():
Document.objects.filter(pk=doc_id).update(archive_serial_number=asn)
logger.info(f"Restored archive serial numbers for documents {list(backup.keys())}")
def set_correspondent(
doc_ids: list[int],
correspondent: Correspondent,
@@ -433,8 +468,6 @@ def merge(
if user is not None:
overrides.owner_id = user.id
# Avoid copying or detecting ASN from merged PDFs to prevent collision
overrides.skip_asn = True
logger.info("Adding merged document to the task queue.")
@@ -447,10 +480,18 @@ def merge(
)
if delete_originals:
backup = release_archive_serial_numbers(affected_docs)
logger.info(
"Queueing removal of original documents after consumption of merged document",
)
chain(consume_task, delete.si(affected_docs)).delay()
try:
consume_task.apply_async(
link=[delete.si(affected_docs)],
link_error=[restore_archive_serial_numbers_task.s(backup)],
)
except Exception:
restore_archive_serial_numbers(backup)
raise
else:
consume_task.delay()
@@ -508,10 +549,20 @@ def split(
)
if delete_originals:
backup = release_archive_serial_numbers([doc.id])
logger.info(
"Queueing removal of original document after consumption of the split documents",
)
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
try:
chord(
header=consume_tasks,
body=delete.si([doc.id]),
).apply_async(
link_error=[restore_archive_serial_numbers_task.s(backup)],
)
except Exception:
restore_archive_serial_numbers(backup)
raise
else:
group(consume_tasks).delay()
@@ -614,7 +665,6 @@ def edit_pdf(
)
if user is not None:
overrides.owner_id = user.id
for idx, pdf in enumerate(pdf_docs, start=1):
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
@@ -633,7 +683,17 @@ def edit_pdf(
)
if delete_original:
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
backup = release_archive_serial_numbers([doc.id])
try:
chord(
header=consume_tasks,
body=delete.si([doc.id]),
).apply_async(
link_error=[restore_archive_serial_numbers_task.s(backup)],
)
except Exception:
restore_archive_serial_numbers(backup)
raise
else:
group(consume_tasks).delay()