mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
working split pages
Signed-off-by: florian on nixos (Florian Brandes) <florian.brandes@posteo.de>
This commit is contained in:
parent
76e43bcb89
commit
bcce0838dd
@ -69,69 +69,92 @@ def train_classifier():
|
|||||||
logger.warning("Classifier error: " + str(e))
|
logger.warning("Classifier error: " + str(e))
|
||||||
|
|
||||||
|
|
||||||
|
def barcode_reader(image) -> list:
|
||||||
def barcode_reader(page) -> list:
|
|
||||||
"""
|
"""
|
||||||
Read any barcodes contained in page
|
Read any barcodes contained in image
|
||||||
Returns a list containing all found barcodes
|
Returns a list containing all found barcodes
|
||||||
"""
|
"""
|
||||||
barcodes = [ ]
|
barcodes = []
|
||||||
# Decode the barcode image
|
# Decode the barcode image
|
||||||
detected_barcodes = pyzbar.decode(page)
|
detected_barcodes = pyzbar.decode(image)
|
||||||
|
|
||||||
if not detected_barcodes:
|
if not detected_barcodes:
|
||||||
logger.debug(f"No barcode detected")
|
logger.debug(f"No barcode detected")
|
||||||
else:
|
else:
|
||||||
# Traverse through all the detected barcodes in image
|
# Traverse through all the detected barcodes in image
|
||||||
for barcode in detected_barcodes:
|
for barcode in detected_barcodes:
|
||||||
if barcode.data!="":
|
if barcode.data != "":
|
||||||
barcodes = barcodes + [str(barcode.data)]
|
barcodes = barcodes + [str(barcode.data)]
|
||||||
logger.debug(f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}")
|
logger.debug(
|
||||||
|
f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}"
|
||||||
|
)
|
||||||
return barcodes
|
return barcodes
|
||||||
|
|
||||||
def scan_file_for_seperating_barcodes(filepath) -> list:
|
|
||||||
|
def scan_file_for_seperating_barcodes(filepath: str) -> list:
|
||||||
"""
|
"""
|
||||||
Scan the provided file for page seperating barcodes
|
Scan the provided file for page seperating barcodes
|
||||||
Returns a list of pagenumbers, which seperate the file
|
Returns a list of pagenumbers, which seperate the file
|
||||||
"""
|
"""
|
||||||
seperator_page_numbers = [ ]
|
seperator_page_numbers = []
|
||||||
# use a temporary directory in case the file os too big to handle in memory
|
# use a temporary directory in case the file os too big to handle in memory
|
||||||
with tempfile.TemporaryDirectory() as path:
|
with tempfile.TemporaryDirectory() as path:
|
||||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||||
for current_page_number, page in enumerate(pages_from_path):
|
for current_page_number, page in enumerate(pages_from_path):
|
||||||
current_barcodes = barcode_reader(page)
|
current_barcodes = barcode_reader(page)
|
||||||
if current_barcodes.isin("PATCHT"):
|
if "b'PATCHT'" in current_barcodes:
|
||||||
seperator_page_numbers = seperator_page_numbers + current_page_number
|
seperator_page_numbers = seperator_page_numbers + [current_page_number]
|
||||||
return seperator_page_numbers
|
return seperator_page_numbers
|
||||||
|
|
||||||
def seperate_pages(filepath, pages_to_split_on: list):
|
|
||||||
|
def seperate_pages(filepath: str, pages_to_split_on: list) -> list:
|
||||||
"""
|
"""
|
||||||
Seperate the provided file on the pages_to_split_on.
|
Seperate the provided file on the pages_to_split_on.
|
||||||
The pages which are defined by page_numbers will be removed.
|
The pages which are defined by page_numbers will be removed.
|
||||||
|
Returns a list of (temporary) filepaths to consume.
|
||||||
|
These will need to be deleted later.
|
||||||
"""
|
"""
|
||||||
pages_to_split_on = scan_file_for_seperating_barcodes(filepath)
|
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||||
|
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||||
fname = os.path.splitext(os.path.basename(filepath))[0]
|
fname = os.path.splitext(os.path.basename(filepath))[0]
|
||||||
pdf = Pdf.open(filepath)
|
pdf = Pdf.open(filepath)
|
||||||
|
document_paths = []
|
||||||
|
logger.debug(f"Temp dir is {str(tempdir)}")
|
||||||
# TODO: Get the directory of the file and save the other files there
|
# TODO: Get the directory of the file and save the other files there
|
||||||
# TODO: Return list of new paths of the new files
|
# TODO: Return list of new paths of the new files
|
||||||
|
if len(pages_to_split_on) <= 0:
|
||||||
|
logger.warning(f"No pages to split on!")
|
||||||
|
else:
|
||||||
|
# go from the first page to the first separator page
|
||||||
|
dst = Pdf.new()
|
||||||
|
for n, page in enumerate(pdf.pages):
|
||||||
|
if n < pages_to_split_on[0]:
|
||||||
|
dst.pages.append(page)
|
||||||
|
output_filename = "{}_document_0.pdf".format(fname)
|
||||||
|
savepath = os.path.join(tempdir, output_filename)
|
||||||
|
with open(savepath, "wb") as out:
|
||||||
|
dst.save(out)
|
||||||
|
document_paths = [savepath]
|
||||||
|
|
||||||
for count, page_number in enumerate(pages_to_split_on):
|
for count, page_number in enumerate(pages_to_split_on):
|
||||||
# First element, so iterate from zero to the first seperator page
|
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
||||||
if count == 0:
|
dst = Pdf.new()
|
||||||
dst = Pdf.new()
|
try:
|
||||||
for page in range(0, page_number):
|
next_page = pages_to_split_on[count + 1]
|
||||||
dst.pages.append(page)
|
except IndexError:
|
||||||
output_filename = '{}_page_{}.pdf'.format(
|
next_page = len(pdf.pages)
|
||||||
fname, str(count))
|
# skip the first page_number. This contains the barcode page
|
||||||
with open(output_filename, 'wb') as out:
|
for page in range(page_number + 1, next_page):
|
||||||
dst.save(out)
|
logger.debug(f"page_number: {str(page_number)} next_page: {str(next_page)}")
|
||||||
else:
|
dst.pages.append(pdf.pages[page])
|
||||||
dst = Pdf.new()
|
output_filename = "{}_document_{}.pdf".format(fname, str(count + 1))
|
||||||
for page in range(pages_to_split_on[count-1], page_number):
|
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
||||||
dst.pages.append(page)
|
savepath = os.path.join(tempdir, output_filename)
|
||||||
output_filename = '{}_page_{}.pdf'.format(
|
with open(savepath, "wb") as out:
|
||||||
fname, page+1)
|
dst.save(out)
|
||||||
with open(output_filename, 'wb') as out:
|
document_paths = document_paths + [savepath]
|
||||||
dst.save(out)
|
logger.debug(f"Temp files are {str(document_paths)}")
|
||||||
|
return document_paths
|
||||||
|
|
||||||
|
|
||||||
def consume_file(
|
def consume_file(
|
||||||
@ -146,7 +169,7 @@ def consume_file(
|
|||||||
|
|
||||||
# check for seperators in current document
|
# check for seperators in current document
|
||||||
seperator_page_numbers = scan_file_for_seperating_barcodes(path)
|
seperator_page_numbers = scan_file_for_seperating_barcodes(path)
|
||||||
if seperator_page_numbers != [ ]:
|
if seperator_page_numbers != []:
|
||||||
logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}")
|
logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}")
|
||||||
|
|
||||||
document = Consumer().try_consume_file(
|
document = Consumer().try_consume_file(
|
||||||
|
BIN
src/documents/tests/samples/patch-code-t-middle.pdf
Normal file
BIN
src/documents/tests/samples/patch-code-t-middle.pdf
Normal file
Binary file not shown.
@ -93,13 +93,43 @@ class TestTasks(DirectoriesMixin, TestCase):
|
|||||||
|
|
||||||
def test_barcode_reader(self):
|
def test_barcode_reader(self):
|
||||||
test_file = os.path.join(
|
test_file = os.path.join(
|
||||||
os.path.dirname(__file__),
|
os.path.dirname(__file__), "samples", "patch-code-t.pbm"
|
||||||
"samples",
|
|
||||||
"patch-code-t.pbm"
|
|
||||||
)
|
)
|
||||||
img = Image.open(test_file)
|
img = Image.open(test_file)
|
||||||
self.assertEqual(tasks.barcode_reader(img), ["b'PATCHT'"])
|
self.assertEqual(tasks.barcode_reader(img), ["b'PATCHT'"])
|
||||||
|
|
||||||
|
def test_barcode_reader2(self):
|
||||||
|
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
||||||
|
img = Image.open(test_file)
|
||||||
|
self.assertEqual(tasks.barcode_reader(img), [])
|
||||||
|
|
||||||
|
def test_scan_file_for_seperating_barcodes(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__), "samples", "patch-code-t.pdf"
|
||||||
|
)
|
||||||
|
pages = tasks.scan_file_for_seperating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [0])
|
||||||
|
|
||||||
|
def test_scan_file_for_seperating_barcodes2(self):
|
||||||
|
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||||
|
pages = tasks.scan_file_for_seperating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [])
|
||||||
|
|
||||||
|
def test_scan_file_for_seperating_barcodes3(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
|
||||||
|
)
|
||||||
|
pages = tasks.scan_file_for_seperating_barcodes(test_file)
|
||||||
|
self.assertEqual(pages, [1])
|
||||||
|
|
||||||
|
def test_seperate_pages(self):
|
||||||
|
test_file = os.path.join(
|
||||||
|
os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
|
||||||
|
)
|
||||||
|
pages = tasks.seperate_pages(test_file, [1])
|
||||||
|
|
||||||
|
self.assertEqual(len(pages), 2)
|
||||||
|
|
||||||
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
||||||
def test_sanity_check_success(self, m):
|
def test_sanity_check_success(self, m):
|
||||||
m.return_value = SanityCheckMessages()
|
m.return_value = SanityCheckMessages()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user