From ec045e81f217e8b667614c32879f873c220ae035 Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Mon, 13 Jun 2022 17:11:22 -0700 Subject: [PATCH] Moves the barcode related functionality out of tasks and into its own location. Splits up the testing based on that --- src/documents/barcodes.py | 186 +++++++++++ src/documents/tasks.py | 192 +---------- src/documents/tests/test_barcodes.py | 456 +++++++++++++++++++++++++++ src/documents/tests/test_tasks.py | 442 -------------------------- 4 files changed, 656 insertions(+), 620 deletions(-) create mode 100644 src/documents/barcodes.py create mode 100644 src/documents/tests/test_barcodes.py diff --git a/src/documents/barcodes.py b/src/documents/barcodes.py new file mode 100644 index 000000000..ccfae37cb --- /dev/null +++ b/src/documents/barcodes.py @@ -0,0 +1,186 @@ +import logging +import os +import shutil +import tempfile +from functools import lru_cache +from typing import List # for type hinting. Can be removed, if only Python >3.8 is used + +import magic +from django.conf import settings +from pdf2image import convert_from_path +from pikepdf import Pdf +from PIL import Image +from PIL import ImageSequence +from pyzbar import pyzbar + +logger = logging.getLogger("paperless.barcodes") + + +@lru_cache(maxsize=8) +def supported_file_type(mime_type) -> bool: + """ + Determines if the file is valid for barcode + processing, based on MIME type and settings + + :return: True if the file is supported, False otherwise + """ + supported_mime = ["application/pdf"] + if settings.CONSUMER_BARCODE_TIFF_SUPPORT: + supported_mime += ["image/tiff"] + + return mime_type in supported_mime + + +def barcode_reader(image) -> List[str]: + """ + Read any barcodes contained in image + Returns a list containing all found barcodes + """ + barcodes = [] + # Decode the barcode image + detected_barcodes = pyzbar.decode(image) + + if detected_barcodes: + # Traverse through all the detected barcodes in image + for barcode in detected_barcodes: + if barcode.data: + decoded_barcode = barcode.data.decode("utf-8") + barcodes.append(decoded_barcode) + logger.debug( + f"Barcode of type {str(barcode.type)} found: {decoded_barcode}", + ) + return barcodes + + +def get_file_mime_type(path: str) -> str: + """ + Determines the file type, based on MIME type. + + Returns the MIME type. + """ + mime_type = magic.from_file(path, mime=True) + logger.debug(f"Detected mime type: {mime_type}") + return mime_type + + +def convert_from_tiff_to_pdf(filepath: str) -> str: + """ + converts a given TIFF image file to pdf into a temporary directory. + + Returns the new pdf file. + """ + file_name = os.path.splitext(os.path.basename(filepath))[0] + mime_type = get_file_mime_type(filepath) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + # use old file name with pdf extension + if mime_type == "image/tiff": + newpath = os.path.join(tempdir, file_name + ".pdf") + else: + logger.warning( + f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.", + ) + return None + with Image.open(filepath) as image: + images = [] + for i, page in enumerate(ImageSequence.Iterator(image)): + page = page.convert("RGB") + images.append(page) + try: + if len(images) == 1: + images[0].save(newpath) + else: + images[0].save(newpath, save_all=True, append_images=images[1:]) + except OSError as e: + logger.warning( + f"Could not save the file as pdf. Error: {str(e)}", + ) + return None + return newpath + + +def scan_file_for_separating_barcodes(filepath: str) -> List[int]: + """ + Scan the provided pdf file for page separating barcodes + Returns a list of pagenumbers, which separate the file + """ + separator_page_numbers = [] + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + # use a temporary directory in case the file os too big to handle in memory + with tempfile.TemporaryDirectory() as path: + pages_from_path = convert_from_path(filepath, output_folder=path) + for current_page_number, page in enumerate(pages_from_path): + current_barcodes = barcode_reader(page) + if separator_barcode in current_barcodes: + separator_page_numbers.append(current_page_number) + return separator_page_numbers + + +def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]: + """ + Separate the provided pdf file on the pages_to_split_on. + The pages which are defined by page_numbers will be removed. + Returns a list of (temporary) filepaths to consume. + These will need to be deleted later. + """ + os.makedirs(settings.SCRATCH_DIR, exist_ok=True) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + fname = os.path.splitext(os.path.basename(filepath))[0] + pdf = Pdf.open(filepath) + document_paths = [] + logger.debug(f"Temp dir is {str(tempdir)}") + if not pages_to_split_on: + logger.warning("No pages to split on!") + else: + # go from the first page to the first separator page + dst = Pdf.new() + for n, page in enumerate(pdf.pages): + if n < pages_to_split_on[0]: + dst.pages.append(page) + output_filename = f"{fname}_document_0.pdf" + savepath = os.path.join(tempdir, output_filename) + with open(savepath, "wb") as out: + dst.save(out) + document_paths = [savepath] + + # iterate through the rest of the document + for count, page_number in enumerate(pages_to_split_on): + logger.debug(f"Count: {str(count)} page_number: {str(page_number)}") + dst = Pdf.new() + try: + next_page = pages_to_split_on[count + 1] + except IndexError: + next_page = len(pdf.pages) + # skip the first page_number. This contains the barcode page + for page in range(page_number + 1, next_page): + logger.debug( + f"page_number: {str(page_number)} next_page: {str(next_page)}", + ) + dst.pages.append(pdf.pages[page]) + output_filename = f"{fname}_document_{str(count + 1)}.pdf" + logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages") + savepath = os.path.join(tempdir, output_filename) + with open(savepath, "wb") as out: + dst.save(out) + document_paths.append(savepath) + logger.debug(f"Temp files are {str(document_paths)}") + return document_paths + + +def save_to_dir( + filepath: str, + newname: str = None, + target_dir: str = settings.CONSUMPTION_DIR, +): + """ + Copies filepath to target_dir. + Optionally rename the file. + """ + if os.path.isfile(filepath) and os.path.isdir(target_dir): + dst = shutil.copy(filepath, target_dir) + logging.debug(f"saved {str(filepath)} to {str(dst)}") + if newname: + dst_new = os.path.join(target_dir, newname) + logger.debug(f"moving {str(dst)} to {str(dst_new)}") + os.rename(dst, dst_new) + else: + logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.") diff --git a/src/documents/tasks.py b/src/documents/tasks.py index 4c57b2eee..b27ecd1dd 100644 --- a/src/documents/tasks.py +++ b/src/documents/tasks.py @@ -1,15 +1,12 @@ import logging import os -import shutil -import tempfile -from typing import List # for type hinting. Can be removed, if only Python >3.8 is used -import magic import tqdm from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from django.conf import settings from django.db.models.signals import post_save +from documents import barcodes from documents import index from documents import sanity_checker from documents.classifier import DocumentClassifier @@ -22,11 +19,6 @@ from documents.models import DocumentType from documents.models import StoragePath from documents.models import Tag from documents.sanity_checker import SanityCheckFailedException -from pdf2image import convert_from_path -from pikepdf import Pdf -from PIL import Image -from PIL import ImageSequence -from pyzbar import pyzbar from whoosh.writing import AsyncWriter @@ -77,161 +69,6 @@ def train_classifier(): logger.warning("Classifier error: " + str(e)) -def barcode_reader(image) -> List[str]: - """ - Read any barcodes contained in image - Returns a list containing all found barcodes - """ - barcodes = [] - # Decode the barcode image - detected_barcodes = pyzbar.decode(image) - - if detected_barcodes: - # Traverse through all the detected barcodes in image - for barcode in detected_barcodes: - if barcode.data: - decoded_barcode = barcode.data.decode("utf-8") - barcodes.append(decoded_barcode) - logger.debug( - f"Barcode of type {str(barcode.type)} found: {decoded_barcode}", - ) - return barcodes - - -def get_file_type(path: str) -> str: - """ - Determines the file type, based on MIME type. - - Returns the MIME type. - """ - mime_type = magic.from_file(path, mime=True) - logger.debug(f"Detected mime type: {mime_type}") - return mime_type - - -def convert_from_tiff_to_pdf(filepath: str) -> str: - """ - converts a given TIFF image file to pdf into a temporary directory. - - Returns the new pdf file. - """ - file_name = os.path.splitext(os.path.basename(filepath))[0] - mime_type = get_file_type(filepath) - tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) - # use old file name with pdf extension - if mime_type == "image/tiff": - newpath = os.path.join(tempdir, file_name + ".pdf") - else: - logger.warning( - f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.", - ) - return None - with Image.open(filepath) as image: - images = [] - for i, page in enumerate(ImageSequence.Iterator(image)): - page = page.convert("RGB") - images.append(page) - try: - if len(images) == 1: - images[0].save(newpath) - else: - images[0].save(newpath, save_all=True, append_images=images[1:]) - except OSError as e: - logger.warning( - f"Could not save the file as pdf. Error: {str(e)}", - ) - return None - return newpath - - -def scan_file_for_separating_barcodes(filepath: str) -> List[int]: - """ - Scan the provided pdf file for page separating barcodes - Returns a list of pagenumbers, which separate the file - """ - separator_page_numbers = [] - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - # use a temporary directory in case the file os too big to handle in memory - with tempfile.TemporaryDirectory() as path: - pages_from_path = convert_from_path(filepath, output_folder=path) - for current_page_number, page in enumerate(pages_from_path): - current_barcodes = barcode_reader(page) - if separator_barcode in current_barcodes: - separator_page_numbers.append(current_page_number) - return separator_page_numbers - - -def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]: - """ - Separate the provided pdf file on the pages_to_split_on. - The pages which are defined by page_numbers will be removed. - Returns a list of (temporary) filepaths to consume. - These will need to be deleted later. - """ - os.makedirs(settings.SCRATCH_DIR, exist_ok=True) - tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) - fname = os.path.splitext(os.path.basename(filepath))[0] - pdf = Pdf.open(filepath) - document_paths = [] - logger.debug(f"Temp dir is {str(tempdir)}") - if not pages_to_split_on: - logger.warning("No pages to split on!") - else: - # go from the first page to the first separator page - dst = Pdf.new() - for n, page in enumerate(pdf.pages): - if n < pages_to_split_on[0]: - dst.pages.append(page) - output_filename = f"{fname}_document_0.pdf" - savepath = os.path.join(tempdir, output_filename) - with open(savepath, "wb") as out: - dst.save(out) - document_paths = [savepath] - - # iterate through the rest of the document - for count, page_number in enumerate(pages_to_split_on): - logger.debug(f"Count: {str(count)} page_number: {str(page_number)}") - dst = Pdf.new() - try: - next_page = pages_to_split_on[count + 1] - except IndexError: - next_page = len(pdf.pages) - # skip the first page_number. This contains the barcode page - for page in range(page_number + 1, next_page): - logger.debug( - f"page_number: {str(page_number)} next_page: {str(next_page)}", - ) - dst.pages.append(pdf.pages[page]) - output_filename = f"{fname}_document_{str(count + 1)}.pdf" - logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages") - savepath = os.path.join(tempdir, output_filename) - with open(savepath, "wb") as out: - dst.save(out) - document_paths.append(savepath) - logger.debug(f"Temp files are {str(document_paths)}") - return document_paths - - -def save_to_dir( - filepath: str, - newname: str = None, - target_dir: str = settings.CONSUMPTION_DIR, -): - """ - Copies filepath to target_dir. - Optionally rename the file. - """ - if os.path.isfile(filepath) and os.path.isdir(target_dir): - dst = shutil.copy(filepath, target_dir) - logging.debug(f"saved {str(filepath)} to {str(dst)}") - if newname: - dst_new = os.path.join(target_dir, newname) - logger.debug(f"moving {str(dst)} to {str(dst_new)}") - os.rename(dst, dst_new) - else: - logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.") - - def consume_file( path, override_filename=None, @@ -245,32 +82,31 @@ def consume_file( # check for separators in current document if settings.CONSUMER_ENABLE_BARCODES: - separators = [] - document_list = [] - converted_tiff = None - if settings.CONSUMER_BARCODE_TIFF_SUPPORT: - supported_mime = ["image/tiff", "application/pdf"] - else: - supported_mime = ["application/pdf"] - mime_type = get_file_type(path) - if mime_type not in supported_mime: + + mime_type = barcodes.get_file_mime_type(path) + + if not barcodes.supported_file_type(mime_type): # if not supported, skip this routine logger.warning( f"Unsupported file format for barcode reader: {str(mime_type)}", ) else: + separators = [] + document_list = [] + converted_tiff = None + if mime_type == "image/tiff": - file_to_process = convert_from_tiff_to_pdf(path) + file_to_process = barcodes.convert_from_tiff_to_pdf(path) else: file_to_process = path - separators = scan_file_for_separating_barcodes(file_to_process) + separators = barcodes.scan_file_for_separating_barcodes(file_to_process) if separators: logger.debug( f"Pages with separators found in: {str(path)}", ) - document_list = separate_pages(file_to_process, separators) + document_list = barcodes.separate_pages(file_to_process, separators) if document_list: for n, document in enumerate(document_list): @@ -280,10 +116,10 @@ def consume_file( newname = f"{str(n)}_" + override_filename else: newname = None - save_to_dir(document, newname=newname) + barcodes.save_to_dir(document, newname=newname) # if we got here, the document was successfully split # and can safely be deleted - if converted_tiff: + if converted_tiff is not None: logger.debug(f"Deleting file {file_to_process}") os.unlink(file_to_process) logger.debug(f"Deleting file {path}") diff --git a/src/documents/tests/test_barcodes.py b/src/documents/tests/test_barcodes.py new file mode 100644 index 000000000..e4e7566ad --- /dev/null +++ b/src/documents/tests/test_barcodes.py @@ -0,0 +1,456 @@ +import os +import shutil +import tempfile +from unittest import mock + +from django.conf import settings +from django.test import override_settings +from django.test import TestCase +from documents import barcodes +from documents import tasks +from documents.tests.utils import DirectoriesMixin +from PIL import Image + + +class TestBarcode(DirectoriesMixin, TestCase): + def test_barcode_reader(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-PATCHT.png", + ) + img = Image.open(test_file) + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) + + def test_barcode_reader2(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t.pbm", + ) + img = Image.open(test_file) + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) + + def test_barcode_reader_distorsion(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-PATCHT-distorsion.png", + ) + img = Image.open(test_file) + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) + + def test_barcode_reader_distorsion2(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-PATCHT-distorsion2.png", + ) + img = Image.open(test_file) + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) + + def test_barcode_reader_unreadable(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-PATCHT-unreadable.png", + ) + img = Image.open(test_file) + self.assertEqual(barcodes.barcode_reader(img), []) + + def test_barcode_reader_qr(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "qr-code-PATCHT.png", + ) + img = Image.open(test_file) + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) + + def test_barcode_reader_128(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-128-PATCHT.png", + ) + img = Image.open(test_file) + separator_barcode = str(settings.CONSUMER_BARCODE_STRING) + self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) + + def test_barcode_reader_no_barcode(self): + test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png") + img = Image.open(test_file) + self.assertEqual(barcodes.barcode_reader(img), []) + + def test_barcode_reader_custom_separator(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-custom.png", + ) + img = Image.open(test_file) + self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"]) + + def test_barcode_reader_custom_qr_separator(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-qr-custom.png", + ) + img = Image.open(test_file) + self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"]) + + def test_barcode_reader_custom_128_separator(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-128-custom.png", + ) + img = Image.open(test_file) + self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"]) + + def test_get_mime_type(self): + tiff_file = os.path.join( + os.path.dirname(__file__), + "samples", + "simple.tiff", + ) + pdf_file = os.path.join( + os.path.dirname(__file__), + "samples", + "simple.pdf", + ) + png_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-128-custom.png", + ) + tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1") + pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2") + shutil.copy(tiff_file, tiff_file_no_extension) + shutil.copy(pdf_file, pdf_file_no_extension) + + self.assertEqual(barcodes.get_file_mime_type(tiff_file), "image/tiff") + self.assertEqual(barcodes.get_file_mime_type(pdf_file), "application/pdf") + self.assertEqual( + barcodes.get_file_mime_type(tiff_file_no_extension), + "image/tiff", + ) + self.assertEqual( + barcodes.get_file_mime_type(pdf_file_no_extension), + "application/pdf", + ) + self.assertEqual(barcodes.get_file_mime_type(png_file), "image/png") + + def test_convert_from_tiff_to_pdf(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "simple.tiff", + ) + dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff") + shutil.copy(test_file, dst) + target_file = barcodes.convert_from_tiff_to_pdf(dst) + file_extension = os.path.splitext(os.path.basename(target_file))[1] + self.assertTrue(os.path.isfile(target_file)) + self.assertEqual(file_extension, ".pdf") + + def test_convert_error_from_pdf_to_pdf(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "simple.pdf", + ) + dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf") + shutil.copy(test_file, dst) + self.assertIsNone(barcodes.convert_from_tiff_to_pdf(dst)) + + def test_scan_file_for_separating_barcodes(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [0]) + + def test_scan_file_for_separating_barcodes2(self): + test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, []) + + def test_scan_file_for_separating_barcodes3(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [1]) + + def test_scan_file_for_separating_barcodes4(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "several-patcht-codes.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [2, 5]) + + def test_scan_file_for_separating_barcodes_upsidedown(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle_reverse.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [1]) + + def test_scan_file_for_separating_qr_barcodes(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-qr.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [0]) + + @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") + def test_scan_file_for_separating_custom_barcodes(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-custom.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [0]) + + @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") + def test_scan_file_for_separating_custom_qr_barcodes(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-qr-custom.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [0]) + + @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") + def test_scan_file_for_separating_custom_128_barcodes(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-128-custom.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, [0]) + + def test_scan_file_for_separating_wrong_qr_barcodes(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "barcode-39-custom.pdf", + ) + pages = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertEqual(pages, []) + + def test_separate_pages(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.pdf", + ) + pages = barcodes.separate_pages(test_file, [1]) + self.assertEqual(len(pages), 2) + + def test_separate_pages_no_list(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.pdf", + ) + with self.assertLogs("paperless.barcodes", level="WARNING") as cm: + pages = barcodes.separate_pages(test_file, []) + self.assertEqual(pages, []) + self.assertEqual( + cm.output, + [ + f"WARNING:paperless.barcodes:No pages to split on!", + ], + ) + + def test_save_to_dir(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t.pdf", + ) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + barcodes.save_to_dir(test_file, target_dir=tempdir) + target_file = os.path.join(tempdir, "patch-code-t.pdf") + self.assertTrue(os.path.isfile(target_file)) + + def test_save_to_dir2(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t.pdf", + ) + nonexistingdir = "/nowhere" + if os.path.isdir(nonexistingdir): + self.fail("non-existing dir exists") + else: + with self.assertLogs("paperless.barcodes", level="WARNING") as cm: + barcodes.save_to_dir(test_file, target_dir=nonexistingdir) + self.assertEqual( + cm.output, + [ + f"WARNING:paperless.barcodes:{str(test_file)} or {str(nonexistingdir)} don't exist.", + ], + ) + + def test_save_to_dir3(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t.pdf", + ) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + barcodes.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir) + target_file = os.path.join(tempdir, "newname.pdf") + self.assertTrue(os.path.isfile(target_file)) + + def test_barcode_splitter(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.pdf", + ) + tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) + separators = barcodes.scan_file_for_separating_barcodes(test_file) + self.assertTrue(separators) + document_list = barcodes.separate_pages(test_file, separators) + self.assertTrue(document_list) + for document in document_list: + barcodes.save_to_dir(document, target_dir=tempdir) + target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf") + target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf") + self.assertTrue(os.path.isfile(target_file1)) + self.assertTrue(os.path.isfile(target_file2)) + + @override_settings(CONSUMER_ENABLE_BARCODES=True) + def test_consume_barcode_file(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.pdf", + ) + dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf") + shutil.copy(test_file, dst) + + self.assertEqual(tasks.consume_file(dst), "File successfully split") + + @override_settings( + CONSUMER_ENABLE_BARCODES=True, + CONSUMER_BARCODE_TIFF_SUPPORT=True, + ) + def test_consume_barcode_tiff_file(self): + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.tiff", + ) + dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff") + shutil.copy(test_file, dst) + + self.assertEqual(tasks.consume_file(dst), "File successfully split") + + @override_settings( + CONSUMER_ENABLE_BARCODES=True, + CONSUMER_BARCODE_TIFF_SUPPORT=True, + ) + @mock.patch("documents.consumer.Consumer.try_consume_file") + def test_consume_barcode_unsupported_jpg_file(self, m): + """ + This test assumes barcode and TIFF support are enabled and + the user uploads an unsupported image file (e.g. jpg) + + The function shouldn't try to scan for separating barcodes + and continue archiving the file as is. + """ + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "simple.jpg", + ) + dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg") + shutil.copy(test_file, dst) + with self.assertLogs("paperless.tasks", level="WARNING") as cm: + self.assertIn("Success", tasks.consume_file(dst)) + self.assertListEqual( + cm.output, + [ + "WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg", + ], + ) + m.assert_called_once() + + args, kwargs = m.call_args + self.assertIsNone(kwargs["override_filename"]) + self.assertIsNone(kwargs["override_title"]) + self.assertIsNone(kwargs["override_correspondent_id"]) + self.assertIsNone(kwargs["override_document_type_id"]) + self.assertIsNone(kwargs["override_tag_ids"]) + + @override_settings( + CONSUMER_ENABLE_BARCODES=True, + CONSUMER_BARCODE_TIFF_SUPPORT=True, + ) + def test_consume_barcode_supported_no_extension_file(self): + """ + This test assumes barcode and TIFF support are enabled and + the user uploads a supported image file, but without extension + """ + test_file = os.path.join( + os.path.dirname(__file__), + "samples", + "barcodes", + "patch-code-t-middle.tiff", + ) + dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle") + shutil.copy(test_file, dst) + + self.assertEqual(tasks.consume_file(dst), "File successfully split") diff --git a/src/documents/tests/test_tasks.py b/src/documents/tests/test_tasks.py index 4db6e3369..a2b4ef000 100644 --- a/src/documents/tests/test_tasks.py +++ b/src/documents/tests/test_tasks.py @@ -1,10 +1,7 @@ import os -import shutil -import tempfile from unittest import mock from django.conf import settings -from django.test import override_settings from django.test import TestCase from django.utils import timezone from documents import tasks @@ -15,7 +12,6 @@ from documents.models import Tag from documents.sanity_checker import SanityCheckFailedException from documents.sanity_checker import SanityCheckMessages from documents.tests.utils import DirectoriesMixin -from PIL import Image class TestIndexReindex(DirectoriesMixin, TestCase): @@ -96,444 +92,6 @@ class TestClassifier(DirectoriesMixin, TestCase): self.assertNotEqual(mtime2, mtime3) -class TestBarcode(DirectoriesMixin, TestCase): - def test_barcode_reader(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-PATCHT.png", - ) - img = Image.open(test_file) - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) - - def test_barcode_reader2(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t.pbm", - ) - img = Image.open(test_file) - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) - - def test_barcode_reader_distorsion(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-PATCHT-distorsion.png", - ) - img = Image.open(test_file) - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) - - def test_barcode_reader_distorsion2(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-PATCHT-distorsion2.png", - ) - img = Image.open(test_file) - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) - - def test_barcode_reader_unreadable(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-PATCHT-unreadable.png", - ) - img = Image.open(test_file) - self.assertEqual(tasks.barcode_reader(img), []) - - def test_barcode_reader_qr(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "qr-code-PATCHT.png", - ) - img = Image.open(test_file) - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) - - def test_barcode_reader_128(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-128-PATCHT.png", - ) - img = Image.open(test_file) - separator_barcode = str(settings.CONSUMER_BARCODE_STRING) - self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) - - def test_barcode_reader_no_barcode(self): - test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png") - img = Image.open(test_file) - self.assertEqual(tasks.barcode_reader(img), []) - - def test_barcode_reader_custom_separator(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-custom.png", - ) - img = Image.open(test_file) - self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"]) - - def test_barcode_reader_custom_qr_separator(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-qr-custom.png", - ) - img = Image.open(test_file) - self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"]) - - def test_barcode_reader_custom_128_separator(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-128-custom.png", - ) - img = Image.open(test_file) - self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"]) - - def test_get_mime_type(self): - tiff_file = os.path.join( - os.path.dirname(__file__), - "samples", - "simple.tiff", - ) - pdf_file = os.path.join( - os.path.dirname(__file__), - "samples", - "simple.pdf", - ) - png_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-128-custom.png", - ) - tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1") - pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2") - shutil.copy(tiff_file, tiff_file_no_extension) - shutil.copy(pdf_file, pdf_file_no_extension) - - self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff") - self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf") - self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff") - self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf") - self.assertEqual(tasks.get_file_type(png_file), "image/png") - - def test_convert_from_tiff_to_pdf(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "simple.tiff", - ) - dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff") - shutil.copy(test_file, dst) - target_file = tasks.convert_from_tiff_to_pdf(dst) - file_extension = os.path.splitext(os.path.basename(target_file))[1] - self.assertTrue(os.path.isfile(target_file)) - self.assertEqual(file_extension, ".pdf") - - def test_convert_error_from_pdf_to_pdf(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "simple.pdf", - ) - dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf") - shutil.copy(test_file, dst) - self.assertIsNone(tasks.convert_from_tiff_to_pdf(dst)) - - def test_scan_file_for_separating_barcodes(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [0]) - - def test_scan_file_for_separating_barcodes2(self): - test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, []) - - def test_scan_file_for_separating_barcodes3(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [1]) - - def test_scan_file_for_separating_barcodes4(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "several-patcht-codes.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [2, 5]) - - def test_scan_file_for_separating_barcodes_upsidedown(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle_reverse.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [1]) - - def test_scan_file_for_separating_qr_barcodes(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-qr.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [0]) - - @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") - def test_scan_file_for_separating_custom_barcodes(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-custom.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [0]) - - @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") - def test_scan_file_for_separating_custom_qr_barcodes(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-qr-custom.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [0]) - - @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") - def test_scan_file_for_separating_custom_128_barcodes(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-128-custom.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, [0]) - - def test_scan_file_for_separating_wrong_qr_barcodes(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "barcode-39-custom.pdf", - ) - pages = tasks.scan_file_for_separating_barcodes(test_file) - self.assertEqual(pages, []) - - def test_separate_pages(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.pdf", - ) - pages = tasks.separate_pages(test_file, [1]) - self.assertEqual(len(pages), 2) - - def test_separate_pages_no_list(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.pdf", - ) - with self.assertLogs("paperless.tasks", level="WARNING") as cm: - pages = tasks.separate_pages(test_file, []) - self.assertEqual(pages, []) - self.assertEqual( - cm.output, - [ - f"WARNING:paperless.tasks:No pages to split on!", - ], - ) - - def test_save_to_dir(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t.pdf", - ) - tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) - tasks.save_to_dir(test_file, target_dir=tempdir) - target_file = os.path.join(tempdir, "patch-code-t.pdf") - self.assertTrue(os.path.isfile(target_file)) - - def test_save_to_dir2(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t.pdf", - ) - nonexistingdir = "/nowhere" - if os.path.isdir(nonexistingdir): - self.fail("non-existing dir exists") - else: - with self.assertLogs("paperless.tasks", level="WARNING") as cm: - tasks.save_to_dir(test_file, target_dir=nonexistingdir) - self.assertEqual( - cm.output, - [ - f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist.", - ], - ) - - def test_save_to_dir3(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t.pdf", - ) - tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) - tasks.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir) - target_file = os.path.join(tempdir, "newname.pdf") - self.assertTrue(os.path.isfile(target_file)) - - def test_barcode_splitter(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.pdf", - ) - tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) - separators = tasks.scan_file_for_separating_barcodes(test_file) - self.assertTrue(separators) - document_list = tasks.separate_pages(test_file, separators) - self.assertTrue(document_list) - for document in document_list: - tasks.save_to_dir(document, target_dir=tempdir) - target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf") - target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf") - self.assertTrue(os.path.isfile(target_file1)) - self.assertTrue(os.path.isfile(target_file2)) - - @override_settings(CONSUMER_ENABLE_BARCODES=True) - def test_consume_barcode_file(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.pdf", - ) - dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf") - shutil.copy(test_file, dst) - - self.assertEqual(tasks.consume_file(dst), "File successfully split") - - @override_settings( - CONSUMER_ENABLE_BARCODES=True, - CONSUMER_BARCODE_TIFF_SUPPORT=True, - ) - def test_consume_barcode_tiff_file(self): - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.tiff", - ) - dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff") - shutil.copy(test_file, dst) - - self.assertEqual(tasks.consume_file(dst), "File successfully split") - - @override_settings( - CONSUMER_ENABLE_BARCODES=True, - CONSUMER_BARCODE_TIFF_SUPPORT=True, - ) - @mock.patch("documents.consumer.Consumer.try_consume_file") - def test_consume_barcode_unsupported_jpg_file(self, m): - """ - This test assumes barcode and TIFF support are enabled and - the user uploads an unsupported image file (e.g. jpg) - - The function shouldn't try to scan for separating barcodes - and continue archiving the file as is. - """ - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "simple.jpg", - ) - dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg") - shutil.copy(test_file, dst) - with self.assertLogs("paperless.tasks", level="WARNING") as cm: - self.assertIn("Success", tasks.consume_file(dst)) - self.assertEqual( - cm.output, - [ - "WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg", - ], - ) - m.assert_called_once() - - args, kwargs = m.call_args - self.assertIsNone(kwargs["override_filename"]) - self.assertIsNone(kwargs["override_title"]) - self.assertIsNone(kwargs["override_correspondent_id"]) - self.assertIsNone(kwargs["override_document_type_id"]) - self.assertIsNone(kwargs["override_tag_ids"]) - - @override_settings( - CONSUMER_ENABLE_BARCODES=True, - CONSUMER_BARCODE_TIFF_SUPPORT=True, - ) - def test_consume_barcode_supported_no_extension_file(self): - """ - This test assumes barcode and TIFF support are enabled and - the user uploads a supported image file, but without extension - """ - test_file = os.path.join( - os.path.dirname(__file__), - "samples", - "barcodes", - "patch-code-t-middle.tiff", - ) - dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle") - shutil.copy(test_file, dst) - - self.assertEqual(tasks.consume_file(dst), "File successfully split") - - class TestSanityCheck(DirectoriesMixin, TestCase): @mock.patch("documents.tasks.sanity_checker.check_sanity") def test_sanity_check_success(self, m):