mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	Merge branch 'dev' into feature-redo-ocr
This commit is contained in:
		
							
								
								
									
										186
									
								
								src/documents/barcodes.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										186
									
								
								src/documents/barcodes.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,186 @@ | ||||
| import logging | ||||
| import os | ||||
| import shutil | ||||
| import tempfile | ||||
| from functools import lru_cache | ||||
| from typing import List  # for type hinting. Can be removed, if only Python >3.8 is used | ||||
|  | ||||
| import magic | ||||
| from django.conf import settings | ||||
| from pdf2image import convert_from_path | ||||
| from pikepdf import Pdf | ||||
| from PIL import Image | ||||
| from PIL import ImageSequence | ||||
| from pyzbar import pyzbar | ||||
|  | ||||
| logger = logging.getLogger("paperless.barcodes") | ||||
|  | ||||
|  | ||||
| @lru_cache(maxsize=8) | ||||
| def supported_file_type(mime_type) -> bool: | ||||
|     """ | ||||
|     Determines if the file is valid for barcode | ||||
|     processing, based on MIME type and settings | ||||
|  | ||||
|     :return: True if the file is supported, False otherwise | ||||
|     """ | ||||
|     supported_mime = ["application/pdf"] | ||||
|     if settings.CONSUMER_BARCODE_TIFF_SUPPORT: | ||||
|         supported_mime += ["image/tiff"] | ||||
|  | ||||
|     return mime_type in supported_mime | ||||
|  | ||||
|  | ||||
| def barcode_reader(image) -> List[str]: | ||||
|     """ | ||||
|     Read any barcodes contained in image | ||||
|     Returns a list containing all found barcodes | ||||
|     """ | ||||
|     barcodes = [] | ||||
|     # Decode the barcode image | ||||
|     detected_barcodes = pyzbar.decode(image) | ||||
|  | ||||
|     if detected_barcodes: | ||||
|         # Traverse through all the detected barcodes in image | ||||
|         for barcode in detected_barcodes: | ||||
|             if barcode.data: | ||||
|                 decoded_barcode = barcode.data.decode("utf-8") | ||||
|                 barcodes.append(decoded_barcode) | ||||
|                 logger.debug( | ||||
|                     f"Barcode of type {str(barcode.type)} found: {decoded_barcode}", | ||||
|                 ) | ||||
|     return barcodes | ||||
|  | ||||
|  | ||||
| def get_file_mime_type(path: str) -> str: | ||||
|     """ | ||||
|     Determines the file type, based on MIME type. | ||||
|  | ||||
|     Returns the MIME type. | ||||
|     """ | ||||
|     mime_type = magic.from_file(path, mime=True) | ||||
|     logger.debug(f"Detected mime type: {mime_type}") | ||||
|     return mime_type | ||||
|  | ||||
|  | ||||
| def convert_from_tiff_to_pdf(filepath: str) -> str: | ||||
|     """ | ||||
|     converts a given TIFF image file to pdf into a temporary directory. | ||||
|  | ||||
|     Returns the new pdf file. | ||||
|     """ | ||||
|     file_name = os.path.splitext(os.path.basename(filepath))[0] | ||||
|     mime_type = get_file_mime_type(filepath) | ||||
|     tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|     # use old file name with pdf extension | ||||
|     if mime_type == "image/tiff": | ||||
|         newpath = os.path.join(tempdir, file_name + ".pdf") | ||||
|     else: | ||||
|         logger.warning( | ||||
|             f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.", | ||||
|         ) | ||||
|         return None | ||||
|     with Image.open(filepath) as image: | ||||
|         images = [] | ||||
|         for i, page in enumerate(ImageSequence.Iterator(image)): | ||||
|             page = page.convert("RGB") | ||||
|             images.append(page) | ||||
|         try: | ||||
|             if len(images) == 1: | ||||
|                 images[0].save(newpath) | ||||
|             else: | ||||
|                 images[0].save(newpath, save_all=True, append_images=images[1:]) | ||||
|         except OSError as e: | ||||
|             logger.warning( | ||||
|                 f"Could not save the file as pdf. Error: {str(e)}", | ||||
|             ) | ||||
|             return None | ||||
|     return newpath | ||||
|  | ||||
|  | ||||
| def scan_file_for_separating_barcodes(filepath: str) -> List[int]: | ||||
|     """ | ||||
|     Scan the provided pdf file for page separating barcodes | ||||
|     Returns a list of pagenumbers, which separate the file | ||||
|     """ | ||||
|     separator_page_numbers = [] | ||||
|     separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|     # use a temporary directory in case the file os too big to handle in memory | ||||
|     with tempfile.TemporaryDirectory() as path: | ||||
|         pages_from_path = convert_from_path(filepath, output_folder=path) | ||||
|         for current_page_number, page in enumerate(pages_from_path): | ||||
|             current_barcodes = barcode_reader(page) | ||||
|             if separator_barcode in current_barcodes: | ||||
|                 separator_page_numbers.append(current_page_number) | ||||
|     return separator_page_numbers | ||||
|  | ||||
|  | ||||
| def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]: | ||||
|     """ | ||||
|     Separate the provided pdf file on the pages_to_split_on. | ||||
|     The pages which are defined by page_numbers will be removed. | ||||
|     Returns a list of (temporary) filepaths to consume. | ||||
|     These will need to be deleted later. | ||||
|     """ | ||||
|     os.makedirs(settings.SCRATCH_DIR, exist_ok=True) | ||||
|     tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|     fname = os.path.splitext(os.path.basename(filepath))[0] | ||||
|     pdf = Pdf.open(filepath) | ||||
|     document_paths = [] | ||||
|     logger.debug(f"Temp dir is {str(tempdir)}") | ||||
|     if not pages_to_split_on: | ||||
|         logger.warning("No pages to split on!") | ||||
|     else: | ||||
|         # go from the first page to the first separator page | ||||
|         dst = Pdf.new() | ||||
|         for n, page in enumerate(pdf.pages): | ||||
|             if n < pages_to_split_on[0]: | ||||
|                 dst.pages.append(page) | ||||
|         output_filename = f"{fname}_document_0.pdf" | ||||
|         savepath = os.path.join(tempdir, output_filename) | ||||
|         with open(savepath, "wb") as out: | ||||
|             dst.save(out) | ||||
|         document_paths = [savepath] | ||||
|  | ||||
|         # iterate through the rest of the document | ||||
|         for count, page_number in enumerate(pages_to_split_on): | ||||
|             logger.debug(f"Count: {str(count)} page_number: {str(page_number)}") | ||||
|             dst = Pdf.new() | ||||
|             try: | ||||
|                 next_page = pages_to_split_on[count + 1] | ||||
|             except IndexError: | ||||
|                 next_page = len(pdf.pages) | ||||
|             # skip the first page_number. This contains the barcode page | ||||
|             for page in range(page_number + 1, next_page): | ||||
|                 logger.debug( | ||||
|                     f"page_number: {str(page_number)} next_page: {str(next_page)}", | ||||
|                 ) | ||||
|                 dst.pages.append(pdf.pages[page]) | ||||
|             output_filename = f"{fname}_document_{str(count + 1)}.pdf" | ||||
|             logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages") | ||||
|             savepath = os.path.join(tempdir, output_filename) | ||||
|             with open(savepath, "wb") as out: | ||||
|                 dst.save(out) | ||||
|             document_paths.append(savepath) | ||||
|     logger.debug(f"Temp files are {str(document_paths)}") | ||||
|     return document_paths | ||||
|  | ||||
|  | ||||
| def save_to_dir( | ||||
|     filepath: str, | ||||
|     newname: str = None, | ||||
|     target_dir: str = settings.CONSUMPTION_DIR, | ||||
| ): | ||||
|     """ | ||||
|     Copies filepath to target_dir. | ||||
|     Optionally rename the file. | ||||
|     """ | ||||
|     if os.path.isfile(filepath) and os.path.isdir(target_dir): | ||||
|         dst = shutil.copy(filepath, target_dir) | ||||
|         logging.debug(f"saved {str(filepath)} to {str(dst)}") | ||||
|         if newname: | ||||
|             dst_new = os.path.join(target_dir, newname) | ||||
|             logger.debug(f"moving {str(dst)} to {str(dst_new)}") | ||||
|             os.rename(dst, dst_new) | ||||
|     else: | ||||
|         logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.") | ||||
| @@ -3,7 +3,9 @@ import sys | ||||
| from django.core.management.commands.loaddata import Command as LoadDataCommand | ||||
|  | ||||
|  | ||||
| class Command(LoadDataCommand): | ||||
| # This class is used to migrate data between databases | ||||
| # That's difficult to test | ||||
| class Command(LoadDataCommand):  # pragma: nocover | ||||
|     """ | ||||
|     Allow the loading of data from standard in.  Sourced originally from: | ||||
|     https://gist.github.com/bmispelon/ad5a2c333443b3a1d051 (MIT licensed) | ||||
|   | ||||
| @@ -87,10 +87,10 @@ def _convert_thumbnails_to_webp(apps, schema_editor): | ||||
|             ) as pool: | ||||
|                 pool.map(_do_convert, work_packages) | ||||
|  | ||||
|         end = time.time() | ||||
|         duration = end - start | ||||
|                 end = time.time() | ||||
|                 duration = end - start | ||||
|  | ||||
|     logger.info(f"Conversion completed in {duration:.3f}s") | ||||
|             logger.info(f"Conversion completed in {duration:.3f}s") | ||||
|  | ||||
|  | ||||
| class Migration(migrations.Migration): | ||||
|   | ||||
| @@ -539,8 +539,6 @@ class BulkDownloadSerializer(DocumentListSerializer): | ||||
|  | ||||
|  | ||||
| class StoragePathSerializer(MatchingModelSerializer): | ||||
|     document_count = serializers.IntegerField(read_only=True) | ||||
|  | ||||
|     class Meta: | ||||
|         model = StoragePath | ||||
|         fields = ( | ||||
| @@ -588,10 +586,6 @@ class UiSettingsViewSerializer(serializers.ModelSerializer): | ||||
|             "settings", | ||||
|         ] | ||||
|  | ||||
|     def update(self, instance, validated_data): | ||||
|         super().update(instance, validated_data) | ||||
|         return instance | ||||
|  | ||||
|     def create(self, validated_data): | ||||
|         ui_settings = UiSettings.objects.update_or_create( | ||||
|             user=validated_data.get("user"), | ||||
|   | ||||
| @@ -6,13 +6,13 @@ from pathlib import Path | ||||
| from typing import List  # for type hinting. Can be removed, if only Python >3.8 is used | ||||
| from typing import Type | ||||
|  | ||||
| import magic | ||||
| import tqdm | ||||
| from asgiref.sync import async_to_sync | ||||
| from channels.layers import get_channel_layer | ||||
| from django.conf import settings | ||||
| from django.core.exceptions import ObjectDoesNotExist | ||||
| from django.db.models.signals import post_save | ||||
| from documents import barcodes | ||||
| from documents import index | ||||
| from documents import sanity_checker | ||||
| from documents.classifier import DocumentClassifier | ||||
| @@ -28,11 +28,6 @@ from documents.parsers import DocumentParser | ||||
| from documents.parsers import get_parser_class_for_mime_type | ||||
| from documents.parsers import ParseError | ||||
| from documents.sanity_checker import SanityCheckFailedException | ||||
| from pdf2image import convert_from_path | ||||
| from pikepdf import Pdf | ||||
| from PIL import Image | ||||
| from PIL import ImageSequence | ||||
| from pyzbar import pyzbar | ||||
| from whoosh.writing import AsyncWriter | ||||
|  | ||||
|  | ||||
| @@ -83,161 +78,6 @@ def train_classifier(): | ||||
|         logger.warning("Classifier error: " + str(e)) | ||||
|  | ||||
|  | ||||
| def barcode_reader(image) -> List[str]: | ||||
|     """ | ||||
|     Read any barcodes contained in image | ||||
|     Returns a list containing all found barcodes | ||||
|     """ | ||||
|     barcodes = [] | ||||
|     # Decode the barcode image | ||||
|     detected_barcodes = pyzbar.decode(image) | ||||
|  | ||||
|     if detected_barcodes: | ||||
|         # Traverse through all the detected barcodes in image | ||||
|         for barcode in detected_barcodes: | ||||
|             if barcode.data: | ||||
|                 decoded_barcode = barcode.data.decode("utf-8") | ||||
|                 barcodes.append(decoded_barcode) | ||||
|                 logger.debug( | ||||
|                     f"Barcode of type {str(barcode.type)} found: {decoded_barcode}", | ||||
|                 ) | ||||
|     return barcodes | ||||
|  | ||||
|  | ||||
| def get_file_type(path: str) -> str: | ||||
|     """ | ||||
|     Determines the file type, based on MIME type. | ||||
|  | ||||
|     Returns the MIME type. | ||||
|     """ | ||||
|     mime_type = magic.from_file(path, mime=True) | ||||
|     logger.debug(f"Detected mime type: {mime_type}") | ||||
|     return mime_type | ||||
|  | ||||
|  | ||||
| def convert_from_tiff_to_pdf(filepath: str) -> str: | ||||
|     """ | ||||
|     converts a given TIFF image file to pdf into a temporary directory. | ||||
|  | ||||
|     Returns the new pdf file. | ||||
|     """ | ||||
|     file_name = os.path.splitext(os.path.basename(filepath))[0] | ||||
|     mime_type = get_file_type(filepath) | ||||
|     tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|     # use old file name with pdf extension | ||||
|     if mime_type == "image/tiff": | ||||
|         newpath = os.path.join(tempdir, file_name + ".pdf") | ||||
|     else: | ||||
|         logger.warning( | ||||
|             f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.", | ||||
|         ) | ||||
|         return None | ||||
|     with Image.open(filepath) as image: | ||||
|         images = [] | ||||
|         for i, page in enumerate(ImageSequence.Iterator(image)): | ||||
|             page = page.convert("RGB") | ||||
|             images.append(page) | ||||
|         try: | ||||
|             if len(images) == 1: | ||||
|                 images[0].save(newpath) | ||||
|             else: | ||||
|                 images[0].save(newpath, save_all=True, append_images=images[1:]) | ||||
|         except OSError as e: | ||||
|             logger.warning( | ||||
|                 f"Could not save the file as pdf. Error: {str(e)}", | ||||
|             ) | ||||
|             return None | ||||
|     return newpath | ||||
|  | ||||
|  | ||||
| def scan_file_for_separating_barcodes(filepath: str) -> List[int]: | ||||
|     """ | ||||
|     Scan the provided pdf file for page separating barcodes | ||||
|     Returns a list of pagenumbers, which separate the file | ||||
|     """ | ||||
|     separator_page_numbers = [] | ||||
|     separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|     # use a temporary directory in case the file os too big to handle in memory | ||||
|     with tempfile.TemporaryDirectory() as path: | ||||
|         pages_from_path = convert_from_path(filepath, output_folder=path) | ||||
|         for current_page_number, page in enumerate(pages_from_path): | ||||
|             current_barcodes = barcode_reader(page) | ||||
|             if separator_barcode in current_barcodes: | ||||
|                 separator_page_numbers.append(current_page_number) | ||||
|     return separator_page_numbers | ||||
|  | ||||
|  | ||||
| def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]: | ||||
|     """ | ||||
|     Separate the provided pdf file on the pages_to_split_on. | ||||
|     The pages which are defined by page_numbers will be removed. | ||||
|     Returns a list of (temporary) filepaths to consume. | ||||
|     These will need to be deleted later. | ||||
|     """ | ||||
|     os.makedirs(settings.SCRATCH_DIR, exist_ok=True) | ||||
|     tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|     fname = os.path.splitext(os.path.basename(filepath))[0] | ||||
|     pdf = Pdf.open(filepath) | ||||
|     document_paths = [] | ||||
|     logger.debug(f"Temp dir is {str(tempdir)}") | ||||
|     if not pages_to_split_on: | ||||
|         logger.warning("No pages to split on!") | ||||
|     else: | ||||
|         # go from the first page to the first separator page | ||||
|         dst = Pdf.new() | ||||
|         for n, page in enumerate(pdf.pages): | ||||
|             if n < pages_to_split_on[0]: | ||||
|                 dst.pages.append(page) | ||||
|         output_filename = f"{fname}_document_0.pdf" | ||||
|         savepath = os.path.join(tempdir, output_filename) | ||||
|         with open(savepath, "wb") as out: | ||||
|             dst.save(out) | ||||
|         document_paths = [savepath] | ||||
|  | ||||
|         # iterate through the rest of the document | ||||
|         for count, page_number in enumerate(pages_to_split_on): | ||||
|             logger.debug(f"Count: {str(count)} page_number: {str(page_number)}") | ||||
|             dst = Pdf.new() | ||||
|             try: | ||||
|                 next_page = pages_to_split_on[count + 1] | ||||
|             except IndexError: | ||||
|                 next_page = len(pdf.pages) | ||||
|             # skip the first page_number. This contains the barcode page | ||||
|             for page in range(page_number + 1, next_page): | ||||
|                 logger.debug( | ||||
|                     f"page_number: {str(page_number)} next_page: {str(next_page)}", | ||||
|                 ) | ||||
|                 dst.pages.append(pdf.pages[page]) | ||||
|             output_filename = f"{fname}_document_{str(count + 1)}.pdf" | ||||
|             logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages") | ||||
|             savepath = os.path.join(tempdir, output_filename) | ||||
|             with open(savepath, "wb") as out: | ||||
|                 dst.save(out) | ||||
|             document_paths.append(savepath) | ||||
|     logger.debug(f"Temp files are {str(document_paths)}") | ||||
|     return document_paths | ||||
|  | ||||
|  | ||||
| def save_to_dir( | ||||
|     filepath: str, | ||||
|     newname: str = None, | ||||
|     target_dir: str = settings.CONSUMPTION_DIR, | ||||
| ): | ||||
|     """ | ||||
|     Copies filepath to target_dir. | ||||
|     Optionally rename the file. | ||||
|     """ | ||||
|     if os.path.isfile(filepath) and os.path.isdir(target_dir): | ||||
|         dst = shutil.copy(filepath, target_dir) | ||||
|         logging.debug(f"saved {str(filepath)} to {str(dst)}") | ||||
|         if newname: | ||||
|             dst_new = os.path.join(target_dir, newname) | ||||
|             logger.debug(f"moving {str(dst)} to {str(dst_new)}") | ||||
|             os.rename(dst, dst_new) | ||||
|     else: | ||||
|         logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.") | ||||
|  | ||||
|  | ||||
| def consume_file( | ||||
|     path, | ||||
|     override_filename=None, | ||||
| @@ -251,32 +91,30 @@ def consume_file( | ||||
|  | ||||
|     # check for separators in current document | ||||
|     if settings.CONSUMER_ENABLE_BARCODES: | ||||
|         separators = [] | ||||
|         document_list = [] | ||||
|         converted_tiff = None | ||||
|         if settings.CONSUMER_BARCODE_TIFF_SUPPORT: | ||||
|             supported_mime = ["image/tiff", "application/pdf"] | ||||
|         else: | ||||
|             supported_mime = ["application/pdf"] | ||||
|         mime_type = get_file_type(path) | ||||
|         if mime_type not in supported_mime: | ||||
|  | ||||
|         mime_type = barcodes.get_file_mime_type(path) | ||||
|  | ||||
|         if not barcodes.supported_file_type(mime_type): | ||||
|             # if not supported, skip this routine | ||||
|             logger.warning( | ||||
|                 f"Unsupported file format for barcode reader: {str(mime_type)}", | ||||
|             ) | ||||
|         else: | ||||
|             separators = [] | ||||
|             document_list = [] | ||||
|  | ||||
|             if mime_type == "image/tiff": | ||||
|                 file_to_process = convert_from_tiff_to_pdf(path) | ||||
|                 file_to_process = barcodes.convert_from_tiff_to_pdf(path) | ||||
|             else: | ||||
|                 file_to_process = path | ||||
|  | ||||
|             separators = scan_file_for_separating_barcodes(file_to_process) | ||||
|             separators = barcodes.scan_file_for_separating_barcodes(file_to_process) | ||||
|  | ||||
|             if separators: | ||||
|                 logger.debug( | ||||
|                     f"Pages with separators found in: {str(path)}", | ||||
|                 ) | ||||
|                 document_list = separate_pages(file_to_process, separators) | ||||
|                 document_list = barcodes.separate_pages(file_to_process, separators) | ||||
|  | ||||
|             if document_list: | ||||
|                 for n, document in enumerate(document_list): | ||||
| @@ -286,14 +124,18 @@ def consume_file( | ||||
|                         newname = f"{str(n)}_" + override_filename | ||||
|                     else: | ||||
|                         newname = None | ||||
|                     save_to_dir(document, newname=newname) | ||||
|                     barcodes.save_to_dir(document, newname=newname) | ||||
|  | ||||
|                 # if we got here, the document was successfully split | ||||
|                 # and can safely be deleted | ||||
|                 if converted_tiff: | ||||
|                 if mime_type == "image/tiff": | ||||
|                     # Remove the TIFF converted to PDF file | ||||
|                     logger.debug(f"Deleting file {file_to_process}") | ||||
|                     os.unlink(file_to_process) | ||||
|                 # Remove the original file (new file is saved above) | ||||
|                 logger.debug(f"Deleting file {path}") | ||||
|                 os.unlink(path) | ||||
|  | ||||
|                 # notify the sender, otherwise the progress bar | ||||
|                 # in the UI stays stuck | ||||
|                 payload = { | ||||
|   | ||||
| @@ -1432,17 +1432,25 @@ class TestDocumentApiV2(DirectoriesMixin, APITestCase): | ||||
|             "#000000", | ||||
|         ) | ||||
|  | ||||
|     def test_ui_settings(self): | ||||
|         test_user = User.objects.create_superuser(username="test") | ||||
|         self.client.force_authenticate(user=test_user) | ||||
|  | ||||
|         response = self.client.get("/api/ui_settings/", format="json") | ||||
| class TestApiUiSettings(DirectoriesMixin, APITestCase): | ||||
|  | ||||
|     ENDPOINT = "/api/ui_settings/" | ||||
|  | ||||
|     def setUp(self): | ||||
|         super().setUp() | ||||
|         self.test_user = User.objects.create_superuser(username="test") | ||||
|         self.client.force_authenticate(user=self.test_user) | ||||
|  | ||||
|     def test_api_get_ui_settings(self): | ||||
|         response = self.client.get(self.ENDPOINT, format="json") | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|         self.assertDictEqual( | ||||
|             response.data["settings"], | ||||
|             {}, | ||||
|         ) | ||||
|  | ||||
|     def test_api_set_ui_settings(self): | ||||
|         settings = { | ||||
|             "settings": { | ||||
|                 "dark_mode": { | ||||
| @@ -1452,18 +1460,16 @@ class TestDocumentApiV2(DirectoriesMixin, APITestCase): | ||||
|         } | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/ui_settings/", | ||||
|             self.ENDPOINT, | ||||
|             json.dumps(settings), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|  | ||||
|         response = self.client.get("/api/ui_settings/", format="json") | ||||
|  | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|         ui_settings = self.test_user.ui_settings | ||||
|         self.assertDictEqual( | ||||
|             response.data["settings"], | ||||
|             ui_settings.settings, | ||||
|             settings["settings"], | ||||
|         ) | ||||
|  | ||||
| @@ -1786,6 +1792,34 @@ class TestBulkEdit(DirectoriesMixin, APITestCase): | ||||
|         self.assertEqual(kwargs["add_tags"], [self.t1.id]) | ||||
|         self.assertEqual(kwargs["remove_tags"], [self.t2.id]) | ||||
|  | ||||
|     @mock.patch("documents.serialisers.bulk_edit.modify_tags") | ||||
|     def test_api_modify_tags_not_provided(self, m): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API data to modify tags is missing modify_tags field | ||||
|         WHEN: | ||||
|             - API to edit tags is called | ||||
|         THEN: | ||||
|             - API returns HTTP 400 | ||||
|             - modify_tags is not called | ||||
|         """ | ||||
|         m.return_value = "OK" | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id, self.doc3.id], | ||||
|                     "method": "modify_tags", | ||||
|                     "parameters": { | ||||
|                         "add_tags": [self.t1.id], | ||||
|                     }, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|         self.assertEqual(response.status_code, 400) | ||||
|         m.assert_not_called() | ||||
|  | ||||
|     @mock.patch("documents.serialisers.bulk_edit.delete") | ||||
|     def test_api_delete(self, m): | ||||
|         m.return_value = "OK" | ||||
| @@ -1802,6 +1836,118 @@ class TestBulkEdit(DirectoriesMixin, APITestCase): | ||||
|         self.assertEqual(args[0], [self.doc1.id]) | ||||
|         self.assertEqual(len(kwargs), 0) | ||||
|  | ||||
|     @mock.patch("documents.serialisers.bulk_edit.set_storage_path") | ||||
|     def test_api_set_storage_path(self, m): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API data to set the storage path of a document | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - set_storage_path is called with correct document IDs and storage_path ID | ||||
|         """ | ||||
|         m.return_value = "OK" | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id], | ||||
|                     "method": "set_storage_path", | ||||
|                     "parameters": {"storage_path": self.sp1.id}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|         m.assert_called_once() | ||||
|         args, kwargs = m.call_args | ||||
|  | ||||
|         self.assertListEqual(args[0], [self.doc1.id]) | ||||
|         self.assertEqual(kwargs["storage_path"], self.sp1.id) | ||||
|  | ||||
|     @mock.patch("documents.serialisers.bulk_edit.set_storage_path") | ||||
|     def test_api_unset_storage_path(self, m): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API data to clear/unset the storage path of a document | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - set_storage_path is called with correct document IDs and None storage_path | ||||
|         """ | ||||
|         m.return_value = "OK" | ||||
|  | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id], | ||||
|                     "method": "set_storage_path", | ||||
|                     "parameters": {"storage_path": None}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|         m.assert_called_once() | ||||
|         args, kwargs = m.call_args | ||||
|  | ||||
|         self.assertListEqual(args[0], [self.doc1.id]) | ||||
|         self.assertEqual(kwargs["storage_path"], None) | ||||
|  | ||||
|     def test_api_invalid_storage_path(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API data to set the storage path of a document | ||||
|             - Given storage_path ID isn't valid | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - set_storage_path is called with correct document IDs and storage_path ID | ||||
|         """ | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id], | ||||
|                     "method": "set_storage_path", | ||||
|                     "parameters": {"storage_path": self.sp1.id + 10}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, 400) | ||||
|         self.async_task.assert_not_called() | ||||
|  | ||||
|     def test_api_set_storage_path_not_provided(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API data to set the storage path of a document | ||||
|             - API data is missing storage path ID | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - set_storage_path is called with correct document IDs and storage_path ID | ||||
|         """ | ||||
|         response = self.client.post( | ||||
|             "/api/documents/bulk_edit/", | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "documents": [self.doc1.id], | ||||
|                     "method": "set_storage_path", | ||||
|                     "parameters": {}, | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(response.status_code, 400) | ||||
|         self.async_task.assert_not_called() | ||||
|  | ||||
|     def test_api_invalid_doc(self): | ||||
|         self.assertEqual(Document.objects.count(), 5) | ||||
|         response = self.client.post( | ||||
| @@ -2203,7 +2349,7 @@ class TestBulkDownload(DirectoriesMixin, APITestCase): | ||||
|         ) | ||||
|  | ||||
|  | ||||
| class TestApiAuth(APITestCase): | ||||
| class TestApiAuth(DirectoriesMixin, APITestCase): | ||||
|     def test_auth_required(self): | ||||
|  | ||||
|         d = Document.objects.create(title="Test") | ||||
| @@ -2256,7 +2402,7 @@ class TestApiAuth(APITestCase): | ||||
|         self.assertIn("X-Version", response) | ||||
|  | ||||
|  | ||||
| class TestRemoteVersion(APITestCase): | ||||
| class TestApiRemoteVersion(DirectoriesMixin, APITestCase): | ||||
|     ENDPOINT = "/api/remote_version/" | ||||
|  | ||||
|     def setUp(self): | ||||
| @@ -2421,3 +2567,81 @@ class TestRemoteVersion(APITestCase): | ||||
|                 "feature_is_set": True, | ||||
|             }, | ||||
|         ) | ||||
|  | ||||
|  | ||||
| class TestApiStoragePaths(DirectoriesMixin, APITestCase): | ||||
|     ENDPOINT = "/api/storage_paths/" | ||||
|  | ||||
|     def setUp(self) -> None: | ||||
|         super().setUp() | ||||
|  | ||||
|         user = User.objects.create(username="temp_admin") | ||||
|         self.client.force_authenticate(user=user) | ||||
|  | ||||
|         self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}") | ||||
|  | ||||
|     def test_api_get_storage_path(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API request to get all storage paths | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - Existing storage paths are returned | ||||
|         """ | ||||
|         response = self.client.get(self.ENDPOINT, format="json") | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|  | ||||
|         self.assertEqual(response.status_code, 200) | ||||
|         self.assertEqual(response.data["count"], 1) | ||||
|  | ||||
|         resp_storage_path = response.data["results"][0] | ||||
|         self.assertEqual(resp_storage_path["id"], self.sp1.id) | ||||
|         self.assertEqual(resp_storage_path["path"], self.sp1.path) | ||||
|  | ||||
|     def test_api_create_storage_path(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API request to create a storage paths | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - Correct HTTP response | ||||
|             - New storage path is created | ||||
|         """ | ||||
|         response = self.client.post( | ||||
|             self.ENDPOINT, | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "name": "A storage path", | ||||
|                     "path": "Somewhere/{asn}", | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|         self.assertEqual(response.status_code, 201) | ||||
|         self.assertEqual(StoragePath.objects.count(), 2) | ||||
|  | ||||
|     def test_api_create_invalid_storage_path(self): | ||||
|         """ | ||||
|         GIVEN: | ||||
|             - API request to create a storage paths | ||||
|             - Storage path format is incorrect | ||||
|         WHEN: | ||||
|             - API is called | ||||
|         THEN: | ||||
|             - Correct HTTP 400 response | ||||
|             - No storage path is created | ||||
|         """ | ||||
|         response = self.client.post( | ||||
|             self.ENDPOINT, | ||||
|             json.dumps( | ||||
|                 { | ||||
|                     "name": "Another storage path", | ||||
|                     "path": "Somewhere/{correspdent}", | ||||
|                 }, | ||||
|             ), | ||||
|             content_type="application/json", | ||||
|         ) | ||||
|         self.assertEqual(response.status_code, 400) | ||||
|         self.assertEqual(StoragePath.objects.count(), 1) | ||||
|   | ||||
							
								
								
									
										456
									
								
								src/documents/tests/test_barcodes.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										456
									
								
								src/documents/tests/test_barcodes.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,456 @@ | ||||
| import os | ||||
| import shutil | ||||
| import tempfile | ||||
| from unittest import mock | ||||
|  | ||||
| from django.conf import settings | ||||
| from django.test import override_settings | ||||
| from django.test import TestCase | ||||
| from documents import barcodes | ||||
| from documents import tasks | ||||
| from documents.tests.utils import DirectoriesMixin | ||||
| from PIL import Image | ||||
|  | ||||
|  | ||||
| class TestBarcode(DirectoriesMixin, TestCase): | ||||
|     def test_barcode_reader(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader2(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pbm", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_distorsion(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT-distorsion.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_distorsion2(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT-distorsion2.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_unreadable(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT-unreadable.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), []) | ||||
|  | ||||
|     def test_barcode_reader_qr(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "qr-code-PATCHT.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_128(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-PATCHT.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_no_barcode(self): | ||||
|         test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png") | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), []) | ||||
|  | ||||
|     def test_barcode_reader_custom_separator(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-custom.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"]) | ||||
|  | ||||
|     def test_barcode_reader_custom_qr_separator(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-qr-custom.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"]) | ||||
|  | ||||
|     def test_barcode_reader_custom_128_separator(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-custom.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"]) | ||||
|  | ||||
|     def test_get_mime_type(self): | ||||
|         tiff_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.tiff", | ||||
|         ) | ||||
|         pdf_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.pdf", | ||||
|         ) | ||||
|         png_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-custom.png", | ||||
|         ) | ||||
|         tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1") | ||||
|         pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2") | ||||
|         shutil.copy(tiff_file, tiff_file_no_extension) | ||||
|         shutil.copy(pdf_file, pdf_file_no_extension) | ||||
|  | ||||
|         self.assertEqual(barcodes.get_file_mime_type(tiff_file), "image/tiff") | ||||
|         self.assertEqual(barcodes.get_file_mime_type(pdf_file), "application/pdf") | ||||
|         self.assertEqual( | ||||
|             barcodes.get_file_mime_type(tiff_file_no_extension), | ||||
|             "image/tiff", | ||||
|         ) | ||||
|         self.assertEqual( | ||||
|             barcodes.get_file_mime_type(pdf_file_no_extension), | ||||
|             "application/pdf", | ||||
|         ) | ||||
|         self.assertEqual(barcodes.get_file_mime_type(png_file), "image/png") | ||||
|  | ||||
|     def test_convert_from_tiff_to_pdf(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.tiff", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff") | ||||
|         shutil.copy(test_file, dst) | ||||
|         target_file = barcodes.convert_from_tiff_to_pdf(dst) | ||||
|         file_extension = os.path.splitext(os.path.basename(target_file))[1] | ||||
|         self.assertTrue(os.path.isfile(target_file)) | ||||
|         self.assertEqual(file_extension, ".pdf") | ||||
|  | ||||
|     def test_convert_error_from_pdf_to_pdf(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.pdf", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf") | ||||
|         shutil.copy(test_file, dst) | ||||
|         self.assertIsNone(barcodes.convert_from_tiff_to_pdf(dst)) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes2(self): | ||||
|         test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, []) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes3(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [1]) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes4(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "several-patcht-codes.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [2, 5]) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes_upsidedown(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle_reverse.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [1]) | ||||
|  | ||||
|     def test_scan_file_for_separating_qr_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-qr.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") | ||||
|     def test_scan_file_for_separating_custom_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-custom.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") | ||||
|     def test_scan_file_for_separating_custom_qr_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-qr-custom.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") | ||||
|     def test_scan_file_for_separating_custom_128_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-custom.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     def test_scan_file_for_separating_wrong_qr_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-custom.pdf", | ||||
|         ) | ||||
|         pages = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, []) | ||||
|  | ||||
|     def test_separate_pages(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         pages = barcodes.separate_pages(test_file, [1]) | ||||
|         self.assertEqual(len(pages), 2) | ||||
|  | ||||
|     def test_separate_pages_no_list(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         with self.assertLogs("paperless.barcodes", level="WARNING") as cm: | ||||
|             pages = barcodes.separate_pages(test_file, []) | ||||
|             self.assertEqual(pages, []) | ||||
|             self.assertEqual( | ||||
|                 cm.output, | ||||
|                 [ | ||||
|                     f"WARNING:paperless.barcodes:No pages to split on!", | ||||
|                 ], | ||||
|             ) | ||||
|  | ||||
|     def test_save_to_dir(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|         barcodes.save_to_dir(test_file, target_dir=tempdir) | ||||
|         target_file = os.path.join(tempdir, "patch-code-t.pdf") | ||||
|         self.assertTrue(os.path.isfile(target_file)) | ||||
|  | ||||
|     def test_save_to_dir2(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         nonexistingdir = "/nowhere" | ||||
|         if os.path.isdir(nonexistingdir): | ||||
|             self.fail("non-existing dir exists") | ||||
|         else: | ||||
|             with self.assertLogs("paperless.barcodes", level="WARNING") as cm: | ||||
|                 barcodes.save_to_dir(test_file, target_dir=nonexistingdir) | ||||
|             self.assertEqual( | ||||
|                 cm.output, | ||||
|                 [ | ||||
|                     f"WARNING:paperless.barcodes:{str(test_file)} or {str(nonexistingdir)} don't exist.", | ||||
|                 ], | ||||
|             ) | ||||
|  | ||||
|     def test_save_to_dir3(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|         barcodes.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir) | ||||
|         target_file = os.path.join(tempdir, "newname.pdf") | ||||
|         self.assertTrue(os.path.isfile(target_file)) | ||||
|  | ||||
|     def test_barcode_splitter(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|         separators = barcodes.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertTrue(separators) | ||||
|         document_list = barcodes.separate_pages(test_file, separators) | ||||
|         self.assertTrue(document_list) | ||||
|         for document in document_list: | ||||
|             barcodes.save_to_dir(document, target_dir=tempdir) | ||||
|         target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf") | ||||
|         target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf") | ||||
|         self.assertTrue(os.path.isfile(target_file1)) | ||||
|         self.assertTrue(os.path.isfile(target_file2)) | ||||
|  | ||||
|     @override_settings(CONSUMER_ENABLE_BARCODES=True) | ||||
|     def test_consume_barcode_file(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf") | ||||
|         shutil.copy(test_file, dst) | ||||
|  | ||||
|         self.assertEqual(tasks.consume_file(dst), "File successfully split") | ||||
|  | ||||
|     @override_settings( | ||||
|         CONSUMER_ENABLE_BARCODES=True, | ||||
|         CONSUMER_BARCODE_TIFF_SUPPORT=True, | ||||
|     ) | ||||
|     def test_consume_barcode_tiff_file(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.tiff", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff") | ||||
|         shutil.copy(test_file, dst) | ||||
|  | ||||
|         self.assertEqual(tasks.consume_file(dst), "File successfully split") | ||||
|  | ||||
|     @override_settings( | ||||
|         CONSUMER_ENABLE_BARCODES=True, | ||||
|         CONSUMER_BARCODE_TIFF_SUPPORT=True, | ||||
|     ) | ||||
|     @mock.patch("documents.consumer.Consumer.try_consume_file") | ||||
|     def test_consume_barcode_unsupported_jpg_file(self, m): | ||||
|         """ | ||||
|         This test assumes barcode and TIFF support are enabled and | ||||
|         the user uploads an unsupported image file (e.g. jpg) | ||||
|  | ||||
|         The function shouldn't try to scan for separating barcodes | ||||
|         and continue archiving the file as is. | ||||
|         """ | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.jpg", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg") | ||||
|         shutil.copy(test_file, dst) | ||||
|         with self.assertLogs("paperless.tasks", level="WARNING") as cm: | ||||
|             self.assertIn("Success", tasks.consume_file(dst)) | ||||
|         self.assertListEqual( | ||||
|             cm.output, | ||||
|             [ | ||||
|                 "WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg", | ||||
|             ], | ||||
|         ) | ||||
|         m.assert_called_once() | ||||
|  | ||||
|         args, kwargs = m.call_args | ||||
|         self.assertIsNone(kwargs["override_filename"]) | ||||
|         self.assertIsNone(kwargs["override_title"]) | ||||
|         self.assertIsNone(kwargs["override_correspondent_id"]) | ||||
|         self.assertIsNone(kwargs["override_document_type_id"]) | ||||
|         self.assertIsNone(kwargs["override_tag_ids"]) | ||||
|  | ||||
|     @override_settings( | ||||
|         CONSUMER_ENABLE_BARCODES=True, | ||||
|         CONSUMER_BARCODE_TIFF_SUPPORT=True, | ||||
|     ) | ||||
|     def test_consume_barcode_supported_no_extension_file(self): | ||||
|         """ | ||||
|         This test assumes barcode and TIFF support are enabled and | ||||
|         the user uploads a supported image file, but without extension | ||||
|         """ | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.tiff", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle") | ||||
|         shutil.copy(test_file, dst) | ||||
|  | ||||
|         self.assertEqual(tasks.consume_file(dst), "File successfully split") | ||||
| @@ -1,23 +1,64 @@ | ||||
| import textwrap | ||||
| import unittest | ||||
| from unittest import mock | ||||
|  | ||||
| from django.core.checks import Error | ||||
| from django.test import override_settings | ||||
| from django.test import TestCase | ||||
| from documents.checks import changed_password_check | ||||
| from documents.checks import parser_check | ||||
| from documents.models import Document | ||||
|  | ||||
| from ..checks import changed_password_check | ||||
| from ..checks import parser_check | ||||
| from ..models import Document | ||||
| from ..signals import document_consumer_declaration | ||||
| from .factories import DocumentFactory | ||||
|  | ||||
|  | ||||
| class ChecksTestCase(TestCase): | ||||
| class TestDocumentChecks(TestCase): | ||||
|     def test_changed_password_check_empty_db(self): | ||||
|         self.assertEqual(changed_password_check(None), []) | ||||
|         self.assertListEqual(changed_password_check(None), []) | ||||
|  | ||||
|     def test_changed_password_check_no_encryption(self): | ||||
|         DocumentFactory.create(storage_type=Document.STORAGE_TYPE_UNENCRYPTED) | ||||
|         self.assertEqual(changed_password_check(None), []) | ||||
|         self.assertListEqual(changed_password_check(None), []) | ||||
|  | ||||
|     def test_encrypted_missing_passphrase(self): | ||||
|         DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG) | ||||
|         msgs = changed_password_check(None) | ||||
|         self.assertEqual(len(msgs), 1) | ||||
|         msg_text = msgs[0].msg | ||||
|         self.assertEqual( | ||||
|             msg_text, | ||||
|             "The database contains encrypted documents but no password is set.", | ||||
|         ) | ||||
|  | ||||
|     @override_settings( | ||||
|         PASSPHRASE="test", | ||||
|     ) | ||||
|     @mock.patch("paperless.db.GnuPG.decrypted") | ||||
|     @mock.patch("documents.models.Document.source_file") | ||||
|     def test_encrypted_decrypt_fails(self, mock_decrypted, mock_source_file): | ||||
|  | ||||
|         mock_decrypted.return_value = None | ||||
|         mock_source_file.return_value = b"" | ||||
|  | ||||
|         DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG) | ||||
|  | ||||
|         msgs = changed_password_check(None) | ||||
|  | ||||
|         self.assertEqual(len(msgs), 1) | ||||
|         msg_text = msgs[0].msg | ||||
|         self.assertEqual( | ||||
|             msg_text, | ||||
|             textwrap.dedent( | ||||
|                 """ | ||||
|                 The current password doesn't match the password of the | ||||
|                 existing documents. | ||||
|  | ||||
|                 If you intend to change your password, you must first export | ||||
|                 all of the old documents, start fresh with the new password | ||||
|                 and then re-import them." | ||||
|                 """, | ||||
|             ), | ||||
|         ) | ||||
|  | ||||
|     def test_parser_check(self): | ||||
|  | ||||
|   | ||||
| @@ -1,10 +1,7 @@ | ||||
| import os | ||||
| import shutil | ||||
| import tempfile | ||||
| from unittest import mock | ||||
|  | ||||
| from django.conf import settings | ||||
| from django.test import override_settings | ||||
| from django.test import TestCase | ||||
| from django.utils import timezone | ||||
| from documents import tasks | ||||
| @@ -15,10 +12,9 @@ from documents.models import Tag | ||||
| from documents.sanity_checker import SanityCheckFailedException | ||||
| from documents.sanity_checker import SanityCheckMessages | ||||
| from documents.tests.utils import DirectoriesMixin | ||||
| from PIL import Image | ||||
|  | ||||
|  | ||||
| class TestTasks(DirectoriesMixin, TestCase): | ||||
| class TestIndexReindex(DirectoriesMixin, TestCase): | ||||
|     def test_index_reindex(self): | ||||
|         Document.objects.create( | ||||
|             title="test", | ||||
| @@ -43,6 +39,8 @@ class TestTasks(DirectoriesMixin, TestCase): | ||||
|  | ||||
|         tasks.index_optimize() | ||||
|  | ||||
|  | ||||
| class TestClassifier(DirectoriesMixin, TestCase): | ||||
|     @mock.patch("documents.tasks.load_classifier") | ||||
|     def test_train_classifier_no_auto_matching(self, load_classifier): | ||||
|         tasks.train_classifier() | ||||
| @@ -93,442 +91,8 @@ class TestTasks(DirectoriesMixin, TestCase): | ||||
|         mtime3 = os.stat(settings.MODEL_FILE).st_mtime | ||||
|         self.assertNotEqual(mtime2, mtime3) | ||||
|  | ||||
|     def test_barcode_reader(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader2(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pbm", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_distorsion(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT-distorsion.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_distorsion2(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT-distorsion2.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_unreadable(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-PATCHT-unreadable.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(tasks.barcode_reader(img), []) | ||||
|  | ||||
|     def test_barcode_reader_qr(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "qr-code-PATCHT.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_128(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-PATCHT.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         separator_barcode = str(settings.CONSUMER_BARCODE_STRING) | ||||
|         self.assertEqual(tasks.barcode_reader(img), [separator_barcode]) | ||||
|  | ||||
|     def test_barcode_reader_no_barcode(self): | ||||
|         test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png") | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(tasks.barcode_reader(img), []) | ||||
|  | ||||
|     def test_barcode_reader_custom_separator(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-custom.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"]) | ||||
|  | ||||
|     def test_barcode_reader_custom_qr_separator(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-qr-custom.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"]) | ||||
|  | ||||
|     def test_barcode_reader_custom_128_separator(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-custom.png", | ||||
|         ) | ||||
|         img = Image.open(test_file) | ||||
|         self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"]) | ||||
|  | ||||
|     def test_get_mime_type(self): | ||||
|         tiff_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.tiff", | ||||
|         ) | ||||
|         pdf_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.pdf", | ||||
|         ) | ||||
|         png_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-custom.png", | ||||
|         ) | ||||
|         tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1") | ||||
|         pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2") | ||||
|         shutil.copy(tiff_file, tiff_file_no_extension) | ||||
|         shutil.copy(pdf_file, pdf_file_no_extension) | ||||
|  | ||||
|         self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff") | ||||
|         self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf") | ||||
|         self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff") | ||||
|         self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf") | ||||
|         self.assertEqual(tasks.get_file_type(png_file), "image/png") | ||||
|  | ||||
|     def test_convert_from_tiff_to_pdf(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.tiff", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff") | ||||
|         shutil.copy(test_file, dst) | ||||
|         target_file = tasks.convert_from_tiff_to_pdf(dst) | ||||
|         file_extension = os.path.splitext(os.path.basename(target_file))[1] | ||||
|         self.assertTrue(os.path.isfile(target_file)) | ||||
|         self.assertEqual(file_extension, ".pdf") | ||||
|  | ||||
|     def test_convert_error_from_pdf_to_pdf(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.pdf", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf") | ||||
|         shutil.copy(test_file, dst) | ||||
|         self.assertIsNone(tasks.convert_from_tiff_to_pdf(dst)) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes2(self): | ||||
|         test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, []) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes3(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [1]) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes4(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "several-patcht-codes.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [2, 5]) | ||||
|  | ||||
|     def test_scan_file_for_separating_barcodes_upsidedown(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle_reverse.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [1]) | ||||
|  | ||||
|     def test_scan_file_for_separating_qr_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-qr.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") | ||||
|     def test_scan_file_for_separating_custom_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-custom.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") | ||||
|     def test_scan_file_for_separating_custom_qr_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-qr-custom.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     @override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE") | ||||
|     def test_scan_file_for_separating_custom_128_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-128-custom.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, [0]) | ||||
|  | ||||
|     def test_scan_file_for_separating_wrong_qr_barcodes(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "barcode-39-custom.pdf", | ||||
|         ) | ||||
|         pages = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertEqual(pages, []) | ||||
|  | ||||
|     def test_separate_pages(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         pages = tasks.separate_pages(test_file, [1]) | ||||
|         self.assertEqual(len(pages), 2) | ||||
|  | ||||
|     def test_separate_pages_no_list(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         with self.assertLogs("paperless.tasks", level="WARNING") as cm: | ||||
|             pages = tasks.separate_pages(test_file, []) | ||||
|             self.assertEqual(pages, []) | ||||
|             self.assertEqual( | ||||
|                 cm.output, | ||||
|                 [ | ||||
|                     f"WARNING:paperless.tasks:No pages to split on!", | ||||
|                 ], | ||||
|             ) | ||||
|  | ||||
|     def test_save_to_dir(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|         tasks.save_to_dir(test_file, target_dir=tempdir) | ||||
|         target_file = os.path.join(tempdir, "patch-code-t.pdf") | ||||
|         self.assertTrue(os.path.isfile(target_file)) | ||||
|  | ||||
|     def test_save_to_dir2(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         nonexistingdir = "/nowhere" | ||||
|         if os.path.isdir(nonexistingdir): | ||||
|             self.fail("non-existing dir exists") | ||||
|         else: | ||||
|             with self.assertLogs("paperless.tasks", level="WARNING") as cm: | ||||
|                 tasks.save_to_dir(test_file, target_dir=nonexistingdir) | ||||
|             self.assertEqual( | ||||
|                 cm.output, | ||||
|                 [ | ||||
|                     f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist.", | ||||
|                 ], | ||||
|             ) | ||||
|  | ||||
|     def test_save_to_dir3(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t.pdf", | ||||
|         ) | ||||
|         tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|         tasks.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir) | ||||
|         target_file = os.path.join(tempdir, "newname.pdf") | ||||
|         self.assertTrue(os.path.isfile(target_file)) | ||||
|  | ||||
|     def test_barcode_splitter(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR) | ||||
|         separators = tasks.scan_file_for_separating_barcodes(test_file) | ||||
|         self.assertTrue(separators) | ||||
|         document_list = tasks.separate_pages(test_file, separators) | ||||
|         self.assertTrue(document_list) | ||||
|         for document in document_list: | ||||
|             tasks.save_to_dir(document, target_dir=tempdir) | ||||
|         target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf") | ||||
|         target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf") | ||||
|         self.assertTrue(os.path.isfile(target_file1)) | ||||
|         self.assertTrue(os.path.isfile(target_file2)) | ||||
|  | ||||
|     @override_settings(CONSUMER_ENABLE_BARCODES=True) | ||||
|     def test_consume_barcode_file(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.pdf", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf") | ||||
|         shutil.copy(test_file, dst) | ||||
|  | ||||
|         self.assertEqual(tasks.consume_file(dst), "File successfully split") | ||||
|  | ||||
|     @override_settings( | ||||
|         CONSUMER_ENABLE_BARCODES=True, | ||||
|         CONSUMER_BARCODE_TIFF_SUPPORT=True, | ||||
|     ) | ||||
|     def test_consume_barcode_tiff_file(self): | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.tiff", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff") | ||||
|         shutil.copy(test_file, dst) | ||||
|  | ||||
|         self.assertEqual(tasks.consume_file(dst), "File successfully split") | ||||
|  | ||||
|     @override_settings( | ||||
|         CONSUMER_ENABLE_BARCODES=True, | ||||
|         CONSUMER_BARCODE_TIFF_SUPPORT=True, | ||||
|     ) | ||||
|     @mock.patch("documents.consumer.Consumer.try_consume_file") | ||||
|     def test_consume_barcode_unsupported_jpg_file(self, m): | ||||
|         """ | ||||
|         This test assumes barcode and TIFF support are enabled and | ||||
|         the user uploads an unsupported image file (e.g. jpg) | ||||
|  | ||||
|         The function shouldn't try to scan for separating barcodes | ||||
|         and continue archiving the file as is. | ||||
|         """ | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "simple.jpg", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg") | ||||
|         shutil.copy(test_file, dst) | ||||
|         with self.assertLogs("paperless.tasks", level="WARNING") as cm: | ||||
|             self.assertIn("Success", tasks.consume_file(dst)) | ||||
|         self.assertEqual( | ||||
|             cm.output, | ||||
|             [ | ||||
|                 "WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg", | ||||
|             ], | ||||
|         ) | ||||
|         m.assert_called_once() | ||||
|  | ||||
|         args, kwargs = m.call_args | ||||
|         self.assertIsNone(kwargs["override_filename"]) | ||||
|         self.assertIsNone(kwargs["override_title"]) | ||||
|         self.assertIsNone(kwargs["override_correspondent_id"]) | ||||
|         self.assertIsNone(kwargs["override_document_type_id"]) | ||||
|         self.assertIsNone(kwargs["override_tag_ids"]) | ||||
|  | ||||
|     @override_settings( | ||||
|         CONSUMER_ENABLE_BARCODES=True, | ||||
|         CONSUMER_BARCODE_TIFF_SUPPORT=True, | ||||
|     ) | ||||
|     def test_consume_barcode_supported_no_extension_file(self): | ||||
|         """ | ||||
|         This test assumes barcode and TIFF support are enabled and | ||||
|         the user uploads a supported image file, but without extension | ||||
|         """ | ||||
|         test_file = os.path.join( | ||||
|             os.path.dirname(__file__), | ||||
|             "samples", | ||||
|             "barcodes", | ||||
|             "patch-code-t-middle.tiff", | ||||
|         ) | ||||
|         dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle") | ||||
|         shutil.copy(test_file, dst) | ||||
|  | ||||
|         self.assertEqual(tasks.consume_file(dst), "File successfully split") | ||||
|  | ||||
| class TestSanityCheck(DirectoriesMixin, TestCase): | ||||
|     @mock.patch("documents.tasks.sanity_checker.check_sanity") | ||||
|     def test_sanity_check_success(self, m): | ||||
|         m.return_value = SanityCheckMessages() | ||||
| @@ -565,6 +129,8 @@ class TestTasks(DirectoriesMixin, TestCase): | ||||
|         ) | ||||
|         m.assert_called_once() | ||||
|  | ||||
|  | ||||
| class TestBulkUpdate(DirectoriesMixin, TestCase): | ||||
|     def test_bulk_update_documents(self): | ||||
|         doc1 = Document.objects.create( | ||||
|             title="test", | ||||
|   | ||||
| @@ -1,9 +1,28 @@ | ||||
| import shutil | ||||
| import tempfile | ||||
|  | ||||
| from django.conf import settings | ||||
| from django.contrib.auth.models import User | ||||
| from django.test import override_settings | ||||
| from django.test import TestCase | ||||
|  | ||||
|  | ||||
| class TestViews(TestCase): | ||||
|     @classmethod | ||||
|     def setUpClass(cls): | ||||
|         # Provide a dummy static dir to silence whitenoise warnings | ||||
|         cls.static_dir = tempfile.mkdtemp() | ||||
|  | ||||
|         cls.override = override_settings( | ||||
|             STATIC_ROOT=cls.static_dir, | ||||
|         ) | ||||
|         cls.override.enable() | ||||
|  | ||||
|     @classmethod | ||||
|     def tearDownClass(cls): | ||||
|         shutil.rmtree(cls.static_dir, ignore_errors=True) | ||||
|         cls.override.disable() | ||||
|  | ||||
|     def setUp(self) -> None: | ||||
|         self.user = User.objects.create_user("testuser") | ||||
|  | ||||
|   | ||||
| @@ -19,6 +19,7 @@ def setup_directories(): | ||||
|     dirs.scratch_dir = tempfile.mkdtemp() | ||||
|     dirs.media_dir = tempfile.mkdtemp() | ||||
|     dirs.consumption_dir = tempfile.mkdtemp() | ||||
|     dirs.static_dir = tempfile.mkdtemp() | ||||
|     dirs.index_dir = os.path.join(dirs.data_dir, "index") | ||||
|     dirs.originals_dir = os.path.join(dirs.media_dir, "documents", "originals") | ||||
|     dirs.thumbnail_dir = os.path.join(dirs.media_dir, "documents", "thumbnails") | ||||
| @@ -42,6 +43,7 @@ def setup_directories(): | ||||
|         CONSUMPTION_DIR=dirs.consumption_dir, | ||||
|         LOGGING_DIR=dirs.logging_dir, | ||||
|         INDEX_DIR=dirs.index_dir, | ||||
|         STATIC_ROOT=dirs.static_dir, | ||||
|         MODEL_FILE=os.path.join(dirs.data_dir, "classification_model.pickle"), | ||||
|         MEDIA_LOCK=os.path.join(dirs.media_dir, "media.lock"), | ||||
|     ) | ||||
| @@ -55,6 +57,7 @@ def remove_dirs(dirs): | ||||
|     shutil.rmtree(dirs.data_dir, ignore_errors=True) | ||||
|     shutil.rmtree(dirs.scratch_dir, ignore_errors=True) | ||||
|     shutil.rmtree(dirs.consumption_dir, ignore_errors=True) | ||||
|     shutil.rmtree(dirs.static_dir, ignore_errors=True) | ||||
|     dirs.settings_override.disable() | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -746,7 +746,7 @@ class RemoteVersionView(GenericAPIView): | ||||
|  | ||||
|  | ||||
| class StoragePathViewSet(ModelViewSet): | ||||
|     model = DocumentType | ||||
|     model = StoragePath | ||||
|  | ||||
|     queryset = StoragePath.objects.annotate(document_count=Count("documents")).order_by( | ||||
|         Lower("name"), | ||||
|   | ||||
| @@ -1,4 +1,6 @@ | ||||
| import os | ||||
| import shutil | ||||
| import tempfile | ||||
| import uuid | ||||
| from typing import ContextManager | ||||
| from unittest import mock | ||||
| @@ -225,11 +227,18 @@ class TestParser(DirectoriesMixin, TestCase): | ||||
|     def test_image_simple_alpha(self): | ||||
|         parser = RasterisedDocumentParser(None) | ||||
|  | ||||
|         parser.parse(os.path.join(self.SAMPLE_FILES, "simple-alpha.png"), "image/png") | ||||
|         with tempfile.TemporaryDirectory() as tempdir: | ||||
|             # Copy sample file to temp directory, as the parsing changes the file | ||||
|             # and this makes it modified to Git | ||||
|             sample_file = os.path.join(self.SAMPLE_FILES, "simple-alpha.png") | ||||
|             dest_file = os.path.join(tempdir, "simple-alpha.png") | ||||
|             shutil.copy(sample_file, dest_file) | ||||
|  | ||||
|         self.assertTrue(os.path.isfile(parser.archive_path)) | ||||
|             parser.parse(dest_file, "image/png") | ||||
|  | ||||
|         self.assertContainsStrings(parser.get_text(), ["This is a test document."]) | ||||
|             self.assertTrue(os.path.isfile(parser.archive_path)) | ||||
|  | ||||
|             self.assertContainsStrings(parser.get_text(), ["This is a test document."]) | ||||
|  | ||||
|     def test_image_calc_a4_dpi(self): | ||||
|         parser = RasterisedDocumentParser(None) | ||||
|   | ||||
| @@ -16,3 +16,7 @@ source = | ||||
|   ./ | ||||
| omit = | ||||
|   */tests/* | ||||
| 	manage.py | ||||
|   paperless/workers.py | ||||
| 	paperless/wsgi.py | ||||
| 	paperless/auth.py | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 shamoon
					shamoon