mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
more work on barcode
Signed-off-by: florian on nixos (Florian Brandes) <florian.brandes@posteo.de>
This commit is contained in:
parent
bcce0838dd
commit
4fe966f534
@ -22,6 +22,7 @@ from pyzbar import pyzbar
|
||||
from pdf2image import convert_from_path
|
||||
import tempfile
|
||||
from pikepdf import Pdf
|
||||
import shutil
|
||||
|
||||
logger = logging.getLogger("paperless.tasks")
|
||||
|
||||
@ -91,25 +92,25 @@ def barcode_reader(image) -> list:
|
||||
return barcodes
|
||||
|
||||
|
||||
def scan_file_for_seperating_barcodes(filepath: str) -> list:
|
||||
def scan_file_for_separating_barcodes(filepath: str) -> list:
|
||||
"""
|
||||
Scan the provided file for page seperating barcodes
|
||||
Returns a list of pagenumbers, which seperate the file
|
||||
Scan the provided file for page separating barcodes
|
||||
Returns a list of pagenumbers, which separate the file
|
||||
"""
|
||||
seperator_page_numbers = []
|
||||
separator_page_numbers = []
|
||||
# use a temporary directory in case the file os too big to handle in memory
|
||||
with tempfile.TemporaryDirectory() as path:
|
||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||
for current_page_number, page in enumerate(pages_from_path):
|
||||
current_barcodes = barcode_reader(page)
|
||||
if "b'PATCHT'" in current_barcodes:
|
||||
seperator_page_numbers = seperator_page_numbers + [current_page_number]
|
||||
return seperator_page_numbers
|
||||
separator_page_numbers = separator_page_numbers + [current_page_number]
|
||||
return separator_page_numbers
|
||||
|
||||
|
||||
def seperate_pages(filepath: str, pages_to_split_on: list) -> list:
|
||||
def separate_pages(filepath: str, pages_to_split_on: list) -> list:
|
||||
"""
|
||||
Seperate the provided file on the pages_to_split_on.
|
||||
Separate the provided file on the pages_to_split_on.
|
||||
The pages which are defined by page_numbers will be removed.
|
||||
Returns a list of (temporary) filepaths to consume.
|
||||
These will need to be deleted later.
|
||||
@ -156,6 +157,14 @@ def seperate_pages(filepath: str, pages_to_split_on: list) -> list:
|
||||
logger.debug(f"Temp files are {str(document_paths)}")
|
||||
return document_paths
|
||||
|
||||
def save_to_dir(filepath, target_dir=settings.CONSUMPTION_DIR):
|
||||
"""
|
||||
Copies filepath to target_dir.
|
||||
"""
|
||||
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
||||
shutil.copy(filepath, target_dir)
|
||||
else:
|
||||
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
||||
|
||||
def consume_file(
|
||||
path,
|
||||
@ -167,10 +176,10 @@ def consume_file(
|
||||
task_id=None,
|
||||
):
|
||||
|
||||
# check for seperators in current document
|
||||
seperator_page_numbers = scan_file_for_seperating_barcodes(path)
|
||||
if seperator_page_numbers != []:
|
||||
logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}")
|
||||
# check for separators in current document
|
||||
separator_page_numbers = scan_file_for_separating_barcodes(path)
|
||||
if separator_page_numbers != []:
|
||||
logger.debug(f"Pages with separators found: {str(separator_page_numbers)}")
|
||||
|
||||
document = Consumer().try_consume_file(
|
||||
path,
|
||||
|
@ -14,6 +14,7 @@ from documents.sanity_checker import SanityCheckMessages
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
|
||||
from PIL import Image
|
||||
import tempfile
|
||||
|
||||
|
||||
class TestTasks(DirectoriesMixin, TestCase):
|
||||
@ -103,33 +104,72 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), [])
|
||||
|
||||
def test_scan_file_for_seperating_barcodes(self):
|
||||
def test_scan_file_for_separating_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__), "samples", "patch-code-t.pdf"
|
||||
)
|
||||
pages = tasks.scan_file_for_seperating_barcodes(test_file)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_seperating_barcodes2(self):
|
||||
def test_scan_file_for_separating_barcodes2(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
pages = tasks.scan_file_for_seperating_barcodes(test_file)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_scan_file_for_seperating_barcodes3(self):
|
||||
def test_scan_file_for_separating_barcodes3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
|
||||
)
|
||||
pages = tasks.scan_file_for_seperating_barcodes(test_file)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_seperate_pages(self):
|
||||
def test_separate_pages(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
|
||||
)
|
||||
pages = tasks.seperate_pages(test_file, [1])
|
||||
|
||||
pages = tasks.separate_pages(test_file, [1])
|
||||
self.assertEqual(len(pages), 2)
|
||||
|
||||
def test_save_to_dir(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__), "samples", "patch-code-t.pdf"
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
tasks.save_to_dir(test_file, tempdir)
|
||||
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_save_to_dir2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__), "samples", "patch-code-t.pdf"
|
||||
)
|
||||
nonexistingdir = "/nowhere"
|
||||
if not os.path.isdir(nonexistingdir):
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
tasks.save_to_dir(test_file, nonexistingdir)
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist."
|
||||
],
|
||||
)
|
||||
|
||||
def test_barcode_splitter(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
separators = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertTrue(separators != [])
|
||||
document_list = tasks.separate_pages(test_file, separators)
|
||||
self.assertTrue(document_list != [])
|
||||
for document in document_list:
|
||||
tasks.save_to_dir(document, tempdir)
|
||||
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
||||
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file1))
|
||||
self.assertTrue(os.path.isfile(target_file2))
|
||||
|
||||
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
||||
def test_sanity_check_success(self, m):
|
||||
m.return_value = SanityCheckMessages()
|
||||
|
Loading…
x
Reference in New Issue
Block a user