mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Merge branch 'dev' into feature-redo-ocr
This commit is contained in:
186
src/documents/barcodes.py
Normal file
186
src/documents/barcodes.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from functools import lru_cache
|
||||
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
|
||||
|
||||
import magic
|
||||
from django.conf import settings
|
||||
from pdf2image import convert_from_path
|
||||
from pikepdf import Pdf
|
||||
from PIL import Image
|
||||
from PIL import ImageSequence
|
||||
from pyzbar import pyzbar
|
||||
|
||||
logger = logging.getLogger("paperless.barcodes")
|
||||
|
||||
|
||||
@lru_cache(maxsize=8)
|
||||
def supported_file_type(mime_type) -> bool:
|
||||
"""
|
||||
Determines if the file is valid for barcode
|
||||
processing, based on MIME type and settings
|
||||
|
||||
:return: True if the file is supported, False otherwise
|
||||
"""
|
||||
supported_mime = ["application/pdf"]
|
||||
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
|
||||
supported_mime += ["image/tiff"]
|
||||
|
||||
return mime_type in supported_mime
|
||||
|
||||
|
||||
def barcode_reader(image) -> List[str]:
|
||||
"""
|
||||
Read any barcodes contained in image
|
||||
Returns a list containing all found barcodes
|
||||
"""
|
||||
barcodes = []
|
||||
# Decode the barcode image
|
||||
detected_barcodes = pyzbar.decode(image)
|
||||
|
||||
if detected_barcodes:
|
||||
# Traverse through all the detected barcodes in image
|
||||
for barcode in detected_barcodes:
|
||||
if barcode.data:
|
||||
decoded_barcode = barcode.data.decode("utf-8")
|
||||
barcodes.append(decoded_barcode)
|
||||
logger.debug(
|
||||
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
|
||||
)
|
||||
return barcodes
|
||||
|
||||
|
||||
def get_file_mime_type(path: str) -> str:
|
||||
"""
|
||||
Determines the file type, based on MIME type.
|
||||
|
||||
Returns the MIME type.
|
||||
"""
|
||||
mime_type = magic.from_file(path, mime=True)
|
||||
logger.debug(f"Detected mime type: {mime_type}")
|
||||
return mime_type
|
||||
|
||||
|
||||
def convert_from_tiff_to_pdf(filepath: str) -> str:
|
||||
"""
|
||||
converts a given TIFF image file to pdf into a temporary directory.
|
||||
|
||||
Returns the new pdf file.
|
||||
"""
|
||||
file_name = os.path.splitext(os.path.basename(filepath))[0]
|
||||
mime_type = get_file_mime_type(filepath)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
# use old file name with pdf extension
|
||||
if mime_type == "image/tiff":
|
||||
newpath = os.path.join(tempdir, file_name + ".pdf")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
|
||||
)
|
||||
return None
|
||||
with Image.open(filepath) as image:
|
||||
images = []
|
||||
for i, page in enumerate(ImageSequence.Iterator(image)):
|
||||
page = page.convert("RGB")
|
||||
images.append(page)
|
||||
try:
|
||||
if len(images) == 1:
|
||||
images[0].save(newpath)
|
||||
else:
|
||||
images[0].save(newpath, save_all=True, append_images=images[1:])
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
f"Could not save the file as pdf. Error: {str(e)}",
|
||||
)
|
||||
return None
|
||||
return newpath
|
||||
|
||||
|
||||
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
|
||||
"""
|
||||
Scan the provided pdf file for page separating barcodes
|
||||
Returns a list of pagenumbers, which separate the file
|
||||
"""
|
||||
separator_page_numbers = []
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
# use a temporary directory in case the file os too big to handle in memory
|
||||
with tempfile.TemporaryDirectory() as path:
|
||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||
for current_page_number, page in enumerate(pages_from_path):
|
||||
current_barcodes = barcode_reader(page)
|
||||
if separator_barcode in current_barcodes:
|
||||
separator_page_numbers.append(current_page_number)
|
||||
return separator_page_numbers
|
||||
|
||||
|
||||
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
|
||||
"""
|
||||
Separate the provided pdf file on the pages_to_split_on.
|
||||
The pages which are defined by page_numbers will be removed.
|
||||
Returns a list of (temporary) filepaths to consume.
|
||||
These will need to be deleted later.
|
||||
"""
|
||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
fname = os.path.splitext(os.path.basename(filepath))[0]
|
||||
pdf = Pdf.open(filepath)
|
||||
document_paths = []
|
||||
logger.debug(f"Temp dir is {str(tempdir)}")
|
||||
if not pages_to_split_on:
|
||||
logger.warning("No pages to split on!")
|
||||
else:
|
||||
# go from the first page to the first separator page
|
||||
dst = Pdf.new()
|
||||
for n, page in enumerate(pdf.pages):
|
||||
if n < pages_to_split_on[0]:
|
||||
dst.pages.append(page)
|
||||
output_filename = f"{fname}_document_0.pdf"
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths = [savepath]
|
||||
|
||||
# iterate through the rest of the document
|
||||
for count, page_number in enumerate(pages_to_split_on):
|
||||
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
||||
dst = Pdf.new()
|
||||
try:
|
||||
next_page = pages_to_split_on[count + 1]
|
||||
except IndexError:
|
||||
next_page = len(pdf.pages)
|
||||
# skip the first page_number. This contains the barcode page
|
||||
for page in range(page_number + 1, next_page):
|
||||
logger.debug(
|
||||
f"page_number: {str(page_number)} next_page: {str(next_page)}",
|
||||
)
|
||||
dst.pages.append(pdf.pages[page])
|
||||
output_filename = f"{fname}_document_{str(count + 1)}.pdf"
|
||||
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths.append(savepath)
|
||||
logger.debug(f"Temp files are {str(document_paths)}")
|
||||
return document_paths
|
||||
|
||||
|
||||
def save_to_dir(
|
||||
filepath: str,
|
||||
newname: str = None,
|
||||
target_dir: str = settings.CONSUMPTION_DIR,
|
||||
):
|
||||
"""
|
||||
Copies filepath to target_dir.
|
||||
Optionally rename the file.
|
||||
"""
|
||||
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
||||
dst = shutil.copy(filepath, target_dir)
|
||||
logging.debug(f"saved {str(filepath)} to {str(dst)}")
|
||||
if newname:
|
||||
dst_new = os.path.join(target_dir, newname)
|
||||
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
|
||||
os.rename(dst, dst_new)
|
||||
else:
|
||||
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
@@ -3,7 +3,9 @@ import sys
|
||||
from django.core.management.commands.loaddata import Command as LoadDataCommand
|
||||
|
||||
|
||||
class Command(LoadDataCommand):
|
||||
# This class is used to migrate data between databases
|
||||
# That's difficult to test
|
||||
class Command(LoadDataCommand): # pragma: nocover
|
||||
"""
|
||||
Allow the loading of data from standard in. Sourced originally from:
|
||||
https://gist.github.com/bmispelon/ad5a2c333443b3a1d051 (MIT licensed)
|
||||
|
@@ -87,10 +87,10 @@ def _convert_thumbnails_to_webp(apps, schema_editor):
|
||||
) as pool:
|
||||
pool.map(_do_convert, work_packages)
|
||||
|
||||
end = time.time()
|
||||
duration = end - start
|
||||
end = time.time()
|
||||
duration = end - start
|
||||
|
||||
logger.info(f"Conversion completed in {duration:.3f}s")
|
||||
logger.info(f"Conversion completed in {duration:.3f}s")
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
@@ -539,8 +539,6 @@ class BulkDownloadSerializer(DocumentListSerializer):
|
||||
|
||||
|
||||
class StoragePathSerializer(MatchingModelSerializer):
|
||||
document_count = serializers.IntegerField(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = StoragePath
|
||||
fields = (
|
||||
@@ -588,10 +586,6 @@ class UiSettingsViewSerializer(serializers.ModelSerializer):
|
||||
"settings",
|
||||
]
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
super().update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
def create(self, validated_data):
|
||||
ui_settings = UiSettings.objects.update_or_create(
|
||||
user=validated_data.get("user"),
|
||||
|
@@ -6,13 +6,13 @@ from pathlib import Path
|
||||
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
|
||||
from typing import Type
|
||||
|
||||
import magic
|
||||
import tqdm
|
||||
from asgiref.sync import async_to_sync
|
||||
from channels.layers import get_channel_layer
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.db.models.signals import post_save
|
||||
from documents import barcodes
|
||||
from documents import index
|
||||
from documents import sanity_checker
|
||||
from documents.classifier import DocumentClassifier
|
||||
@@ -28,11 +28,6 @@ from documents.parsers import DocumentParser
|
||||
from documents.parsers import get_parser_class_for_mime_type
|
||||
from documents.parsers import ParseError
|
||||
from documents.sanity_checker import SanityCheckFailedException
|
||||
from pdf2image import convert_from_path
|
||||
from pikepdf import Pdf
|
||||
from PIL import Image
|
||||
from PIL import ImageSequence
|
||||
from pyzbar import pyzbar
|
||||
from whoosh.writing import AsyncWriter
|
||||
|
||||
|
||||
@@ -83,161 +78,6 @@ def train_classifier():
|
||||
logger.warning("Classifier error: " + str(e))
|
||||
|
||||
|
||||
def barcode_reader(image) -> List[str]:
|
||||
"""
|
||||
Read any barcodes contained in image
|
||||
Returns a list containing all found barcodes
|
||||
"""
|
||||
barcodes = []
|
||||
# Decode the barcode image
|
||||
detected_barcodes = pyzbar.decode(image)
|
||||
|
||||
if detected_barcodes:
|
||||
# Traverse through all the detected barcodes in image
|
||||
for barcode in detected_barcodes:
|
||||
if barcode.data:
|
||||
decoded_barcode = barcode.data.decode("utf-8")
|
||||
barcodes.append(decoded_barcode)
|
||||
logger.debug(
|
||||
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
|
||||
)
|
||||
return barcodes
|
||||
|
||||
|
||||
def get_file_type(path: str) -> str:
|
||||
"""
|
||||
Determines the file type, based on MIME type.
|
||||
|
||||
Returns the MIME type.
|
||||
"""
|
||||
mime_type = magic.from_file(path, mime=True)
|
||||
logger.debug(f"Detected mime type: {mime_type}")
|
||||
return mime_type
|
||||
|
||||
|
||||
def convert_from_tiff_to_pdf(filepath: str) -> str:
|
||||
"""
|
||||
converts a given TIFF image file to pdf into a temporary directory.
|
||||
|
||||
Returns the new pdf file.
|
||||
"""
|
||||
file_name = os.path.splitext(os.path.basename(filepath))[0]
|
||||
mime_type = get_file_type(filepath)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
# use old file name with pdf extension
|
||||
if mime_type == "image/tiff":
|
||||
newpath = os.path.join(tempdir, file_name + ".pdf")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
|
||||
)
|
||||
return None
|
||||
with Image.open(filepath) as image:
|
||||
images = []
|
||||
for i, page in enumerate(ImageSequence.Iterator(image)):
|
||||
page = page.convert("RGB")
|
||||
images.append(page)
|
||||
try:
|
||||
if len(images) == 1:
|
||||
images[0].save(newpath)
|
||||
else:
|
||||
images[0].save(newpath, save_all=True, append_images=images[1:])
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
f"Could not save the file as pdf. Error: {str(e)}",
|
||||
)
|
||||
return None
|
||||
return newpath
|
||||
|
||||
|
||||
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
|
||||
"""
|
||||
Scan the provided pdf file for page separating barcodes
|
||||
Returns a list of pagenumbers, which separate the file
|
||||
"""
|
||||
separator_page_numbers = []
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
# use a temporary directory in case the file os too big to handle in memory
|
||||
with tempfile.TemporaryDirectory() as path:
|
||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||
for current_page_number, page in enumerate(pages_from_path):
|
||||
current_barcodes = barcode_reader(page)
|
||||
if separator_barcode in current_barcodes:
|
||||
separator_page_numbers.append(current_page_number)
|
||||
return separator_page_numbers
|
||||
|
||||
|
||||
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
|
||||
"""
|
||||
Separate the provided pdf file on the pages_to_split_on.
|
||||
The pages which are defined by page_numbers will be removed.
|
||||
Returns a list of (temporary) filepaths to consume.
|
||||
These will need to be deleted later.
|
||||
"""
|
||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
fname = os.path.splitext(os.path.basename(filepath))[0]
|
||||
pdf = Pdf.open(filepath)
|
||||
document_paths = []
|
||||
logger.debug(f"Temp dir is {str(tempdir)}")
|
||||
if not pages_to_split_on:
|
||||
logger.warning("No pages to split on!")
|
||||
else:
|
||||
# go from the first page to the first separator page
|
||||
dst = Pdf.new()
|
||||
for n, page in enumerate(pdf.pages):
|
||||
if n < pages_to_split_on[0]:
|
||||
dst.pages.append(page)
|
||||
output_filename = f"{fname}_document_0.pdf"
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths = [savepath]
|
||||
|
||||
# iterate through the rest of the document
|
||||
for count, page_number in enumerate(pages_to_split_on):
|
||||
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
||||
dst = Pdf.new()
|
||||
try:
|
||||
next_page = pages_to_split_on[count + 1]
|
||||
except IndexError:
|
||||
next_page = len(pdf.pages)
|
||||
# skip the first page_number. This contains the barcode page
|
||||
for page in range(page_number + 1, next_page):
|
||||
logger.debug(
|
||||
f"page_number: {str(page_number)} next_page: {str(next_page)}",
|
||||
)
|
||||
dst.pages.append(pdf.pages[page])
|
||||
output_filename = f"{fname}_document_{str(count + 1)}.pdf"
|
||||
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths.append(savepath)
|
||||
logger.debug(f"Temp files are {str(document_paths)}")
|
||||
return document_paths
|
||||
|
||||
|
||||
def save_to_dir(
|
||||
filepath: str,
|
||||
newname: str = None,
|
||||
target_dir: str = settings.CONSUMPTION_DIR,
|
||||
):
|
||||
"""
|
||||
Copies filepath to target_dir.
|
||||
Optionally rename the file.
|
||||
"""
|
||||
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
||||
dst = shutil.copy(filepath, target_dir)
|
||||
logging.debug(f"saved {str(filepath)} to {str(dst)}")
|
||||
if newname:
|
||||
dst_new = os.path.join(target_dir, newname)
|
||||
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
|
||||
os.rename(dst, dst_new)
|
||||
else:
|
||||
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
||||
|
||||
|
||||
def consume_file(
|
||||
path,
|
||||
override_filename=None,
|
||||
@@ -251,32 +91,30 @@ def consume_file(
|
||||
|
||||
# check for separators in current document
|
||||
if settings.CONSUMER_ENABLE_BARCODES:
|
||||
separators = []
|
||||
document_list = []
|
||||
converted_tiff = None
|
||||
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
|
||||
supported_mime = ["image/tiff", "application/pdf"]
|
||||
else:
|
||||
supported_mime = ["application/pdf"]
|
||||
mime_type = get_file_type(path)
|
||||
if mime_type not in supported_mime:
|
||||
|
||||
mime_type = barcodes.get_file_mime_type(path)
|
||||
|
||||
if not barcodes.supported_file_type(mime_type):
|
||||
# if not supported, skip this routine
|
||||
logger.warning(
|
||||
f"Unsupported file format for barcode reader: {str(mime_type)}",
|
||||
)
|
||||
else:
|
||||
separators = []
|
||||
document_list = []
|
||||
|
||||
if mime_type == "image/tiff":
|
||||
file_to_process = convert_from_tiff_to_pdf(path)
|
||||
file_to_process = barcodes.convert_from_tiff_to_pdf(path)
|
||||
else:
|
||||
file_to_process = path
|
||||
|
||||
separators = scan_file_for_separating_barcodes(file_to_process)
|
||||
separators = barcodes.scan_file_for_separating_barcodes(file_to_process)
|
||||
|
||||
if separators:
|
||||
logger.debug(
|
||||
f"Pages with separators found in: {str(path)}",
|
||||
)
|
||||
document_list = separate_pages(file_to_process, separators)
|
||||
document_list = barcodes.separate_pages(file_to_process, separators)
|
||||
|
||||
if document_list:
|
||||
for n, document in enumerate(document_list):
|
||||
@@ -286,14 +124,18 @@ def consume_file(
|
||||
newname = f"{str(n)}_" + override_filename
|
||||
else:
|
||||
newname = None
|
||||
save_to_dir(document, newname=newname)
|
||||
barcodes.save_to_dir(document, newname=newname)
|
||||
|
||||
# if we got here, the document was successfully split
|
||||
# and can safely be deleted
|
||||
if converted_tiff:
|
||||
if mime_type == "image/tiff":
|
||||
# Remove the TIFF converted to PDF file
|
||||
logger.debug(f"Deleting file {file_to_process}")
|
||||
os.unlink(file_to_process)
|
||||
# Remove the original file (new file is saved above)
|
||||
logger.debug(f"Deleting file {path}")
|
||||
os.unlink(path)
|
||||
|
||||
# notify the sender, otherwise the progress bar
|
||||
# in the UI stays stuck
|
||||
payload = {
|
||||
|
@@ -1432,17 +1432,25 @@ class TestDocumentApiV2(DirectoriesMixin, APITestCase):
|
||||
"#000000",
|
||||
)
|
||||
|
||||
def test_ui_settings(self):
|
||||
test_user = User.objects.create_superuser(username="test")
|
||||
self.client.force_authenticate(user=test_user)
|
||||
|
||||
response = self.client.get("/api/ui_settings/", format="json")
|
||||
class TestApiUiSettings(DirectoriesMixin, APITestCase):
|
||||
|
||||
ENDPOINT = "/api/ui_settings/"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.test_user = User.objects.create_superuser(username="test")
|
||||
self.client.force_authenticate(user=self.test_user)
|
||||
|
||||
def test_api_get_ui_settings(self):
|
||||
response = self.client.get(self.ENDPOINT, format="json")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertDictEqual(
|
||||
response.data["settings"],
|
||||
{},
|
||||
)
|
||||
|
||||
def test_api_set_ui_settings(self):
|
||||
settings = {
|
||||
"settings": {
|
||||
"dark_mode": {
|
||||
@@ -1452,18 +1460,16 @@ class TestDocumentApiV2(DirectoriesMixin, APITestCase):
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
"/api/ui_settings/",
|
||||
self.ENDPOINT,
|
||||
json.dumps(settings),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.get("/api/ui_settings/", format="json")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
ui_settings = self.test_user.ui_settings
|
||||
self.assertDictEqual(
|
||||
response.data["settings"],
|
||||
ui_settings.settings,
|
||||
settings["settings"],
|
||||
)
|
||||
|
||||
@@ -1786,6 +1792,34 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(kwargs["add_tags"], [self.t1.id])
|
||||
self.assertEqual(kwargs["remove_tags"], [self.t2.id])
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.modify_tags")
|
||||
def test_api_modify_tags_not_provided(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to modify tags is missing modify_tags field
|
||||
WHEN:
|
||||
- API to edit tags is called
|
||||
THEN:
|
||||
- API returns HTTP 400
|
||||
- modify_tags is not called
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id, self.doc3.id],
|
||||
"method": "modify_tags",
|
||||
"parameters": {
|
||||
"add_tags": [self.t1.id],
|
||||
},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
m.assert_not_called()
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.delete")
|
||||
def test_api_delete(self, m):
|
||||
m.return_value = "OK"
|
||||
@@ -1802,6 +1836,118 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(args[0], [self.doc1.id])
|
||||
self.assertEqual(len(kwargs), 0)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
|
||||
def test_api_set_storage_path(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to set the storage path of a document
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and storage_path ID
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": self.sp1.id},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
|
||||
self.assertListEqual(args[0], [self.doc1.id])
|
||||
self.assertEqual(kwargs["storage_path"], self.sp1.id)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
|
||||
def test_api_unset_storage_path(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to clear/unset the storage path of a document
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and None storage_path
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": None},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
|
||||
self.assertListEqual(args[0], [self.doc1.id])
|
||||
self.assertEqual(kwargs["storage_path"], None)
|
||||
|
||||
def test_api_invalid_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to set the storage path of a document
|
||||
- Given storage_path ID isn't valid
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and storage_path ID
|
||||
"""
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": self.sp1.id + 10},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.async_task.assert_not_called()
|
||||
|
||||
def test_api_set_storage_path_not_provided(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to set the storage path of a document
|
||||
- API data is missing storage path ID
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and storage_path ID
|
||||
"""
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.async_task.assert_not_called()
|
||||
|
||||
def test_api_invalid_doc(self):
|
||||
self.assertEqual(Document.objects.count(), 5)
|
||||
response = self.client.post(
|
||||
@@ -2203,7 +2349,7 @@ class TestBulkDownload(DirectoriesMixin, APITestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestApiAuth(APITestCase):
|
||||
class TestApiAuth(DirectoriesMixin, APITestCase):
|
||||
def test_auth_required(self):
|
||||
|
||||
d = Document.objects.create(title="Test")
|
||||
@@ -2256,7 +2402,7 @@ class TestApiAuth(APITestCase):
|
||||
self.assertIn("X-Version", response)
|
||||
|
||||
|
||||
class TestRemoteVersion(APITestCase):
|
||||
class TestApiRemoteVersion(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/remote_version/"
|
||||
|
||||
def setUp(self):
|
||||
@@ -2421,3 +2567,81 @@ class TestRemoteVersion(APITestCase):
|
||||
"feature_is_set": True,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class TestApiStoragePaths(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/storage_paths/"
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
||||
user = User.objects.create(username="temp_admin")
|
||||
self.client.force_authenticate(user=user)
|
||||
|
||||
self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}")
|
||||
|
||||
def test_api_get_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to get all storage paths
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Existing storage paths are returned
|
||||
"""
|
||||
response = self.client.get(self.ENDPOINT, format="json")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.data["count"], 1)
|
||||
|
||||
resp_storage_path = response.data["results"][0]
|
||||
self.assertEqual(resp_storage_path["id"], self.sp1.id)
|
||||
self.assertEqual(resp_storage_path["path"], self.sp1.path)
|
||||
|
||||
def test_api_create_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a storage paths
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP response
|
||||
- New storage path is created
|
||||
"""
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "A storage path",
|
||||
"path": "Somewhere/{asn}",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertEqual(StoragePath.objects.count(), 2)
|
||||
|
||||
def test_api_create_invalid_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a storage paths
|
||||
- Storage path format is incorrect
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP 400 response
|
||||
- No storage path is created
|
||||
"""
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "Another storage path",
|
||||
"path": "Somewhere/{correspdent}",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(StoragePath.objects.count(), 1)
|
||||
|
456
src/documents/tests/test_barcodes.py
Normal file
456
src/documents/tests/test_barcodes.py
Normal file
@@ -0,0 +1,456 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
from documents import barcodes
|
||||
from documents import tasks
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class TestBarcode(DirectoriesMixin, TestCase):
|
||||
def test_barcode_reader(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pbm",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion2.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_unreadable(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-unreadable.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_qr(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"qr-code-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_128(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_no_barcode(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_custom_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_qr_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_128_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_get_mime_type(self):
|
||||
tiff_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
pdf_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
png_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
|
||||
pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
|
||||
shutil.copy(tiff_file, tiff_file_no_extension)
|
||||
shutil.copy(pdf_file, pdf_file_no_extension)
|
||||
|
||||
self.assertEqual(barcodes.get_file_mime_type(tiff_file), "image/tiff")
|
||||
self.assertEqual(barcodes.get_file_mime_type(pdf_file), "application/pdf")
|
||||
self.assertEqual(
|
||||
barcodes.get_file_mime_type(tiff_file_no_extension),
|
||||
"image/tiff",
|
||||
)
|
||||
self.assertEqual(
|
||||
barcodes.get_file_mime_type(pdf_file_no_extension),
|
||||
"application/pdf",
|
||||
)
|
||||
self.assertEqual(barcodes.get_file_mime_type(png_file), "image/png")
|
||||
|
||||
def test_convert_from_tiff_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
target_file = barcodes.convert_from_tiff_to_pdf(dst)
|
||||
file_extension = os.path.splitext(os.path.basename(target_file))[1]
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
self.assertEqual(file_extension, ".pdf")
|
||||
|
||||
def test_convert_error_from_pdf_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
self.assertIsNone(barcodes.convert_from_tiff_to_pdf(dst))
|
||||
|
||||
def test_scan_file_for_separating_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_barcodes2(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_scan_file_for_separating_barcodes3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_barcodes4(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"several-patcht-codes.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [2, 5])
|
||||
|
||||
def test_scan_file_for_separating_barcodes_upsidedown(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle_reverse.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-qr.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_128_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_wrong_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_separate_pages(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = barcodes.separate_pages(test_file, [1])
|
||||
self.assertEqual(len(pages), 2)
|
||||
|
||||
def test_separate_pages_no_list(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
with self.assertLogs("paperless.barcodes", level="WARNING") as cm:
|
||||
pages = barcodes.separate_pages(test_file, [])
|
||||
self.assertEqual(pages, [])
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.barcodes:No pages to split on!",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
barcodes.save_to_dir(test_file, target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_save_to_dir2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
nonexistingdir = "/nowhere"
|
||||
if os.path.isdir(nonexistingdir):
|
||||
self.fail("non-existing dir exists")
|
||||
else:
|
||||
with self.assertLogs("paperless.barcodes", level="WARNING") as cm:
|
||||
barcodes.save_to_dir(test_file, target_dir=nonexistingdir)
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.barcodes:{str(test_file)} or {str(nonexistingdir)} don't exist.",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
barcodes.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "newname.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_barcode_splitter(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
separators = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertTrue(separators)
|
||||
document_list = barcodes.separate_pages(test_file, separators)
|
||||
self.assertTrue(document_list)
|
||||
for document in document_list:
|
||||
barcodes.save_to_dir(document, target_dir=tempdir)
|
||||
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
||||
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file1))
|
||||
self.assertTrue(os.path.isfile(target_file2))
|
||||
|
||||
@override_settings(CONSUMER_ENABLE_BARCODES=True)
|
||||
def test_consume_barcode_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_tiff_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consume_barcode_unsupported_jpg_file(self, m):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads an unsupported image file (e.g. jpg)
|
||||
|
||||
The function shouldn't try to scan for separating barcodes
|
||||
and continue archiving the file as is.
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.jpg",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
|
||||
shutil.copy(test_file, dst)
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
self.assertIn("Success", tasks.consume_file(dst))
|
||||
self.assertListEqual(
|
||||
cm.output,
|
||||
[
|
||||
"WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
|
||||
],
|
||||
)
|
||||
m.assert_called_once()
|
||||
|
||||
args, kwargs = m.call_args
|
||||
self.assertIsNone(kwargs["override_filename"])
|
||||
self.assertIsNone(kwargs["override_title"])
|
||||
self.assertIsNone(kwargs["override_correspondent_id"])
|
||||
self.assertIsNone(kwargs["override_document_type_id"])
|
||||
self.assertIsNone(kwargs["override_tag_ids"])
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_supported_no_extension_file(self):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads a supported image file, but without extension
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
@@ -1,23 +1,64 @@
|
||||
import textwrap
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from django.core.checks import Error
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
from documents.checks import changed_password_check
|
||||
from documents.checks import parser_check
|
||||
from documents.models import Document
|
||||
|
||||
from ..checks import changed_password_check
|
||||
from ..checks import parser_check
|
||||
from ..models import Document
|
||||
from ..signals import document_consumer_declaration
|
||||
from .factories import DocumentFactory
|
||||
|
||||
|
||||
class ChecksTestCase(TestCase):
|
||||
class TestDocumentChecks(TestCase):
|
||||
def test_changed_password_check_empty_db(self):
|
||||
self.assertEqual(changed_password_check(None), [])
|
||||
self.assertListEqual(changed_password_check(None), [])
|
||||
|
||||
def test_changed_password_check_no_encryption(self):
|
||||
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_UNENCRYPTED)
|
||||
self.assertEqual(changed_password_check(None), [])
|
||||
self.assertListEqual(changed_password_check(None), [])
|
||||
|
||||
def test_encrypted_missing_passphrase(self):
|
||||
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG)
|
||||
msgs = changed_password_check(None)
|
||||
self.assertEqual(len(msgs), 1)
|
||||
msg_text = msgs[0].msg
|
||||
self.assertEqual(
|
||||
msg_text,
|
||||
"The database contains encrypted documents but no password is set.",
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
PASSPHRASE="test",
|
||||
)
|
||||
@mock.patch("paperless.db.GnuPG.decrypted")
|
||||
@mock.patch("documents.models.Document.source_file")
|
||||
def test_encrypted_decrypt_fails(self, mock_decrypted, mock_source_file):
|
||||
|
||||
mock_decrypted.return_value = None
|
||||
mock_source_file.return_value = b""
|
||||
|
||||
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG)
|
||||
|
||||
msgs = changed_password_check(None)
|
||||
|
||||
self.assertEqual(len(msgs), 1)
|
||||
msg_text = msgs[0].msg
|
||||
self.assertEqual(
|
||||
msg_text,
|
||||
textwrap.dedent(
|
||||
"""
|
||||
The current password doesn't match the password of the
|
||||
existing documents.
|
||||
|
||||
If you intend to change your password, you must first export
|
||||
all of the old documents, start fresh with the new password
|
||||
and then re-import them."
|
||||
""",
|
||||
),
|
||||
)
|
||||
|
||||
def test_parser_check(self):
|
||||
|
||||
|
@@ -1,10 +1,7 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
from django.utils import timezone
|
||||
from documents import tasks
|
||||
@@ -15,10 +12,9 @@ from documents.models import Tag
|
||||
from documents.sanity_checker import SanityCheckFailedException
|
||||
from documents.sanity_checker import SanityCheckMessages
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class TestTasks(DirectoriesMixin, TestCase):
|
||||
class TestIndexReindex(DirectoriesMixin, TestCase):
|
||||
def test_index_reindex(self):
|
||||
Document.objects.create(
|
||||
title="test",
|
||||
@@ -43,6 +39,8 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
|
||||
tasks.index_optimize()
|
||||
|
||||
|
||||
class TestClassifier(DirectoriesMixin, TestCase):
|
||||
@mock.patch("documents.tasks.load_classifier")
|
||||
def test_train_classifier_no_auto_matching(self, load_classifier):
|
||||
tasks.train_classifier()
|
||||
@@ -93,442 +91,8 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
mtime3 = os.stat(settings.MODEL_FILE).st_mtime
|
||||
self.assertNotEqual(mtime2, mtime3)
|
||||
|
||||
def test_barcode_reader(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pbm",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion2.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_unreadable(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-unreadable.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_qr(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"qr-code-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_128(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_no_barcode(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_custom_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_qr_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_128_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_get_mime_type(self):
|
||||
tiff_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
pdf_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
png_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
|
||||
pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
|
||||
shutil.copy(tiff_file, tiff_file_no_extension)
|
||||
shutil.copy(pdf_file, pdf_file_no_extension)
|
||||
|
||||
self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff")
|
||||
self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf")
|
||||
self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff")
|
||||
self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf")
|
||||
self.assertEqual(tasks.get_file_type(png_file), "image/png")
|
||||
|
||||
def test_convert_from_tiff_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
target_file = tasks.convert_from_tiff_to_pdf(dst)
|
||||
file_extension = os.path.splitext(os.path.basename(target_file))[1]
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
self.assertEqual(file_extension, ".pdf")
|
||||
|
||||
def test_convert_error_from_pdf_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
self.assertIsNone(tasks.convert_from_tiff_to_pdf(dst))
|
||||
|
||||
def test_scan_file_for_separating_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_barcodes2(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_scan_file_for_separating_barcodes3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_barcodes4(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"several-patcht-codes.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [2, 5])
|
||||
|
||||
def test_scan_file_for_separating_barcodes_upsidedown(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle_reverse.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-qr.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_128_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_wrong_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_separate_pages(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = tasks.separate_pages(test_file, [1])
|
||||
self.assertEqual(len(pages), 2)
|
||||
|
||||
def test_separate_pages_no_list(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
pages = tasks.separate_pages(test_file, [])
|
||||
self.assertEqual(pages, [])
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.tasks:No pages to split on!",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
tasks.save_to_dir(test_file, target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_save_to_dir2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
nonexistingdir = "/nowhere"
|
||||
if os.path.isdir(nonexistingdir):
|
||||
self.fail("non-existing dir exists")
|
||||
else:
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
tasks.save_to_dir(test_file, target_dir=nonexistingdir)
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist.",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
tasks.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "newname.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_barcode_splitter(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
separators = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertTrue(separators)
|
||||
document_list = tasks.separate_pages(test_file, separators)
|
||||
self.assertTrue(document_list)
|
||||
for document in document_list:
|
||||
tasks.save_to_dir(document, target_dir=tempdir)
|
||||
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
||||
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file1))
|
||||
self.assertTrue(os.path.isfile(target_file2))
|
||||
|
||||
@override_settings(CONSUMER_ENABLE_BARCODES=True)
|
||||
def test_consume_barcode_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_tiff_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consume_barcode_unsupported_jpg_file(self, m):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads an unsupported image file (e.g. jpg)
|
||||
|
||||
The function shouldn't try to scan for separating barcodes
|
||||
and continue archiving the file as is.
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.jpg",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
|
||||
shutil.copy(test_file, dst)
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
self.assertIn("Success", tasks.consume_file(dst))
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
"WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
|
||||
],
|
||||
)
|
||||
m.assert_called_once()
|
||||
|
||||
args, kwargs = m.call_args
|
||||
self.assertIsNone(kwargs["override_filename"])
|
||||
self.assertIsNone(kwargs["override_title"])
|
||||
self.assertIsNone(kwargs["override_correspondent_id"])
|
||||
self.assertIsNone(kwargs["override_document_type_id"])
|
||||
self.assertIsNone(kwargs["override_tag_ids"])
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_supported_no_extension_file(self):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads a supported image file, but without extension
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
||||
def test_sanity_check_success(self, m):
|
||||
m.return_value = SanityCheckMessages()
|
||||
@@ -565,6 +129,8 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
)
|
||||
m.assert_called_once()
|
||||
|
||||
|
||||
class TestBulkUpdate(DirectoriesMixin, TestCase):
|
||||
def test_bulk_update_documents(self):
|
||||
doc1 = Document.objects.create(
|
||||
title="test",
|
||||
|
@@ -1,9 +1,28 @@
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
class TestViews(TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# Provide a dummy static dir to silence whitenoise warnings
|
||||
cls.static_dir = tempfile.mkdtemp()
|
||||
|
||||
cls.override = override_settings(
|
||||
STATIC_ROOT=cls.static_dir,
|
||||
)
|
||||
cls.override.enable()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
shutil.rmtree(cls.static_dir, ignore_errors=True)
|
||||
cls.override.disable()
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.user = User.objects.create_user("testuser")
|
||||
|
||||
|
@@ -19,6 +19,7 @@ def setup_directories():
|
||||
dirs.scratch_dir = tempfile.mkdtemp()
|
||||
dirs.media_dir = tempfile.mkdtemp()
|
||||
dirs.consumption_dir = tempfile.mkdtemp()
|
||||
dirs.static_dir = tempfile.mkdtemp()
|
||||
dirs.index_dir = os.path.join(dirs.data_dir, "index")
|
||||
dirs.originals_dir = os.path.join(dirs.media_dir, "documents", "originals")
|
||||
dirs.thumbnail_dir = os.path.join(dirs.media_dir, "documents", "thumbnails")
|
||||
@@ -42,6 +43,7 @@ def setup_directories():
|
||||
CONSUMPTION_DIR=dirs.consumption_dir,
|
||||
LOGGING_DIR=dirs.logging_dir,
|
||||
INDEX_DIR=dirs.index_dir,
|
||||
STATIC_ROOT=dirs.static_dir,
|
||||
MODEL_FILE=os.path.join(dirs.data_dir, "classification_model.pickle"),
|
||||
MEDIA_LOCK=os.path.join(dirs.media_dir, "media.lock"),
|
||||
)
|
||||
@@ -55,6 +57,7 @@ def remove_dirs(dirs):
|
||||
shutil.rmtree(dirs.data_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.scratch_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.consumption_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.static_dir, ignore_errors=True)
|
||||
dirs.settings_override.disable()
|
||||
|
||||
|
||||
|
@@ -746,7 +746,7 @@ class RemoteVersionView(GenericAPIView):
|
||||
|
||||
|
||||
class StoragePathViewSet(ModelViewSet):
|
||||
model = DocumentType
|
||||
model = StoragePath
|
||||
|
||||
queryset = StoragePath.objects.annotate(document_count=Count("documents")).order_by(
|
||||
Lower("name"),
|
||||
|
Reference in New Issue
Block a user