Merge pull request #532 from paperless-ngx/feature-barcode-splitter

Feature barcode splitter
This commit is contained in:
Florian
2022-04-11 08:51:48 +02:00
committed by GitHub
27 changed files with 778 additions and 1 deletions

View File

@@ -1,6 +1,12 @@
import logging
import os
import shutil
import tempfile
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
import tqdm
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
from django.db.models.signals import post_save
from documents import index
@@ -14,8 +20,12 @@ from documents.models import Document
from documents.models import DocumentType
from documents.models import Tag
from documents.sanity_checker import SanityCheckFailedException
from pdf2image import convert_from_path
from pikepdf import Pdf
from pyzbar import pyzbar
from whoosh.writing import AsyncWriter
logger = logging.getLogger("paperless.tasks")
@@ -62,6 +72,115 @@ def train_classifier():
logger.warning("Classifier error: " + str(e))
def barcode_reader(image) -> List[str]:
"""
Read any barcodes contained in image
Returns a list containing all found barcodes
"""
barcodes = []
# Decode the barcode image
detected_barcodes = pyzbar.decode(image)
if detected_barcodes:
# Traverse through all the detected barcodes in image
for barcode in detected_barcodes:
if barcode.data:
decoded_barcode = barcode.data.decode("utf-8")
barcodes.append(decoded_barcode)
logger.debug(
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
)
return barcodes
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
"""
Scan the provided file for page separating barcodes
Returns a list of pagenumbers, which separate the file
"""
separator_page_numbers = []
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
# use a temporary directory in case the file os too big to handle in memory
with tempfile.TemporaryDirectory() as path:
pages_from_path = convert_from_path(filepath, output_folder=path)
for current_page_number, page in enumerate(pages_from_path):
current_barcodes = barcode_reader(page)
if separator_barcode in current_barcodes:
separator_page_numbers.append(current_page_number)
return separator_page_numbers
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
"""
Separate the provided file on the pages_to_split_on.
The pages which are defined by page_numbers will be removed.
Returns a list of (temporary) filepaths to consume.
These will need to be deleted later.
"""
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
fname = os.path.splitext(os.path.basename(filepath))[0]
pdf = Pdf.open(filepath)
document_paths = []
logger.debug(f"Temp dir is {str(tempdir)}")
if not pages_to_split_on:
logger.warning("No pages to split on!")
else:
# go from the first page to the first separator page
dst = Pdf.new()
for n, page in enumerate(pdf.pages):
if n < pages_to_split_on[0]:
dst.pages.append(page)
output_filename = "{}_document_0.pdf".format(fname)
savepath = os.path.join(tempdir, output_filename)
with open(savepath, "wb") as out:
dst.save(out)
document_paths = [savepath]
# iterate through the rest of the document
for count, page_number in enumerate(pages_to_split_on):
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
dst = Pdf.new()
try:
next_page = pages_to_split_on[count + 1]
except IndexError:
next_page = len(pdf.pages)
# skip the first page_number. This contains the barcode page
for page in range(page_number + 1, next_page):
logger.debug(
f"page_number: {str(page_number)} next_page: {str(next_page)}",
)
dst.pages.append(pdf.pages[page])
output_filename = "{}_document_{}.pdf".format(fname, str(count + 1))
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
savepath = os.path.join(tempdir, output_filename)
with open(savepath, "wb") as out:
dst.save(out)
document_paths.append(savepath)
logger.debug(f"Temp files are {str(document_paths)}")
return document_paths
def save_to_dir(
filepath: str,
newname: str = None,
target_dir: str = settings.CONSUMPTION_DIR,
):
"""
Copies filepath to target_dir.
Optionally rename the file.
"""
if os.path.isfile(filepath) and os.path.isdir(target_dir):
dst = shutil.copy(filepath, target_dir)
logging.debug(f"saved {str(filepath)} to {str(dst)}")
if newname:
dst_new = os.path.join(target_dir, newname)
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
os.rename(dst, dst_new)
else:
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
def consume_file(
path,
override_filename=None,
@@ -72,6 +191,48 @@ def consume_file(
task_id=None,
):
# check for separators in current document
if settings.CONSUMER_ENABLE_BARCODES:
separators = []
document_list = []
separators = scan_file_for_separating_barcodes(path)
if separators:
logger.debug(f"Pages with separators found in: {str(path)}")
document_list = separate_pages(path, separators)
if document_list:
for n, document in enumerate(document_list):
# save to consumption dir
# rename it to the original filename with number prefix
if override_filename:
newname = f"{str(n)}_" + override_filename
else:
newname = None
save_to_dir(document, newname=newname)
# if we got here, the document was successfully split
# and can safely be deleted
logger.debug("Deleting file {}".format(path))
os.unlink(path)
# notify the sender, otherwise the progress bar
# in the UI stays stuck
payload = {
"filename": override_filename,
"task_id": task_id,
"current_progress": 100,
"max_progress": 100,
"status": "SUCCESS",
"message": "finished",
}
try:
async_to_sync(get_channel_layer().group_send)(
"status_updates",
{"type": "status_update", "data": payload},
)
except OSError as e:
logger.warning("OSError. It could be, the broker cannot be reached.")
logger.warning(str(e))
return "File successfully split"
# continue with consumption if no barcode was found
document = Consumer().try_consume_file(
path,
override_filename=override_filename,

Binary file not shown.

After

Width:  |  Height:  |  Size: 836 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 891 B

File diff suppressed because one or more lines are too long

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 337 B

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

View File

@@ -1,7 +1,10 @@
import os
import shutil
import tempfile
from unittest import mock
from django.conf import settings
from django.test import override_settings
from django.test import TestCase
from django.utils import timezone
from documents import tasks
@@ -12,6 +15,7 @@ from documents.models import Tag
from documents.sanity_checker import SanityCheckFailedException
from documents.sanity_checker import SanityCheckMessages
from documents.tests.utils import DirectoriesMixin
from PIL import Image
class TestTasks(DirectoriesMixin, TestCase):
@@ -89,6 +93,318 @@ class TestTasks(DirectoriesMixin, TestCase):
mtime3 = os.stat(settings.MODEL_FILE).st_mtime
self.assertNotEqual(mtime2, mtime3)
def test_barcode_reader(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-PATCHT.png",
)
img = Image.open(test_file)
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
def test_barcode_reader2(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t.pbm",
)
img = Image.open(test_file)
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
def test_barcode_reader_distorsion(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-PATCHT-distorsion.png",
)
img = Image.open(test_file)
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
def test_barcode_reader_distorsion2(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-PATCHT-distorsion2.png",
)
img = Image.open(test_file)
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
def test_barcode_reader_unreadable(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-PATCHT-unreadable.png",
)
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), [])
def test_barcode_reader_qr(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"qr-code-PATCHT.png",
)
img = Image.open(test_file)
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
def test_barcode_reader_128(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-128-PATCHT.png",
)
img = Image.open(test_file)
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
def test_barcode_reader_no_barcode(self):
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), [])
def test_barcode_reader_custom_separator(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-custom.png",
)
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
def test_barcode_reader_custom_qr_separator(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-qr-custom.png",
)
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
def test_barcode_reader_custom_128_separator(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-128-custom.png",
)
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
def test_scan_file_for_separating_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [0])
def test_scan_file_for_separating_barcodes2(self):
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [])
def test_scan_file_for_separating_barcodes3(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [1])
def test_scan_file_for_separating_barcodes4(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"several-patcht-codes.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [2, 5])
def test_scan_file_for_separating_barcodes_upsidedown(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle_reverse.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [1])
def test_scan_file_for_separating_qr_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-qr.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [0])
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
def test_scan_file_for_separating_custom_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-custom.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [0])
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
def test_scan_file_for_separating_custom_qr_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-qr-custom.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [0])
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
def test_scan_file_for_separating_custom_128_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-128-custom.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [0])
def test_scan_file_for_separating_wrong_qr_barcodes(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"barcode-39-custom.pdf",
)
pages = tasks.scan_file_for_separating_barcodes(test_file)
self.assertEqual(pages, [])
def test_separate_pages(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle.pdf",
)
pages = tasks.separate_pages(test_file, [1])
self.assertEqual(len(pages), 2)
def test_separate_pages_no_list(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle.pdf",
)
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
pages = tasks.separate_pages(test_file, [])
self.assertEqual(pages, [])
self.assertEqual(
cm.output,
[
f"WARNING:paperless.tasks:No pages to split on!",
],
)
def test_save_to_dir(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t.pdf",
)
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
tasks.save_to_dir(test_file, target_dir=tempdir)
target_file = os.path.join(tempdir, "patch-code-t.pdf")
self.assertTrue(os.path.isfile(target_file))
def test_save_to_dir2(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t.pdf",
)
nonexistingdir = "/nowhere"
if os.path.isdir(nonexistingdir):
self.fail("non-existing dir exists")
else:
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
tasks.save_to_dir(test_file, target_dir=nonexistingdir)
self.assertEqual(
cm.output,
[
f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist.",
],
)
def test_save_to_dir3(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t.pdf",
)
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
tasks.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
target_file = os.path.join(tempdir, "newname.pdf")
self.assertTrue(os.path.isfile(target_file))
def test_barcode_splitter(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle.pdf",
)
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
separators = tasks.scan_file_for_separating_barcodes(test_file)
self.assertTrue(separators)
document_list = tasks.separate_pages(test_file, separators)
self.assertTrue(document_list)
for document in document_list:
tasks.save_to_dir(document, target_dir=tempdir)
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
self.assertTrue(os.path.isfile(target_file1))
self.assertTrue(os.path.isfile(target_file2))
@override_settings(CONSUMER_ENABLE_BARCODES=True)
def test_consume_barcode_file(self):
test_file = os.path.join(
os.path.dirname(__file__),
"samples",
"barcodes",
"patch-code-t-middle.pdf",
)
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pd")
shutil.copy(test_file, dst)
self.assertEqual(tasks.consume_file(dst), "File successfully split")
@mock.patch("documents.tasks.sanity_checker.check_sanity")
def test_sanity_check_success(self, m):
m.return_value = SanityCheckMessages()