From 4fe966f53402c40d48be8a1972aa7297dd3dd7d0 Mon Sep 17 00:00:00 2001 From: "florian on nixos (Florian Brandes)" Date: Thu, 24 Mar 2022 11:33:24 +0100 Subject: [PATCH] more work on barcode Signed-off-by: florian on nixos (Florian Brandes) --- src/documents/tasks.py | 33 +++++++++++------- src/documents/tests/test_tasks.py | 58 ++++++++++++++++++++++++++----- 2 files changed, 70 insertions(+), 21 deletions(-) diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 1dd41b740..5161fcc01 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -22,6 +22,7 @@ from pyzbar import pyzbar from pdf2image import convert_from_path import tempfile from pikepdf import Pdf +import shutil logger = logging.getLogger("paperless.tasks") @@ -91,25 +92,25 @@ def barcode_reader(image) -> list: return barcodes -def scan_file_for_seperating_barcodes(filepath: str) -> list: +def scan_file_for_separating_barcodes(filepath: str) -> list: """ - Scan the provided file for page seperating barcodes - Returns a list of pagenumbers, which seperate the file + Scan the provided file for page separating barcodes + Returns a list of pagenumbers, which separate the file """ - seperator_page_numbers = [] + separator_page_numbers = [] # use a temporary directory in case the file os too big to handle in memory with tempfile.TemporaryDirectory() as path: pages_from_path = convert_from_path(filepath, output_folder=path) for current_page_number, page in enumerate(pages_from_path): current_barcodes = barcode_reader(page) if "b'PATCHT'" in current_barcodes: - seperator_page_numbers = seperator_page_numbers + [current_page_number] - return seperator_page_numbers + separator_page_numbers = separator_page_numbers + [current_page_number] + return separator_page_numbers -def seperate_pages(filepath: str, pages_to_split_on: list) -> list: +def separate_pages(filepath: str, pages_to_split_on: list) -> list: """ - Seperate the provided file on the pages_to_split_on. + Separate the provided file on the pages_to_split_on. The pages which are defined by page_numbers will be removed. Returns a list of (temporary) filepaths to consume. These will need to be deleted later. @@ -156,6 +157,14 @@ def seperate_pages(filepath: str, pages_to_split_on: list) -> list: logger.debug(f"Temp files are {str(document_paths)}") return document_paths +def save_to_dir(filepath, target_dir=settings.CONSUMPTION_DIR): + """ + Copies filepath to target_dir. + """ + if os.path.isfile(filepath) and os.path.isdir(target_dir): + shutil.copy(filepath, target_dir) + else: + logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.") def consume_file( path, @@ -167,10 +176,10 @@ def consume_file( task_id=None, ): - # check for seperators in current document - seperator_page_numbers = scan_file_for_seperating_barcodes(path) - if seperator_page_numbers != []: - logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}") + # check for separators in current document + separator_page_numbers = scan_file_for_separating_barcodes(path) + if separator_page_numbers != []: + logger.debug(f"Pages with separators found: {str(separator_page_numbers)}") document = Consumer().try_consume_file( path, diff --git a/src/documents/tests/test_tasks.py b/src/documents/tests/test_tasks.py index 02c747e3e..fcb6909ff 100644 --- a/src/documents/tests/test_tasks.py +++ b/src/documents/tests/test_tasks.py @@ -14,6 +14,7 @@ from documents.sanity_checker import SanityCheckMessages from documents.tests.utils import DirectoriesMixin from PIL import Image +import tempfile class TestTasks(DirectoriesMixin, TestCase): @@ -103,33 +104,72 @@ class TestTasks(DirectoriesMixin, TestCase): img = Image.open(test_file) self.assertEqual(tasks.barcode_reader(img), []) - def test_scan_file_for_seperating_barcodes(self): + def test_scan_file_for_separating_barcodes(self): test_file = os.path.join( os.path.dirname(__file__), "samples", "patch-code-t.pdf" ) - pages = tasks.scan_file_for_seperating_barcodes(test_file) + pages = tasks.scan_file_for_separating_barcodes(test_file) self.assertEqual(pages, [0]) - def test_scan_file_for_seperating_barcodes2(self): + def test_scan_file_for_separating_barcodes2(self): test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") - pages = tasks.scan_file_for_seperating_barcodes(test_file) + pages = tasks.scan_file_for_separating_barcodes(test_file) self.assertEqual(pages, []) - def test_scan_file_for_seperating_barcodes3(self): + def test_scan_file_for_separating_barcodes3(self): test_file = os.path.join( os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf" ) - pages = tasks.scan_file_for_seperating_barcodes(test_file) + pages = tasks.scan_file_for_separating_barcodes(test_file) self.assertEqual(pages, [1]) - def test_seperate_pages(self): + def test_separate_pages(self): test_file = os.path.join( os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf" ) - pages = tasks.seperate_pages(test_file, [1]) - + pages = tasks.separate_pages(test_file, [1]) self.assertEqual(len(pages), 2) + def test_save_to_dir(self): + test_file = os.path.join( + os.path.dirname(__file__), "samples", "patch-code-t.pdf" + ) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + tasks.save_to_dir(test_file, tempdir) + target_file = os.path.join(tempdir, "patch-code-t.pdf") + self.assertTrue(os.path.isfile(target_file)) + + def test_save_to_dir2(self): + test_file = os.path.join( + os.path.dirname(__file__), "samples", "patch-code-t.pdf" + ) + nonexistingdir = "/nowhere" + if not os.path.isdir(nonexistingdir): + with self.assertLogs("paperless.tasks", level="WARNING") as cm: + tasks.save_to_dir(test_file, nonexistingdir) + self.assertEqual( + cm.output, + [ + f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist." + ], + ) + + def test_barcode_splitter(self): + test_file = os.path.join( + os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf" + ) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + separators = tasks.scan_file_for_separating_barcodes(test_file) + self.assertTrue(separators != []) + document_list = tasks.separate_pages(test_file, separators) + self.assertTrue(document_list != []) + for document in document_list: + tasks.save_to_dir(document, tempdir) + target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf") + target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf") + self.assertTrue(os.path.isfile(target_file1)) + self.assertTrue(os.path.isfile(target_file2)) + @mock.patch("documents.tasks.sanity_checker.check_sanity") def test_sanity_check_success(self, m): m.return_value = SanityCheckMessages()