Bulk edit and actions should update version

This commit is contained in:
shamoon
2026-02-10 16:42:38 -08:00
parent 8014932419
commit daa4586eeb
2 changed files with 147 additions and 54 deletions

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import hashlib
import logging
import tempfile
from pathlib import Path
@@ -73,6 +72,46 @@ def restore_archive_serial_numbers(backup: dict[int, int | None]) -> None:
logger.info(f"Restored archive serial numbers for documents {list(backup.keys())}")
def _get_root_ids_by_doc_id(doc_ids: list[int]) -> dict[int, int]:
"""
Resolve each provided document id to its root document id.
- If the id is already a root document: root id is itself.
- If the id is a version document: root id is its `root_document_id`.
"""
qs = Document.objects.filter(id__in=doc_ids).only("id", "root_document_id")
return {doc.id: doc.root_document_id or doc.id for doc in qs}
def _get_root_and_current_docs_by_root_id(
root_ids: set[int],
) -> tuple[dict[int, Document], dict[int, Document]]:
"""
Returns:
- root_docs: root_id -> root Document
- current_docs: root_id -> newest version Document (or root if none)
"""
root_docs = {
doc.id: doc
for doc in Document.objects.filter(id__in=root_ids).select_related(
"owner",
)
}
latest_versions_by_root_id: dict[int, Document] = {}
for version_doc in Document.objects.filter(root_document_id__in=root_ids).order_by(
"root_document_id",
"-id",
):
latest_versions_by_root_id.setdefault(version_doc.root_document_id, version_doc)
current_docs = {
root_id: latest_versions_by_root_id.get(root_id, root_docs.get(root_id))
for root_id in root_ids
if root_id in root_docs
}
return root_docs, current_docs
def set_correspondent(
doc_ids: list[int],
correspondent: Correspondent,
@@ -379,43 +418,49 @@ def rotate(doc_ids: list[int], degrees: int) -> Literal["OK"]:
logger.info(
f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
)
qs = Document.objects.filter(id__in=doc_ids)
doc_to_root_id = _get_root_ids_by_doc_id(doc_ids)
root_ids = set(doc_to_root_id.values())
root_docs_by_id, current_docs_by_root_id = _get_root_and_current_docs_by_root_id(
root_ids,
)
import pikepdf
for doc in qs:
if doc.mime_type != "application/pdf":
for root_id in root_ids:
root_doc = root_docs_by_id[root_id]
source_doc = current_docs_by_root_id[root_id]
if source_doc.mime_type != "application/pdf":
logger.warning(
f"Document {doc.id} is not a PDF, skipping rotation.",
f"Document {root_doc.id} is not a PDF, skipping rotation.",
)
continue
try:
# Write rotated output to a temp file and create a new version via consume pipeline
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_rotated.pdf"
/ f"{root_doc.id}_rotated.pdf"
)
with pikepdf.open(doc.source_path) as pdf:
with pikepdf.open(source_doc.source_path) as pdf:
for page in pdf.pages:
page.rotate(degrees, relative=True)
pdf.remove_unreferenced_resources()
pdf.save(filepath)
# Preserve metadata/permissions via overrides; mark as new version
overrides = DocumentMetadataOverrides().from_document(doc)
overrides = DocumentMetadataOverrides().from_document(root_doc)
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=doc.id,
root_document_id=root_doc.id,
),
overrides,
)
logger.info(
f"Queued new rotated version for document {doc.id} by {degrees} degrees",
f"Queued new rotated version for document {root_doc.id} by {degrees} degrees",
)
except Exception as e:
logger.exception(f"Error rotating document {doc.id}: {e}")
logger.exception(f"Error rotating document {root_doc.id}: {e}")
return "OK"
@@ -604,7 +649,14 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
logger.info(
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
)
doc = Document.objects.get(id=doc_ids[0])
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc = doc if doc.root_document_id is None else doc.root_document
source_doc = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
.first()
or root_doc
)
pages = sorted(pages) # sort pages to avoid index issues
import pikepdf
@@ -612,9 +664,9 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
# Produce edited PDF to a temp file and create a new version
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_pages_deleted.pdf"
/ f"{root_doc.id}_pages_deleted.pdf"
)
with pikepdf.open(doc.source_path) as pdf:
with pikepdf.open(source_doc.source_path) as pdf:
offset = 1 # pages are 1-indexed
for page_num in pages:
pdf.pages.remove(pdf.pages[page_num - offset])
@@ -622,20 +674,20 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
pdf.remove_unreferenced_resources()
pdf.save(filepath)
overrides = DocumentMetadataOverrides().from_document(doc)
overrides = DocumentMetadataOverrides().from_document(root_doc)
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=doc.id,
root_document_id=root_doc.id,
),
overrides,
)
logger.info(
f"Queued new version for document {doc.id} after deleting pages {pages}",
f"Queued new version for document {root_doc.id} after deleting pages {pages}",
)
except Exception as e:
logger.exception(f"Error deleting pages from document {doc.id}: {e}")
logger.exception(f"Error deleting pages from document {root_doc.id}: {e}")
return "OK"
@@ -660,13 +712,20 @@ def edit_pdf(
logger.info(
f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations",
)
doc = Document.objects.get(id=doc_ids[0])
doc = Document.objects.select_related("root_document").get(id=doc_ids[0])
root_doc = doc if doc.root_document_id is None else doc.root_document
source_doc = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
.first()
or root_doc
)
import pikepdf
pdf_docs: list[pikepdf.Pdf] = []
try:
with pikepdf.open(doc.source_path) as src:
with pikepdf.open(source_doc.source_path) as src:
# prepare output documents
max_idx = max(op.get("doc", 0) for op in operations)
pdf_docs = [pikepdf.new() for _ in range(max_idx + 1)]
@@ -690,11 +749,11 @@ def edit_pdf(
pdf.remove_unreferenced_resources()
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_edited.pdf"
/ f"{root_doc.id}_edited.pdf"
)
pdf.save(filepath)
overrides = (
DocumentMetadataOverrides().from_document(doc)
DocumentMetadataOverrides().from_document(root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
@@ -704,14 +763,14 @@ def edit_pdf(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=doc.id,
root_document_id=root_doc.id,
),
overrides,
)
else:
consume_tasks = []
overrides = (
DocumentMetadataOverrides().from_document(doc)
DocumentMetadataOverrides().from_document(root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
@@ -720,11 +779,11 @@ def edit_pdf(
if not delete_original:
overrides.skip_asn_if_exists = True
if delete_original and len(pdf_docs) == 1:
overrides.asn = doc.archive_serial_number
overrides.asn = root_doc.archive_serial_number
for idx, pdf in enumerate(pdf_docs, start=1):
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_edit_{idx}.pdf"
/ f"{root_doc.id}_edit_{idx}.pdf"
)
pdf.remove_unreferenced_resources()
pdf.save(filepath)
@@ -754,7 +813,7 @@ def edit_pdf(
group(consume_tasks).delay()
except Exception as e:
logger.exception(f"Error editing document {doc.id}: {e}")
logger.exception(f"Error editing document {root_doc.id}: {e}")
raise ValueError(
f"An error occurred while editing the document: {e}",
) from e
@@ -777,38 +836,53 @@ def remove_password(
import pikepdf
for doc_id in doc_ids:
doc = Document.objects.get(id=doc_id)
doc = Document.objects.select_related("root_document").get(id=doc_id)
root_doc = doc if doc.root_document_id is None else doc.root_document
source_doc = (
Document.objects.filter(Q(id=root_doc.id) | Q(root_document=root_doc))
.order_by("-id")
.first()
or root_doc
)
try:
logger.info(
f"Attempting password removal from document {doc_ids[0]}",
)
with pikepdf.open(doc.source_path, password=password) as pdf:
temp_path = doc.source_path.with_suffix(".tmp.pdf")
with pikepdf.open(source_doc.source_path, password=password) as pdf:
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{root_doc.id}_unprotected.pdf"
)
pdf.remove_unreferenced_resources()
pdf.save(temp_path)
pdf.save(filepath)
if update_document:
# replace the original document with the unprotected one
temp_path.replace(doc.source_path)
doc.checksum = hashlib.md5(doc.source_path.read_bytes()).hexdigest()
doc.page_count = len(pdf.pages)
doc.save()
update_document_content_maybe_archive_file.delay(document_id=doc.id)
# Create a new version rather than modifying the root/original in place.
overrides = (
DocumentMetadataOverrides().from_document(root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
if user is not None:
overrides.owner_id = user.id
consume_file.delay(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
root_document_id=root_doc.id,
),
overrides,
)
else:
consume_tasks = []
overrides = (
DocumentMetadataOverrides().from_document(doc)
DocumentMetadataOverrides().from_document(root_doc)
if include_metadata
else DocumentMetadataOverrides()
)
if user is not None:
overrides.owner_id = user.id
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_unprotected.pdf"
)
temp_path.replace(filepath)
consume_tasks.append(
consume_file.s(
ConsumableDocument(
@@ -820,12 +894,17 @@ def remove_password(
)
if delete_original:
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
chord(
header=consume_tasks,
body=delete.si([doc.id]),
).delay()
else:
group(consume_tasks).delay()
except Exception as e:
logger.exception(f"Error removing password from document {doc.id}: {e}")
logger.exception(
f"Error removing password from document {root_doc.id}: {e}",
)
raise ValueError(
f"An error occurred while removing the password: {e}",
) from e

View File

@@ -1,4 +1,3 @@
import hashlib
import shutil
from datetime import date
from pathlib import Path
@@ -1241,10 +1240,20 @@ class TestPDFActions(DirectoriesMixin, TestCase):
mock_consume_file.assert_not_called()
@mock.patch("documents.bulk_edit.update_document_content_maybe_archive_file.delay")
@mock.patch("documents.tasks.consume_file.delay")
@mock.patch("documents.bulk_edit.tempfile.mkdtemp")
@mock.patch("pikepdf.open")
def test_remove_password_update_document(self, mock_open, mock_update_document):
def test_remove_password_update_document(
self,
mock_open,
mock_mkdtemp,
mock_consume_delay,
mock_update_document,
):
doc = self.doc1
original_checksum = doc.checksum
temp_dir = self.dirs.scratch_dir / "remove-password-update"
temp_dir.mkdir(parents=True, exist_ok=True)
mock_mkdtemp.return_value = str(temp_dir)
fake_pdf = mock.MagicMock()
fake_pdf.pages = [mock.Mock(), mock.Mock(), mock.Mock()]
@@ -1264,12 +1273,17 @@ class TestPDFActions(DirectoriesMixin, TestCase):
self.assertEqual(result, "OK")
mock_open.assert_called_once_with(doc.source_path, password="secret")
fake_pdf.remove_unreferenced_resources.assert_called_once()
doc.refresh_from_db()
self.assertNotEqual(doc.checksum, original_checksum)
expected_checksum = hashlib.md5(doc.source_path.read_bytes()).hexdigest()
self.assertEqual(doc.checksum, expected_checksum)
self.assertEqual(doc.page_count, len(fake_pdf.pages))
mock_update_document.assert_called_once_with(document_id=doc.id)
mock_update_document.assert_not_called()
mock_consume_delay.assert_called_once()
consumable, overrides = mock_consume_delay.call_args[0]
expected_path = temp_dir / f"{doc.id}_unprotected.pdf"
self.assertTrue(expected_path.exists())
self.assertEqual(
Path(consumable.original_file).resolve(),
expected_path.resolve(),
)
self.assertEqual(consumable.root_document_id, doc.id)
self.assertIsNotNone(overrides)
@mock.patch("documents.bulk_edit.chord")
@mock.patch("documents.bulk_edit.group")