Just save this

[ci skip]
This commit is contained in:
shamoon
2025-07-01 11:35:08 -07:00
parent f0b6e79d14
commit 0716758db7
11 changed files with 404 additions and 12 deletions

View File

@@ -497,6 +497,77 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
return "OK"
def edit_pdf(
doc_ids: list[int],
operations: list[dict],
*,
delete_original: bool = False,
user: User | None = None,
) -> Literal["OK"]:
"""
Operations is a list of dictionaries describing the final PDF pages.
Each entry must contain the original page number in `page` and may
specify `rotate` in degrees and `doc` indicating the output
document index (for splitting). Pages omitted from the list are
discarded.
"""
logger.info(
f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations",
)
doc = Document.objects.get(id=doc_ids[0])
import pikepdf
pdf_docs: list[pikepdf.Pdf] = []
try:
with pikepdf.open(doc.source_path) as src:
# prepare output documents
max_idx = max(op.get("doc", 0) for op in operations)
pdf_docs = [pikepdf.new() for _ in range(max_idx + 1)]
for op in operations:
dst = pdf_docs[op.get("doc", 0)]
page = src.pages[op["page"] - 1]
dst.pages.append(page)
if op.get("rotate"):
dst.pages[-1].rotate(op["rotate"], relative=True)
consume_tasks = []
overrides: DocumentMetadataOverrides = (
DocumentMetadataOverrides().from_document(doc)
)
if user is not None:
overrides.owner_id = user.id
for idx, pdf in enumerate(pdf_docs, start=1):
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_edit_{idx}.pdf"
)
pdf.remove_unreferenced_resources()
pdf.save(filepath)
consume_tasks.append(
consume_file.s(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
),
overrides,
),
)
if delete_original:
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
else:
group(consume_tasks).delay()
except Exception as e:
logger.exception(f"Error editing document {doc.id}: {e}")
return "OK"
def reflect_doclinks(
document: Document,
field: CustomField,

View File

@@ -1366,6 +1366,8 @@ class BulkEditSerializer(
return bulk_edit.split
elif method == "delete_pages":
return bulk_edit.delete_pages
elif method == "edit_pdf":
return bulk_edit.edit_pdf
else:
raise serializers.ValidationError("Unsupported method.")
@@ -1520,6 +1522,26 @@ class BulkEditSerializer(
else:
parameters["archive_fallback"] = False
def _validate_parameters_edit_pdf(self, parameters):
if "operations" not in parameters:
raise serializers.ValidationError("operations not specified")
if not isinstance(parameters["operations"], list):
raise serializers.ValidationError("operations must be a list")
for op in parameters["operations"]:
if not isinstance(op, dict):
raise serializers.ValidationError("invalid operation entry")
if "page" not in op or not isinstance(op["page"], int):
raise serializers.ValidationError("page must be an integer")
if "rotate" in op and not isinstance(op["rotate"], int):
raise serializers.ValidationError("rotate must be an integer")
if "doc" in op and not isinstance(op["doc"], int):
raise serializers.ValidationError("doc must be an integer")
if "delete_original" in parameters:
if not isinstance(parameters["delete_original"], bool):
raise serializers.ValidationError("delete_original must be a boolean")
else:
parameters["delete_original"] = False
def validate(self, attrs):
method = attrs["method"]
parameters = attrs["parameters"]
@@ -1554,6 +1576,12 @@ class BulkEditSerializer(
self._validate_parameters_delete_pages(parameters)
elif method == bulk_edit.merge:
self._validate_parameters_merge(parameters)
elif method == bulk_edit.edit_pdf:
if len(attrs["documents"]) > 1:
raise serializers.ValidationError(
"Edit PDF method only supports one document",
)
self._validate_parameters_edit_pdf(parameters)
return attrs

View File

@@ -1369,6 +1369,60 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"pages must be a list of integers", response.content)
@mock.patch("documents.serialisers.bulk_edit.edit_pdf")
def test_edit_pdf(self, m):
self.setup_mock(m, "edit_pdf")
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 1}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertCountEqual(args[0], [self.doc2.id])
self.assertEqual(kwargs["operations"], [{"page": 1}])
self.assertEqual(kwargs["user"], self.user)
def test_edit_pdf_invalid_params(self):
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 1}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"Edit PDF method only supports one document", response.content)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"operations not specified", response.content)
@override_settings(AUDIT_LOG_ENABLED=True)
def test_bulk_edit_audit_log_enabled_simple_field(self):
"""

View File

@@ -1321,6 +1321,7 @@ class BulkEditView(PassUserMixin):
"delete_pages": "checksum",
"split": None,
"merge": None,
"edit_pdf": None,
"reprocess": "checksum",
}
@@ -1339,6 +1340,7 @@ class BulkEditView(PassUserMixin):
if method in [
bulk_edit.split,
bulk_edit.merge,
bulk_edit.edit_pdf,
]:
parameters["user"] = user
@@ -1358,24 +1360,29 @@ class BulkEditView(PassUserMixin):
# check ownership for methods that change original document
if (
has_perms
and method
in [
bulk_edit.set_permissions,
bulk_edit.delete,
bulk_edit.rotate,
bulk_edit.delete_pages,
]
) or (
method in [bulk_edit.merge, bulk_edit.split]
and parameters["delete_originals"]
(
has_perms
and method
in [
bulk_edit.set_permissions,
bulk_edit.delete,
bulk_edit.rotate,
bulk_edit.delete_pages,
bulk_edit.edit_pdf,
]
)
or (
method in [bulk_edit.merge, bulk_edit.split]
and parameters["delete_originals"]
)
or (method == bulk_edit.edit_pdf and parameters["delete_original"])
):
has_perms = user_is_owner_of_all_documents
# check global add permissions for methods that create documents
if (
has_perms
and method in [bulk_edit.split, bulk_edit.merge]
and method in [bulk_edit.split, bulk_edit.merge, bulk_edit.edit_pdf]
and not user.has_perm(
"documents.add_document",
)
@@ -1391,6 +1398,7 @@ class BulkEditView(PassUserMixin):
method in [bulk_edit.merge, bulk_edit.split]
and parameters["delete_originals"]
)
or (method == bulk_edit.edit_pdf and parameters["delete_original"])
)
and not user.has_perm("documents.delete_document")
):