mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-01-12 21:44:21 -06:00
Feature: password removal action (#11656)
This commit is contained in:
@@ -646,6 +646,77 @@ def edit_pdf(
|
||||
return "OK"
|
||||
|
||||
|
||||
def remove_password(
|
||||
doc_ids: list[int],
|
||||
password: str,
|
||||
*,
|
||||
update_document: bool = False,
|
||||
delete_original: bool = False,
|
||||
include_metadata: bool = True,
|
||||
user: User | None = None,
|
||||
) -> Literal["OK"]:
|
||||
"""
|
||||
Remove password protection from PDF documents.
|
||||
"""
|
||||
import pikepdf
|
||||
|
||||
for doc_id in doc_ids:
|
||||
doc = Document.objects.get(id=doc_id)
|
||||
try:
|
||||
logger.info(
|
||||
f"Attempting password removal from document {doc_ids[0]}",
|
||||
)
|
||||
with pikepdf.open(doc.source_path, password=password) as pdf:
|
||||
temp_path = doc.source_path.with_suffix(".tmp.pdf")
|
||||
pdf.remove_unreferenced_resources()
|
||||
pdf.save(temp_path)
|
||||
|
||||
if update_document:
|
||||
# replace the original document with the unprotected one
|
||||
temp_path.replace(doc.source_path)
|
||||
doc.checksum = hashlib.md5(doc.source_path.read_bytes()).hexdigest()
|
||||
doc.page_count = len(pdf.pages)
|
||||
doc.save()
|
||||
update_document_content_maybe_archive_file.delay(document_id=doc.id)
|
||||
else:
|
||||
consume_tasks = []
|
||||
overrides = (
|
||||
DocumentMetadataOverrides().from_document(doc)
|
||||
if include_metadata
|
||||
else DocumentMetadataOverrides()
|
||||
)
|
||||
if user is not None:
|
||||
overrides.owner_id = user.id
|
||||
|
||||
filepath: Path = (
|
||||
Path(tempfile.mkdtemp(dir=settings.SCRATCH_DIR))
|
||||
/ f"{doc.id}_unprotected.pdf"
|
||||
)
|
||||
temp_path.replace(filepath)
|
||||
consume_tasks.append(
|
||||
consume_file.s(
|
||||
ConsumableDocument(
|
||||
source=DocumentSource.ConsumeFolder,
|
||||
original_file=filepath,
|
||||
),
|
||||
overrides,
|
||||
),
|
||||
)
|
||||
|
||||
if delete_original:
|
||||
chord(header=consume_tasks, body=delete.si([doc.id])).delay()
|
||||
else:
|
||||
group(consume_tasks).delay()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error removing password from document {doc.id}: {e}")
|
||||
raise ValueError(
|
||||
f"An error occurred while removing the password: {e}",
|
||||
) from e
|
||||
|
||||
return "OK"
|
||||
|
||||
|
||||
def reflect_doclinks(
|
||||
document: Document,
|
||||
field: CustomField,
|
||||
|
||||
@@ -1430,6 +1430,7 @@ class BulkEditSerializer(
|
||||
"split",
|
||||
"delete_pages",
|
||||
"edit_pdf",
|
||||
"remove_password",
|
||||
],
|
||||
label="Method",
|
||||
write_only=True,
|
||||
@@ -1505,6 +1506,8 @@ class BulkEditSerializer(
|
||||
return bulk_edit.delete_pages
|
||||
elif method == "edit_pdf":
|
||||
return bulk_edit.edit_pdf
|
||||
elif method == "remove_password":
|
||||
return bulk_edit.remove_password
|
||||
else: # pragma: no cover
|
||||
# This will never happen as it is handled by the ChoiceField
|
||||
raise serializers.ValidationError("Unsupported method.")
|
||||
@@ -1701,6 +1704,12 @@ class BulkEditSerializer(
|
||||
f"Page {op['page']} is out of bounds for document with {doc.page_count} pages.",
|
||||
)
|
||||
|
||||
def validate_parameters_remove_password(self, parameters):
|
||||
if "password" not in parameters:
|
||||
raise serializers.ValidationError("password not specified")
|
||||
if not isinstance(parameters["password"], str):
|
||||
raise serializers.ValidationError("password must be a string")
|
||||
|
||||
def validate(self, attrs):
|
||||
method = attrs["method"]
|
||||
parameters = attrs["parameters"]
|
||||
@@ -1741,6 +1750,8 @@ class BulkEditSerializer(
|
||||
"Edit PDF method only supports one document",
|
||||
)
|
||||
self._validate_parameters_edit_pdf(parameters, attrs["documents"][0])
|
||||
elif method == bulk_edit.remove_password:
|
||||
self.validate_parameters_remove_password(parameters)
|
||||
|
||||
return attrs
|
||||
|
||||
|
||||
@@ -1582,6 +1582,58 @@ class TestBulkEditAPI(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn(b"out of bounds", response.content)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.remove_password")
|
||||
def test_remove_password(self, m):
|
||||
self.setup_mock(m, "remove_password")
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc2.id],
|
||||
"method": "remove_password",
|
||||
"parameters": {"password": "secret", "update_document": True},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
self.assertCountEqual(args[0], [self.doc2.id])
|
||||
self.assertEqual(kwargs["password"], "secret")
|
||||
self.assertTrue(kwargs["update_document"])
|
||||
self.assertEqual(kwargs["user"], self.user)
|
||||
|
||||
def test_remove_password_invalid_params(self):
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc2.id],
|
||||
"method": "remove_password",
|
||||
"parameters": {},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn(b"password not specified", response.content)
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc2.id],
|
||||
"method": "remove_password",
|
||||
"parameters": {"password": 123},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
|
||||
self.assertIn(b"password must be a string", response.content)
|
||||
|
||||
@override_settings(AUDIT_LOG_ENABLED=True)
|
||||
def test_bulk_edit_audit_log_enabled_simple_field(self):
|
||||
"""
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import hashlib
|
||||
import shutil
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
@@ -1066,3 +1067,147 @@ class TestPDFActions(DirectoriesMixin, TestCase):
|
||||
bulk_edit.edit_pdf(doc_ids, operations, update_document=True)
|
||||
mock_group.assert_not_called()
|
||||
mock_consume_file.assert_not_called()
|
||||
|
||||
@mock.patch("documents.bulk_edit.update_document_content_maybe_archive_file.delay")
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_remove_password_update_document(self, mock_open, mock_update_document):
|
||||
doc = self.doc1
|
||||
original_checksum = doc.checksum
|
||||
|
||||
fake_pdf = mock.MagicMock()
|
||||
fake_pdf.pages = [mock.Mock(), mock.Mock(), mock.Mock()]
|
||||
|
||||
def save_side_effect(target_path):
|
||||
Path(target_path).write_bytes(b"new pdf content")
|
||||
|
||||
fake_pdf.save.side_effect = save_side_effect
|
||||
mock_open.return_value.__enter__.return_value = fake_pdf
|
||||
|
||||
result = bulk_edit.remove_password(
|
||||
[doc.id],
|
||||
password="secret",
|
||||
update_document=True,
|
||||
)
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
mock_open.assert_called_once_with(doc.source_path, password="secret")
|
||||
fake_pdf.remove_unreferenced_resources.assert_called_once()
|
||||
doc.refresh_from_db()
|
||||
self.assertNotEqual(doc.checksum, original_checksum)
|
||||
expected_checksum = hashlib.md5(doc.source_path.read_bytes()).hexdigest()
|
||||
self.assertEqual(doc.checksum, expected_checksum)
|
||||
self.assertEqual(doc.page_count, len(fake_pdf.pages))
|
||||
mock_update_document.assert_called_once_with(document_id=doc.id)
|
||||
|
||||
@mock.patch("documents.bulk_edit.chord")
|
||||
@mock.patch("documents.bulk_edit.group")
|
||||
@mock.patch("documents.tasks.consume_file.s")
|
||||
@mock.patch("documents.bulk_edit.tempfile.mkdtemp")
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_remove_password_creates_consumable_document(
|
||||
self,
|
||||
mock_open,
|
||||
mock_mkdtemp,
|
||||
mock_consume_file,
|
||||
mock_group,
|
||||
mock_chord,
|
||||
):
|
||||
doc = self.doc2
|
||||
temp_dir = self.dirs.scratch_dir / "remove-password"
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
mock_mkdtemp.return_value = str(temp_dir)
|
||||
|
||||
fake_pdf = mock.MagicMock()
|
||||
fake_pdf.pages = [mock.Mock(), mock.Mock()]
|
||||
|
||||
def save_side_effect(target_path):
|
||||
Path(target_path).write_bytes(b"password removed")
|
||||
|
||||
fake_pdf.save.side_effect = save_side_effect
|
||||
mock_open.return_value.__enter__.return_value = fake_pdf
|
||||
mock_group.return_value.delay.return_value = None
|
||||
|
||||
user = User.objects.create(username="owner")
|
||||
|
||||
result = bulk_edit.remove_password(
|
||||
[doc.id],
|
||||
password="secret",
|
||||
include_metadata=False,
|
||||
update_document=False,
|
||||
delete_original=False,
|
||||
user=user,
|
||||
)
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
mock_open.assert_called_once_with(doc.source_path, password="secret")
|
||||
mock_consume_file.assert_called_once()
|
||||
consume_args, _ = mock_consume_file.call_args
|
||||
consumable_document = consume_args[0]
|
||||
overrides = consume_args[1]
|
||||
expected_path = temp_dir / f"{doc.id}_unprotected.pdf"
|
||||
self.assertTrue(expected_path.exists())
|
||||
self.assertEqual(
|
||||
Path(consumable_document.original_file).resolve(),
|
||||
expected_path.resolve(),
|
||||
)
|
||||
self.assertEqual(overrides.owner_id, user.id)
|
||||
mock_group.assert_called_once_with([mock_consume_file.return_value])
|
||||
mock_group.return_value.delay.assert_called_once()
|
||||
mock_chord.assert_not_called()
|
||||
|
||||
@mock.patch("documents.bulk_edit.delete")
|
||||
@mock.patch("documents.bulk_edit.chord")
|
||||
@mock.patch("documents.bulk_edit.group")
|
||||
@mock.patch("documents.tasks.consume_file.s")
|
||||
@mock.patch("documents.bulk_edit.tempfile.mkdtemp")
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_remove_password_deletes_original(
|
||||
self,
|
||||
mock_open,
|
||||
mock_mkdtemp,
|
||||
mock_consume_file,
|
||||
mock_group,
|
||||
mock_chord,
|
||||
mock_delete,
|
||||
):
|
||||
doc = self.doc2
|
||||
temp_dir = self.dirs.scratch_dir / "remove-password-delete"
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
mock_mkdtemp.return_value = str(temp_dir)
|
||||
|
||||
fake_pdf = mock.MagicMock()
|
||||
fake_pdf.pages = [mock.Mock(), mock.Mock()]
|
||||
|
||||
def save_side_effect(target_path):
|
||||
Path(target_path).write_bytes(b"password removed")
|
||||
|
||||
fake_pdf.save.side_effect = save_side_effect
|
||||
mock_open.return_value.__enter__.return_value = fake_pdf
|
||||
mock_chord.return_value.delay.return_value = None
|
||||
|
||||
result = bulk_edit.remove_password(
|
||||
[doc.id],
|
||||
password="secret",
|
||||
include_metadata=False,
|
||||
update_document=False,
|
||||
delete_original=True,
|
||||
)
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
mock_open.assert_called_once_with(doc.source_path, password="secret")
|
||||
mock_consume_file.assert_called_once()
|
||||
mock_group.assert_not_called()
|
||||
mock_chord.assert_called_once()
|
||||
mock_chord.return_value.delay.assert_called_once()
|
||||
mock_delete.si.assert_called_once_with([doc.id])
|
||||
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_remove_password_open_failure(self, mock_open):
|
||||
mock_open.side_effect = RuntimeError("wrong password")
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
with self.assertRaises(ValueError) as exc:
|
||||
bulk_edit.remove_password([self.doc1.id], password="secret")
|
||||
|
||||
self.assertIn("wrong password", str(exc.exception))
|
||||
self.assertIn("Error removing password from document", cm.output[0])
|
||||
|
||||
@@ -1504,6 +1504,7 @@ class BulkEditView(PassUserMixin):
|
||||
"merge": None,
|
||||
"edit_pdf": "checksum",
|
||||
"reprocess": "checksum",
|
||||
"remove_password": "checksum",
|
||||
}
|
||||
|
||||
permission_classes = (IsAuthenticated,)
|
||||
@@ -1522,6 +1523,7 @@ class BulkEditView(PassUserMixin):
|
||||
bulk_edit.split,
|
||||
bulk_edit.merge,
|
||||
bulk_edit.edit_pdf,
|
||||
bulk_edit.remove_password,
|
||||
]:
|
||||
parameters["user"] = user
|
||||
|
||||
@@ -1550,6 +1552,7 @@ class BulkEditView(PassUserMixin):
|
||||
bulk_edit.rotate,
|
||||
bulk_edit.delete_pages,
|
||||
bulk_edit.edit_pdf,
|
||||
bulk_edit.remove_password,
|
||||
]
|
||||
)
|
||||
or (
|
||||
@@ -1566,7 +1569,7 @@ class BulkEditView(PassUserMixin):
|
||||
and (
|
||||
method in [bulk_edit.split, bulk_edit.merge]
|
||||
or (
|
||||
method == bulk_edit.edit_pdf
|
||||
method in [bulk_edit.edit_pdf, bulk_edit.remove_password]
|
||||
and not parameters["update_document"]
|
||||
)
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user