Merge branch 'dev' into feature-ai

This commit is contained in:
shamoon
2025-08-11 10:50:15 -07:00
committed by GitHub
29 changed files with 1549 additions and 912 deletions

View File

@@ -497,6 +497,103 @@ def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
return "OK"
def edit_pdf(
doc_ids: list[int],
operations: list[dict],
*,
delete_original: bool = False,
update_document: bool = False,
include_metadata: bool = True,
user: User | None = None,
) -> Literal["OK"]:
"""
Operations is a list of dictionaries describing the final PDF pages.
Each entry must contain the original page number in `page` and may
specify `rotate` in degrees and `doc` indicating the output
document index (for splitting). Pages omitted from the list are
discarded.
"""
logger.info(
f"Editing PDF of document {doc_ids[0]} with {len(operations)} operations",
)
doc = Document.objects.get(id=doc_ids[0])
import pikepdf
pdf_docs: list[pikepdf.Pdf] = []
try:
with pikepdf.open(doc.source_path) as src:
# prepare output documents
max_idx = max(op.get("doc", 0) for op in operations)
pdf_docs = [pikepdf.new() for _ in range(max_idx + 1)]
if update_document and len(pdf_docs) > 1:
logger.error(
"Update requested but multiple output documents specified",
)
raise ValueError("Multiple output documents specified")
for op in operations:
dst = pdf_docs[op.get("doc", 0)]
page = src.pages[op["page"] - 1]
dst.pages.append(page)
if op.get("rotate"):
dst.pages[-1].rotate(op["rotate"], relative=True)
if update_document:
temp_path = doc.source_path.with_suffix(".tmp.pdf")
pdf = pdf_docs[0]
pdf.remove_unreferenced_resources()
# save the edited PDF to a temporary file in case of errors
pdf.save(temp_path)
# replace the original document with the edited one
temp_path.replace(doc.source_path)
doc.checksum = hashlib.md5(doc.source_path.read_bytes()).hexdigest()
doc.page_count = len(pdf.pages)
doc.save()
update_document_content_maybe_archive_file.delay(document_id=doc.id)
else:
consume_tasks = []
overrides = (
DocumentMetadataOverrides().from_document(doc)
if include_metadata
else DocumentMetadataOverrides()
)
if user is not None:
overrides.owner_id = user.id
for idx, pdf in enumerate(pdf_docs, start=1):
filepath: Path = (
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
/ f"{doc.id}_edit_{idx}.pdf"
)
pdf.remove_unreferenced_resources()
pdf.save(filepath)
consume_tasks.append(
consume_file.s(
ConsumableDocument(
source=DocumentSource.ConsumeFolder,
original_file=filepath,
),
overrides,
),
)
if delete_original:
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
else:
group(consume_tasks).delay()
except Exception as e:
logger.exception(f"Error editing document {doc.id}: {e}")
raise ValueError(
f"An error occurred while editing the document: {e}",
) from e
return "OK"
def reflect_doclinks(
document: Document,
field: CustomField,

View File

@@ -1293,6 +1293,7 @@ class BulkEditSerializer(
"merge",
"split",
"delete_pages",
"edit_pdf",
],
label="Method",
write_only=True,
@@ -1366,7 +1367,10 @@ class BulkEditSerializer(
return bulk_edit.split
elif method == "delete_pages":
return bulk_edit.delete_pages
else:
elif method == "edit_pdf":
return bulk_edit.edit_pdf
else: # pragma: no cover
# This will never happen as it is handled by the ChoiceField
raise serializers.ValidationError("Unsupported method.")
def _validate_parameters_tags(self, parameters):
@@ -1520,6 +1524,47 @@ class BulkEditSerializer(
else:
parameters["archive_fallback"] = False
def _validate_parameters_edit_pdf(self, parameters, document_id):
if "operations" not in parameters:
raise serializers.ValidationError("operations not specified")
if not isinstance(parameters["operations"], list):
raise serializers.ValidationError("operations must be a list")
for op in parameters["operations"]:
if not isinstance(op, dict):
raise serializers.ValidationError("invalid operation entry")
if "page" not in op or not isinstance(op["page"], int):
raise serializers.ValidationError("page must be an integer")
if "rotate" in op and not isinstance(op["rotate"], int):
raise serializers.ValidationError("rotate must be an integer")
if "doc" in op and not isinstance(op["doc"], int):
raise serializers.ValidationError("doc must be an integer")
if "update_document" in parameters:
if not isinstance(parameters["update_document"], bool):
raise serializers.ValidationError("update_document must be a boolean")
else:
parameters["update_document"] = False
if "include_metadata" in parameters:
if not isinstance(parameters["include_metadata"], bool):
raise serializers.ValidationError("include_metadata must be a boolean")
else:
parameters["include_metadata"] = True
if parameters["update_document"]:
max_idx = max(op.get("doc", 0) for op in parameters["operations"])
if max_idx > 0:
raise serializers.ValidationError(
"update_document only allowed with a single output document",
)
doc = Document.objects.get(id=document_id)
# doc existence is already validated
if doc.page_count:
for op in parameters["operations"]:
if op["page"] < 1 or op["page"] > doc.page_count:
raise serializers.ValidationError(
f"Page {op['page']} is out of bounds for document with {doc.page_count} pages.",
)
def validate(self, attrs):
method = attrs["method"]
parameters = attrs["parameters"]
@@ -1554,6 +1599,12 @@ class BulkEditSerializer(
self._validate_parameters_delete_pages(parameters)
elif method == bulk_edit.merge:
self._validate_parameters_merge(parameters)
elif method == bulk_edit.edit_pdf:
if len(attrs["documents"]) > 1:
raise serializers.ValidationError(
"Edit PDF method only supports one document",
)
self._validate_parameters_edit_pdf(parameters, attrs["documents"][0])
return attrs

View File

@@ -1,9 +1,12 @@
from __future__ import annotations
import ipaddress
import logging
import shutil
import socket
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlparse
import httpx
from celery import shared_task
@@ -671,6 +674,28 @@ def run_workflows_updated(sender, document: Document, logging_group=None, **kwar
)
def _is_public_ip(ip: str) -> bool:
try:
obj = ipaddress.ip_address(ip)
return not (
obj.is_private
or obj.is_loopback
or obj.is_link_local
or obj.is_multicast
or obj.is_unspecified
)
except ValueError: # pragma: no cover
return False
def _resolve_first_ip(host: str) -> str | None:
try:
info = socket.getaddrinfo(host, None)
return info[0][4][0] if info else None
except Exception: # pragma: no cover
return None
@shared_task(
retry_backoff=True,
autoretry_for=(httpx.HTTPStatusError,),
@@ -685,11 +710,35 @@ def send_webhook(
*,
as_json: bool = False,
):
p = urlparse(url)
if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname:
logger.warning("Webhook blocked: invalid scheme/hostname")
raise ValueError("Invalid URL scheme or hostname.")
port = p.port or (443 if p.scheme == "https" else 80)
if (
len(settings.WEBHOOKS_ALLOWED_PORTS) > 0
and port not in settings.WEBHOOKS_ALLOWED_PORTS
):
logger.warning("Webhook blocked: port not permitted")
raise ValueError("Destination port not permitted.")
ip = _resolve_first_ip(p.hostname)
if not ip or (
not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS
):
logger.warning("Webhook blocked: destination not allowed")
raise ValueError("Destination host is not allowed.")
try:
post_args = {
"url": url,
"headers": headers,
"files": files,
"headers": {
k: v for k, v in (headers or {}).items() if k.lower() != "host"
},
"files": files or None,
"timeout": 5.0,
"follow_redirects": False,
}
if as_json:
post_args["json"] = data
@@ -710,15 +759,6 @@ def send_webhook(
)
raise e
logger.info(
f"Webhook sent to {url}",
)
except Exception as e:
logger.error(
f"Failed attempt sending webhook to {url}: {e}",
)
raise e
def run_workflows(
trigger_type: WorkflowTrigger.WorkflowTriggerType,

View File

@@ -41,6 +41,7 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
title="B",
correspondent=self.c1,
document_type=self.dt1,
page_count=5,
)
self.doc3 = Document.objects.create(
checksum="C",
@@ -1369,6 +1370,218 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"pages must be a list of integers", response.content)
@mock.patch("documents.serialisers.bulk_edit.edit_pdf")
def test_edit_pdf(self, m):
self.setup_mock(m, "edit_pdf")
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 1}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertCountEqual(args[0], [self.doc2.id])
self.assertEqual(kwargs["operations"], [{"page": 1}])
self.assertEqual(kwargs["user"], self.user)
def test_edit_pdf_invalid_params(self):
# multiple documents
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 1}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"Edit PDF method only supports one document", response.content)
# no operations specified
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"operations not specified", response.content)
# operations not a list
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": "not_a_list"},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"operations must be a list", response.content)
# invalid operation
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": ["invalid_operation"]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"invalid operation entry", response.content)
# page not an int
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": "not_an_int"}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"page must be an integer", response.content)
# rotate not an int
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 1, "rotate": "not_an_int"}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"rotate must be an integer", response.content)
# doc not an int
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 1, "doc": "not_an_int"}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"doc must be an integer", response.content)
# update_document not a boolean
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {
"update_document": "not_a_bool",
"operations": [{"page": 1}],
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"update_document must be a boolean", response.content)
# include_metadata not a boolean
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {
"include_metadata": "not_a_bool",
"operations": [{"page": 1}],
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"include_metadata must be a boolean", response.content)
# update_document True but output would be multiple documents
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {
"update_document": True,
"operations": [{"page": 1, "doc": 1}, {"page": 2, "doc": 2}],
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(
b"update_document only allowed with a single output document",
response.content,
)
@mock.patch("documents.serialisers.bulk_edit.edit_pdf")
def test_edit_pdf_page_out_of_bounds(self, m):
"""
GIVEN:
- API data for editing PDF is called
- The page number is out of bounds
WHEN:
- API is called
THEN:
- The API fails with a correct error code
"""
self.setup_mock(m, "edit_pdf")
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "edit_pdf",
"parameters": {"operations": [{"page": 99}]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn(b"out of bounds", response.content)
@override_settings(AUDIT_LOG_ENABLED=True)
def test_bulk_edit_audit_log_enabled_simple_field(self):
"""

View File

@@ -909,3 +909,156 @@ class TestPDFActions(DirectoriesMixin, TestCase):
expected_str = "Error deleting pages from document"
self.assertIn(expected_str, error_str)
mock_update_archive_file.assert_not_called()
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_basic_operations(self, mock_consume_file, mock_group):
"""
GIVEN:
- Existing document
WHEN:
- edit_pdf is called with two operations to split the doc and rotate pages
THEN:
- A grouped task is generated and delay() is called
"""
mock_group.return_value.delay.return_value = None
doc_ids = [self.doc2.id]
operations = [{"page": 1, "doc": 0}, {"page": 2, "doc": 1, "rotate": 90}]
result = bulk_edit.edit_pdf(doc_ids, operations)
self.assertEqual(result, "OK")
mock_group.return_value.delay.assert_called_once()
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_with_user_override(self, mock_consume_file, mock_group):
"""
GIVEN:
- Existing document
WHEN:
- edit_pdf is called with user override
THEN:
- Task is created with user context
"""
mock_group.return_value.delay.return_value = None
doc_ids = [self.doc2.id]
operations = [{"page": 1, "doc": 0}, {"page": 2, "doc": 1}]
user = User.objects.create(username="editor")
result = bulk_edit.edit_pdf(doc_ids, operations, user=user)
self.assertEqual(result, "OK")
mock_group.return_value.delay.assert_called_once()
@mock.patch("documents.bulk_edit.chord")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_with_delete_original(self, mock_consume_file, mock_chord):
"""
GIVEN:
- Existing document
WHEN:
- edit_pdf is called with delete_original=True
THEN:
- Task group is triggered
"""
mock_chord.return_value.delay.return_value = None
doc_ids = [self.doc2.id]
operations = [{"page": 1}, {"page": 2}]
result = bulk_edit.edit_pdf(doc_ids, operations, delete_original=True)
self.assertEqual(result, "OK")
mock_chord.assert_called_once()
@mock.patch("documents.tasks.update_document_content_maybe_archive_file.delay")
def test_edit_pdf_with_update_document(self, mock_update_document):
"""
GIVEN:
- A single existing PDF document
WHEN:
- edit_pdf is called with update_document=True and a single output
THEN:
- The original document is updated in-place
- The update_document_content_maybe_archive_file task is triggered
"""
doc_ids = [self.doc2.id]
operations = [{"page": 1}, {"page": 2}]
original_checksum = self.doc2.checksum
original_page_count = self.doc2.page_count
result = bulk_edit.edit_pdf(
doc_ids,
operations=operations,
update_document=True,
delete_original=False,
)
self.assertEqual(result, "OK")
self.doc2.refresh_from_db()
self.assertNotEqual(self.doc2.checksum, original_checksum)
self.assertNotEqual(self.doc2.page_count, original_page_count)
mock_update_document.assert_called_once_with(document_id=self.doc2.id)
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_without_metadata(self, mock_consume_file, mock_group):
"""
GIVEN:
- Existing document
WHEN:
- edit_pdf is called with include_metadata=False
THEN:
- Tasks are created with empty metadata
"""
mock_group.return_value.delay.return_value = None
doc_ids = [self.doc2.id]
operations = [{"page": 1}]
result = bulk_edit.edit_pdf(doc_ids, operations, include_metadata=False)
self.assertEqual(result, "OK")
mock_group.return_value.delay.assert_called_once()
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_open_failure(self, mock_consume_file, mock_group):
"""
GIVEN:
- Existing document
WHEN:
- edit_pdf fails to open PDF
THEN:
- Task group is not called
"""
doc_ids = [self.doc2.id]
operations = [
{"page": 9999}, # invalid page, forces error during PDF load
]
with self.assertLogs("paperless.bulk_edit", level="ERROR"):
with self.assertRaises(Exception):
bulk_edit.edit_pdf(doc_ids, operations)
mock_group.assert_not_called()
mock_consume_file.assert_not_called()
@mock.patch("documents.bulk_edit.group")
@mock.patch("documents.tasks.consume_file.s")
def test_edit_pdf_multiple_outputs_with_update_flag_errors(
self,
mock_consume_file,
mock_group,
):
"""
GIVEN:
- Existing document
WHEN:
- edit_pdf is called with multiple outputs and update_document=True
THEN:
- An error is logged and task group is not called
"""
doc_ids = [self.doc2.id]
operations = [
{"page": 1, "doc": 0},
{"page": 2, "doc": 1},
]
with self.assertLogs("paperless.bulk_edit", level="ERROR"):
with self.assertRaises(ValueError):
bulk_edit.edit_pdf(doc_ids, operations, update_document=True)
mock_group.assert_not_called()
mock_consume_file.assert_not_called()

View File

@@ -1,8 +1,10 @@
import shutil
import socket
from datetime import timedelta
from typing import TYPE_CHECKING
from unittest import mock
import pytest
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.test import override_settings
@@ -10,6 +12,7 @@ from django.utils import timezone
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_groups_with_perms
from guardian.shortcuts import get_users_with_perms
from httpx import HTTPError
from httpx import HTTPStatusError
from pytest_httpx import HTTPXMock
from rest_framework.test import APITestCase
@@ -2825,6 +2828,8 @@ class TestWorkflows(
content="Test message",
headers={},
files=None,
follow_redirects=False,
timeout=5,
)
expected_str = "Webhook sent to http://paperless-ngx.com"
@@ -2842,6 +2847,8 @@ class TestWorkflows(
data={"message": "Test message"},
headers={},
files=None,
follow_redirects=False,
timeout=5,
)
@mock.patch("httpx.post")
@@ -2962,3 +2969,164 @@ class TestWebhookSend:
as_json=True,
)
assert httpx_mock.get_request().headers["Content-Type"] == "application/json"
@pytest.fixture
def resolve_to(monkeypatch):
"""
Force DNS resolution to a specific IP for any hostname.
"""
def _set(ip: str):
def fake_getaddrinfo(host, *_args, **_kwargs):
return [(socket.AF_INET, None, None, "", (ip, 0))]
monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo)
return _set
class TestWebhookSecurity:
def test_blocks_invalid_scheme_or_hostname(self, httpx_mock: HTTPXMock):
"""
GIVEN:
- Invalid URL schemes or hostnames
WHEN:
- send_webhook is called with such URLs
THEN:
- ValueError is raised
"""
with pytest.raises(ValueError):
send_webhook(
"ftp://example.com",
data="",
headers={},
files=None,
as_json=False,
)
with pytest.raises(ValueError):
send_webhook(
"http:///nohost",
data="",
headers={},
files=None,
as_json=False,
)
@override_settings(WEBHOOKS_ALLOWED_PORTS=[80, 443])
def test_blocks_disallowed_port(self, httpx_mock: HTTPXMock):
"""
GIVEN:
- URL with a disallowed port
WHEN:
- send_webhook is called with such URL
THEN:
- ValueError is raised
"""
with pytest.raises(ValueError):
send_webhook(
"http://paperless-ngx.com:8080",
data="",
headers={},
files=None,
as_json=False,
)
assert httpx_mock.get_request() is None
@override_settings(WEBHOOKS_ALLOW_INTERNAL_REQUESTS=False)
def test_blocks_private_loopback_linklocal(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- URL with a private, loopback, or link-local IP address
- WEBHOOKS_ALLOW_INTERNAL_REQUESTS is False
WHEN:
- send_webhook is called with such URL
THEN:
- ValueError is raised
"""
resolve_to("127.0.0.1")
with pytest.raises(ValueError):
send_webhook(
"http://paperless-ngx.com",
data="",
headers={},
files=None,
as_json=False,
)
def test_allows_public_ip_and_sends(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- URL with a public IP address
WHEN:
- send_webhook is called with such URL
THEN:
- Request is sent successfully
"""
resolve_to("52.207.186.75")
httpx_mock.add_response(content=b"ok")
send_webhook(
url="http://paperless-ngx.com",
data="hi",
headers={},
files=None,
as_json=False,
)
req = httpx_mock.get_request()
assert req.url.host == "paperless-ngx.com"
def test_follow_redirects_disabled(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- A URL that redirects
WHEN:
- send_webhook is called with follow_redirects=False
THEN:
- Request is made to the original URL and does not follow the redirect
"""
resolve_to("52.207.186.75")
# Return a redirect and ensure we don't follow it (only one request recorded)
httpx_mock.add_response(
status_code=302,
headers={"location": "http://internal-service.local"},
content=b"",
)
with pytest.raises(HTTPError):
send_webhook(
"http://paperless-ngx.com",
data="",
headers={},
files=None,
as_json=False,
)
assert len(httpx_mock.get_requests()) == 1
def test_strips_user_supplied_host_header(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- A URL with a user-supplied Host header
WHEN:
- send_webhook is called with a malicious Host header
THEN:
- The Host header is stripped and replaced with the resolved hostname
"""
resolve_to("52.207.186.75")
httpx_mock.add_response(content=b"ok")
send_webhook(
url="http://paperless-ngx.com",
data="ok",
headers={"Host": "evil.test"},
files=None,
as_json=False,
)
req = httpx_mock.get_request()
assert req.headers["Host"] == "paperless-ngx.com"
assert "evil.test" not in req.headers.get("Host", "")

View File

@@ -1448,6 +1448,7 @@ class BulkEditView(PassUserMixin):
"delete_pages": "checksum",
"split": None,
"merge": None,
"edit_pdf": "checksum",
"reprocess": "checksum",
}
@@ -1466,6 +1467,7 @@ class BulkEditView(PassUserMixin):
if method in [
bulk_edit.split,
bulk_edit.merge,
bulk_edit.edit_pdf,
]:
parameters["user"] = user
@@ -1485,27 +1487,36 @@ class BulkEditView(PassUserMixin):
# check ownership for methods that change original document
if (
has_perms
and method
in [
bulk_edit.set_permissions,
bulk_edit.delete,
bulk_edit.rotate,
bulk_edit.delete_pages,
]
) or (
method in [bulk_edit.merge, bulk_edit.split]
and parameters["delete_originals"]
(
has_perms
and method
in [
bulk_edit.set_permissions,
bulk_edit.delete,
bulk_edit.rotate,
bulk_edit.delete_pages,
bulk_edit.edit_pdf,
]
)
or (
method in [bulk_edit.merge, bulk_edit.split]
and parameters["delete_originals"]
)
or (method == bulk_edit.edit_pdf and parameters["update_document"])
):
has_perms = user_is_owner_of_all_documents
# check global add permissions for methods that create documents
if (
has_perms
and method in [bulk_edit.split, bulk_edit.merge]
and not user.has_perm(
"documents.add_document",
and (
method in [bulk_edit.split, bulk_edit.merge]
or (
method == bulk_edit.edit_pdf
and not parameters["update_document"]
)
)
and not user.has_perm("documents.add_document")
):
has_perms = False
@@ -1543,7 +1554,6 @@ class BulkEditView(PassUserMixin):
)
}
# TODO: parameter validation
result = method(documents, **parameters)
if settings.AUDIT_LOG_ENABLED and modified_field: