mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-11 10:00:48 -05:00
Moves the barcode related functionality out of tasks and into its own location. Splits up the testing based on that
This commit is contained in:
parent
d8a7828cb5
commit
ec045e81f2
186
src/documents/barcodes.py
Normal file
186
src/documents/barcodes.py
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
from functools import lru_cache
|
||||||
|
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
|
||||||
|
|
||||||
|
import magic
|
||||||
|
from django.conf import settings
|
||||||
|
from pdf2image import convert_from_path
|
||||||
|
from pikepdf import Pdf
|
||||||
|
from PIL import Image
|
||||||
|
from PIL import ImageSequence
|
||||||
|
from pyzbar import pyzbar
|
||||||
|
|
||||||
|
logger = logging.getLogger("paperless.barcodes")
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=8)
|
||||||
|
def supported_file_type(mime_type) -> bool:
|
||||||
|
"""
|
||||||
|
Determines if the file is valid for barcode
|
||||||
|
processing, based on MIME type and settings
|
||||||
|
|
||||||
|
:return: True if the file is supported, False otherwise
|
||||||
|
"""
|
||||||
|
supported_mime = ["application/pdf"]
|
||||||
|
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
|
||||||
|
supported_mime += ["image/tiff"]
|
||||||
|
|
||||||
|
return mime_type in supported_mime
|
||||||
|
|
||||||
|
|
||||||
|
def barcode_reader(image) -> List[str]:
|
||||||
|
"""
|
||||||
|
Read any barcodes contained in image
|
||||||
|
Returns a list containing all found barcodes
|
||||||
|
"""
|
||||||
|
barcodes = []
|
||||||
|
# Decode the barcode image
|
||||||
|
detected_barcodes = pyzbar.decode(image)
|
||||||
|
|
||||||
|
if detected_barcodes:
|
||||||
|
# Traverse through all the detected barcodes in image
|
||||||
|
for barcode in detected_barcodes:
|
||||||
|
if barcode.data:
|
||||||
|
decoded_barcode = barcode.data.decode("utf-8")
|
||||||
|
barcodes.append(decoded_barcode)
|
||||||
|
logger.debug(
|
||||||
|
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
|
||||||
|
)
|
||||||
|
return barcodes
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_mime_type(path: str) -> str:
|
||||||
|
"""
|
||||||
|
Determines the file type, based on MIME type.
|
||||||
|
|
||||||
|
Returns the MIME type.
|
||||||
|
"""
|
||||||
|
mime_type = magic.from_file(path, mime=True)
|
||||||
|
logger.debug(f"Detected mime type: {mime_type}")
|
||||||
|
return mime_type
|
||||||
|
|
||||||
|
|
||||||
|
def convert_from_tiff_to_pdf(filepath: str) -> str:
|
||||||
|
"""
|
||||||
|
converts a given TIFF image file to pdf into a temporary directory.
|
||||||
|
|
||||||
|
Returns the new pdf file.
|
||||||
|
"""
|
||||||
|
file_name = os.path.splitext(os.path.basename(filepath))[0]
|
||||||
|
mime_type = get_file_mime_type(filepath)
|
||||||
|
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||||
|
# use old file name with pdf extension
|
||||||
|
if mime_type == "image/tiff":
|
||||||
|
newpath = os.path.join(tempdir, file_name + ".pdf")
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
with Image.open(filepath) as image:
|
||||||
|
images = []
|
||||||
|
for i, page in enumerate(ImageSequence.Iterator(image)):
|
||||||
|
page = page.convert("RGB")
|
||||||
|
images.append(page)
|
||||||
|
try:
|
||||||
|
if len(images) == 1:
|
||||||
|
images[0].save(newpath)
|
||||||
|
else:
|
||||||
|
images[0].save(newpath, save_all=True, append_images=images[1:])
|
||||||
|
except OSError as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Could not save the file as pdf. Error: {str(e)}",
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return newpath
|
||||||
|
|
||||||
|
|
||||||
|
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
|
||||||
|
"""
|
||||||
|
Scan the provided pdf file for page separating barcodes
|
||||||
|
Returns a list of pagenumbers, which separate the file
|
||||||
|
"""
|
||||||
|
separator_page_numbers = []
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
# use a temporary directory in case the file os too big to handle in memory
|
||||||
|
with tempfile.TemporaryDirectory() as path:
|
||||||
|
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||||
|
for current_page_number, page in enumerate(pages_from_path):
|
||||||
|
current_barcodes = barcode_reader(page)
|
||||||
|
if separator_barcode in current_barcodes:
|
||||||
|
separator_page_numbers.append(current_page_number)
|
||||||
|
return separator_page_numbers
|
||||||
|
|
||||||
|
|
||||||
|
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
|
||||||
|
"""
|
||||||
|
Separate the provided pdf file on the pages_to_split_on.
|
||||||
|
The pages which are defined by page_numbers will be removed.
|
||||||
|
Returns a list of (temporary) filepaths to consume.
|
||||||
|
These will need to be deleted later.
|
||||||
|
"""
|
||||||
|
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||||
|
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||||
|
fname = os.path.splitext(os.path.basename(filepath))[0]
|
||||||
|
pdf = Pdf.open(filepath)
|
||||||
|
document_paths = []
|
||||||
|
logger.debug(f"Temp dir is {str(tempdir)}")
|
||||||
|
if not pages_to_split_on:
|
||||||
|
logger.warning("No pages to split on!")
|
||||||
|
else:
|
||||||
|
# go from the first page to the first separator page
|
||||||
|
dst = Pdf.new()
|
||||||
|
for n, page in enumerate(pdf.pages):
|
||||||
|
if n < pages_to_split_on[0]:
|
||||||
|
dst.pages.append(page)
|
||||||
|
output_filename = f"{fname}_document_0.pdf"
|
||||||
|
savepath = os.path.join(tempdir, output_filename)
|
||||||
|
with open(savepath, "wb") as out:
|
||||||
|
dst.save(out)
|
||||||
|
document_paths = [savepath]
|
||||||
|
|
||||||
|
# iterate through the rest of the document
|
||||||
|
for count, page_number in enumerate(pages_to_split_on):
|
||||||
|
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
||||||
|
dst = Pdf.new()
|
||||||
|
try:
|
||||||
|
next_page = pages_to_split_on[count + 1]
|
||||||
|
except IndexError:
|
||||||
|
next_page = len(pdf.pages)
|
||||||
|
# skip the first page_number. This contains the barcode page
|
||||||
|
for page in range(page_number + 1, next_page):
|
||||||
|
logger.debug(
|
||||||
|
f"page_number: {str(page_number)} next_page: {str(next_page)}",
|
||||||
|
)
|
||||||
|
dst.pages.append(pdf.pages[page])
|
||||||
|
output_filename = f"{fname}_document_{str(count + 1)}.pdf"
|
||||||
|
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
||||||
|
savepath = os.path.join(tempdir, output_filename)
|
||||||
|
with open(savepath, "wb") as out:
|
||||||
|
dst.save(out)
|
||||||
|
document_paths.append(savepath)
|
||||||
|
logger.debug(f"Temp files are {str(document_paths)}")
|
||||||
|
return document_paths
|
||||||
|
|
||||||
|
|
||||||
|
def save_to_dir(
|
||||||
|
filepath: str,
|
||||||
|
newname: str = None,
|
||||||
|
target_dir: str = settings.CONSUMPTION_DIR,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Copies filepath to target_dir.
|
||||||
|
Optionally rename the file.
|
||||||
|
"""
|
||||||
|
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
||||||
|
dst = shutil.copy(filepath, target_dir)
|
||||||
|
logging.debug(f"saved {str(filepath)} to {str(dst)}")
|
||||||
|
if newname:
|
||||||
|
dst_new = os.path.join(target_dir, newname)
|
||||||
|
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
|
||||||
|
os.rename(dst, dst_new)
|
||||||
|
else:
|
||||||
|
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
@ -1,15 +1,12 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import tempfile
|
|
||||||
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
|
|
||||||
|
|
||||||
import magic
|
|
||||||
import tqdm
|
import tqdm
|
||||||
from asgiref.sync import async_to_sync
|
from asgiref.sync import async_to_sync
|
||||||
from channels.layers import get_channel_layer
|
from channels.layers import get_channel_layer
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db.models.signals import post_save
|
from django.db.models.signals import post_save
|
||||||
|
from documents import barcodes
|
||||||
from documents import index
|
from documents import index
|
||||||
from documents import sanity_checker
|
from documents import sanity_checker
|
||||||
from documents.classifier import DocumentClassifier
|
from documents.classifier import DocumentClassifier
|
||||||
@ -22,11 +19,6 @@ from documents.models import DocumentType
|
|||||||
from documents.models import StoragePath
|
from documents.models import StoragePath
|
||||||
from documents.models import Tag
|
from documents.models import Tag
|
||||||
from documents.sanity_checker import SanityCheckFailedException
|
from documents.sanity_checker import SanityCheckFailedException
|
||||||
from pdf2image import convert_from_path
|
|
||||||
from pikepdf import Pdf
|
|
||||||
from PIL import Image
|
|
||||||
from PIL import ImageSequence
|
|
||||||
from pyzbar import pyzbar
|
|
||||||
from whoosh.writing import AsyncWriter
|
from whoosh.writing import AsyncWriter
|
||||||
|
|
||||||
|
|
||||||
@ -77,161 +69,6 @@ def train_classifier():
|
|||||||
logger.warning("Classifier error: " + str(e))
|
logger.warning("Classifier error: " + str(e))
|
||||||
|
|
||||||
|
|
||||||
def barcode_reader(image) -> List[str]:
|
|
||||||
"""
|
|
||||||
Read any barcodes contained in image
|
|
||||||
Returns a list containing all found barcodes
|
|
||||||
"""
|
|
||||||
barcodes = []
|
|
||||||
# Decode the barcode image
|
|
||||||
detected_barcodes = pyzbar.decode(image)
|
|
||||||
|
|
||||||
if detected_barcodes:
|
|
||||||
# Traverse through all the detected barcodes in image
|
|
||||||
for barcode in detected_barcodes:
|
|
||||||
if barcode.data:
|
|
||||||
decoded_barcode = barcode.data.decode("utf-8")
|
|
||||||
barcodes.append(decoded_barcode)
|
|
||||||
logger.debug(
|
|
||||||
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
|
|
||||||
)
|
|
||||||
return barcodes
|
|
||||||
|
|
||||||
|
|
||||||
def get_file_type(path: str) -> str:
|
|
||||||
"""
|
|
||||||
Determines the file type, based on MIME type.
|
|
||||||
|
|
||||||
Returns the MIME type.
|
|
||||||
"""
|
|
||||||
mime_type = magic.from_file(path, mime=True)
|
|
||||||
logger.debug(f"Detected mime type: {mime_type}")
|
|
||||||
return mime_type
|
|
||||||
|
|
||||||
|
|
||||||
def convert_from_tiff_to_pdf(filepath: str) -> str:
|
|
||||||
"""
|
|
||||||
converts a given TIFF image file to pdf into a temporary directory.
|
|
||||||
|
|
||||||
Returns the new pdf file.
|
|
||||||
"""
|
|
||||||
file_name = os.path.splitext(os.path.basename(filepath))[0]
|
|
||||||
mime_type = get_file_type(filepath)
|
|
||||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
|
||||||
# use old file name with pdf extension
|
|
||||||
if mime_type == "image/tiff":
|
|
||||||
newpath = os.path.join(tempdir, file_name + ".pdf")
|
|
||||||
else:
|
|
||||||
logger.warning(
|
|
||||||
f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
with Image.open(filepath) as image:
|
|
||||||
images = []
|
|
||||||
for i, page in enumerate(ImageSequence.Iterator(image)):
|
|
||||||
page = page.convert("RGB")
|
|
||||||
images.append(page)
|
|
||||||
try:
|
|
||||||
if len(images) == 1:
|
|
||||||
images[0].save(newpath)
|
|
||||||
else:
|
|
||||||
images[0].save(newpath, save_all=True, append_images=images[1:])
|
|
||||||
except OSError as e:
|
|
||||||
logger.warning(
|
|
||||||
f"Could not save the file as pdf. Error: {str(e)}",
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
return newpath
|
|
||||||
|
|
||||||
|
|
||||||
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
|
|
||||||
"""
|
|
||||||
Scan the provided pdf file for page separating barcodes
|
|
||||||
Returns a list of pagenumbers, which separate the file
|
|
||||||
"""
|
|
||||||
separator_page_numbers = []
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
# use a temporary directory in case the file os too big to handle in memory
|
|
||||||
with tempfile.TemporaryDirectory() as path:
|
|
||||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
|
||||||
for current_page_number, page in enumerate(pages_from_path):
|
|
||||||
current_barcodes = barcode_reader(page)
|
|
||||||
if separator_barcode in current_barcodes:
|
|
||||||
separator_page_numbers.append(current_page_number)
|
|
||||||
return separator_page_numbers
|
|
||||||
|
|
||||||
|
|
||||||
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
|
|
||||||
"""
|
|
||||||
Separate the provided pdf file on the pages_to_split_on.
|
|
||||||
The pages which are defined by page_numbers will be removed.
|
|
||||||
Returns a list of (temporary) filepaths to consume.
|
|
||||||
These will need to be deleted later.
|
|
||||||
"""
|
|
||||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
|
||||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
|
||||||
fname = os.path.splitext(os.path.basename(filepath))[0]
|
|
||||||
pdf = Pdf.open(filepath)
|
|
||||||
document_paths = []
|
|
||||||
logger.debug(f"Temp dir is {str(tempdir)}")
|
|
||||||
if not pages_to_split_on:
|
|
||||||
logger.warning("No pages to split on!")
|
|
||||||
else:
|
|
||||||
# go from the first page to the first separator page
|
|
||||||
dst = Pdf.new()
|
|
||||||
for n, page in enumerate(pdf.pages):
|
|
||||||
if n < pages_to_split_on[0]:
|
|
||||||
dst.pages.append(page)
|
|
||||||
output_filename = f"{fname}_document_0.pdf"
|
|
||||||
savepath = os.path.join(tempdir, output_filename)
|
|
||||||
with open(savepath, "wb") as out:
|
|
||||||
dst.save(out)
|
|
||||||
document_paths = [savepath]
|
|
||||||
|
|
||||||
# iterate through the rest of the document
|
|
||||||
for count, page_number in enumerate(pages_to_split_on):
|
|
||||||
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
|
||||||
dst = Pdf.new()
|
|
||||||
try:
|
|
||||||
next_page = pages_to_split_on[count + 1]
|
|
||||||
except IndexError:
|
|
||||||
next_page = len(pdf.pages)
|
|
||||||
# skip the first page_number. This contains the barcode page
|
|
||||||
for page in range(page_number + 1, next_page):
|
|
||||||
logger.debug(
|
|
||||||
f"page_number: {str(page_number)} next_page: {str(next_page)}",
|
|
||||||
)
|
|
||||||
dst.pages.append(pdf.pages[page])
|
|
||||||
output_filename = f"{fname}_document_{str(count + 1)}.pdf"
|
|
||||||
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
|
||||||
savepath = os.path.join(tempdir, output_filename)
|
|
||||||
with open(savepath, "wb") as out:
|
|
||||||
dst.save(out)
|
|
||||||
document_paths.append(savepath)
|
|
||||||
logger.debug(f"Temp files are {str(document_paths)}")
|
|
||||||
return document_paths
|
|
||||||
|
|
||||||
|
|
||||||
def save_to_dir(
|
|
||||||
filepath: str,
|
|
||||||
newname: str = None,
|
|
||||||
target_dir: str = settings.CONSUMPTION_DIR,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Copies filepath to target_dir.
|
|
||||||
Optionally rename the file.
|
|
||||||
"""
|
|
||||||
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
|
||||||
dst = shutil.copy(filepath, target_dir)
|
|
||||||
logging.debug(f"saved {str(filepath)} to {str(dst)}")
|
|
||||||
if newname:
|
|
||||||
dst_new = os.path.join(target_dir, newname)
|
|
||||||
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
|
|
||||||
os.rename(dst, dst_new)
|
|
||||||
else:
|
|
||||||
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
|
||||||
|
|
||||||
|
|
||||||
def consume_file(
|
def consume_file(
|
||||||
path,
|
path,
|
||||||
override_filename=None,
|
override_filename=None,
|
||||||
@ -245,32 +82,31 @@ def consume_file(
|
|||||||
|
|
||||||
# check for separators in current document
|
# check for separators in current document
|
||||||
if settings.CONSUMER_ENABLE_BARCODES:
|
if settings.CONSUMER_ENABLE_BARCODES:
|
||||||
separators = []
|
|
||||||
document_list = []
|
mime_type = barcodes.get_file_mime_type(path)
|
||||||
converted_tiff = None
|
|
||||||
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
|
if not barcodes.supported_file_type(mime_type):
|
||||||
supported_mime = ["image/tiff", "application/pdf"]
|
|
||||||
else:
|
|
||||||
supported_mime = ["application/pdf"]
|
|
||||||
mime_type = get_file_type(path)
|
|
||||||
if mime_type not in supported_mime:
|
|
||||||
# if not supported, skip this routine
|
# if not supported, skip this routine
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Unsupported file format for barcode reader: {str(mime_type)}",
|
f"Unsupported file format for barcode reader: {str(mime_type)}",
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
separators = []
|
||||||
|
document_list = []
|
||||||
|
converted_tiff = None
|
||||||
|
|
||||||
if mime_type == "image/tiff":
|
if mime_type == "image/tiff":
|
||||||
file_to_process = convert_from_tiff_to_pdf(path)
|
file_to_process = barcodes.convert_from_tiff_to_pdf(path)
|
||||||
else:
|
else:
|
||||||
file_to_process = path
|
file_to_process = path
|
||||||
|
|
||||||
separators = scan_file_for_separating_barcodes(file_to_process)
|
separators = barcodes.scan_file_for_separating_barcodes(file_to_process)
|
||||||
|
|
||||||
if separators:
|
if separators:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Pages with separators found in: {str(path)}",
|
f"Pages with separators found in: {str(path)}",
|
||||||
)
|
)
|
||||||
document_list = separate_pages(file_to_process, separators)
|
document_list = barcodes.separate_pages(file_to_process, separators)
|
||||||
|
|
||||||
if document_list:
|
if document_list:
|
||||||
for n, document in enumerate(document_list):
|
for n, document in enumerate(document_list):
|
||||||
@ -280,10 +116,10 @@ def consume_file(
|
|||||||
newname = f"{str(n)}_" + override_filename
|
newname = f"{str(n)}_" + override_filename
|
||||||
else:
|
else:
|
||||||
newname = None
|
newname = None
|
||||||
save_to_dir(document, newname=newname)
|
barcodes.save_to_dir(document, newname=newname)
|
||||||
# if we got here, the document was successfully split
|
# if we got here, the document was successfully split
|
||||||
# and can safely be deleted
|
# and can safely be deleted
|
||||||
if converted_tiff:
|
if converted_tiff is not None:
|
||||||
logger.debug(f"Deleting file {file_to_process}")
|
logger.debug(f"Deleting file {file_to_process}")
|
||||||
os.unlink(file_to_process)
|
os.unlink(file_to_process)
|
||||||
logger.debug(f"Deleting file {path}")
|
logger.debug(f"Deleting file {path}")
|
||||||
|
456
src/documents/tests/test_barcodes.py
Normal file
456
src/documents/tests/test_barcodes.py
Normal file
@ -0,0 +1,456 @@
|
|||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
from django.test import override_settings
|
||||||
|
from django.test import TestCase
|
||||||
|
from documents import barcodes
|
||||||
|
from documents import tasks
|
||||||
|
from documents.tests.utils import DirectoriesMixin
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
|
||||||
|
class TestBarcode(DirectoriesMixin, TestCase):
|
||||||
|
def test_barcode_reader(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-PATCHT.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||||
|
|
||||||
|
def test_barcode_reader2(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t.pbm",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||||
|
|
||||||
|
def test_barcode_reader_distorsion(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-PATCHT-distorsion.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||||
|
|
||||||
|
def test_barcode_reader_distorsion2(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-PATCHT-distorsion2.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||||
|
|
||||||
|
def test_barcode_reader_unreadable(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-PATCHT-unreadable.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [])
|
||||||
|
|
||||||
|
def test_barcode_reader_qr(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"qr-code-PATCHT.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||||
|
|
||||||
|
def test_barcode_reader_128(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-128-PATCHT.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||||
|
|
||||||
|
def test_barcode_reader_no_barcode(self):
|
||||||
|
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
||||||
|
img = Image.open(test_file)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), [])
|
||||||
|
|
||||||
|
def test_barcode_reader_custom_separator(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-custom.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||||
|
|
||||||
|
def test_barcode_reader_custom_qr_separator(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-qr-custom.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||||
|
|
||||||
|
def test_barcode_reader_custom_128_separator(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-128-custom.png",
|
||||||
|
)
|
||||||
|
img = Image.open(test_file)
|
||||||
|
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||||
|
|
||||||
|
def test_get_mime_type(self):
|
||||||
|
tiff_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"simple.tiff",
|
||||||
|
)
|
||||||
|
pdf_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"simple.pdf",
|
||||||
|
)
|
||||||
|
png_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-128-custom.png",
|
||||||
|
)
|
||||||
|
tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
|
||||||
|
pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
|
||||||
|
shutil.copy(tiff_file, tiff_file_no_extension)
|
||||||
|
shutil.copy(pdf_file, pdf_file_no_extension)
|
||||||
|
|
||||||
|
self.assertEqual(barcodes.get_file_mime_type(tiff_file), "image/tiff")
|
||||||
|
self.assertEqual(barcodes.get_file_mime_type(pdf_file), "application/pdf")
|
||||||
|
self.assertEqual(
|
||||||
|
barcodes.get_file_mime_type(tiff_file_no_extension),
|
||||||
|
"image/tiff",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
barcodes.get_file_mime_type(pdf_file_no_extension),
|
||||||
|
"application/pdf",
|
||||||
|
)
|
||||||
|
self.assertEqual(barcodes.get_file_mime_type(png_file), "image/png")
|
||||||
|
|
||||||
|
def test_convert_from_tiff_to_pdf(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"simple.tiff",
|
||||||
|
)
|
||||||
|
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
|
||||||
|
shutil.copy(test_file, dst)
|
||||||
|
target_file = barcodes.convert_from_tiff_to_pdf(dst)
|
||||||
|
file_extension = os.path.splitext(os.path.basename(target_file))[1]
|
||||||
|
self.assertTrue(os.path.isfile(target_file))
|
||||||
|
self.assertEqual(file_extension, ".pdf")
|
||||||
|
|
||||||
|
def test_convert_error_from_pdf_to_pdf(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"simple.pdf",
|
||||||
|
)
|
||||||
|
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
|
||||||
|
shutil.copy(test_file, dst)
|
||||||
|
self.assertIsNone(barcodes.convert_from_tiff_to_pdf(dst))
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [0])
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_barcodes2(self):
|
||||||
|
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [])
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_barcodes3(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [1])
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_barcodes4(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"several-patcht-codes.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [2, 5])
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_barcodes_upsidedown(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle_reverse.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [1])
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_qr_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-qr.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [0])
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||||
|
def test_scan_file_for_separating_custom_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-custom.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [0])
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||||
|
def test_scan_file_for_separating_custom_qr_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-qr-custom.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [0])
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||||
|
def test_scan_file_for_separating_custom_128_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-128-custom.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [0])
|
||||||
|
|
||||||
|
def test_scan_file_for_separating_wrong_qr_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"barcode-39-custom.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [])
|
||||||
|
|
||||||
|
def test_separate_pages(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.pdf",
|
||||||
|
)
|
||||||
|
pages = barcodes.separate_pages(test_file, [1])
|
||||||
|
self.assertEqual(len(pages), 2)
|
||||||
|
|
||||||
|
def test_separate_pages_no_list(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.pdf",
|
||||||
|
)
|
||||||
|
with self.assertLogs("paperless.barcodes", level="WARNING") as cm:
|
||||||
|
pages = barcodes.separate_pages(test_file, [])
|
||||||
|
self.assertEqual(pages, [])
|
||||||
|
self.assertEqual(
|
||||||
|
cm.output,
|
||||||
|
[
|
||||||
|
f"WARNING:paperless.barcodes:No pages to split on!",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_save_to_dir(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t.pdf",
|
||||||
|
)
|
||||||
|
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||||
|
barcodes.save_to_dir(test_file, target_dir=tempdir)
|
||||||
|
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
||||||
|
self.assertTrue(os.path.isfile(target_file))
|
||||||
|
|
||||||
|
def test_save_to_dir2(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t.pdf",
|
||||||
|
)
|
||||||
|
nonexistingdir = "/nowhere"
|
||||||
|
if os.path.isdir(nonexistingdir):
|
||||||
|
self.fail("non-existing dir exists")
|
||||||
|
else:
|
||||||
|
with self.assertLogs("paperless.barcodes", level="WARNING") as cm:
|
||||||
|
barcodes.save_to_dir(test_file, target_dir=nonexistingdir)
|
||||||
|
self.assertEqual(
|
||||||
|
cm.output,
|
||||||
|
[
|
||||||
|
f"WARNING:paperless.barcodes:{str(test_file)} or {str(nonexistingdir)} don't exist.",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_save_to_dir3(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t.pdf",
|
||||||
|
)
|
||||||
|
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||||
|
barcodes.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
|
||||||
|
target_file = os.path.join(tempdir, "newname.pdf")
|
||||||
|
self.assertTrue(os.path.isfile(target_file))
|
||||||
|
|
||||||
|
def test_barcode_splitter(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.pdf",
|
||||||
|
)
|
||||||
|
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||||
|
separators = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||||
|
self.assertTrue(separators)
|
||||||
|
document_list = barcodes.separate_pages(test_file, separators)
|
||||||
|
self.assertTrue(document_list)
|
||||||
|
for document in document_list:
|
||||||
|
barcodes.save_to_dir(document, target_dir=tempdir)
|
||||||
|
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
||||||
|
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
||||||
|
self.assertTrue(os.path.isfile(target_file1))
|
||||||
|
self.assertTrue(os.path.isfile(target_file2))
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_ENABLE_BARCODES=True)
|
||||||
|
def test_consume_barcode_file(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.pdf",
|
||||||
|
)
|
||||||
|
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
|
||||||
|
shutil.copy(test_file, dst)
|
||||||
|
|
||||||
|
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||||
|
|
||||||
|
@override_settings(
|
||||||
|
CONSUMER_ENABLE_BARCODES=True,
|
||||||
|
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||||
|
)
|
||||||
|
def test_consume_barcode_tiff_file(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.tiff",
|
||||||
|
)
|
||||||
|
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
|
||||||
|
shutil.copy(test_file, dst)
|
||||||
|
|
||||||
|
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||||
|
|
||||||
|
@override_settings(
|
||||||
|
CONSUMER_ENABLE_BARCODES=True,
|
||||||
|
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||||
|
)
|
||||||
|
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||||
|
def test_consume_barcode_unsupported_jpg_file(self, m):
|
||||||
|
"""
|
||||||
|
This test assumes barcode and TIFF support are enabled and
|
||||||
|
the user uploads an unsupported image file (e.g. jpg)
|
||||||
|
|
||||||
|
The function shouldn't try to scan for separating barcodes
|
||||||
|
and continue archiving the file as is.
|
||||||
|
"""
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"simple.jpg",
|
||||||
|
)
|
||||||
|
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
|
||||||
|
shutil.copy(test_file, dst)
|
||||||
|
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||||
|
self.assertIn("Success", tasks.consume_file(dst))
|
||||||
|
self.assertListEqual(
|
||||||
|
cm.output,
|
||||||
|
[
|
||||||
|
"WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
m.assert_called_once()
|
||||||
|
|
||||||
|
args, kwargs = m.call_args
|
||||||
|
self.assertIsNone(kwargs["override_filename"])
|
||||||
|
self.assertIsNone(kwargs["override_title"])
|
||||||
|
self.assertIsNone(kwargs["override_correspondent_id"])
|
||||||
|
self.assertIsNone(kwargs["override_document_type_id"])
|
||||||
|
self.assertIsNone(kwargs["override_tag_ids"])
|
||||||
|
|
||||||
|
@override_settings(
|
||||||
|
CONSUMER_ENABLE_BARCODES=True,
|
||||||
|
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||||
|
)
|
||||||
|
def test_consume_barcode_supported_no_extension_file(self):
|
||||||
|
"""
|
||||||
|
This test assumes barcode and TIFF support are enabled and
|
||||||
|
the user uploads a supported image file, but without extension
|
||||||
|
"""
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__),
|
||||||
|
"samples",
|
||||||
|
"barcodes",
|
||||||
|
"patch-code-t-middle.tiff",
|
||||||
|
)
|
||||||
|
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
|
||||||
|
shutil.copy(test_file, dst)
|
||||||
|
|
||||||
|
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
@ -1,10 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import tempfile
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.test import override_settings
|
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from documents import tasks
|
from documents import tasks
|
||||||
@ -15,7 +12,6 @@ from documents.models import Tag
|
|||||||
from documents.sanity_checker import SanityCheckFailedException
|
from documents.sanity_checker import SanityCheckFailedException
|
||||||
from documents.sanity_checker import SanityCheckMessages
|
from documents.sanity_checker import SanityCheckMessages
|
||||||
from documents.tests.utils import DirectoriesMixin
|
from documents.tests.utils import DirectoriesMixin
|
||||||
from PIL import Image
|
|
||||||
|
|
||||||
|
|
||||||
class TestIndexReindex(DirectoriesMixin, TestCase):
|
class TestIndexReindex(DirectoriesMixin, TestCase):
|
||||||
@ -96,444 +92,6 @@ class TestClassifier(DirectoriesMixin, TestCase):
|
|||||||
self.assertNotEqual(mtime2, mtime3)
|
self.assertNotEqual(mtime2, mtime3)
|
||||||
|
|
||||||
|
|
||||||
class TestBarcode(DirectoriesMixin, TestCase):
|
|
||||||
def test_barcode_reader(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-PATCHT.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
|
||||||
|
|
||||||
def test_barcode_reader2(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t.pbm",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
|
||||||
|
|
||||||
def test_barcode_reader_distorsion(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-PATCHT-distorsion.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
|
||||||
|
|
||||||
def test_barcode_reader_distorsion2(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-PATCHT-distorsion2.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
|
||||||
|
|
||||||
def test_barcode_reader_unreadable(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-PATCHT-unreadable.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [])
|
|
||||||
|
|
||||||
def test_barcode_reader_qr(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"qr-code-PATCHT.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
|
||||||
|
|
||||||
def test_barcode_reader_128(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-128-PATCHT.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
|
||||||
|
|
||||||
def test_barcode_reader_no_barcode(self):
|
|
||||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
|
||||||
img = Image.open(test_file)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), [])
|
|
||||||
|
|
||||||
def test_barcode_reader_custom_separator(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-custom.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
|
||||||
|
|
||||||
def test_barcode_reader_custom_qr_separator(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-qr-custom.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
|
||||||
|
|
||||||
def test_barcode_reader_custom_128_separator(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-128-custom.png",
|
|
||||||
)
|
|
||||||
img = Image.open(test_file)
|
|
||||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
|
||||||
|
|
||||||
def test_get_mime_type(self):
|
|
||||||
tiff_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"simple.tiff",
|
|
||||||
)
|
|
||||||
pdf_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"simple.pdf",
|
|
||||||
)
|
|
||||||
png_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-128-custom.png",
|
|
||||||
)
|
|
||||||
tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
|
|
||||||
pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
|
|
||||||
shutil.copy(tiff_file, tiff_file_no_extension)
|
|
||||||
shutil.copy(pdf_file, pdf_file_no_extension)
|
|
||||||
|
|
||||||
self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff")
|
|
||||||
self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf")
|
|
||||||
self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff")
|
|
||||||
self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf")
|
|
||||||
self.assertEqual(tasks.get_file_type(png_file), "image/png")
|
|
||||||
|
|
||||||
def test_convert_from_tiff_to_pdf(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"simple.tiff",
|
|
||||||
)
|
|
||||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
|
|
||||||
shutil.copy(test_file, dst)
|
|
||||||
target_file = tasks.convert_from_tiff_to_pdf(dst)
|
|
||||||
file_extension = os.path.splitext(os.path.basename(target_file))[1]
|
|
||||||
self.assertTrue(os.path.isfile(target_file))
|
|
||||||
self.assertEqual(file_extension, ".pdf")
|
|
||||||
|
|
||||||
def test_convert_error_from_pdf_to_pdf(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"simple.pdf",
|
|
||||||
)
|
|
||||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
|
|
||||||
shutil.copy(test_file, dst)
|
|
||||||
self.assertIsNone(tasks.convert_from_tiff_to_pdf(dst))
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_barcodes(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [0])
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_barcodes2(self):
|
|
||||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [])
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_barcodes3(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [1])
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_barcodes4(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"several-patcht-codes.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [2, 5])
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_barcodes_upsidedown(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle_reverse.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [1])
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_qr_barcodes(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-qr.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [0])
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
|
||||||
def test_scan_file_for_separating_custom_barcodes(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-custom.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [0])
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
|
||||||
def test_scan_file_for_separating_custom_qr_barcodes(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-qr-custom.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [0])
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
|
||||||
def test_scan_file_for_separating_custom_128_barcodes(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-128-custom.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [0])
|
|
||||||
|
|
||||||
def test_scan_file_for_separating_wrong_qr_barcodes(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"barcode-39-custom.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertEqual(pages, [])
|
|
||||||
|
|
||||||
def test_separate_pages(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.pdf",
|
|
||||||
)
|
|
||||||
pages = tasks.separate_pages(test_file, [1])
|
|
||||||
self.assertEqual(len(pages), 2)
|
|
||||||
|
|
||||||
def test_separate_pages_no_list(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.pdf",
|
|
||||||
)
|
|
||||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
|
||||||
pages = tasks.separate_pages(test_file, [])
|
|
||||||
self.assertEqual(pages, [])
|
|
||||||
self.assertEqual(
|
|
||||||
cm.output,
|
|
||||||
[
|
|
||||||
f"WARNING:paperless.tasks:No pages to split on!",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_save_to_dir(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t.pdf",
|
|
||||||
)
|
|
||||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
|
||||||
tasks.save_to_dir(test_file, target_dir=tempdir)
|
|
||||||
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
|
||||||
self.assertTrue(os.path.isfile(target_file))
|
|
||||||
|
|
||||||
def test_save_to_dir2(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t.pdf",
|
|
||||||
)
|
|
||||||
nonexistingdir = "/nowhere"
|
|
||||||
if os.path.isdir(nonexistingdir):
|
|
||||||
self.fail("non-existing dir exists")
|
|
||||||
else:
|
|
||||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
|
||||||
tasks.save_to_dir(test_file, target_dir=nonexistingdir)
|
|
||||||
self.assertEqual(
|
|
||||||
cm.output,
|
|
||||||
[
|
|
||||||
f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist.",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_save_to_dir3(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t.pdf",
|
|
||||||
)
|
|
||||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
|
||||||
tasks.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
|
|
||||||
target_file = os.path.join(tempdir, "newname.pdf")
|
|
||||||
self.assertTrue(os.path.isfile(target_file))
|
|
||||||
|
|
||||||
def test_barcode_splitter(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.pdf",
|
|
||||||
)
|
|
||||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
|
||||||
separators = tasks.scan_file_for_separating_barcodes(test_file)
|
|
||||||
self.assertTrue(separators)
|
|
||||||
document_list = tasks.separate_pages(test_file, separators)
|
|
||||||
self.assertTrue(document_list)
|
|
||||||
for document in document_list:
|
|
||||||
tasks.save_to_dir(document, target_dir=tempdir)
|
|
||||||
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
|
||||||
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
|
||||||
self.assertTrue(os.path.isfile(target_file1))
|
|
||||||
self.assertTrue(os.path.isfile(target_file2))
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_ENABLE_BARCODES=True)
|
|
||||||
def test_consume_barcode_file(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.pdf",
|
|
||||||
)
|
|
||||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
|
|
||||||
shutil.copy(test_file, dst)
|
|
||||||
|
|
||||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
|
||||||
|
|
||||||
@override_settings(
|
|
||||||
CONSUMER_ENABLE_BARCODES=True,
|
|
||||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
|
||||||
)
|
|
||||||
def test_consume_barcode_tiff_file(self):
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.tiff",
|
|
||||||
)
|
|
||||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
|
|
||||||
shutil.copy(test_file, dst)
|
|
||||||
|
|
||||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
|
||||||
|
|
||||||
@override_settings(
|
|
||||||
CONSUMER_ENABLE_BARCODES=True,
|
|
||||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
|
||||||
)
|
|
||||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
|
||||||
def test_consume_barcode_unsupported_jpg_file(self, m):
|
|
||||||
"""
|
|
||||||
This test assumes barcode and TIFF support are enabled and
|
|
||||||
the user uploads an unsupported image file (e.g. jpg)
|
|
||||||
|
|
||||||
The function shouldn't try to scan for separating barcodes
|
|
||||||
and continue archiving the file as is.
|
|
||||||
"""
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"simple.jpg",
|
|
||||||
)
|
|
||||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
|
|
||||||
shutil.copy(test_file, dst)
|
|
||||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
|
||||||
self.assertIn("Success", tasks.consume_file(dst))
|
|
||||||
self.assertEqual(
|
|
||||||
cm.output,
|
|
||||||
[
|
|
||||||
"WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
m.assert_called_once()
|
|
||||||
|
|
||||||
args, kwargs = m.call_args
|
|
||||||
self.assertIsNone(kwargs["override_filename"])
|
|
||||||
self.assertIsNone(kwargs["override_title"])
|
|
||||||
self.assertIsNone(kwargs["override_correspondent_id"])
|
|
||||||
self.assertIsNone(kwargs["override_document_type_id"])
|
|
||||||
self.assertIsNone(kwargs["override_tag_ids"])
|
|
||||||
|
|
||||||
@override_settings(
|
|
||||||
CONSUMER_ENABLE_BARCODES=True,
|
|
||||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
|
||||||
)
|
|
||||||
def test_consume_barcode_supported_no_extension_file(self):
|
|
||||||
"""
|
|
||||||
This test assumes barcode and TIFF support are enabled and
|
|
||||||
the user uploads a supported image file, but without extension
|
|
||||||
"""
|
|
||||||
test_file = os.path.join(
|
|
||||||
os.path.dirname(__file__),
|
|
||||||
"samples",
|
|
||||||
"barcodes",
|
|
||||||
"patch-code-t-middle.tiff",
|
|
||||||
)
|
|
||||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
|
|
||||||
shutil.copy(test_file, dst)
|
|
||||||
|
|
||||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
|
||||||
|
|
||||||
|
|
||||||
class TestSanityCheck(DirectoriesMixin, TestCase):
|
class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||||
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
||||||
def test_sanity_check_success(self, m):
|
def test_sanity_check_success(self, m):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user