mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-08-12 00:19:48 +00:00
Feature: PDF actions - merge, split & rotate (#6094)
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
from django.contrib.auth.models import Group
|
||||
@@ -275,3 +277,262 @@ class TestBulkEdit(DirectoriesMixin, TestCase):
|
||||
self.doc1,
|
||||
)
|
||||
self.assertEqual(groups_with_perms.count(), 2)
|
||||
|
||||
|
||||
class TestPDFActions(DirectoriesMixin, TestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
sample1 = self.dirs.scratch_dir / "sample.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000001.pdf",
|
||||
sample1,
|
||||
)
|
||||
sample1_archive = self.dirs.archive_dir / "sample_archive.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000001.pdf",
|
||||
sample1_archive,
|
||||
)
|
||||
sample2 = self.dirs.scratch_dir / "sample2.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000002.pdf",
|
||||
sample2,
|
||||
)
|
||||
sample2_archive = self.dirs.archive_dir / "sample2_archive.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000002.pdf",
|
||||
sample2_archive,
|
||||
)
|
||||
sample3 = self.dirs.scratch_dir / "sample3.pdf"
|
||||
shutil.copy(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000003.pdf",
|
||||
sample3,
|
||||
)
|
||||
self.doc1 = Document.objects.create(
|
||||
checksum="A",
|
||||
title="A",
|
||||
filename=sample1,
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
self.doc1.archive_filename = sample1_archive
|
||||
self.doc1.save()
|
||||
self.doc2 = Document.objects.create(
|
||||
checksum="B",
|
||||
title="B",
|
||||
filename=sample2,
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
self.doc2.archive_filename = sample2_archive
|
||||
self.doc2.save()
|
||||
self.doc3 = Document.objects.create(
|
||||
checksum="C",
|
||||
title="C",
|
||||
filename=sample3,
|
||||
mime_type="application/pdf",
|
||||
)
|
||||
img_doc = self.dirs.scratch_dir / "sample_image.jpg"
|
||||
shutil.copy(
|
||||
Path(__file__).parent / "samples" / "simple.jpg",
|
||||
img_doc,
|
||||
)
|
||||
self.img_doc = Document.objects.create(
|
||||
checksum="D",
|
||||
title="D",
|
||||
filename=img_doc,
|
||||
mime_type="image/jpeg",
|
||||
)
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
def test_merge(self, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Merge action is called with 3 documents
|
||||
THEN:
|
||||
- Consume file should be called
|
||||
"""
|
||||
doc_ids = [self.doc1.id, self.doc2.id, self.doc3.id]
|
||||
metadata_document_id = self.doc1.id
|
||||
|
||||
result = bulk_edit.merge(doc_ids)
|
||||
|
||||
expected_filename = (
|
||||
f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
|
||||
)
|
||||
|
||||
mock_consume_file.assert_called()
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(
|
||||
Path(consume_file_args[0].original_file).name,
|
||||
expected_filename,
|
||||
)
|
||||
self.assertEqual(consume_file_args[1].title, None)
|
||||
|
||||
# With metadata_document_id overrides
|
||||
result = bulk_edit.merge(doc_ids, metadata_document_id=metadata_document_id)
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(consume_file_args[1].title, "A (merged)")
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
@mock.patch("pikepdf.open")
|
||||
def test_merge_with_errors(self, mock_open_pdf, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Merge action is called with 2 documents
|
||||
- Error occurs when opening both files
|
||||
THEN:
|
||||
- Consume file should not be called
|
||||
"""
|
||||
mock_open_pdf.side_effect = Exception("Error opening PDF")
|
||||
doc_ids = [self.doc2.id, self.doc3.id]
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
bulk_edit.merge(doc_ids)
|
||||
error_str = cm.output[0]
|
||||
expected_str = (
|
||||
"Error merging document 2, it will not be included in the merge"
|
||||
)
|
||||
self.assertIn(expected_str, error_str)
|
||||
|
||||
mock_consume_file.assert_not_called()
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
def test_split(self, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Split action is called with 1 document and 2 pages
|
||||
THEN:
|
||||
- Consume file should be called twice
|
||||
"""
|
||||
doc_ids = [self.doc2.id]
|
||||
pages = [[1, 2], [3]]
|
||||
result = bulk_edit.split(doc_ids, pages)
|
||||
self.assertEqual(mock_consume_file.call_count, 2)
|
||||
consume_file_args, _ = mock_consume_file.call_args
|
||||
self.assertEqual(consume_file_args[1].title, "B (split 2)")
|
||||
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.consume_file.delay")
|
||||
@mock.patch("pikepdf.Pdf.save")
|
||||
def test_split_with_errors(self, mock_save_pdf, mock_consume_file):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Split action is called with 1 document and 2 page groups
|
||||
- Error occurs when saving the files
|
||||
THEN:
|
||||
- Consume file should not be called
|
||||
"""
|
||||
mock_save_pdf.side_effect = Exception("Error saving PDF")
|
||||
doc_ids = [self.doc2.id]
|
||||
pages = [[1, 2], [3]]
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
bulk_edit.split(doc_ids, pages)
|
||||
error_str = cm.output[0]
|
||||
expected_str = "Error splitting document 2"
|
||||
self.assertIn(expected_str, error_str)
|
||||
|
||||
mock_consume_file.assert_not_called()
|
||||
|
||||
@mock.patch("documents.tasks.bulk_update_documents.s")
|
||||
@mock.patch("documents.tasks.update_document_archive_file.s")
|
||||
@mock.patch("celery.chord.delay")
|
||||
def test_rotate(self, mock_chord, mock_update_document, mock_update_documents):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Rotate action is called with 2 documents
|
||||
THEN:
|
||||
- Rotate action should be called twice
|
||||
"""
|
||||
doc_ids = [self.doc1.id, self.doc2.id]
|
||||
result = bulk_edit.rotate(doc_ids, 90)
|
||||
self.assertEqual(mock_update_document.call_count, 2)
|
||||
mock_update_documents.assert_called_once()
|
||||
mock_chord.assert_called_once()
|
||||
self.assertEqual(result, "OK")
|
||||
|
||||
@mock.patch("documents.tasks.bulk_update_documents.s")
|
||||
@mock.patch("documents.tasks.update_document_archive_file.s")
|
||||
@mock.patch("pikepdf.Pdf.save")
|
||||
def test_rotate_with_error(
|
||||
self,
|
||||
mock_pdf_save,
|
||||
mock_update_archive_file,
|
||||
mock_update_documents,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Rotate action is called with 2 documents
|
||||
- PikePDF raises an error
|
||||
THEN:
|
||||
- Rotate action should be called 0 times
|
||||
"""
|
||||
mock_pdf_save.side_effect = Exception("Error saving PDF")
|
||||
doc_ids = [self.doc2.id, self.doc3.id]
|
||||
|
||||
with self.assertLogs("paperless.bulk_edit", level="ERROR") as cm:
|
||||
bulk_edit.rotate(doc_ids, 90)
|
||||
error_str = cm.output[0]
|
||||
expected_str = "Error rotating document"
|
||||
self.assertIn(expected_str, error_str)
|
||||
mock_update_archive_file.assert_not_called()
|
||||
|
||||
@mock.patch("documents.tasks.bulk_update_documents.s")
|
||||
@mock.patch("documents.tasks.update_document_archive_file.s")
|
||||
@mock.patch("celery.chord.delay")
|
||||
def test_rotate_non_pdf(
|
||||
self,
|
||||
mock_chord,
|
||||
mock_update_document,
|
||||
mock_update_documents,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Existing documents
|
||||
WHEN:
|
||||
- Rotate action is called with 2 documents, one of which is not a PDF
|
||||
THEN:
|
||||
- Rotate action should be performed 1 time, with the non-PDF document skipped
|
||||
"""
|
||||
with self.assertLogs("paperless.bulk_edit", level="INFO") as cm:
|
||||
result = bulk_edit.rotate([self.doc2.id, self.img_doc.id], 90)
|
||||
output_str = cm.output[1]
|
||||
expected_str = "Document 4 is not a PDF, skipping rotation"
|
||||
self.assertIn(expected_str, output_str)
|
||||
self.assertEqual(mock_update_document.call_count, 1)
|
||||
mock_update_documents.assert_called_once()
|
||||
mock_chord.assert_called_once()
|
||||
self.assertEqual(result, "OK")
|
||||
|
Reference in New Issue
Block a user