mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-28 03:46:06 -05:00 
			
		
		
		
	add first tests for barcode reader
Signed-off-by: florian on nixos (Florian Brandes) <florian.brandes@posteo.de>
This commit is contained in:
		 florian on nixos (Florian Brandes)
					florian on nixos (Florian Brandes)
				
			
				
					committed by
					
						 Florian Brandes
						Florian Brandes
					
				
			
			
				
	
			
			
			 Florian Brandes
						Florian Brandes
					
				
			
						parent
						
							c35814bfd3
						
					
				
				
					commit
					aa46b06d95
				
			| @@ -16,6 +16,13 @@ from documents.models import Tag | |||||||
| from documents.sanity_checker import SanityCheckFailedException | from documents.sanity_checker import SanityCheckFailedException | ||||||
| from whoosh.writing import AsyncWriter | from whoosh.writing import AsyncWriter | ||||||
|  |  | ||||||
|  | # barcode decoder | ||||||
|  | import os | ||||||
|  | from pyzbar import pyzbar | ||||||
|  | from pdf2image import convert_from_path | ||||||
|  | import tempfile | ||||||
|  | from pikepdf import Pdf | ||||||
|  |  | ||||||
| logger = logging.getLogger("paperless.tasks") | logger = logging.getLogger("paperless.tasks") | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -62,6 +69,71 @@ def train_classifier(): | |||||||
|         logger.warning("Classifier error: " + str(e)) |         logger.warning("Classifier error: " + str(e)) | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def barcode_reader(page) -> list: | ||||||
|  |     """ | ||||||
|  |     Read any barcodes contained in page | ||||||
|  |     Returns a list containing all found barcodes | ||||||
|  |     """ | ||||||
|  |     barcodes = [ ] | ||||||
|  |     # Decode the barcode image | ||||||
|  |     detected_barcodes = pyzbar.decode(page) | ||||||
|  |  | ||||||
|  |     if not detected_barcodes: | ||||||
|  |         logger.debug(f"No barcode detected") | ||||||
|  |     else: | ||||||
|  |         # Traverse through all the detected barcodes in image | ||||||
|  |         for barcode in detected_barcodes: | ||||||
|  |             if barcode.data!="": | ||||||
|  |                 barcodes = barcodes + [str(barcode.data)] | ||||||
|  |                 logger.debug(f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}") | ||||||
|  |     return barcodes | ||||||
|  |  | ||||||
|  | def scan_file_for_seperating_barcodes(filepath) -> list: | ||||||
|  |     """ | ||||||
|  |     Scan the provided file for page seperating barcodes | ||||||
|  |     Returns a list of pagenumbers, which seperate the file | ||||||
|  |     """ | ||||||
|  |     seperator_page_numbers = [ ] | ||||||
|  |     # use a temporary directory in case the file os too big to handle in memory | ||||||
|  |     with tempfile.TemporaryDirectory() as path: | ||||||
|  |         pages_from_path = convert_from_path(filepath, output_folder=path) | ||||||
|  |         for current_page_number, page in enumerate(pages_from_path): | ||||||
|  |             current_barcodes = barcode_reader(page) | ||||||
|  |             if current_barcodes.isin("PATCHT"): | ||||||
|  |                 seperator_page_numbers = seperator_page_numbers + current_page_number | ||||||
|  |     return seperator_page_numbers | ||||||
|  |  | ||||||
|  | def seperate_pages(filepath, pages_to_split_on: list): | ||||||
|  |     """ | ||||||
|  |     Seperate the provided file on the pages_to_split_on. | ||||||
|  |     The pages which are defined by page_numbers will be removed. | ||||||
|  |     """ | ||||||
|  |     pages_to_split_on = scan_file_for_seperating_barcodes(filepath) | ||||||
|  |     fname = os.path.splitext(os.path.basename(filepath))[0] | ||||||
|  |     pdf = Pdf.open(filepath) | ||||||
|  |     # TODO: Get the directory of the file and save the other files there | ||||||
|  |     # TODO: Return list of new paths of the new files | ||||||
|  |     for count, page_number in enumerate(pages_to_split_on): | ||||||
|  |         # First element, so iterate from zero to the first seperator page | ||||||
|  |         if count == 0: | ||||||
|  |             dst = Pdf.new() | ||||||
|  |             for page in range(0, page_number): | ||||||
|  |                 dst.pages.append(page) | ||||||
|  |             output_filename = '{}_page_{}.pdf'.format( | ||||||
|  |                 fname, str(count)) | ||||||
|  |             with open(output_filename, 'wb') as out: | ||||||
|  |                 dst.save(out) | ||||||
|  |         else: | ||||||
|  |             dst = Pdf.new() | ||||||
|  |             for page in range(pages_to_split_on[count-1], page_number): | ||||||
|  |                 dst.pages.append(page) | ||||||
|  |             output_filename = '{}_page_{}.pdf'.format( | ||||||
|  |                 fname, page+1) | ||||||
|  |             with open(output_filename, 'wb') as out: | ||||||
|  |                 dst.save(out) | ||||||
|  |  | ||||||
|  |  | ||||||
| def consume_file( | def consume_file( | ||||||
|     path, |     path, | ||||||
|     override_filename=None, |     override_filename=None, | ||||||
| @@ -72,6 +144,11 @@ def consume_file( | |||||||
|     task_id=None, |     task_id=None, | ||||||
| ): | ): | ||||||
|  |  | ||||||
|  |     # check for seperators in current document | ||||||
|  |     seperator_page_numbers = scan_file_for_seperating_barcodes(path) | ||||||
|  |     if seperator_page_numbers != [ ]: | ||||||
|  |         logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}") | ||||||
|  |  | ||||||
|     document = Consumer().try_consume_file( |     document = Consumer().try_consume_file( | ||||||
|         path, |         path, | ||||||
|         override_filename=override_filename, |         override_filename=override_filename, | ||||||
|   | |||||||
							
								
								
									
										
											BIN
										
									
								
								src/documents/tests/samples/patch-code-t.pbm
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								src/documents/tests/samples/patch-code-t.pbm
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							
							
								
								
									
										
											BIN
										
									
								
								src/documents/tests/samples/patch-code-t.pdf
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								src/documents/tests/samples/patch-code-t.pdf
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| @@ -13,6 +13,8 @@ from documents.sanity_checker import SanityCheckFailedException | |||||||
| from documents.sanity_checker import SanityCheckMessages | from documents.sanity_checker import SanityCheckMessages | ||||||
| from documents.tests.utils import DirectoriesMixin | from documents.tests.utils import DirectoriesMixin | ||||||
|  |  | ||||||
|  | from PIL import Image | ||||||
|  |  | ||||||
|  |  | ||||||
| class TestTasks(DirectoriesMixin, TestCase): | class TestTasks(DirectoriesMixin, TestCase): | ||||||
|     def test_index_reindex(self): |     def test_index_reindex(self): | ||||||
| @@ -89,6 +91,15 @@ class TestTasks(DirectoriesMixin, TestCase): | |||||||
|         mtime3 = os.stat(settings.MODEL_FILE).st_mtime |         mtime3 = os.stat(settings.MODEL_FILE).st_mtime | ||||||
|         self.assertNotEqual(mtime2, mtime3) |         self.assertNotEqual(mtime2, mtime3) | ||||||
|  |  | ||||||
|  |     def test_barcode_reader(self): | ||||||
|  |         test_file = os.path.join( | ||||||
|  |             os.path.dirname(__file__), | ||||||
|  |             "samples", | ||||||
|  |             "patch-code-t.pbm" | ||||||
|  |         ) | ||||||
|  |         img = Image.open(test_file) | ||||||
|  |         self.assertEqual(tasks.barcode_reader(img), ["b'PATCHT'"]) | ||||||
|  |  | ||||||
|     @mock.patch("documents.tasks.sanity_checker.check_sanity") |     @mock.patch("documents.tasks.sanity_checker.check_sanity") | ||||||
|     def test_sanity_check_success(self, m): |     def test_sanity_check_success(self, m): | ||||||
|         m.return_value = SanityCheckMessages() |         m.return_value = SanityCheckMessages() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user