mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Merge branch 'dev' into feature-ocrmypdf
This commit is contained in:
@@ -64,15 +64,18 @@ def get_schema():
|
||||
|
||||
|
||||
def open_index(recreate=False):
|
||||
if exists_in(settings.INDEX_DIR) and not recreate:
|
||||
return open_dir(settings.INDEX_DIR)
|
||||
else:
|
||||
# TODO: this is not thread safe. If 2 instances try to create the index
|
||||
# at the same time, this fails. This currently prevents parallel
|
||||
# tests.
|
||||
if not os.path.isdir(settings.INDEX_DIR):
|
||||
os.makedirs(settings.INDEX_DIR, exist_ok=True)
|
||||
return create_in(settings.INDEX_DIR, get_schema())
|
||||
# TODO: this is not thread safe. If 2 instances try to create the index
|
||||
# at the same time, this fails. This currently prevents parallel
|
||||
# tests.
|
||||
try:
|
||||
if exists_in(settings.INDEX_DIR) and not recreate:
|
||||
return open_dir(settings.INDEX_DIR)
|
||||
except Exception as e:
|
||||
logger.error(f"Error while opening the index: {e}, recreating.")
|
||||
|
||||
if not os.path.isdir(settings.INDEX_DIR):
|
||||
os.makedirs(settings.INDEX_DIR, exist_ok=True)
|
||||
return create_in(settings.INDEX_DIR, get_schema())
|
||||
|
||||
|
||||
def update_document(writer, doc):
|
||||
|
@@ -74,7 +74,7 @@ class Command(BaseCommand):
|
||||
f"Abort: encrypted file {document.source_path} does not "
|
||||
f"end with .gpg")
|
||||
|
||||
document.filename = os.path.splitext(document.source_path)[0]
|
||||
document.filename = os.path.splitext(document.filename)[0]
|
||||
|
||||
with open(document.source_path, "wb") as f:
|
||||
f.write(raw_document)
|
||||
|
@@ -95,14 +95,8 @@ class Command(BaseCommand):
|
||||
def handle(self, *args, **options):
|
||||
directory = options["directory"]
|
||||
|
||||
logging.getLogger(__name__).info(
|
||||
f"Starting document consumer at {directory}")
|
||||
|
||||
for entry in os.scandir(directory):
|
||||
if entry.is_file():
|
||||
async_task("documents.tasks.consume_file",
|
||||
entry.path,
|
||||
task_name=os.path.basename(entry.path)[:100])
|
||||
_consume(entry.path)
|
||||
|
||||
if options["oneshot"]:
|
||||
return
|
||||
|
@@ -22,13 +22,6 @@ class Command(Renderable, BaseCommand):
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument("target")
|
||||
parser.add_argument(
|
||||
"--legacy",
|
||||
action="store_true",
|
||||
help="Don't try to export all of the document data, just dump the "
|
||||
"original document files out in a format that makes "
|
||||
"re-consuming them easy."
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
BaseCommand.__init__(self, *args, **kwargs)
|
||||
@@ -44,10 +37,7 @@ class Command(Renderable, BaseCommand):
|
||||
if not os.access(self.target, os.W_OK):
|
||||
raise CommandError("That path doesn't appear to be writable")
|
||||
|
||||
if options["legacy"]:
|
||||
self.dump_legacy()
|
||||
else:
|
||||
self.dump()
|
||||
self.dump()
|
||||
|
||||
def dump(self):
|
||||
|
||||
@@ -102,33 +92,3 @@ class Command(Renderable, BaseCommand):
|
||||
|
||||
with open(os.path.join(self.target, "manifest.json"), "w") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
def dump_legacy(self):
|
||||
|
||||
for document in Document.objects.all():
|
||||
|
||||
target = os.path.join(
|
||||
self.target, self._get_legacy_file_name(document))
|
||||
|
||||
print("Exporting: {}".format(target))
|
||||
|
||||
with open(target, "wb") as f:
|
||||
f.write(GnuPG.decrypted(document.source_file))
|
||||
t = int(time.mktime(document.created.timetuple()))
|
||||
os.utime(target, times=(t, t))
|
||||
|
||||
@staticmethod
|
||||
def _get_legacy_file_name(doc):
|
||||
|
||||
if not doc.correspondent and not doc.title:
|
||||
return os.path.basename(doc.source_path)
|
||||
|
||||
created = doc.created.strftime("%Y%m%d%H%M%SZ")
|
||||
tags = ",".join([t.slug for t in doc.tags.all()])
|
||||
|
||||
if tags:
|
||||
return "{} - {} - {} - {}{}".format(
|
||||
created, doc.correspondent, doc.title, tags, doc.file_type)
|
||||
|
||||
return "{} - {} - {}{}".format(
|
||||
created, doc.correspondent, doc.title, doc.file_type)
|
||||
|
BIN
src/documents/tests/samples/originals/0000001.pdf
Normal file
BIN
src/documents/tests/samples/originals/0000001.pdf
Normal file
Binary file not shown.
BIN
src/documents/tests/samples/originals/0000002.pdf.gpg
Normal file
BIN
src/documents/tests/samples/originals/0000002.pdf.gpg
Normal file
Binary file not shown.
BIN
src/documents/tests/samples/thumb/0000001.png
Normal file
BIN
src/documents/tests/samples/thumb/0000001.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 7.7 KiB |
BIN
src/documents/tests/samples/thumb/0000002.png.gpg
Normal file
BIN
src/documents/tests/samples/thumb/0000002.png.gpg
Normal file
Binary file not shown.
@@ -1,41 +1,24 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.test import override_settings
|
||||
from pathvalidate import ValidationError
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from documents.models import Document, Correspondent, DocumentType, Tag
|
||||
from documents.tests.utils import setup_directories, remove_dirs
|
||||
|
||||
|
||||
class DocumentApiTest(APITestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.scratch_dir = tempfile.mkdtemp()
|
||||
self.media_dir = tempfile.mkdtemp()
|
||||
self.originals_dir = os.path.join(self.media_dir, "documents", "originals")
|
||||
self.thumbnail_dir = os.path.join(self.media_dir, "documents", "thumbnails")
|
||||
|
||||
os.makedirs(self.originals_dir, exist_ok=True)
|
||||
os.makedirs(self.thumbnail_dir, exist_ok=True)
|
||||
|
||||
override_settings(
|
||||
SCRATCH_DIR=self.scratch_dir,
|
||||
MEDIA_ROOT=self.media_dir,
|
||||
ORIGINALS_DIR=self.originals_dir,
|
||||
THUMBNAIL_DIR=self.thumbnail_dir
|
||||
).enable()
|
||||
self.dirs = setup_directories()
|
||||
self.addCleanup(remove_dirs, self.dirs)
|
||||
|
||||
user = User.objects.create_superuser(username="temp_admin")
|
||||
self.client.force_login(user=user)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.scratch_dir, ignore_errors=True)
|
||||
shutil.rmtree(self.media_dir, ignore_errors=True)
|
||||
|
||||
def testDocuments(self):
|
||||
|
||||
response = self.client.get("/api/documents/").data
|
||||
@@ -88,7 +71,7 @@ class DocumentApiTest(APITestCase):
|
||||
|
||||
def test_document_actions(self):
|
||||
|
||||
_, filename = tempfile.mkstemp(dir=self.originals_dir)
|
||||
_, filename = tempfile.mkstemp(dir=self.dirs.originals_dir)
|
||||
|
||||
content = b"This is a test"
|
||||
content_thumbnail = b"thumbnail content"
|
||||
@@ -98,7 +81,7 @@ class DocumentApiTest(APITestCase):
|
||||
|
||||
doc = Document.objects.create(title="none", filename=os.path.basename(filename), mime_type="application/pdf")
|
||||
|
||||
with open(os.path.join(self.thumbnail_dir, "{:07d}.png".format(doc.pk)), "wb") as f:
|
||||
with open(os.path.join(self.dirs.thumbnail_dir, "{:07d}.png".format(doc.pk)), "wb") as f:
|
||||
f.write(content_thumbnail)
|
||||
|
||||
response = self.client.get('/api/documents/{}/download/'.format(doc.pk))
|
||||
@@ -227,7 +210,8 @@ class DocumentApiTest(APITestCase):
|
||||
|
||||
m.assert_called_once()
|
||||
|
||||
self.assertEqual(m.call_args.kwargs['override_filename'], "simple.pdf")
|
||||
args, kwargs = m.call_args
|
||||
self.assertEqual(kwargs['override_filename'], "simple.pdf")
|
||||
|
||||
@mock.patch("documents.forms.async_task")
|
||||
def test_upload_invalid_form(self, m):
|
||||
|
@@ -11,7 +11,6 @@ from documents.models import Correspondent, Document, Tag, DocumentType
|
||||
class TestClassifier(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
self.classifier = DocumentClassifier()
|
||||
|
||||
def generate_test_data(self):
|
||||
|
@@ -1,12 +1,12 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from .utils import setup_directories, remove_dirs
|
||||
from ..consumer import Consumer, ConsumerError
|
||||
from ..models import FileInfo, Tag, Correspondent, DocumentType, Document
|
||||
from ..parsers import DocumentParser, ParseError
|
||||
@@ -410,24 +410,15 @@ def fake_magic_from_file(file, mime=False):
|
||||
@mock.patch("documents.consumer.magic.from_file", fake_magic_from_file)
|
||||
class TestConsumer(TestCase):
|
||||
|
||||
def make_dummy_parser(self, logging_group):
|
||||
return DummyParser(logging_group, self.scratch_dir)
|
||||
def make_dummy_parser(self, path, logging_group):
|
||||
return DummyParser(logging_group, self.dirs.scratch_dir)
|
||||
|
||||
def make_faulty_parser(self, logging_group):
|
||||
return FaultyParser(logging_group, self.scratch_dir)
|
||||
def make_faulty_parser(self, path, logging_group):
|
||||
return FaultyParser(logging_group, self.dirs.scratch_dir)
|
||||
|
||||
def setUp(self):
|
||||
self.scratch_dir = tempfile.mkdtemp()
|
||||
self.media_dir = tempfile.mkdtemp()
|
||||
self.consumption_dir = tempfile.mkdtemp()
|
||||
|
||||
override_settings(
|
||||
SCRATCH_DIR=self.scratch_dir,
|
||||
MEDIA_ROOT=self.media_dir,
|
||||
ORIGINALS_DIR=os.path.join(self.media_dir, "documents", "originals"),
|
||||
THUMBNAIL_DIR=os.path.join(self.media_dir, "documents", "thumbnails"),
|
||||
CONSUMPTION_DIR=self.consumption_dir
|
||||
).enable()
|
||||
self.dirs = setup_directories()
|
||||
self.addCleanup(remove_dirs, self.dirs)
|
||||
|
||||
patcher = mock.patch("documents.parsers.document_consumer_declaration.send")
|
||||
m = patcher.start()
|
||||
@@ -441,13 +432,8 @@ class TestConsumer(TestCase):
|
||||
|
||||
self.consumer = Consumer()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.scratch_dir, ignore_errors=True)
|
||||
shutil.rmtree(self.media_dir, ignore_errors=True)
|
||||
shutil.rmtree(self.consumption_dir, ignore_errors=True)
|
||||
|
||||
def get_test_file(self):
|
||||
fd, f = tempfile.mkstemp(suffix=".pdf", dir=self.scratch_dir)
|
||||
fd, f = tempfile.mkstemp(suffix=".pdf", dir=self.dirs.scratch_dir)
|
||||
return f
|
||||
|
||||
def testNormalOperation(self):
|
||||
|
7
src/documents/tests/test_document_retagger.py
Normal file
7
src/documents/tests/test_document_retagger.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from django.test import TestCase
|
||||
|
||||
|
||||
class TestRetagger(TestCase):
|
||||
|
||||
def test_overwrite(self):
|
||||
pass
|
@@ -1,7 +1,6 @@
|
||||
import filecmp
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from unittest import mock
|
||||
@@ -11,6 +10,7 @@ from django.test import TestCase, override_settings
|
||||
|
||||
from documents.consumer import ConsumerError
|
||||
from documents.management.commands import document_consumer
|
||||
from documents.tests.utils import setup_directories, remove_dirs
|
||||
|
||||
|
||||
class ConsumerThread(Thread):
|
||||
@@ -41,9 +41,8 @@ class TestConsumer(TestCase):
|
||||
self.task_mock = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
|
||||
self.consume_dir = tempfile.mkdtemp()
|
||||
|
||||
override_settings(CONSUMPTION_DIR=self.consume_dir).enable()
|
||||
self.dirs = setup_directories()
|
||||
self.addCleanup(remove_dirs, self.dirs)
|
||||
|
||||
def t_start(self):
|
||||
self.t = ConsumerThread()
|
||||
@@ -94,25 +93,29 @@ class TestConsumer(TestCase):
|
||||
def test_consume_file(self):
|
||||
self.t_start()
|
||||
|
||||
f = os.path.join(self.consume_dir, "my_file.pdf")
|
||||
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||
shutil.copy(self.sample_file, f)
|
||||
|
||||
self.wait_for_task_mock_call()
|
||||
|
||||
self.task_mock.assert_called_once()
|
||||
self.assertEqual(self.task_mock.call_args.args[1], f)
|
||||
|
||||
args, kwargs = self.task_mock.call_args
|
||||
self.assertEqual(args[1], f)
|
||||
|
||||
@override_settings(CONSUMER_POLLING=1)
|
||||
def test_consume_file_polling(self):
|
||||
self.test_consume_file()
|
||||
|
||||
def test_consume_existing_file(self):
|
||||
f = os.path.join(self.consume_dir, "my_file.pdf")
|
||||
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||
shutil.copy(self.sample_file, f)
|
||||
|
||||
self.t_start()
|
||||
self.task_mock.assert_called_once()
|
||||
self.assertEqual(self.task_mock.call_args.args[1], f)
|
||||
|
||||
args, kwargs = self.task_mock.call_args
|
||||
self.assertEqual(args[1], f)
|
||||
|
||||
@override_settings(CONSUMER_POLLING=1)
|
||||
def test_consume_existing_file_polling(self):
|
||||
@@ -125,7 +128,7 @@ class TestConsumer(TestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
fname = os.path.join(self.consume_dir, "my_file.pdf")
|
||||
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||
|
||||
self.slow_write_file(fname)
|
||||
|
||||
@@ -135,7 +138,8 @@ class TestConsumer(TestCase):
|
||||
|
||||
self.task_mock.assert_called_once()
|
||||
|
||||
self.assertEqual(self.task_mock.call_args.args[1], fname)
|
||||
args, kwargs = self.task_mock.call_args
|
||||
self.assertEqual(args[1], fname)
|
||||
|
||||
@override_settings(CONSUMER_POLLING=1)
|
||||
def test_slow_write_pdf_polling(self):
|
||||
@@ -148,8 +152,8 @@ class TestConsumer(TestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
fname = os.path.join(self.consume_dir, "my_file.~df")
|
||||
fname2 = os.path.join(self.consume_dir, "my_file.pdf")
|
||||
fname = os.path.join(self.dirs.consumption_dir, "my_file.~df")
|
||||
fname2 = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||
|
||||
self.slow_write_file(fname)
|
||||
shutil.move(fname, fname2)
|
||||
@@ -157,7 +161,9 @@ class TestConsumer(TestCase):
|
||||
self.wait_for_task_mock_call()
|
||||
|
||||
self.task_mock.assert_called_once()
|
||||
self.assertEqual(self.task_mock.call_args.args[1], fname2)
|
||||
|
||||
args, kwargs = self.task_mock.call_args
|
||||
self.assertEqual(args[1], fname2)
|
||||
|
||||
error_logger.assert_not_called()
|
||||
|
||||
@@ -172,13 +178,14 @@ class TestConsumer(TestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
fname = os.path.join(self.consume_dir, "my_file.pdf")
|
||||
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||
self.slow_write_file(fname, incomplete=True)
|
||||
|
||||
self.wait_for_task_mock_call()
|
||||
|
||||
self.task_mock.assert_called_once()
|
||||
self.assertEqual(self.task_mock.call_args.args[1], fname)
|
||||
args, kwargs = self.task_mock.call_args
|
||||
self.assertEqual(args[1], fname)
|
||||
|
||||
# assert that we have an error logged with this invalid file.
|
||||
error_logger.assert_called_once()
|
||||
|
56
src/documents/tests/test_management_decrypt.py
Normal file
56
src/documents/tests/test_management_decrypt.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from unittest import mock
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from documents.management.commands import document_exporter
|
||||
from documents.models import Document, Tag, DocumentType, Correspondent
|
||||
|
||||
|
||||
class TestDecryptDocuments(TestCase):
|
||||
|
||||
@override_settings(
|
||||
ORIGINALS_DIR=os.path.join(os.path.dirname(__file__), "samples", "originals"),
|
||||
THUMBNAIL_DIR=os.path.join(os.path.dirname(__file__), "samples", "thumb"),
|
||||
PASSPHRASE="test"
|
||||
)
|
||||
@mock.patch("documents.management.commands.decrypt_documents.input")
|
||||
def test_decrypt(self, m):
|
||||
|
||||
media_dir = tempfile.mkdtemp()
|
||||
originals_dir = os.path.join(media_dir, "documents", "originals")
|
||||
thumb_dir = os.path.join(media_dir, "documents", "thumbnails")
|
||||
os.makedirs(originals_dir, exist_ok=True)
|
||||
os.makedirs(thumb_dir, exist_ok=True)
|
||||
|
||||
override_settings(
|
||||
ORIGINALS_DIR=originals_dir,
|
||||
THUMBNAIL_DIR=thumb_dir,
|
||||
PASSPHRASE="test"
|
||||
).enable()
|
||||
|
||||
shutil.copy(os.path.join(os.path.dirname(__file__), "samples", "originals", "0000002.pdf.gpg"), os.path.join(originals_dir, "0000002.pdf.gpg"))
|
||||
shutil.copy(os.path.join(os.path.dirname(__file__), "samples", "thumb", "0000002.png.gpg"), os.path.join(thumb_dir, "0000002.png.gpg"))
|
||||
|
||||
Document.objects.create(checksum="9c9691e51741c1f4f41a20896af31770", title="wow", filename="0000002.pdf.gpg", id=2, mime_type="application/pdf", storage_type=Document.STORAGE_TYPE_GPG)
|
||||
|
||||
call_command('decrypt_documents')
|
||||
|
||||
doc = Document.objects.get(id=2)
|
||||
|
||||
self.assertEqual(doc.storage_type, Document.STORAGE_TYPE_UNENCRYPTED)
|
||||
self.assertEqual(doc.filename, "0000002.pdf")
|
||||
self.assertTrue(os.path.isfile(os.path.join(originals_dir, "0000002.pdf")))
|
||||
self.assertTrue(os.path.isfile(doc.source_path))
|
||||
self.assertTrue(os.path.isfile(os.path.join(thumb_dir, "0000002.png")))
|
||||
self.assertTrue(os.path.isfile(doc.thumbnail_path))
|
||||
|
||||
with doc.source_file as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
self.assertEqual(checksum, doc.checksum)
|
||||
|
53
src/documents/tests/test_management_exporter.py
Normal file
53
src/documents/tests/test_management_exporter.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from documents.management.commands import document_exporter
|
||||
from documents.models import Document, Tag, DocumentType, Correspondent
|
||||
|
||||
|
||||
class TestExporter(TestCase):
|
||||
|
||||
@override_settings(
|
||||
ORIGINALS_DIR=os.path.join(os.path.dirname(__file__), "samples", "originals"),
|
||||
THUMBNAIL_DIR=os.path.join(os.path.dirname(__file__), "samples", "thumb"),
|
||||
PASSPHRASE="test"
|
||||
)
|
||||
def test_exporter(self):
|
||||
file = os.path.join(os.path.dirname(__file__), "samples", "originals", "0000001.pdf")
|
||||
|
||||
with open(file, "rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
|
||||
Document.objects.create(checksum=checksum, title="wow", filename="0000001.pdf", id=1, mime_type="application/pdf")
|
||||
Document.objects.create(checksum="9c9691e51741c1f4f41a20896af31770", title="wow", filename="0000002.pdf.gpg", id=2, mime_type="application/pdf", storage_type=Document.STORAGE_TYPE_GPG)
|
||||
Tag.objects.create(name="t")
|
||||
DocumentType.objects.create(name="dt")
|
||||
Correspondent.objects.create(name="c")
|
||||
|
||||
target = tempfile.mkdtemp()
|
||||
|
||||
call_command('document_exporter', target)
|
||||
|
||||
with open(os.path.join(target, "manifest.json")) as f:
|
||||
manifest = json.load(f)
|
||||
|
||||
self.assertEqual(len(manifest), 5)
|
||||
|
||||
for element in manifest:
|
||||
if element['model'] == 'documents.document':
|
||||
fname = os.path.join(target, element[document_exporter.EXPORTER_FILE_NAME])
|
||||
self.assertTrue(os.path.exists(fname))
|
||||
self.assertTrue(os.path.exists(os.path.join(target, element[document_exporter.EXPORTER_THUMBNAIL_NAME])))
|
||||
|
||||
with open(fname, "rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
self.assertEqual(checksum, element['fields']['checksum'])
|
||||
|
||||
Document.objects.create(checksum="AAAAAAAAAAAAAAAAA", title="wow", filename="0000004.pdf", id=3, mime_type="application/pdf")
|
||||
|
||||
self.assertRaises(FileNotFoundError, call_command, 'document_exporter', target)
|
@@ -1,3 +1,5 @@
|
||||
import shutil
|
||||
import tempfile
|
||||
from random import randint
|
||||
|
||||
from django.contrib.admin.models import LogEntry
|
||||
@@ -215,6 +217,13 @@ class TestDocumentConsumptionFinishedSignal(TestCase):
|
||||
self.doc_contains = Document.objects.create(
|
||||
content="I contain the keyword.", mime_type="application/pdf")
|
||||
|
||||
self.index_dir = tempfile.mkdtemp()
|
||||
# TODO: we should not need the index here.
|
||||
override_settings(INDEX_DIR=self.index_dir).enable()
|
||||
|
||||
def tearDown(self) -> None:
|
||||
shutil.rmtree(self.index_dir, ignore_errors=True)
|
||||
|
||||
def test_tag_applied_any(self):
|
||||
t1 = Tag.objects.create(
|
||||
name="test", match="keyword", matching_algorithm=Tag.MATCH_ANY)
|
||||
|
41
src/documents/tests/utils.py
Normal file
41
src/documents/tests/utils.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from collections import namedtuple
|
||||
|
||||
from django.test import override_settings
|
||||
|
||||
|
||||
def setup_directories():
|
||||
|
||||
dirs = namedtuple("Dirs", ())
|
||||
|
||||
dirs.data_dir = tempfile.mkdtemp()
|
||||
dirs.scratch_dir = tempfile.mkdtemp()
|
||||
dirs.media_dir = tempfile.mkdtemp()
|
||||
dirs.consumption_dir = tempfile.mkdtemp()
|
||||
dirs.index_dir = os.path.join(dirs.data_dir, "documents", "originals")
|
||||
dirs.originals_dir = os.path.join(dirs.media_dir, "documents", "originals")
|
||||
dirs.thumbnail_dir = os.path.join(dirs.media_dir, "documents", "thumbnails")
|
||||
os.makedirs(dirs.index_dir)
|
||||
os.makedirs(dirs.originals_dir)
|
||||
os.makedirs(dirs.thumbnail_dir)
|
||||
|
||||
override_settings(
|
||||
DATA_DIR=dirs.data_dir,
|
||||
SCRATCH_DIR=dirs.scratch_dir,
|
||||
MEDIA_ROOT=dirs.media_dir,
|
||||
ORIGINALS_DIR=dirs.originals_dir,
|
||||
THUMBNAIL_DIR=dirs.thumbnail_dir,
|
||||
CONSUMPTION_DIR=dirs.consumption_dir,
|
||||
INDEX_DIR=dirs.index_dir
|
||||
).enable()
|
||||
|
||||
return dirs
|
||||
|
||||
|
||||
def remove_dirs(dirs):
|
||||
shutil.rmtree(dirs.media_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.data_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.scratch_dir, ignore_errors=True)
|
||||
shutil.rmtree(dirs.consumption_dir, ignore_errors=True)
|
@@ -3,7 +3,7 @@ exclude = migrations, paperless/settings.py, .tox, */tests/*
|
||||
|
||||
[tool:pytest]
|
||||
DJANGO_SETTINGS_MODULE=paperless.settings
|
||||
addopts = --pythonwarnings=all
|
||||
addopts = --pythonwarnings=all --cov --cov-report=html
|
||||
env =
|
||||
PAPERLESS_SECRET=paperless
|
||||
PAPERLESS_EMAIL_SECRET=paperless
|
||||
|
Reference in New Issue
Block a user