mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Merge branch 'dev' into feature-frontend-task-queue
This commit is contained in:
186
src/documents/barcodes.py
Normal file
186
src/documents/barcodes.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from functools import lru_cache
|
||||
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
|
||||
|
||||
import magic
|
||||
from django.conf import settings
|
||||
from pdf2image import convert_from_path
|
||||
from pikepdf import Pdf
|
||||
from PIL import Image
|
||||
from PIL import ImageSequence
|
||||
from pyzbar import pyzbar
|
||||
|
||||
logger = logging.getLogger("paperless.barcodes")
|
||||
|
||||
|
||||
@lru_cache(maxsize=8)
|
||||
def supported_file_type(mime_type) -> bool:
|
||||
"""
|
||||
Determines if the file is valid for barcode
|
||||
processing, based on MIME type and settings
|
||||
|
||||
:return: True if the file is supported, False otherwise
|
||||
"""
|
||||
supported_mime = ["application/pdf"]
|
||||
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
|
||||
supported_mime += ["image/tiff"]
|
||||
|
||||
return mime_type in supported_mime
|
||||
|
||||
|
||||
def barcode_reader(image) -> List[str]:
|
||||
"""
|
||||
Read any barcodes contained in image
|
||||
Returns a list containing all found barcodes
|
||||
"""
|
||||
barcodes = []
|
||||
# Decode the barcode image
|
||||
detected_barcodes = pyzbar.decode(image)
|
||||
|
||||
if detected_barcodes:
|
||||
# Traverse through all the detected barcodes in image
|
||||
for barcode in detected_barcodes:
|
||||
if barcode.data:
|
||||
decoded_barcode = barcode.data.decode("utf-8")
|
||||
barcodes.append(decoded_barcode)
|
||||
logger.debug(
|
||||
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
|
||||
)
|
||||
return barcodes
|
||||
|
||||
|
||||
def get_file_mime_type(path: str) -> str:
|
||||
"""
|
||||
Determines the file type, based on MIME type.
|
||||
|
||||
Returns the MIME type.
|
||||
"""
|
||||
mime_type = magic.from_file(path, mime=True)
|
||||
logger.debug(f"Detected mime type: {mime_type}")
|
||||
return mime_type
|
||||
|
||||
|
||||
def convert_from_tiff_to_pdf(filepath: str) -> str:
|
||||
"""
|
||||
converts a given TIFF image file to pdf into a temporary directory.
|
||||
|
||||
Returns the new pdf file.
|
||||
"""
|
||||
file_name = os.path.splitext(os.path.basename(filepath))[0]
|
||||
mime_type = get_file_mime_type(filepath)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
# use old file name with pdf extension
|
||||
if mime_type == "image/tiff":
|
||||
newpath = os.path.join(tempdir, file_name + ".pdf")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
|
||||
)
|
||||
return None
|
||||
with Image.open(filepath) as image:
|
||||
images = []
|
||||
for i, page in enumerate(ImageSequence.Iterator(image)):
|
||||
page = page.convert("RGB")
|
||||
images.append(page)
|
||||
try:
|
||||
if len(images) == 1:
|
||||
images[0].save(newpath)
|
||||
else:
|
||||
images[0].save(newpath, save_all=True, append_images=images[1:])
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
f"Could not save the file as pdf. Error: {str(e)}",
|
||||
)
|
||||
return None
|
||||
return newpath
|
||||
|
||||
|
||||
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
|
||||
"""
|
||||
Scan the provided pdf file for page separating barcodes
|
||||
Returns a list of pagenumbers, which separate the file
|
||||
"""
|
||||
separator_page_numbers = []
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
# use a temporary directory in case the file os too big to handle in memory
|
||||
with tempfile.TemporaryDirectory() as path:
|
||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||
for current_page_number, page in enumerate(pages_from_path):
|
||||
current_barcodes = barcode_reader(page)
|
||||
if separator_barcode in current_barcodes:
|
||||
separator_page_numbers.append(current_page_number)
|
||||
return separator_page_numbers
|
||||
|
||||
|
||||
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
|
||||
"""
|
||||
Separate the provided pdf file on the pages_to_split_on.
|
||||
The pages which are defined by page_numbers will be removed.
|
||||
Returns a list of (temporary) filepaths to consume.
|
||||
These will need to be deleted later.
|
||||
"""
|
||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
fname = os.path.splitext(os.path.basename(filepath))[0]
|
||||
pdf = Pdf.open(filepath)
|
||||
document_paths = []
|
||||
logger.debug(f"Temp dir is {str(tempdir)}")
|
||||
if not pages_to_split_on:
|
||||
logger.warning("No pages to split on!")
|
||||
else:
|
||||
# go from the first page to the first separator page
|
||||
dst = Pdf.new()
|
||||
for n, page in enumerate(pdf.pages):
|
||||
if n < pages_to_split_on[0]:
|
||||
dst.pages.append(page)
|
||||
output_filename = f"{fname}_document_0.pdf"
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths = [savepath]
|
||||
|
||||
# iterate through the rest of the document
|
||||
for count, page_number in enumerate(pages_to_split_on):
|
||||
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
||||
dst = Pdf.new()
|
||||
try:
|
||||
next_page = pages_to_split_on[count + 1]
|
||||
except IndexError:
|
||||
next_page = len(pdf.pages)
|
||||
# skip the first page_number. This contains the barcode page
|
||||
for page in range(page_number + 1, next_page):
|
||||
logger.debug(
|
||||
f"page_number: {str(page_number)} next_page: {str(next_page)}",
|
||||
)
|
||||
dst.pages.append(pdf.pages[page])
|
||||
output_filename = f"{fname}_document_{str(count + 1)}.pdf"
|
||||
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths.append(savepath)
|
||||
logger.debug(f"Temp files are {str(document_paths)}")
|
||||
return document_paths
|
||||
|
||||
|
||||
def save_to_dir(
|
||||
filepath: str,
|
||||
newname: str = None,
|
||||
target_dir: str = settings.CONSUMPTION_DIR,
|
||||
):
|
||||
"""
|
||||
Copies filepath to target_dir.
|
||||
Optionally rename the file.
|
||||
"""
|
||||
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
||||
dst = shutil.copy(filepath, target_dir)
|
||||
logging.debug(f"saved {str(filepath)} to {str(dst)}")
|
||||
if newname:
|
||||
dst_new = os.path.join(target_dir, newname)
|
||||
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
|
||||
os.rename(dst, dst_new)
|
||||
else:
|
||||
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
@@ -118,3 +118,10 @@ def delete(doc_ids):
|
||||
index.remove_document_by_id(writer, id)
|
||||
|
||||
return "OK"
|
||||
|
||||
|
||||
def redo_ocr(doc_ids):
|
||||
|
||||
async_task("documents.tasks.redo_ocr", document_ids=doc_ids)
|
||||
|
||||
return "OK"
|
||||
|
@@ -11,7 +11,6 @@ from documents.signals import document_consumer_declaration
|
||||
|
||||
@register()
|
||||
def changed_password_check(app_configs, **kwargs):
|
||||
|
||||
from documents.models import Document
|
||||
from paperless.db import GnuPG
|
||||
|
||||
|
@@ -273,7 +273,7 @@ class Consumer(LoggingMixin):
|
||||
|
||||
self.log("debug", f"Generating thumbnail for {self.filename}...")
|
||||
self._send_progress(70, 100, "WORKING", MESSAGE_GENERATING_THUMBNAIL)
|
||||
thumbnail = document_parser.get_optimised_thumbnail(
|
||||
thumbnail = document_parser.get_thumbnail(
|
||||
self.path,
|
||||
mime_type,
|
||||
self.filename,
|
||||
|
@@ -41,7 +41,7 @@ def handle_document(document_id):
|
||||
try:
|
||||
parser.parse(document.source_path, mime_type, document.get_public_filename())
|
||||
|
||||
thumbnail = parser.get_optimised_thumbnail(
|
||||
thumbnail = parser.get_thumbnail(
|
||||
document.source_path,
|
||||
mime_type,
|
||||
document.get_public_filename(),
|
||||
|
@@ -189,7 +189,7 @@ class Command(BaseCommand):
|
||||
original_target = os.path.join(self.target, original_name)
|
||||
document_dict[EXPORTER_FILE_NAME] = original_name
|
||||
|
||||
thumbnail_name = base_name + "-thumbnail.png"
|
||||
thumbnail_name = base_name + "-thumbnail.webp"
|
||||
thumbnail_target = os.path.join(self.target, thumbnail_name)
|
||||
document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
|
||||
|
||||
|
35
src/documents/management/commands/document_redo_ocr.py
Normal file
35
src/documents/management/commands/document_redo_ocr.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import tqdm
|
||||
from django.core.management.base import BaseCommand
|
||||
from documents.tasks import redo_ocr
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
||||
help = """
|
||||
This will rename all documents to match the latest filename format.
|
||||
""".replace(
|
||||
" ",
|
||||
"",
|
||||
)
|
||||
|
||||
def add_arguments(self, parser):
|
||||
|
||||
parser.add_argument(
|
||||
"--no-progress-bar",
|
||||
default=False,
|
||||
action="store_true",
|
||||
help="If set, the progress bar will not be shown",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"documents",
|
||||
nargs="+",
|
||||
help="Document primary keys for re-processing OCR on",
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
doc_pks = tqdm.tqdm(
|
||||
options["documents"],
|
||||
disable=options["no_progress_bar"],
|
||||
)
|
||||
redo_ocr(doc_pks)
|
@@ -11,7 +11,7 @@ from ...parsers import get_parser_class_for_mime_type
|
||||
|
||||
|
||||
def _process_document(doc_in):
|
||||
document = Document.objects.get(id=doc_in)
|
||||
document: Document = Document.objects.get(id=doc_in)
|
||||
parser_class = get_parser_class_for_mime_type(document.mime_type)
|
||||
|
||||
if parser_class:
|
||||
@@ -21,7 +21,8 @@ def _process_document(doc_in):
|
||||
return
|
||||
|
||||
try:
|
||||
thumb = parser.get_optimised_thumbnail(
|
||||
|
||||
thumb = parser.get_thumbnail(
|
||||
document.source_path,
|
||||
document.mime_type,
|
||||
document.get_public_filename(),
|
||||
@@ -69,7 +70,7 @@ class Command(BaseCommand):
|
||||
ids = [doc.id for doc in documents]
|
||||
|
||||
# Note to future self: this prevents django from reusing database
|
||||
# conncetions between processes, which is bad and does not work
|
||||
# connections between processes, which is bad and does not work
|
||||
# with postgres.
|
||||
db.connections.close_all()
|
||||
|
||||
|
@@ -3,7 +3,9 @@ import sys
|
||||
from django.core.management.commands.loaddata import Command as LoadDataCommand
|
||||
|
||||
|
||||
class Command(LoadDataCommand):
|
||||
# This class is used to migrate data between databases
|
||||
# That's difficult to test
|
||||
class Command(LoadDataCommand): # pragma: nocover
|
||||
"""
|
||||
Allow the loading of data from standard in. Sourced originally from:
|
||||
https://gist.github.com/bmispelon/ad5a2c333443b3a1d051 (MIT licensed)
|
||||
|
107
src/documents/migrations/1021_webp_thumbnail_conversion.py
Normal file
107
src/documents/migrations/1021_webp_thumbnail_conversion.py
Normal file
@@ -0,0 +1,107 @@
|
||||
# Generated by Django 4.0.5 on 2022-06-11 15:40
|
||||
import logging
|
||||
import multiprocessing.pool
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import migrations
|
||||
from documents.parsers import run_convert
|
||||
|
||||
logger = logging.getLogger("paperless.migrations")
|
||||
|
||||
|
||||
def _do_convert(work_package):
|
||||
existing_thumbnail, converted_thumbnail = work_package
|
||||
try:
|
||||
|
||||
logger.info(f"Converting thumbnail: {existing_thumbnail}")
|
||||
|
||||
# Run actual conversion
|
||||
run_convert(
|
||||
density=300,
|
||||
scale="500x5000>",
|
||||
alpha="remove",
|
||||
strip=True,
|
||||
trim=False,
|
||||
auto_orient=True,
|
||||
input_file=f"{existing_thumbnail}[0]",
|
||||
output_file=str(converted_thumbnail),
|
||||
)
|
||||
|
||||
# Copy newly created thumbnail to thumbnail directory
|
||||
shutil.copy(converted_thumbnail, existing_thumbnail.parent)
|
||||
|
||||
# Remove the PNG version
|
||||
existing_thumbnail.unlink()
|
||||
|
||||
logger.info(
|
||||
"Conversion to WebP completed, "
|
||||
f"replaced {existing_thumbnail.name} with {converted_thumbnail.name}",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error converting thumbnail (existing file unchanged): {e}")
|
||||
|
||||
|
||||
def _convert_thumbnails_to_webp(apps, schema_editor):
|
||||
start = time.time()
|
||||
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
|
||||
work_packages = []
|
||||
|
||||
for file in Path(settings.THUMBNAIL_DIR).glob("*.png"):
|
||||
existing_thumbnail = file.resolve()
|
||||
|
||||
# Change the existing filename suffix from png to webp
|
||||
converted_thumbnail_name = existing_thumbnail.with_suffix(
|
||||
".webp",
|
||||
).name
|
||||
|
||||
# Create the expected output filename in the tempdir
|
||||
converted_thumbnail = (
|
||||
Path(tempdir) / Path(converted_thumbnail_name)
|
||||
).resolve()
|
||||
|
||||
# Package up the necessary info
|
||||
work_packages.append(
|
||||
(existing_thumbnail, converted_thumbnail),
|
||||
)
|
||||
|
||||
if len(work_packages):
|
||||
|
||||
logger.info(
|
||||
"\n\n"
|
||||
" This is a one-time only migration to convert thumbnails for all of your\n"
|
||||
" documents into WebP format. If you have a lot of documents though, \n"
|
||||
" this may take a while, so a coffee break may be in order."
|
||||
"\n",
|
||||
)
|
||||
|
||||
with multiprocessing.pool.Pool(
|
||||
processes=min(multiprocessing.cpu_count(), 4),
|
||||
maxtasksperchild=4,
|
||||
) as pool:
|
||||
pool.map(_do_convert, work_packages)
|
||||
|
||||
end = time.time()
|
||||
duration = end - start
|
||||
|
||||
logger.info(f"Conversion completed in {duration:.3f}s")
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("documents", "1020_merge_20220518_1839"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RunPython(
|
||||
code=_convert_thumbnails_to_webp,
|
||||
reverse_code=migrations.RunPython.noop,
|
||||
),
|
||||
]
|
@@ -3,6 +3,7 @@ import logging
|
||||
import os
|
||||
import re
|
||||
from collections import OrderedDict
|
||||
from typing import Optional
|
||||
|
||||
import dateutil.parser
|
||||
import pathvalidate
|
||||
@@ -229,7 +230,7 @@ class Document(models.Model):
|
||||
verbose_name = _("document")
|
||||
verbose_name_plural = _("documents")
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
|
||||
# Convert UTC database time to local time
|
||||
created = datetime.date.isoformat(timezone.localdate(self.created))
|
||||
@@ -243,7 +244,7 @@ class Document(models.Model):
|
||||
return res
|
||||
|
||||
@property
|
||||
def source_path(self):
|
||||
def source_path(self) -> str:
|
||||
if self.filename:
|
||||
fname = str(self.filename)
|
||||
else:
|
||||
@@ -258,11 +259,11 @@ class Document(models.Model):
|
||||
return open(self.source_path, "rb")
|
||||
|
||||
@property
|
||||
def has_archive_version(self):
|
||||
def has_archive_version(self) -> bool:
|
||||
return self.archive_filename is not None
|
||||
|
||||
@property
|
||||
def archive_path(self):
|
||||
def archive_path(self) -> Optional[str]:
|
||||
if self.has_archive_version:
|
||||
return os.path.join(settings.ARCHIVE_DIR, str(self.archive_filename))
|
||||
else:
|
||||
@@ -272,7 +273,7 @@ class Document(models.Model):
|
||||
def archive_file(self):
|
||||
return open(self.archive_path, "rb")
|
||||
|
||||
def get_public_filename(self, archive=False, counter=0, suffix=None):
|
||||
def get_public_filename(self, archive=False, counter=0, suffix=None) -> str:
|
||||
result = str(self)
|
||||
|
||||
if counter:
|
||||
@@ -293,12 +294,14 @@ class Document(models.Model):
|
||||
return get_default_file_extension(self.mime_type)
|
||||
|
||||
@property
|
||||
def thumbnail_path(self):
|
||||
file_name = f"{self.pk:07}.png"
|
||||
def thumbnail_path(self) -> str:
|
||||
webp_file_name = f"{self.pk:07}.webp"
|
||||
if self.storage_type == self.STORAGE_TYPE_GPG:
|
||||
file_name += ".gpg"
|
||||
webp_file_name += ".gpg"
|
||||
|
||||
return os.path.join(settings.THUMBNAIL_DIR, file_name)
|
||||
webp_file_path = os.path.join(settings.THUMBNAIL_DIR, webp_file_name)
|
||||
|
||||
return os.path.normpath(webp_file_path)
|
||||
|
||||
@property
|
||||
def thumbnail_file(self):
|
||||
|
@@ -150,11 +150,14 @@ def run_convert(
|
||||
|
||||
|
||||
def get_default_thumbnail() -> str:
|
||||
"""
|
||||
Returns the path to a generic thumbnail
|
||||
"""
|
||||
return os.path.join(os.path.dirname(__file__), "resources", "document.png")
|
||||
|
||||
|
||||
def make_thumbnail_from_pdf_gs_fallback(in_path, temp_dir, logging_group=None) -> str:
|
||||
out_path = os.path.join(temp_dir, "convert_gs.png")
|
||||
out_path = os.path.join(temp_dir, "convert_gs.webp")
|
||||
|
||||
# if convert fails, fall back to extracting
|
||||
# the first PDF page as a PNG using Ghostscript
|
||||
@@ -191,7 +194,7 @@ def make_thumbnail_from_pdf(in_path, temp_dir, logging_group=None) -> str:
|
||||
"""
|
||||
The thumbnail of a PDF is just a 500px wide image of the first page.
|
||||
"""
|
||||
out_path = os.path.join(temp_dir, "convert.png")
|
||||
out_path = os.path.join(temp_dir, "convert.webp")
|
||||
|
||||
# Run convert to get a decent thumbnail
|
||||
try:
|
||||
@@ -319,29 +322,6 @@ class DocumentParser(LoggingMixin):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_optimised_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
thumbnail = self.get_thumbnail(document_path, mime_type, file_name)
|
||||
if settings.OPTIMIZE_THUMBNAILS:
|
||||
out_path = os.path.join(self.tempdir, "thumb_optipng.png")
|
||||
|
||||
args = (
|
||||
settings.OPTIPNG_BINARY,
|
||||
"-silent",
|
||||
"-o5",
|
||||
thumbnail,
|
||||
"-out",
|
||||
out_path,
|
||||
)
|
||||
|
||||
self.log("debug", f"Execute: {' '.join(args)}")
|
||||
|
||||
if not subprocess.Popen(args).wait() == 0:
|
||||
raise ParseError(f"Optipng failed at {args}")
|
||||
|
||||
return out_path
|
||||
else:
|
||||
return thumbnail
|
||||
|
||||
def get_text(self):
|
||||
return self.text
|
||||
|
||||
|
@@ -324,6 +324,7 @@ class BulkEditSerializer(DocumentListSerializer):
|
||||
"remove_tag",
|
||||
"modify_tags",
|
||||
"delete",
|
||||
"redo_ocr",
|
||||
],
|
||||
label="Method",
|
||||
write_only=True,
|
||||
@@ -357,6 +358,8 @@ class BulkEditSerializer(DocumentListSerializer):
|
||||
return bulk_edit.modify_tags
|
||||
elif method == "delete":
|
||||
return bulk_edit.delete
|
||||
elif method == "redo_ocr":
|
||||
return bulk_edit.redo_ocr
|
||||
else:
|
||||
raise serializers.ValidationError("Unsupported method.")
|
||||
|
||||
@@ -537,8 +540,6 @@ class BulkDownloadSerializer(DocumentListSerializer):
|
||||
|
||||
|
||||
class StoragePathSerializer(MatchingModelSerializer):
|
||||
document_count = serializers.IntegerField(read_only=True)
|
||||
|
||||
class Meta:
|
||||
model = StoragePath
|
||||
fields = (
|
||||
@@ -586,10 +587,6 @@ class UiSettingsViewSerializer(serializers.ModelSerializer):
|
||||
"settings",
|
||||
]
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
super().update(instance, validated_data)
|
||||
return instance
|
||||
|
||||
def create(self, validated_data):
|
||||
ui_settings = UiSettings.objects.update_or_create(
|
||||
user=validated_data.get("user"),
|
||||
|
@@ -1,15 +1,16 @@
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
|
||||
from pathlib import Path
|
||||
from typing import Type
|
||||
|
||||
import magic
|
||||
import tqdm
|
||||
from asgiref.sync import async_to_sync
|
||||
from channels.layers import get_channel_layer
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.db.models.signals import post_save
|
||||
from documents import barcodes
|
||||
from documents import index
|
||||
from documents import sanity_checker
|
||||
from documents.classifier import DocumentClassifier
|
||||
@@ -21,12 +22,10 @@ from documents.models import Document
|
||||
from documents.models import DocumentType
|
||||
from documents.models import StoragePath
|
||||
from documents.models import Tag
|
||||
from documents.parsers import DocumentParser
|
||||
from documents.parsers import get_parser_class_for_mime_type
|
||||
from documents.parsers import ParseError
|
||||
from documents.sanity_checker import SanityCheckFailedException
|
||||
from pdf2image import convert_from_path
|
||||
from pikepdf import Pdf
|
||||
from PIL import Image
|
||||
from PIL import ImageSequence
|
||||
from pyzbar import pyzbar
|
||||
from whoosh.writing import AsyncWriter
|
||||
|
||||
|
||||
@@ -77,161 +76,6 @@ def train_classifier():
|
||||
logger.warning("Classifier error: " + str(e))
|
||||
|
||||
|
||||
def barcode_reader(image) -> List[str]:
|
||||
"""
|
||||
Read any barcodes contained in image
|
||||
Returns a list containing all found barcodes
|
||||
"""
|
||||
barcodes = []
|
||||
# Decode the barcode image
|
||||
detected_barcodes = pyzbar.decode(image)
|
||||
|
||||
if detected_barcodes:
|
||||
# Traverse through all the detected barcodes in image
|
||||
for barcode in detected_barcodes:
|
||||
if barcode.data:
|
||||
decoded_barcode = barcode.data.decode("utf-8")
|
||||
barcodes.append(decoded_barcode)
|
||||
logger.debug(
|
||||
f"Barcode of type {str(barcode.type)} found: {decoded_barcode}",
|
||||
)
|
||||
return barcodes
|
||||
|
||||
|
||||
def get_file_type(path: str) -> str:
|
||||
"""
|
||||
Determines the file type, based on MIME type.
|
||||
|
||||
Returns the MIME type.
|
||||
"""
|
||||
mime_type = magic.from_file(path, mime=True)
|
||||
logger.debug(f"Detected mime type: {mime_type}")
|
||||
return mime_type
|
||||
|
||||
|
||||
def convert_from_tiff_to_pdf(filepath: str) -> str:
|
||||
"""
|
||||
converts a given TIFF image file to pdf into a temporary directory.
|
||||
|
||||
Returns the new pdf file.
|
||||
"""
|
||||
file_name = os.path.splitext(os.path.basename(filepath))[0]
|
||||
mime_type = get_file_type(filepath)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
# use old file name with pdf extension
|
||||
if mime_type == "image/tiff":
|
||||
newpath = os.path.join(tempdir, file_name + ".pdf")
|
||||
else:
|
||||
logger.warning(
|
||||
f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
|
||||
)
|
||||
return None
|
||||
with Image.open(filepath) as image:
|
||||
images = []
|
||||
for i, page in enumerate(ImageSequence.Iterator(image)):
|
||||
page = page.convert("RGB")
|
||||
images.append(page)
|
||||
try:
|
||||
if len(images) == 1:
|
||||
images[0].save(newpath)
|
||||
else:
|
||||
images[0].save(newpath, save_all=True, append_images=images[1:])
|
||||
except OSError as e:
|
||||
logger.warning(
|
||||
f"Could not save the file as pdf. Error: {str(e)}",
|
||||
)
|
||||
return None
|
||||
return newpath
|
||||
|
||||
|
||||
def scan_file_for_separating_barcodes(filepath: str) -> List[int]:
|
||||
"""
|
||||
Scan the provided pdf file for page separating barcodes
|
||||
Returns a list of pagenumbers, which separate the file
|
||||
"""
|
||||
separator_page_numbers = []
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
# use a temporary directory in case the file os too big to handle in memory
|
||||
with tempfile.TemporaryDirectory() as path:
|
||||
pages_from_path = convert_from_path(filepath, output_folder=path)
|
||||
for current_page_number, page in enumerate(pages_from_path):
|
||||
current_barcodes = barcode_reader(page)
|
||||
if separator_barcode in current_barcodes:
|
||||
separator_page_numbers.append(current_page_number)
|
||||
return separator_page_numbers
|
||||
|
||||
|
||||
def separate_pages(filepath: str, pages_to_split_on: List[int]) -> List[str]:
|
||||
"""
|
||||
Separate the provided pdf file on the pages_to_split_on.
|
||||
The pages which are defined by page_numbers will be removed.
|
||||
Returns a list of (temporary) filepaths to consume.
|
||||
These will need to be deleted later.
|
||||
"""
|
||||
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
fname = os.path.splitext(os.path.basename(filepath))[0]
|
||||
pdf = Pdf.open(filepath)
|
||||
document_paths = []
|
||||
logger.debug(f"Temp dir is {str(tempdir)}")
|
||||
if not pages_to_split_on:
|
||||
logger.warning("No pages to split on!")
|
||||
else:
|
||||
# go from the first page to the first separator page
|
||||
dst = Pdf.new()
|
||||
for n, page in enumerate(pdf.pages):
|
||||
if n < pages_to_split_on[0]:
|
||||
dst.pages.append(page)
|
||||
output_filename = f"{fname}_document_0.pdf"
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths = [savepath]
|
||||
|
||||
# iterate through the rest of the document
|
||||
for count, page_number in enumerate(pages_to_split_on):
|
||||
logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
|
||||
dst = Pdf.new()
|
||||
try:
|
||||
next_page = pages_to_split_on[count + 1]
|
||||
except IndexError:
|
||||
next_page = len(pdf.pages)
|
||||
# skip the first page_number. This contains the barcode page
|
||||
for page in range(page_number + 1, next_page):
|
||||
logger.debug(
|
||||
f"page_number: {str(page_number)} next_page: {str(next_page)}",
|
||||
)
|
||||
dst.pages.append(pdf.pages[page])
|
||||
output_filename = f"{fname}_document_{str(count + 1)}.pdf"
|
||||
logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
|
||||
savepath = os.path.join(tempdir, output_filename)
|
||||
with open(savepath, "wb") as out:
|
||||
dst.save(out)
|
||||
document_paths.append(savepath)
|
||||
logger.debug(f"Temp files are {str(document_paths)}")
|
||||
return document_paths
|
||||
|
||||
|
||||
def save_to_dir(
|
||||
filepath: str,
|
||||
newname: str = None,
|
||||
target_dir: str = settings.CONSUMPTION_DIR,
|
||||
):
|
||||
"""
|
||||
Copies filepath to target_dir.
|
||||
Optionally rename the file.
|
||||
"""
|
||||
if os.path.isfile(filepath) and os.path.isdir(target_dir):
|
||||
dst = shutil.copy(filepath, target_dir)
|
||||
logging.debug(f"saved {str(filepath)} to {str(dst)}")
|
||||
if newname:
|
||||
dst_new = os.path.join(target_dir, newname)
|
||||
logger.debug(f"moving {str(dst)} to {str(dst_new)}")
|
||||
os.rename(dst, dst_new)
|
||||
else:
|
||||
logger.warning(f"{str(filepath)} or {str(target_dir)} don't exist.")
|
||||
|
||||
|
||||
def consume_file(
|
||||
path,
|
||||
override_filename=None,
|
||||
@@ -245,32 +89,30 @@ def consume_file(
|
||||
|
||||
# check for separators in current document
|
||||
if settings.CONSUMER_ENABLE_BARCODES:
|
||||
separators = []
|
||||
document_list = []
|
||||
converted_tiff = None
|
||||
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
|
||||
supported_mime = ["image/tiff", "application/pdf"]
|
||||
else:
|
||||
supported_mime = ["application/pdf"]
|
||||
mime_type = get_file_type(path)
|
||||
if mime_type not in supported_mime:
|
||||
|
||||
mime_type = barcodes.get_file_mime_type(path)
|
||||
|
||||
if not barcodes.supported_file_type(mime_type):
|
||||
# if not supported, skip this routine
|
||||
logger.warning(
|
||||
f"Unsupported file format for barcode reader: {str(mime_type)}",
|
||||
)
|
||||
else:
|
||||
separators = []
|
||||
document_list = []
|
||||
|
||||
if mime_type == "image/tiff":
|
||||
file_to_process = convert_from_tiff_to_pdf(path)
|
||||
file_to_process = barcodes.convert_from_tiff_to_pdf(path)
|
||||
else:
|
||||
file_to_process = path
|
||||
|
||||
separators = scan_file_for_separating_barcodes(file_to_process)
|
||||
separators = barcodes.scan_file_for_separating_barcodes(file_to_process)
|
||||
|
||||
if separators:
|
||||
logger.debug(
|
||||
f"Pages with separators found in: {str(path)}",
|
||||
)
|
||||
document_list = separate_pages(file_to_process, separators)
|
||||
document_list = barcodes.separate_pages(file_to_process, separators)
|
||||
|
||||
if document_list:
|
||||
for n, document in enumerate(document_list):
|
||||
@@ -280,14 +122,18 @@ def consume_file(
|
||||
newname = f"{str(n)}_" + override_filename
|
||||
else:
|
||||
newname = None
|
||||
save_to_dir(document, newname=newname)
|
||||
barcodes.save_to_dir(document, newname=newname)
|
||||
|
||||
# if we got here, the document was successfully split
|
||||
# and can safely be deleted
|
||||
if converted_tiff:
|
||||
if mime_type == "image/tiff":
|
||||
# Remove the TIFF converted to PDF file
|
||||
logger.debug(f"Deleting file {file_to_process}")
|
||||
os.unlink(file_to_process)
|
||||
# Remove the original file (new file is saved above)
|
||||
logger.debug(f"Deleting file {path}")
|
||||
os.unlink(path)
|
||||
|
||||
# notify the sender, otherwise the progress bar
|
||||
# in the UI stays stuck
|
||||
payload = {
|
||||
@@ -359,3 +205,46 @@ def bulk_update_documents(document_ids):
|
||||
with AsyncWriter(ix) as writer:
|
||||
for doc in documents:
|
||||
index.update_document(writer, doc)
|
||||
|
||||
|
||||
def redo_ocr(document_ids):
|
||||
all_docs = Document.objects.all()
|
||||
|
||||
for doc_pk in document_ids:
|
||||
try:
|
||||
logger.info(f"Parsing document {doc_pk}")
|
||||
doc: Document = all_docs.get(pk=doc_pk)
|
||||
except ObjectDoesNotExist:
|
||||
logger.error(f"Document {doc_pk} does not exist")
|
||||
continue
|
||||
|
||||
# Get the correct parser for this mime type
|
||||
parser_class: Type[DocumentParser] = get_parser_class_for_mime_type(
|
||||
doc.mime_type,
|
||||
)
|
||||
document_parser: DocumentParser = parser_class(
|
||||
"redo-ocr",
|
||||
)
|
||||
|
||||
# Create a file path to copy the original file to for working on
|
||||
temp_file = (Path(document_parser.tempdir) / Path("new-ocr-file")).resolve()
|
||||
|
||||
shutil.copy(doc.source_path, temp_file)
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
f"Using {type(document_parser).__name__} for document",
|
||||
)
|
||||
# Try to re-parse the document into text
|
||||
document_parser.parse(str(temp_file), doc.mime_type)
|
||||
|
||||
doc.content = document_parser.get_text()
|
||||
doc.save()
|
||||
logger.info("Document OCR updated")
|
||||
|
||||
except ParseError as e:
|
||||
logger.error(f"Error parsing document: {e}")
|
||||
finally:
|
||||
# Remove the file path if it was created
|
||||
if temp_file.exists() and temp_file.is_file():
|
||||
temp_file.unlink()
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 7.7 KiB |
BIN
src/documents/tests/samples/documents/thumbnails/0000001.webp
Normal file
BIN
src/documents/tests/samples/documents/thumbnails/0000001.webp
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.6 KiB |
Binary file not shown.
Before Width: | Height: | Size: 7.7 KiB |
BIN
src/documents/tests/samples/documents/thumbnails/0000002.webp
Normal file
BIN
src/documents/tests/samples/documents/thumbnails/0000002.webp
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.6 KiB |
Binary file not shown.
Before Width: | Height: | Size: 7.7 KiB |
BIN
src/documents/tests/samples/documents/thumbnails/0000003.webp
Normal file
BIN
src/documents/tests/samples/documents/thumbnails/0000003.webp
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.6 KiB |
Binary file not shown.
Binary file not shown.
@@ -179,7 +179,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
|
||||
)
|
||||
|
||||
with open(
|
||||
os.path.join(self.dirs.thumbnail_dir, f"{doc.pk:07d}.png"),
|
||||
os.path.join(self.dirs.thumbnail_dir, f"{doc.pk:07d}.webp"),
|
||||
"wb",
|
||||
) as f:
|
||||
f.write(content_thumbnail)
|
||||
@@ -1025,7 +1025,7 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
|
||||
"samples",
|
||||
"documents",
|
||||
"thumbnails",
|
||||
"0000001.png",
|
||||
"0000001.webp",
|
||||
)
|
||||
archive_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
|
||||
@@ -1435,17 +1435,25 @@ class TestDocumentApiV2(DirectoriesMixin, APITestCase):
|
||||
"#000000",
|
||||
)
|
||||
|
||||
def test_ui_settings(self):
|
||||
test_user = User.objects.create_superuser(username="test")
|
||||
self.client.force_authenticate(user=test_user)
|
||||
|
||||
response = self.client.get("/api/ui_settings/", format="json")
|
||||
class TestApiUiSettings(DirectoriesMixin, APITestCase):
|
||||
|
||||
ENDPOINT = "/api/ui_settings/"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.test_user = User.objects.create_superuser(username="test")
|
||||
self.client.force_authenticate(user=self.test_user)
|
||||
|
||||
def test_api_get_ui_settings(self):
|
||||
response = self.client.get(self.ENDPOINT, format="json")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertDictEqual(
|
||||
response.data["settings"],
|
||||
{},
|
||||
)
|
||||
|
||||
def test_api_set_ui_settings(self):
|
||||
settings = {
|
||||
"settings": {
|
||||
"dark_mode": {
|
||||
@@ -1455,18 +1463,16 @@ class TestDocumentApiV2(DirectoriesMixin, APITestCase):
|
||||
}
|
||||
|
||||
response = self.client.post(
|
||||
"/api/ui_settings/",
|
||||
self.ENDPOINT,
|
||||
json.dumps(settings),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.get("/api/ui_settings/", format="json")
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
ui_settings = self.test_user.ui_settings
|
||||
self.assertDictEqual(
|
||||
response.data["settings"],
|
||||
ui_settings.settings,
|
||||
settings["settings"],
|
||||
)
|
||||
|
||||
@@ -1789,6 +1795,34 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(kwargs["add_tags"], [self.t1.id])
|
||||
self.assertEqual(kwargs["remove_tags"], [self.t2.id])
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.modify_tags")
|
||||
def test_api_modify_tags_not_provided(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to modify tags is missing modify_tags field
|
||||
WHEN:
|
||||
- API to edit tags is called
|
||||
THEN:
|
||||
- API returns HTTP 400
|
||||
- modify_tags is not called
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id, self.doc3.id],
|
||||
"method": "modify_tags",
|
||||
"parameters": {
|
||||
"add_tags": [self.t1.id],
|
||||
},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
m.assert_not_called()
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.delete")
|
||||
def test_api_delete(self, m):
|
||||
m.return_value = "OK"
|
||||
@@ -1805,6 +1839,118 @@ class TestBulkEdit(DirectoriesMixin, APITestCase):
|
||||
self.assertEqual(args[0], [self.doc1.id])
|
||||
self.assertEqual(len(kwargs), 0)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
|
||||
def test_api_set_storage_path(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to set the storage path of a document
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and storage_path ID
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": self.sp1.id},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
|
||||
self.assertListEqual(args[0], [self.doc1.id])
|
||||
self.assertEqual(kwargs["storage_path"], self.sp1.id)
|
||||
|
||||
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
|
||||
def test_api_unset_storage_path(self, m):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to clear/unset the storage path of a document
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and None storage_path
|
||||
"""
|
||||
m.return_value = "OK"
|
||||
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": None},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
m.assert_called_once()
|
||||
args, kwargs = m.call_args
|
||||
|
||||
self.assertListEqual(args[0], [self.doc1.id])
|
||||
self.assertEqual(kwargs["storage_path"], None)
|
||||
|
||||
def test_api_invalid_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to set the storage path of a document
|
||||
- Given storage_path ID isn't valid
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and storage_path ID
|
||||
"""
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {"storage_path": self.sp1.id + 10},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.async_task.assert_not_called()
|
||||
|
||||
def test_api_set_storage_path_not_provided(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API data to set the storage path of a document
|
||||
- API data is missing storage path ID
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- set_storage_path is called with correct document IDs and storage_path ID
|
||||
"""
|
||||
response = self.client.post(
|
||||
"/api/documents/bulk_edit/",
|
||||
json.dumps(
|
||||
{
|
||||
"documents": [self.doc1.id],
|
||||
"method": "set_storage_path",
|
||||
"parameters": {},
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.async_task.assert_not_called()
|
||||
|
||||
def test_api_invalid_doc(self):
|
||||
self.assertEqual(Document.objects.count(), 5)
|
||||
response = self.client.post(
|
||||
@@ -2206,7 +2352,7 @@ class TestBulkDownload(DirectoriesMixin, APITestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestApiAuth(APITestCase):
|
||||
class TestApiAuth(DirectoriesMixin, APITestCase):
|
||||
def test_auth_required(self):
|
||||
|
||||
d = Document.objects.create(title="Test")
|
||||
@@ -2259,7 +2405,7 @@ class TestApiAuth(APITestCase):
|
||||
self.assertIn("X-Version", response)
|
||||
|
||||
|
||||
class TestRemoteVersion(APITestCase):
|
||||
class TestApiRemoteVersion(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/remote_version/"
|
||||
|
||||
def setUp(self):
|
||||
@@ -2426,6 +2572,84 @@ class TestRemoteVersion(APITestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestApiStoragePaths(DirectoriesMixin, APITestCase):
|
||||
ENDPOINT = "/api/storage_paths/"
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
|
||||
user = User.objects.create(username="temp_admin")
|
||||
self.client.force_authenticate(user=user)
|
||||
|
||||
self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}")
|
||||
|
||||
def test_api_get_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to get all storage paths
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Existing storage paths are returned
|
||||
"""
|
||||
response = self.client.get(self.ENDPOINT, format="json")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertEqual(response.data["count"], 1)
|
||||
|
||||
resp_storage_path = response.data["results"][0]
|
||||
self.assertEqual(resp_storage_path["id"], self.sp1.id)
|
||||
self.assertEqual(resp_storage_path["path"], self.sp1.path)
|
||||
|
||||
def test_api_create_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a storage paths
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP response
|
||||
- New storage path is created
|
||||
"""
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "A storage path",
|
||||
"path": "Somewhere/{asn}",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertEqual(StoragePath.objects.count(), 2)
|
||||
|
||||
def test_api_create_invalid_storage_path(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- API request to create a storage paths
|
||||
- Storage path format is incorrect
|
||||
WHEN:
|
||||
- API is called
|
||||
THEN:
|
||||
- Correct HTTP 400 response
|
||||
- No storage path is created
|
||||
"""
|
||||
response = self.client.post(
|
||||
self.ENDPOINT,
|
||||
json.dumps(
|
||||
{
|
||||
"name": "Another storage path",
|
||||
"path": "Somewhere/{correspdent}",
|
||||
},
|
||||
),
|
||||
content_type="application/json",
|
||||
)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertEqual(StoragePath.objects.count(), 1)
|
||||
|
||||
|
||||
class TestTasks(APITestCase):
|
||||
ENDPOINT = "/api/tasks/"
|
||||
ENDPOINT_ACKOWLEDGE = "/api/acknowledge_tasks/"
|
||||
@@ -2477,4 +2701,4 @@ class TestTasks(APITestCase):
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.get(self.ENDPOINT)
|
||||
self.assertEqual(len(response.data), 0)
|
||||
self.assertEqual(len(response.data), 0)
|
456
src/documents/tests/test_barcodes.py
Normal file
456
src/documents/tests/test_barcodes.py
Normal file
@@ -0,0 +1,456 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
from documents import barcodes
|
||||
from documents import tasks
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class TestBarcode(DirectoriesMixin, TestCase):
|
||||
def test_barcode_reader(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pbm",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion2.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_unreadable(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-unreadable.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_qr(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"qr-code-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_128(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_no_barcode(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_custom_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_qr_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_128_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(barcodes.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_get_mime_type(self):
|
||||
tiff_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
pdf_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
png_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
|
||||
pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
|
||||
shutil.copy(tiff_file, tiff_file_no_extension)
|
||||
shutil.copy(pdf_file, pdf_file_no_extension)
|
||||
|
||||
self.assertEqual(barcodes.get_file_mime_type(tiff_file), "image/tiff")
|
||||
self.assertEqual(barcodes.get_file_mime_type(pdf_file), "application/pdf")
|
||||
self.assertEqual(
|
||||
barcodes.get_file_mime_type(tiff_file_no_extension),
|
||||
"image/tiff",
|
||||
)
|
||||
self.assertEqual(
|
||||
barcodes.get_file_mime_type(pdf_file_no_extension),
|
||||
"application/pdf",
|
||||
)
|
||||
self.assertEqual(barcodes.get_file_mime_type(png_file), "image/png")
|
||||
|
||||
def test_convert_from_tiff_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
target_file = barcodes.convert_from_tiff_to_pdf(dst)
|
||||
file_extension = os.path.splitext(os.path.basename(target_file))[1]
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
self.assertEqual(file_extension, ".pdf")
|
||||
|
||||
def test_convert_error_from_pdf_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
self.assertIsNone(barcodes.convert_from_tiff_to_pdf(dst))
|
||||
|
||||
def test_scan_file_for_separating_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_barcodes2(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_scan_file_for_separating_barcodes3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_barcodes4(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"several-patcht-codes.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [2, 5])
|
||||
|
||||
def test_scan_file_for_separating_barcodes_upsidedown(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle_reverse.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-qr.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_128_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_wrong_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_separate_pages(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = barcodes.separate_pages(test_file, [1])
|
||||
self.assertEqual(len(pages), 2)
|
||||
|
||||
def test_separate_pages_no_list(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
with self.assertLogs("paperless.barcodes", level="WARNING") as cm:
|
||||
pages = barcodes.separate_pages(test_file, [])
|
||||
self.assertEqual(pages, [])
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.barcodes:No pages to split on!",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
barcodes.save_to_dir(test_file, target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_save_to_dir2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
nonexistingdir = "/nowhere"
|
||||
if os.path.isdir(nonexistingdir):
|
||||
self.fail("non-existing dir exists")
|
||||
else:
|
||||
with self.assertLogs("paperless.barcodes", level="WARNING") as cm:
|
||||
barcodes.save_to_dir(test_file, target_dir=nonexistingdir)
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.barcodes:{str(test_file)} or {str(nonexistingdir)} don't exist.",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
barcodes.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "newname.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_barcode_splitter(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
separators = barcodes.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertTrue(separators)
|
||||
document_list = barcodes.separate_pages(test_file, separators)
|
||||
self.assertTrue(document_list)
|
||||
for document in document_list:
|
||||
barcodes.save_to_dir(document, target_dir=tempdir)
|
||||
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
||||
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file1))
|
||||
self.assertTrue(os.path.isfile(target_file2))
|
||||
|
||||
@override_settings(CONSUMER_ENABLE_BARCODES=True)
|
||||
def test_consume_barcode_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_tiff_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consume_barcode_unsupported_jpg_file(self, m):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads an unsupported image file (e.g. jpg)
|
||||
|
||||
The function shouldn't try to scan for separating barcodes
|
||||
and continue archiving the file as is.
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.jpg",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
|
||||
shutil.copy(test_file, dst)
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
self.assertIn("Success", tasks.consume_file(dst))
|
||||
self.assertListEqual(
|
||||
cm.output,
|
||||
[
|
||||
"WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
|
||||
],
|
||||
)
|
||||
m.assert_called_once()
|
||||
|
||||
args, kwargs = m.call_args
|
||||
self.assertIsNone(kwargs["override_filename"])
|
||||
self.assertIsNone(kwargs["override_title"])
|
||||
self.assertIsNone(kwargs["override_correspondent_id"])
|
||||
self.assertIsNone(kwargs["override_document_type_id"])
|
||||
self.assertIsNone(kwargs["override_tag_ids"])
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_supported_no_extension_file(self):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads a supported image file, but without extension
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
@@ -1,23 +1,64 @@
|
||||
import textwrap
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from django.core.checks import Error
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
from documents.checks import changed_password_check
|
||||
from documents.checks import parser_check
|
||||
from documents.models import Document
|
||||
|
||||
from ..checks import changed_password_check
|
||||
from ..checks import parser_check
|
||||
from ..models import Document
|
||||
from ..signals import document_consumer_declaration
|
||||
from .factories import DocumentFactory
|
||||
|
||||
|
||||
class ChecksTestCase(TestCase):
|
||||
class TestDocumentChecks(TestCase):
|
||||
def test_changed_password_check_empty_db(self):
|
||||
self.assertEqual(changed_password_check(None), [])
|
||||
self.assertListEqual(changed_password_check(None), [])
|
||||
|
||||
def test_changed_password_check_no_encryption(self):
|
||||
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_UNENCRYPTED)
|
||||
self.assertEqual(changed_password_check(None), [])
|
||||
self.assertListEqual(changed_password_check(None), [])
|
||||
|
||||
def test_encrypted_missing_passphrase(self):
|
||||
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG)
|
||||
msgs = changed_password_check(None)
|
||||
self.assertEqual(len(msgs), 1)
|
||||
msg_text = msgs[0].msg
|
||||
self.assertEqual(
|
||||
msg_text,
|
||||
"The database contains encrypted documents but no password is set.",
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
PASSPHRASE="test",
|
||||
)
|
||||
@mock.patch("paperless.db.GnuPG.decrypted")
|
||||
@mock.patch("documents.models.Document.source_file")
|
||||
def test_encrypted_decrypt_fails(self, mock_decrypted, mock_source_file):
|
||||
|
||||
mock_decrypted.return_value = None
|
||||
mock_source_file.return_value = b""
|
||||
|
||||
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG)
|
||||
|
||||
msgs = changed_password_check(None)
|
||||
|
||||
self.assertEqual(len(msgs), 1)
|
||||
msg_text = msgs[0].msg
|
||||
self.assertEqual(
|
||||
msg_text,
|
||||
textwrap.dedent(
|
||||
"""
|
||||
The current password doesn't match the password of the
|
||||
existing documents.
|
||||
|
||||
If you intend to change your password, you must first export
|
||||
all of the old documents, start fresh with the new password
|
||||
and then re-import them."
|
||||
""",
|
||||
),
|
||||
)
|
||||
|
||||
def test_parser_check(self):
|
||||
|
||||
|
@@ -180,10 +180,10 @@ class DummyParser(DocumentParser):
|
||||
|
||||
def __init__(self, logging_group, scratch_dir, archive_path):
|
||||
super().__init__(logging_group, None)
|
||||
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir)
|
||||
_, self.fake_thumb = tempfile.mkstemp(suffix=".webp", dir=scratch_dir)
|
||||
self.archive_path = archive_path
|
||||
|
||||
def get_optimised_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
def get_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
return self.fake_thumb
|
||||
|
||||
def parse(self, document_path, mime_type, file_name=None):
|
||||
@@ -194,12 +194,12 @@ class CopyParser(DocumentParser):
|
||||
def get_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
return self.fake_thumb
|
||||
|
||||
def get_optimised_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
def get_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
return self.fake_thumb
|
||||
|
||||
def __init__(self, logging_group, progress_callback=None):
|
||||
super().__init__(logging_group, progress_callback)
|
||||
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=self.tempdir)
|
||||
_, self.fake_thumb = tempfile.mkstemp(suffix=".webp", dir=self.tempdir)
|
||||
|
||||
def parse(self, document_path, mime_type, file_name=None):
|
||||
self.text = "The text"
|
||||
@@ -214,9 +214,9 @@ class FaultyParser(DocumentParser):
|
||||
|
||||
def __init__(self, logging_group, scratch_dir):
|
||||
super().__init__(logging_group)
|
||||
_, self.fake_thumb = tempfile.mkstemp(suffix=".png", dir=scratch_dir)
|
||||
_, self.fake_thumb = tempfile.mkstemp(suffix=".webp", dir=scratch_dir)
|
||||
|
||||
def get_optimised_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
def get_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
return self.fake_thumb
|
||||
|
||||
def parse(self, document_path, mime_type, file_name=None):
|
||||
@@ -230,6 +230,8 @@ def fake_magic_from_file(file, mime=False):
|
||||
return "application/pdf"
|
||||
elif os.path.splitext(file)[1] == ".png":
|
||||
return "image/png"
|
||||
elif os.path.splitext(file)[1] == ".webp":
|
||||
return "image/webp"
|
||||
else:
|
||||
return "unknown"
|
||||
else:
|
||||
|
@@ -150,9 +150,9 @@ class TestDecryptDocuments(TestCase):
|
||||
"samples",
|
||||
"documents",
|
||||
"thumbnails",
|
||||
f"0000004.png.gpg",
|
||||
f"0000004.webp.gpg",
|
||||
),
|
||||
os.path.join(thumb_dir, f"{doc.id:07}.png.gpg"),
|
||||
os.path.join(thumb_dir, f"{doc.id:07}.webp.gpg"),
|
||||
)
|
||||
|
||||
call_command("decrypt_documents")
|
||||
@@ -163,7 +163,7 @@ class TestDecryptDocuments(TestCase):
|
||||
self.assertEqual(doc.filename, "0000004.pdf")
|
||||
self.assertTrue(os.path.isfile(os.path.join(originals_dir, "0000004.pdf")))
|
||||
self.assertTrue(os.path.isfile(doc.source_path))
|
||||
self.assertTrue(os.path.isfile(os.path.join(thumb_dir, f"{doc.id:07}.png")))
|
||||
self.assertTrue(os.path.isfile(os.path.join(thumb_dir, f"{doc.id:07}.webp")))
|
||||
self.assertTrue(os.path.isfile(doc.thumbnail_path))
|
||||
|
||||
with doc.source_file as f:
|
||||
|
231
src/documents/tests/test_migration_webp_conversion.py
Normal file
231
src/documents/tests/test_migration_webp_conversion.py
Normal file
@@ -0,0 +1,231 @@
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
from typing import Iterable
|
||||
from typing import Union
|
||||
from unittest import mock
|
||||
|
||||
from django.test import override_settings
|
||||
from documents.tests.test_migration_archive_files import thumbnail_path
|
||||
from documents.tests.utils import TestMigrations
|
||||
|
||||
|
||||
@mock.patch(
|
||||
"documents.migrations.1021_webp_thumbnail_conversion.multiprocessing.pool.Pool.map",
|
||||
)
|
||||
@mock.patch("documents.migrations.1021_webp_thumbnail_conversion.run_convert")
|
||||
class TestMigrateWebPThumbnails(TestMigrations):
|
||||
|
||||
migrate_from = "1020_merge_20220518_1839"
|
||||
migrate_to = "1021_webp_thumbnail_conversion"
|
||||
auto_migrate = False
|
||||
|
||||
def pretend_convert_output(self, *args, **kwargs):
|
||||
"""
|
||||
Pretends to do the conversion, by copying the input file
|
||||
to the output file
|
||||
"""
|
||||
shutil.copy2(
|
||||
Path(kwargs["input_file"].rstrip("[0]")),
|
||||
Path(kwargs["output_file"]),
|
||||
)
|
||||
|
||||
def pretend_map(self, func: Callable, iterable: Iterable):
|
||||
"""
|
||||
Pretends to be the map of a multiprocessing.Pool, but secretly does
|
||||
everything in series
|
||||
"""
|
||||
for item in iterable:
|
||||
func(item)
|
||||
|
||||
def create_dummy_thumbnails(
|
||||
self,
|
||||
thumb_dir: Path,
|
||||
ext: str,
|
||||
count: int,
|
||||
start_count: int = 0,
|
||||
):
|
||||
"""
|
||||
Helper to create a certain count of files of given extension in a given directory
|
||||
"""
|
||||
for idx in range(count):
|
||||
(Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
|
||||
# Triple check expected files exist
|
||||
self.assert_file_count_by_extension(ext, thumb_dir, count)
|
||||
|
||||
def create_webp_thumbnail_files(
|
||||
self,
|
||||
thumb_dir: Path,
|
||||
count: int,
|
||||
start_count: int = 0,
|
||||
):
|
||||
"""
|
||||
Creates a dummy WebP thumbnail file in the given directory, based on
|
||||
the database Document
|
||||
"""
|
||||
self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
|
||||
|
||||
def create_png_thumbnail_file(
|
||||
self,
|
||||
thumb_dir: Path,
|
||||
count: int,
|
||||
start_count: int = 0,
|
||||
):
|
||||
"""
|
||||
Creates a dummy PNG thumbnail file in the given directory, based on
|
||||
the database Document
|
||||
"""
|
||||
self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
|
||||
|
||||
def assert_file_count_by_extension(
|
||||
self,
|
||||
ext: str,
|
||||
dir: Union[str, Path],
|
||||
expected_count: int,
|
||||
):
|
||||
"""
|
||||
Helper to assert a certain count of given extension files in given directory
|
||||
"""
|
||||
if not isinstance(dir, Path):
|
||||
dir = Path(dir)
|
||||
matching_files = list(dir.glob(f"*.{ext}"))
|
||||
self.assertEqual(len(matching_files), expected_count)
|
||||
|
||||
def assert_png_file_count(self, dir: Path, expected_count: int):
|
||||
"""
|
||||
Helper to assert a certain count of PNG extension files in given directory
|
||||
"""
|
||||
self.assert_file_count_by_extension("png", dir, expected_count)
|
||||
|
||||
def assert_webp_file_count(self, dir: Path, expected_count: int):
|
||||
"""
|
||||
Helper to assert a certain count of WebP extension files in given directory
|
||||
"""
|
||||
self.assert_file_count_by_extension("webp", dir, expected_count)
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
|
||||
|
||||
return super().setUp()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
|
||||
shutil.rmtree(self.thumbnail_dir)
|
||||
|
||||
return super().tearDown()
|
||||
|
||||
def test_do_nothing_if_converted(
|
||||
self,
|
||||
run_convert_mock: mock.MagicMock,
|
||||
map_mock: mock.MagicMock,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Document exists with default WebP thumbnail path
|
||||
WHEN:
|
||||
- Thumbnail conversion is attempted
|
||||
THEN:
|
||||
- Nothing is converted
|
||||
"""
|
||||
map_mock.side_effect = self.pretend_map
|
||||
|
||||
with override_settings(
|
||||
THUMBNAIL_DIR=self.thumbnail_dir,
|
||||
):
|
||||
|
||||
self.create_webp_thumbnail_files(self.thumbnail_dir, 3)
|
||||
|
||||
self.performMigration()
|
||||
run_convert_mock.assert_not_called()
|
||||
|
||||
self.assert_webp_file_count(self.thumbnail_dir, 3)
|
||||
|
||||
def test_convert_single_thumbnail(
|
||||
self,
|
||||
run_convert_mock: mock.MagicMock,
|
||||
map_mock: mock.MagicMock,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Document exists with PNG thumbnail
|
||||
WHEN:
|
||||
- Thumbnail conversion is attempted
|
||||
THEN:
|
||||
- Single thumbnail is converted
|
||||
"""
|
||||
map_mock.side_effect = self.pretend_map
|
||||
run_convert_mock.side_effect = self.pretend_convert_output
|
||||
|
||||
with override_settings(
|
||||
THUMBNAIL_DIR=self.thumbnail_dir,
|
||||
):
|
||||
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
|
||||
|
||||
self.performMigration()
|
||||
|
||||
run_convert_mock.assert_called()
|
||||
self.assertEqual(run_convert_mock.call_count, 3)
|
||||
|
||||
self.assert_webp_file_count(self.thumbnail_dir, 3)
|
||||
|
||||
def test_convert_errors_out(
|
||||
self,
|
||||
run_convert_mock: mock.MagicMock,
|
||||
map_mock: mock.MagicMock,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Document exists with PNG thumbnail
|
||||
WHEN:
|
||||
- Thumbnail conversion is attempted, but raises an exception
|
||||
THEN:
|
||||
- Single thumbnail is converted
|
||||
"""
|
||||
map_mock.side_effect = self.pretend_map
|
||||
run_convert_mock.side_effect = OSError
|
||||
|
||||
with override_settings(
|
||||
THUMBNAIL_DIR=self.thumbnail_dir,
|
||||
):
|
||||
|
||||
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
|
||||
|
||||
self.performMigration()
|
||||
|
||||
run_convert_mock.assert_called()
|
||||
self.assertEqual(run_convert_mock.call_count, 3)
|
||||
|
||||
self.assert_png_file_count(self.thumbnail_dir, 3)
|
||||
|
||||
def test_convert_mixed(
|
||||
self,
|
||||
run_convert_mock: mock.MagicMock,
|
||||
map_mock: mock.MagicMock,
|
||||
):
|
||||
"""
|
||||
GIVEN:
|
||||
- Document exists with PNG thumbnail
|
||||
WHEN:
|
||||
- Thumbnail conversion is attempted, but raises an exception
|
||||
THEN:
|
||||
- Single thumbnail is converted
|
||||
"""
|
||||
map_mock.side_effect = self.pretend_map
|
||||
run_convert_mock.side_effect = self.pretend_convert_output
|
||||
|
||||
with override_settings(
|
||||
THUMBNAIL_DIR=self.thumbnail_dir,
|
||||
):
|
||||
|
||||
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
|
||||
self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=3)
|
||||
|
||||
self.performMigration()
|
||||
|
||||
run_convert_mock.assert_called()
|
||||
self.assertEqual(run_convert_mock.call_count, 3)
|
||||
|
||||
self.assert_png_file_count(self.thumbnail_dir, 0)
|
||||
self.assert_webp_file_count(self.thumbnail_dir, 5)
|
@@ -87,31 +87,6 @@ def fake_get_thumbnail(self, path, mimetype, file_name):
|
||||
return os.path.join(os.path.dirname(__file__), "examples", "no-text.png")
|
||||
|
||||
|
||||
class TestBaseParser(TestCase):
|
||||
def setUp(self) -> None:
|
||||
|
||||
self.scratch = tempfile.mkdtemp()
|
||||
override_settings(SCRATCH_DIR=self.scratch).enable()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
shutil.rmtree(self.scratch)
|
||||
|
||||
@mock.patch("documents.parsers.DocumentParser.get_thumbnail", fake_get_thumbnail)
|
||||
@override_settings(OPTIMIZE_THUMBNAILS=True)
|
||||
def test_get_optimised_thumbnail(self):
|
||||
parser = DocumentParser(None)
|
||||
|
||||
parser.get_optimised_thumbnail("any", "not important", "document.pdf")
|
||||
|
||||
@mock.patch("documents.parsers.DocumentParser.get_thumbnail", fake_get_thumbnail)
|
||||
@override_settings(OPTIMIZE_THUMBNAILS=False)
|
||||
def test_get_optimised_thumb_disabled(self):
|
||||
parser = DocumentParser(None)
|
||||
|
||||
path = parser.get_optimised_thumbnail("any", "not important", "document.pdf")
|
||||
self.assertEqual(path, fake_get_thumbnail(None, None, None, None))
|
||||
|
||||
|
||||
class TestParserAvailability(TestCase):
|
||||
def test_file_extensions(self):
|
||||
|
||||
|
@@ -42,9 +42,9 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
"samples",
|
||||
"documents",
|
||||
"thumbnails",
|
||||
"0000001.png",
|
||||
"0000001.webp",
|
||||
),
|
||||
os.path.join(self.dirs.thumbnail_dir, "0000001.png"),
|
||||
os.path.join(self.dirs.thumbnail_dir, "0000001.webp"),
|
||||
)
|
||||
|
||||
return Document.objects.create(
|
||||
|
@@ -1,10 +1,7 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
from django.utils import timezone
|
||||
from documents import tasks
|
||||
@@ -15,10 +12,9 @@ from documents.models import Tag
|
||||
from documents.sanity_checker import SanityCheckFailedException
|
||||
from documents.sanity_checker import SanityCheckMessages
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class TestTasks(DirectoriesMixin, TestCase):
|
||||
class TestIndexReindex(DirectoriesMixin, TestCase):
|
||||
def test_index_reindex(self):
|
||||
Document.objects.create(
|
||||
title="test",
|
||||
@@ -43,6 +39,8 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
|
||||
tasks.index_optimize()
|
||||
|
||||
|
||||
class TestClassifier(DirectoriesMixin, TestCase):
|
||||
@mock.patch("documents.tasks.load_classifier")
|
||||
def test_train_classifier_no_auto_matching(self, load_classifier):
|
||||
tasks.train_classifier()
|
||||
@@ -93,442 +91,8 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
mtime3 = os.stat(settings.MODEL_FILE).st_mtime
|
||||
self.assertNotEqual(mtime2, mtime3)
|
||||
|
||||
def test_barcode_reader(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pbm",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_distorsion2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-distorsion2.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_unreadable(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-PATCHT-unreadable.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_qr(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"qr-code-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_128(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-PATCHT.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
separator_barcode = str(settings.CONSUMER_BARCODE_STRING)
|
||||
self.assertEqual(tasks.barcode_reader(img), [separator_barcode])
|
||||
|
||||
def test_barcode_reader_no_barcode(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), [])
|
||||
|
||||
def test_barcode_reader_custom_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_qr_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_barcode_reader_custom_128_separator(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
img = Image.open(test_file)
|
||||
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
|
||||
|
||||
def test_get_mime_type(self):
|
||||
tiff_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
pdf_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
png_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.png",
|
||||
)
|
||||
tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
|
||||
pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
|
||||
shutil.copy(tiff_file, tiff_file_no_extension)
|
||||
shutil.copy(pdf_file, pdf_file_no_extension)
|
||||
|
||||
self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff")
|
||||
self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf")
|
||||
self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff")
|
||||
self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf")
|
||||
self.assertEqual(tasks.get_file_type(png_file), "image/png")
|
||||
|
||||
def test_convert_from_tiff_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
target_file = tasks.convert_from_tiff_to_pdf(dst)
|
||||
file_extension = os.path.splitext(os.path.basename(target_file))[1]
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
self.assertEqual(file_extension, ".pdf")
|
||||
|
||||
def test_convert_error_from_pdf_to_pdf(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
self.assertIsNone(tasks.convert_from_tiff_to_pdf(dst))
|
||||
|
||||
def test_scan_file_for_separating_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_barcodes2(self):
|
||||
test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_scan_file_for_separating_barcodes3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_barcodes4(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"several-patcht-codes.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [2, 5])
|
||||
|
||||
def test_scan_file_for_separating_barcodes_upsidedown(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle_reverse.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [1])
|
||||
|
||||
def test_scan_file_for_separating_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-qr.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-qr-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
@override_settings(CONSUMER_BARCODE_STRING="CUSTOM BARCODE")
|
||||
def test_scan_file_for_separating_custom_128_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-128-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [0])
|
||||
|
||||
def test_scan_file_for_separating_wrong_qr_barcodes(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"barcode-39-custom.pdf",
|
||||
)
|
||||
pages = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertEqual(pages, [])
|
||||
|
||||
def test_separate_pages(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
pages = tasks.separate_pages(test_file, [1])
|
||||
self.assertEqual(len(pages), 2)
|
||||
|
||||
def test_separate_pages_no_list(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
pages = tasks.separate_pages(test_file, [])
|
||||
self.assertEqual(pages, [])
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.tasks:No pages to split on!",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
tasks.save_to_dir(test_file, target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "patch-code-t.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_save_to_dir2(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
nonexistingdir = "/nowhere"
|
||||
if os.path.isdir(nonexistingdir):
|
||||
self.fail("non-existing dir exists")
|
||||
else:
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
tasks.save_to_dir(test_file, target_dir=nonexistingdir)
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
f"WARNING:paperless.tasks:{str(test_file)} or {str(nonexistingdir)} don't exist.",
|
||||
],
|
||||
)
|
||||
|
||||
def test_save_to_dir3(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
tasks.save_to_dir(test_file, newname="newname.pdf", target_dir=tempdir)
|
||||
target_file = os.path.join(tempdir, "newname.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file))
|
||||
|
||||
def test_barcode_splitter(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
separators = tasks.scan_file_for_separating_barcodes(test_file)
|
||||
self.assertTrue(separators)
|
||||
document_list = tasks.separate_pages(test_file, separators)
|
||||
self.assertTrue(document_list)
|
||||
for document in document_list:
|
||||
tasks.save_to_dir(document, target_dir=tempdir)
|
||||
target_file1 = os.path.join(tempdir, "patch-code-t-middle_document_0.pdf")
|
||||
target_file2 = os.path.join(tempdir, "patch-code-t-middle_document_1.pdf")
|
||||
self.assertTrue(os.path.isfile(target_file1))
|
||||
self.assertTrue(os.path.isfile(target_file2))
|
||||
|
||||
@override_settings(CONSUMER_ENABLE_BARCODES=True)
|
||||
def test_consume_barcode_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.pdf",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.pdf")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_tiff_file(self):
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle.tiff")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
@mock.patch("documents.consumer.Consumer.try_consume_file")
|
||||
def test_consume_barcode_unsupported_jpg_file(self, m):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads an unsupported image file (e.g. jpg)
|
||||
|
||||
The function shouldn't try to scan for separating barcodes
|
||||
and continue archiving the file as is.
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"simple.jpg",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "simple.jpg")
|
||||
shutil.copy(test_file, dst)
|
||||
with self.assertLogs("paperless.tasks", level="WARNING") as cm:
|
||||
self.assertIn("Success", tasks.consume_file(dst))
|
||||
self.assertEqual(
|
||||
cm.output,
|
||||
[
|
||||
"WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
|
||||
],
|
||||
)
|
||||
m.assert_called_once()
|
||||
|
||||
args, kwargs = m.call_args
|
||||
self.assertIsNone(kwargs["override_filename"])
|
||||
self.assertIsNone(kwargs["override_title"])
|
||||
self.assertIsNone(kwargs["override_correspondent_id"])
|
||||
self.assertIsNone(kwargs["override_document_type_id"])
|
||||
self.assertIsNone(kwargs["override_tag_ids"])
|
||||
|
||||
@override_settings(
|
||||
CONSUMER_ENABLE_BARCODES=True,
|
||||
CONSUMER_BARCODE_TIFF_SUPPORT=True,
|
||||
)
|
||||
def test_consume_barcode_supported_no_extension_file(self):
|
||||
"""
|
||||
This test assumes barcode and TIFF support are enabled and
|
||||
the user uploads a supported image file, but without extension
|
||||
"""
|
||||
test_file = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"barcodes",
|
||||
"patch-code-t-middle.tiff",
|
||||
)
|
||||
dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
|
||||
shutil.copy(test_file, dst)
|
||||
|
||||
self.assertEqual(tasks.consume_file(dst), "File successfully split")
|
||||
|
||||
class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
@mock.patch("documents.tasks.sanity_checker.check_sanity")
|
||||
def test_sanity_check_success(self, m):
|
||||
m.return_value = SanityCheckMessages()
|
||||
@@ -565,6 +129,8 @@ class TestTasks(DirectoriesMixin, TestCase):
|
||||
)
|
||||
m.assert_called_once()
|
||||
|
||||
|
||||
class TestBulkUpdate(DirectoriesMixin, TestCase):
|
||||
def test_bulk_update_documents(self):
|
||||
doc1 = Document.objects.create(
|
||||
title="test",
|
||||
|
@@ -1,9 +1,28 @@
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import override_settings
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
class TestViews(TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# Provide a dummy static dir to silence whitenoise warnings
|
||||
cls.static_dir = tempfile.mkdtemp()
|
||||
|
||||
cls.override = override_settings(
|
||||
STATIC_ROOT=cls.static_dir,
|
||||
)
|
||||
cls.override.enable()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
shutil.rmtree(cls.static_dir, ignore_errors=True)
|
||||
cls.override.disable()
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.user = User.objects.create_user("testuser")
|
||||
|
||||
|
@@ -19,6 +19,7 @@ def setup_directories():
|
||||
dirs.scratch_dir = tempfile.mkdtemp()
|
||||
dirs.media_dir = tempfile.mkdtemp()
|
||||
dirs.consumption_dir = tempfile.mkdtemp()
|
||||
dirs.static_dir = tempfile.mkdtemp()
|
||||
dirs.index_dir = os.path.join(dirs.data_dir, "index")
|
||||
dirs.originals_dir = os.path.join(dirs.media_dir, "documents", "originals")
|
||||
dirs.thumbnail_dir = os.path.join(dirs.media_dir, "documents", "thumbnails")
|
||||
@@ -42,6 +43,7 @@ def setup_directories():
|
||||
CONSUMPTION_DIR=dirs.consumption_dir,
|
||||
LOGGING_DIR=dirs.logging_dir,
|
||||
INDEX_DIR=dirs.index_dir,
|
||||
STATIC_ROOT=dirs.static_dir,
|
||||
MODEL_FILE=os.path.join(dirs.data_dir, "classification_model.pickle"),
|
||||
MEDIA_LOCK=os.path.join(dirs.media_dir, "media.lock"),
|
||||
)
|
||||
@@ -55,6 +57,7 @@ def remove_dirs(dirs):
|
||||
shutil.rmtree(dirs.data_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.scratch_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.consumption_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.static_dir, ignore_errors=True)
|
||||
dirs.settings_override.disable()
|
||||
|
||||
|
||||
|
@@ -366,7 +366,8 @@ class DocumentViewSet(
|
||||
handle = doc.thumbnail_file
|
||||
# TODO: Send ETag information and use that to send new thumbnails
|
||||
# if available
|
||||
return HttpResponse(handle, content_type="image/png")
|
||||
|
||||
return HttpResponse(handle, content_type="image/webp")
|
||||
except (FileNotFoundError, Document.DoesNotExist):
|
||||
raise Http404()
|
||||
|
||||
@@ -749,7 +750,7 @@ class RemoteVersionView(GenericAPIView):
|
||||
|
||||
|
||||
class StoragePathViewSet(ModelViewSet):
|
||||
model = DocumentType
|
||||
model = StoragePath
|
||||
|
||||
queryset = StoragePath.objects.annotate(document_count=Count("documents")).order_by(
|
||||
Lower("name"),
|
||||
|
@@ -72,7 +72,7 @@ def binaries_check(app_configs, **kwargs):
|
||||
error = "Paperless can't find {}. Without it, consumption is impossible."
|
||||
hint = "Either it's not in your ${PATH} or it's not installed."
|
||||
|
||||
binaries = (settings.CONVERT_BINARY, settings.OPTIPNG_BINARY, "tesseract")
|
||||
binaries = (settings.CONVERT_BINARY, "tesseract")
|
||||
|
||||
check_messages = []
|
||||
for binary in binaries:
|
||||
|
@@ -526,8 +526,6 @@ CONSUMER_BARCODE_TIFF_SUPPORT = __get_boolean(
|
||||
|
||||
CONSUMER_BARCODE_STRING = os.getenv("PAPERLESS_CONSUMER_BARCODE_STRING", "PATCHT")
|
||||
|
||||
OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true")
|
||||
|
||||
OCR_PAGES = int(os.getenv("PAPERLESS_OCR_PAGES", 0))
|
||||
|
||||
# The default language that tesseract will attempt to use when parsing
|
||||
@@ -570,8 +568,6 @@ CONVERT_MEMORY_LIMIT = os.getenv("PAPERLESS_CONVERT_MEMORY_LIMIT")
|
||||
|
||||
GS_BINARY = os.getenv("PAPERLESS_GS_BINARY", "gs")
|
||||
|
||||
OPTIPNG_BINARY = os.getenv("PAPERLESS_OPTIPNG_BINARY", "optipng")
|
||||
|
||||
|
||||
# Pre-2.x versions of Paperless stored your documents locally with GPG
|
||||
# encryption, but that is no longer the default. This behaviour is still
|
||||
|
@@ -13,9 +13,9 @@ class TestChecks(DirectoriesMixin, TestCase):
|
||||
def test_binaries(self):
|
||||
self.assertEqual(binaries_check(None), [])
|
||||
|
||||
@override_settings(CONVERT_BINARY="uuuhh", OPTIPNG_BINARY="forgot")
|
||||
@override_settings(CONVERT_BINARY="uuuhh")
|
||||
def test_binaries_fail(self):
|
||||
self.assertEqual(len(binaries_check(None)), 2)
|
||||
self.assertEqual(len(binaries_check(None)), 1)
|
||||
|
||||
def test_paths_check(self):
|
||||
self.assertEqual(paths_check(None), [])
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import uuid
|
||||
from typing import ContextManager
|
||||
from unittest import mock
|
||||
@@ -225,11 +227,18 @@ class TestParser(DirectoriesMixin, TestCase):
|
||||
def test_image_simple_alpha(self):
|
||||
parser = RasterisedDocumentParser(None)
|
||||
|
||||
parser.parse(os.path.join(self.SAMPLE_FILES, "simple-alpha.png"), "image/png")
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
# Copy sample file to temp directory, as the parsing changes the file
|
||||
# and this makes it modified to Git
|
||||
sample_file = os.path.join(self.SAMPLE_FILES, "simple-alpha.png")
|
||||
dest_file = os.path.join(tempdir, "simple-alpha.png")
|
||||
shutil.copy(sample_file, dest_file)
|
||||
|
||||
self.assertTrue(os.path.isfile(parser.archive_path))
|
||||
parser.parse(dest_file, "image/png")
|
||||
|
||||
self.assertContainsStrings(parser.get_text(), ["This is a test document."])
|
||||
self.assertTrue(os.path.isfile(parser.archive_path))
|
||||
|
||||
self.assertContainsStrings(parser.get_text(), ["This is a test document."])
|
||||
|
||||
def test_image_calc_a4_dpi(self):
|
||||
parser = RasterisedDocumentParser(None)
|
||||
|
@@ -30,8 +30,8 @@ class TextDocumentParser(DocumentParser):
|
||||
)
|
||||
draw.text((5, 5), read_text(), font=font, fill="black")
|
||||
|
||||
out_path = os.path.join(self.tempdir, "thumb.png")
|
||||
img.save(out_path)
|
||||
out_path = os.path.join(self.tempdir, "thumb.webp")
|
||||
img.save(out_path, format="WEBP")
|
||||
|
||||
return out_path
|
||||
|
||||
|
@@ -16,3 +16,7 @@ source =
|
||||
./
|
||||
omit =
|
||||
*/tests/*
|
||||
manage.py
|
||||
paperless/workers.py
|
||||
paperless/wsgi.py
|
||||
paperless/auth.py
|
||||
|
Reference in New Issue
Block a user