mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-02-16 00:19:32 -06:00
Fixhancement: improve ASN handling with PDF operations (#11689)
This commit is contained in:
@@ -7,7 +7,6 @@ from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Literal
|
||||
|
||||
from celery import chain
|
||||
from celery import chord
|
||||
from celery import group
|
||||
from celery import shared_task
|
||||
@@ -38,6 +37,42 @@ if TYPE_CHECKING:
|
||||
logger: logging.Logger = logging.getLogger("paperless.bulk_edit")
|
||||
|
||||
|
||||
@shared_task(bind=True)
|
||||
def restore_archive_serial_numbers_task(
|
||||
self,
|
||||
backup: dict[int, int | None],
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
restore_archive_serial_numbers(backup)
|
||||
|
||||
|
||||
def release_archive_serial_numbers(doc_ids: list[int]) -> dict[int, int | None]:
|
||||
"""
|
||||
Clears ASNs on documents that are about to be replaced so new documents
|
||||
can be assigned ASNs without uniqueness collisions. Returns a backup map
|
||||
of doc_id -> previous ASN for potential restoration.
|
||||
"""
|
||||
qs = Document.objects.filter(
|
||||
id__in=doc_ids,
|
||||
archive_serial_number__isnull=False,
|
||||
).only("pk", "archive_serial_number")
|
||||
backup = dict(qs.values_list("pk", "archive_serial_number"))
|
||||
qs.update(archive_serial_number=None)
|
||||
logger.info(f"Released archive serial numbers for documents {list(backup.keys())}")
|
||||
return backup
|
||||
|
||||
|
||||
def restore_archive_serial_numbers(backup: dict[int, int | None]) -> None:
|
||||
"""
|
||||
Restores ASNs using the provided backup map, intended for
|
||||
rollback when replacement consumption fails.
|
||||
"""
|
||||
for doc_id, asn in backup.items():
|
||||
Document.objects.filter(pk=doc_id).update(archive_serial_number=asn)
|
||||
logger.info(f"Restored archive serial numbers for documents {list(backup.keys())}")
|
||||
|
||||
|
||||
def set_correspondent(
|
||||
doc_ids: list[int],
|
||||
correspondent: Correspondent,
|
||||
@@ -305,10 +340,10 @@ def reprocess(doc_ids: list[int]) -> Literal["OK"]:
|
||||
|
||||
def set_permissions(
|
||||
doc_ids: list[int],
|
||||
set_permissions,
|
||||
set_permissions: dict,
|
||||
*,
|
||||
owner=None,
|
||||
merge=False,
|
||||
owner: User | None = None,
|
||||
merge: bool = False,
|
||||
) -> Literal["OK"]:
|
||||
qs = Document.objects.filter(id__in=doc_ids).select_related("owner")
|
||||
|
||||
@@ -386,6 +421,7 @@ def merge(
|
||||
|
||||
merged_pdf = pikepdf.new()
|
||||
version: str = merged_pdf.pdf_version
|
||||
handoff_asn: int | None = None
|
||||
# use doc_ids to preserve order
|
||||
for doc_id in doc_ids:
|
||||
doc = qs.get(id=doc_id)
|
||||
@@ -401,6 +437,8 @@ def merge(
|
||||
version = max(version, pdf.pdf_version)
|
||||
merged_pdf.pages.extend(pdf.pages)
|
||||
affected_docs.append(doc.id)
|
||||
if handoff_asn is None and doc.archive_serial_number is not None:
|
||||
handoff_asn = doc.archive_serial_number
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
f"Error merging document {doc.id}, it will not be included in the merge: {e}",
|
||||
@@ -426,6 +464,8 @@ def merge(
|
||||
DocumentMetadataOverrides.from_document(metadata_document)
|
||||
)
|
||||
overrides.title = metadata_document.title + " (merged)"
|
||||
if metadata_document.archive_serial_number is not None:
|
||||
handoff_asn = metadata_document.archive_serial_number
|
||||
else:
|
||||
overrides = DocumentMetadataOverrides()
|
||||
else:
|
||||
@@ -433,8 +473,11 @@ def merge(
|
||||
|
||||
if user is not None:
|
||||
overrides.owner_id = user.id
|
||||
# Avoid copying or detecting ASN from merged PDFs to prevent collision
|
||||
overrides.skip_asn = True
|
||||
if not delete_originals:
|
||||
overrides.skip_asn_if_exists = True
|
||||
|
||||
if delete_originals and handoff_asn is not None:
|
||||
overrides.asn = handoff_asn
|
||||
|
||||
logger.info("Adding merged document to the task queue.")
|
||||
|
||||
@@ -447,12 +490,20 @@ def merge(
|
||||
)
|
||||
|
||||
if delete_originals:
|
||||
backup = release_archive_serial_numbers(affected_docs)
|
||||
logger.info(
|
||||
"Queueing removal of original documents after consumption of merged document",
|
||||
)
|
||||
chain(consume_task, delete.si(affected_docs)).delay()
|
||||
else:
|
||||
consume_task.delay()
|
||||
try:
|
||||
consume_task.apply_async(
|
||||
link=[delete.si(affected_docs)],
|
||||
link_error=[restore_archive_serial_numbers_task.s(backup)],
|
||||
)
|
||||
except Exception:
|
||||
restore_archive_serial_numbers(backup)
|
||||
raise
|
||||
else:
|
||||
consume_task.delay()
|
||||
|
||||
return "OK"
|
||||
|
||||
@@ -494,6 +545,8 @@ def split(
|
||||
overrides.title = f"{doc.title} (split {idx + 1})"
|
||||
if user is not None:
|
||||
overrides.owner_id = user.id
|
||||
if not delete_originals:
|
||||
overrides.skip_asn_if_exists = True
|
||||
logger.info(
|
||||
f"Adding split document with pages {split_doc} to the task queue.",
|
||||
)
|
||||
@@ -508,10 +561,20 @@ def split(
|
||||
)
|
||||
|
||||
if delete_originals:
|
||||
backup = release_archive_serial_numbers([doc.id])
|
||||
logger.info(
|
||||
"Queueing removal of original document after consumption of the split documents",
|
||||
)
|
||||
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
|
||||
try:
|
||||
chord(
|
||||
header=consume_tasks,
|
||||
body=delete.si([doc.id]),
|
||||
).apply_async(
|
||||
link_error=[restore_archive_serial_numbers_task.s(backup)],
|
||||
)
|
||||
except Exception:
|
||||
restore_archive_serial_numbers(backup)
|
||||
raise
|
||||
else:
|
||||
group(consume_tasks).delay()
|
||||
|
||||
@@ -551,7 +614,7 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
|
||||
|
||||
def edit_pdf(
|
||||
doc_ids: list[int],
|
||||
operations: list[dict],
|
||||
operations: list[dict[str, int]],
|
||||
*,
|
||||
delete_original: bool = False,
|
||||
update_document: bool = False,
|
||||
@@ -614,7 +677,10 @@ def edit_pdf(
|
||||
)
|
||||
if user is not None:
|
||||
overrides.owner_id = user.id
|
||||
|
||||
if not delete_original:
|
||||
overrides.skip_asn_if_exists = True
|
||||
if delete_original and len(pdf_docs) == 1:
|
||||
overrides.asn = doc.archive_serial_number
|
||||
for idx, pdf in enumerate(pdf_docs, start=1):
|
||||
filepath: Path = (
|
||||
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
|
||||
@@ -633,7 +699,17 @@ def edit_pdf(
|
||||
)
|
||||
|
||||
if delete_original:
|
||||
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
|
||||
backup = release_archive_serial_numbers([doc.id])
|
||||
try:
|
||||
chord(
|
||||
header=consume_tasks,
|
||||
body=delete.si([doc.id]),
|
||||
).apply_async(
|
||||
link_error=[restore_archive_serial_numbers_task.s(backup)],
|
||||
)
|
||||
except Exception:
|
||||
restore_archive_serial_numbers(backup)
|
||||
raise
|
||||
else:
|
||||
group(consume_tasks).delay()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user