Merge branch 'dev' into feature-trigger-any

This commit is contained in:
shamoon
2026-01-24 20:03:50 -08:00
280 changed files with 10822 additions and 14849 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 KiB

View File

@@ -1,6 +1,7 @@
import json
from io import BytesIO
from pathlib import Path
from unittest.mock import patch
from django.contrib.auth.models import User
from django.core.files.uploadedfile import SimpleUploadedFile
@@ -66,6 +67,13 @@ class TestApiAppConfig(DirectoriesMixin, APITestCase):
"barcode_max_pages": None,
"barcode_enable_tag": None,
"barcode_tag_mapping": None,
"ai_enabled": False,
"llm_embedding_backend": None,
"llm_embedding_model": None,
"llm_backend": None,
"llm_model": None,
"llm_api_key": None,
"llm_endpoint": None,
},
)
@@ -611,3 +619,76 @@ class TestApiAppConfig(DirectoriesMixin, APITestCase):
)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertEqual(ApplicationConfiguration.objects.count(), 1)
def test_update_llm_api_key(self):
"""
GIVEN:
- Existing config with llm_api_key specified
WHEN:
- API to update llm_api_key is called with all *s
- API to update llm_api_key is called with empty string
THEN:
- llm_api_key is unchanged
- llm_api_key is set to None
"""
config = ApplicationConfiguration.objects.first()
config.llm_api_key = "1234567890"
config.save()
# Test with all *
response = self.client.patch(
f"{self.ENDPOINT}1/",
json.dumps(
{
"llm_api_key": "*" * 32,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
config.refresh_from_db()
self.assertEqual(config.llm_api_key, "1234567890")
# Test with empty string
response = self.client.patch(
f"{self.ENDPOINT}1/",
json.dumps(
{
"llm_api_key": "",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
config.refresh_from_db()
self.assertEqual(config.llm_api_key, None)
def test_enable_ai_index_triggers_update(self):
"""
GIVEN:
- Existing config with AI disabled
WHEN:
- Config is updated to enable AI with llm_embedding_backend
THEN:
- LLM index is triggered to update
"""
config = ApplicationConfiguration.objects.first()
config.ai_enabled = False
config.llm_embedding_backend = None
config.save()
with (
patch("documents.tasks.llmindex_index.delay") as mock_update,
patch("paperless_ai.indexing.vector_store_file_exists") as mock_exists,
):
mock_exists.return_value = False
self.client.patch(
f"{self.ENDPOINT}1/",
json.dumps(
{
"ai_enabled": True,
"llm_embedding_backend": "openai",
},
),
content_type="application/json",
)
mock_update.assert_called_once()

View File

@@ -219,6 +219,30 @@ class TestApiStoragePaths(DirectoriesMixin, APITestCase):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(StoragePath.objects.count(), 1)
def test_api_create_storage_path_rejects_traversal(self):
"""
GIVEN:
- API request to create a storage paths
- Storage path attempts directory traversal
WHEN:
- API is called
THEN:
- Correct HTTP 400 response
- No storage path is created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Traversal path",
"path": "../../../../../tmp/proof",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(StoragePath.objects.count(), 1)
def test_api_storage_path_placeholders(self):
"""
GIVEN:

View File

@@ -1,4 +1,6 @@
import os
import shutil
import tempfile
from pathlib import Path
from unittest import mock
@@ -16,9 +18,19 @@ class TestSystemStatus(APITestCase):
ENDPOINT = "/api/status/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(
username="temp_admin",
)
self.tmp_dir = Path(tempfile.mkdtemp())
self.override = override_settings(MEDIA_ROOT=self.tmp_dir)
self.override.enable()
def tearDown(self):
super().tearDown()
self.override.disable()
shutil.rmtree(self.tmp_dir)
def test_system_status(self):
"""
@@ -310,3 +322,69 @@ class TestSystemStatus(APITestCase):
"ERROR",
)
self.assertIsNotNone(response.data["tasks"]["sanity_check_error"])
def test_system_status_ai_disabled(self):
"""
GIVEN:
- The AI feature is disabled
WHEN:
- The user requests the system status
THEN:
- The response contains the correct AI status
"""
with override_settings(AI_ENABLED=False):
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["llmindex_status"], "DISABLED")
self.assertIsNone(response.data["tasks"]["llmindex_error"])
def test_system_status_ai_enabled(self):
"""
GIVEN:
- The AI index feature is enabled, but no tasks are found
- The AI index feature is enabled and a task is found
WHEN:
- The user requests the system status
THEN:
- The response contains the correct AI status
"""
with override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="openai"):
self.client.force_login(self.user)
# No tasks found
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["llmindex_status"], "WARNING")
PaperlessTask.objects.create(
type=PaperlessTask.TaskType.SCHEDULED_TASK,
status=states.SUCCESS,
task_name=PaperlessTask.TaskName.LLMINDEX_UPDATE,
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["llmindex_status"], "OK")
self.assertIsNone(response.data["tasks"]["llmindex_error"])
def test_system_status_ai_error(self):
"""
GIVEN:
- The AI index feature is enabled and a task is found with an error
WHEN:
- The user requests the system status
THEN:
- The response contains the correct AI status
"""
with override_settings(AI_ENABLED=True, LLM_EMBEDDING_BACKEND="openai"):
PaperlessTask.objects.create(
type=PaperlessTask.TaskType.SCHEDULED_TASK,
status=states.FAILURE,
task_name=PaperlessTask.TaskName.LLMINDEX_UPDATE,
result="AI index update failed",
)
self.client.force_login(self.user)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["tasks"]["llmindex_status"], "ERROR")
self.assertIsNotNone(response.data["tasks"]["llmindex_error"])

View File

@@ -49,6 +49,7 @@ class TestApiUiSettings(DirectoriesMixin, APITestCase):
"backend_setting": "default",
},
"email_enabled": False,
"ai_enabled": False,
},
)

View File

@@ -1,4 +1,3 @@
import textwrap
from unittest import mock
from django.core.checks import Error
@@ -6,60 +5,11 @@ from django.core.checks import Warning
from django.test import TestCase
from django.test import override_settings
from documents.checks import changed_password_check
from documents.checks import filename_format_check
from documents.checks import parser_check
from documents.models import Document
from documents.tests.factories import DocumentFactory
class TestDocumentChecks(TestCase):
def test_changed_password_check_empty_db(self):
self.assertListEqual(changed_password_check(None), [])
def test_changed_password_check_no_encryption(self):
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_UNENCRYPTED)
self.assertListEqual(changed_password_check(None), [])
def test_encrypted_missing_passphrase(self):
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG)
msgs = changed_password_check(None)
self.assertEqual(len(msgs), 1)
msg_text = msgs[0].msg
self.assertEqual(
msg_text,
"The database contains encrypted documents but no password is set.",
)
@override_settings(
PASSPHRASE="test",
)
@mock.patch("paperless.db.GnuPG.decrypted")
@mock.patch("documents.models.Document.source_file")
def test_encrypted_decrypt_fails(self, mock_decrypted, mock_source_file):
mock_decrypted.return_value = None
mock_source_file.return_value = b""
DocumentFactory.create(storage_type=Document.STORAGE_TYPE_GPG)
msgs = changed_password_check(None)
self.assertEqual(len(msgs), 1)
msg_text = msgs[0].msg
self.assertEqual(
msg_text,
textwrap.dedent(
"""
The current password doesn't match the password of the
existing documents.
If you intend to change your password, you must first export
all of the old documents, start fresh with the new password
and then re-import them."
""",
),
)
def test_parser_check(self):
self.assertEqual(parser_check(None), [])

View File

@@ -34,22 +34,14 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_generate_source_filename(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
self.assertEqual(generate_filename(document), Path(f"{document.pk:07d}.pdf"))
document.storage_type = Document.STORAGE_TYPE_GPG
self.assertEqual(
generate_filename(document),
Path(f"{document.pk:07d}.pdf.gpg"),
)
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Test default source_path
@@ -63,11 +55,6 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
# Ensure that filename is properly generated
self.assertEqual(document.filename, Path("none/none.pdf"))
# Enable encryption and check again
document.storage_type = Document.STORAGE_TYPE_GPG
document.filename = generate_filename(document)
self.assertEqual(document.filename, Path("none/none.pdf.gpg"))
document.save()
# test that creating dirs for the source_path creates the correct directory
@@ -87,14 +74,14 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
settings.ORIGINALS_DIR / "none",
)
self.assertIsFile(
settings.ORIGINALS_DIR / "test" / "test.pdf.gpg",
settings.ORIGINALS_DIR / "test" / "test.pdf",
)
@override_settings(FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming_missing_permissions(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -128,14 +115,13 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_file_renaming_database_error(self):
Document.objects.create(
mime_type="application/pdf",
storage_type=Document.STORAGE_TYPE_UNENCRYPTED,
checksum="AAAAA",
)
document = Document()
document.mime_type = "application/pdf"
document.checksum = "BBBBB"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -170,7 +156,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_document_delete(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -196,7 +182,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_document_delete_trash_dir(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -221,7 +207,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
# Create an identical document and ensure it is trashed under a new name
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
document.filename = generate_filename(document)
document.save()
@@ -235,7 +221,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_document_delete_nofile(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
document.delete()
@@ -245,7 +231,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_directory_not_empty(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -362,7 +348,7 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_nested_directory_cleanup(self):
document = Document()
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -390,7 +376,6 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
document = Document()
document.pk = 1
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
self.assertEqual(generate_filename(document), Path("0000001.pdf"))
@@ -403,7 +388,6 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
document = Document()
document.pk = 1
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
self.assertEqual(generate_filename(document), Path("0000001.pdf"))
@@ -429,7 +413,6 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
document = Document()
document.pk = 1
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
self.assertEqual(generate_filename(document), Path("0000001.pdf"))
@@ -438,7 +421,6 @@ class TestFileHandling(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
document = Document()
document.pk = 1
document.mime_type = "application/pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
self.assertEqual(generate_filename(document), Path("0000001.pdf"))
@@ -1258,7 +1240,7 @@ class TestFilenameGeneration(DirectoriesMixin, TestCase):
title="doc1",
mime_type="application/pdf",
)
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
@@ -1732,7 +1714,6 @@ class TestPathDateLocalization:
document = DocumentFactory.create(
title="My Document",
mime_type="application/pdf",
storage_type=Document.STORAGE_TYPE_UNENCRYPTED,
created=self.TEST_DATE, # 2023-10-26 (which is a Thursday)
)
with override_settings(FILENAME_FORMAT=filename_format):

View File

@@ -1,7 +1,5 @@
import filecmp
import hashlib
import shutil
import tempfile
from io import StringIO
from pathlib import Path
from unittest import mock
@@ -96,66 +94,6 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
self.assertEqual(doc2.archive_filename, "document_01.pdf")
class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
@mock.patch("documents.management.commands.decrypt_documents.input")
def test_decrypt(self, m):
media_dir = tempfile.mkdtemp()
originals_dir = Path(media_dir) / "documents" / "originals"
thumb_dir = Path(media_dir) / "documents" / "thumbnails"
originals_dir.mkdir(parents=True, exist_ok=True)
thumb_dir.mkdir(parents=True, exist_ok=True)
with override_settings(
ORIGINALS_DIR=originals_dir,
THUMBNAIL_DIR=thumb_dir,
PASSPHRASE="test",
FILENAME_FORMAT=None,
):
doc = Document.objects.create(
checksum="82186aaa94f0b98697d704b90fd1c072",
title="wow",
filename="0000004.pdf.gpg",
mime_type="application/pdf",
storage_type=Document.STORAGE_TYPE_GPG,
)
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000004.pdf.gpg"
),
originals_dir / "0000004.pdf.gpg",
)
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "thumbnails"
/ "0000004.webp.gpg"
),
thumb_dir / f"{doc.id:07}.webp.gpg",
)
call_command("decrypt_documents")
doc.refresh_from_db()
self.assertEqual(doc.storage_type, Document.STORAGE_TYPE_UNENCRYPTED)
self.assertEqual(doc.filename, "0000004.pdf")
self.assertIsFile(Path(originals_dir) / "0000004.pdf")
self.assertIsFile(doc.source_path)
self.assertIsFile(Path(thumb_dir) / f"{doc.id:07}.webp")
self.assertIsFile(doc.thumbnail_path)
with doc.source_file as f:
checksum: str = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, doc.checksum)
class TestMakeIndex(TestCase):
@mock.patch("documents.management.commands.document_index.index_reindex")
def test_reindex(self, m):

File diff suppressed because it is too large Load Diff

View File

@@ -86,9 +86,8 @@ class TestExportImport(
content="Content",
checksum="82186aaa94f0b98697d704b90fd1c072",
title="wow_dec",
filename="0000004.pdf.gpg",
filename="0000004.pdf",
mime_type="application/pdf",
storage_type=Document.STORAGE_TYPE_GPG,
)
self.note = Note.objects.create(
@@ -242,11 +241,6 @@ class TestExportImport(
checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, element["fields"]["checksum"])
self.assertEqual(
element["fields"]["storage_type"],
Document.STORAGE_TYPE_UNENCRYPTED,
)
if document_exporter.EXPORTER_ARCHIVE_NAME in element:
fname = (
self.target / element[document_exporter.EXPORTER_ARCHIVE_NAME]
@@ -436,7 +430,7 @@ class TestExportImport(
Document.objects.create(
checksum="AAAAAAAAAAAAAAAAA",
title="wow",
filename="0000004.pdf",
filename="0000010.pdf",
mime_type="application/pdf",
)
self.assertRaises(FileNotFoundError, call_command, "document_exporter", target)

View File

@@ -33,8 +33,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
# just the consumer user which is created
# during migration, and AnonymousUser
self.assertEqual(User.objects.count(), 2)
self.assertTrue(User.objects.filter(username="consumer").exists())
self.assertEqual(User.objects.count(), 1)
self.assertEqual(User.objects.filter(is_superuser=True).count(), 0)
self.assertEqual(
out,
@@ -54,7 +53,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
# count is 3 as there's the consumer
# user already created during migration, and AnonymousUser
user: User = User.objects.get_by_natural_key("admin")
self.assertEqual(User.objects.count(), 3)
self.assertEqual(User.objects.count(), 2)
self.assertTrue(user.is_superuser)
self.assertEqual(user.email, "root@localhost")
self.assertEqual(out, 'Created superuser "admin" with provided password.\n')
@@ -71,7 +70,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
out = self.call_command(environ={"PAPERLESS_ADMIN_PASSWORD": "123456"})
self.assertEqual(User.objects.count(), 3)
self.assertEqual(User.objects.count(), 2)
with self.assertRaises(User.DoesNotExist):
User.objects.get_by_natural_key("admin")
self.assertEqual(
@@ -92,7 +91,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
out = self.call_command(environ={"PAPERLESS_ADMIN_PASSWORD": "123456"})
self.assertEqual(User.objects.count(), 3)
self.assertEqual(User.objects.count(), 2)
user: User = User.objects.get_by_natural_key("admin")
self.assertTrue(user.check_password("password"))
self.assertEqual(out, "Did not create superuser, a user admin already exists\n")
@@ -111,7 +110,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
out = self.call_command(environ={"PAPERLESS_ADMIN_PASSWORD": "123456"})
self.assertEqual(User.objects.count(), 3)
self.assertEqual(User.objects.count(), 2)
user: User = User.objects.get_by_natural_key("admin")
self.assertTrue(user.check_password("password"))
self.assertFalse(user.is_superuser)
@@ -150,7 +149,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
)
user: User = User.objects.get_by_natural_key("admin")
self.assertEqual(User.objects.count(), 3)
self.assertEqual(User.objects.count(), 2)
self.assertTrue(user.is_superuser)
self.assertEqual(user.email, "hello@world.com")
self.assertEqual(user.username, "admin")
@@ -174,7 +173,7 @@ class TestManageSuperUser(DirectoriesMixin, TestCase):
)
user: User = User.objects.get_by_natural_key("super")
self.assertEqual(User.objects.count(), 3)
self.assertEqual(User.objects.count(), 2)
self.assertTrue(user.is_superuser)
self.assertEqual(user.email, "hello@world.com")
self.assertEqual(user.username, "super")

View File

@@ -1,574 +0,0 @@
import hashlib
import importlib
import shutil
from pathlib import Path
from unittest import mock
import pytest
from django.conf import settings
from django.test import override_settings
from documents.parsers import ParseError
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import TestMigrations
STORAGE_TYPE_GPG = "gpg"
migration_1012_obj = importlib.import_module(
"documents.migrations.1012_fix_archive_files",
)
def archive_name_from_filename(filename: Path) -> Path:
return Path(filename.stem + ".pdf")
def archive_path_old(self) -> Path:
if self.filename:
fname = archive_name_from_filename(Path(self.filename))
else:
fname = Path(f"{self.pk:07}.pdf")
return Path(settings.ARCHIVE_DIR) / fname
def archive_path_new(doc):
if doc.archive_filename is not None:
return Path(settings.ARCHIVE_DIR) / str(doc.archive_filename)
else:
return None
def source_path(doc):
if doc.filename:
fname = str(doc.filename)
else:
fname = f"{doc.pk:07}{doc.file_type}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
return Path(settings.ORIGINALS_DIR) / fname
def thumbnail_path(doc):
file_name = f"{doc.pk:07}.png"
if doc.storage_type == STORAGE_TYPE_GPG:
file_name += ".gpg"
return Path(settings.THUMBNAIL_DIR) / file_name
def make_test_document(
document_class,
title: str,
mime_type: str,
original: str,
original_filename: str,
archive: str | None = None,
archive_filename: str | None = None,
):
doc = document_class()
doc.filename = original_filename
doc.title = title
doc.mime_type = mime_type
doc.content = "the content, does not matter for this test"
doc.save()
shutil.copy2(original, source_path(doc))
with Path(original).open("rb") as f:
doc.checksum = hashlib.md5(f.read()).hexdigest()
if archive:
if archive_filename:
doc.archive_filename = archive_filename
shutil.copy2(archive, archive_path_new(doc))
else:
shutil.copy2(archive, archive_path_old(doc))
with Path(archive).open("rb") as f:
doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
doc.save()
Path(thumbnail_path(doc)).touch()
return doc
simple_jpg = Path(__file__).parent / "samples" / "simple.jpg"
simple_pdf = Path(__file__).parent / "samples" / "simple.pdf"
simple_pdf2 = (
Path(__file__).parent / "samples" / "documents" / "originals" / "0000002.pdf"
)
simple_pdf3 = (
Path(__file__).parent / "samples" / "documents" / "originals" / "0000003.pdf"
)
simple_txt = Path(__file__).parent / "samples" / "simple.txt"
simple_png = Path(__file__).parent / "samples" / "simple-noalpha.png"
simple_png2 = Path(__file__).parent / "examples" / "no-text.png"
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFiles(DirectoriesMixin, FileSystemAssertsMixin, TestMigrations):
migrate_from = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
migrate_to = "1012_fix_archive_files"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
self.unrelated = make_test_document(
Document,
"unrelated",
"application/pdf",
simple_pdf3,
"unrelated.pdf",
simple_pdf,
)
self.no_text = make_test_document(
Document,
"no-text",
"image/png",
simple_png2,
"no-text.png",
simple_pdf,
)
self.doc_no_archive = make_test_document(
Document,
"no_archive",
"text/plain",
simple_txt,
"no_archive.txt",
)
self.clash1 = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
)
self.clash2 = make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
)
self.clash3 = make_test_document(
Document,
"clash",
"image/png",
simple_png,
"clash.png",
simple_pdf,
)
self.clash4 = make_test_document(
Document,
"clash.png",
"application/pdf",
simple_pdf2,
"clash.png.pdf",
simple_pdf2,
)
self.assertEqual(archive_path_old(self.clash1), archive_path_old(self.clash2))
self.assertEqual(archive_path_old(self.clash1), archive_path_old(self.clash3))
self.assertNotEqual(
archive_path_old(self.clash1),
archive_path_old(self.clash4),
)
def testArchiveFilesMigrated(self):
Document = self.apps.get_model("documents", "Document")
for doc in Document.objects.all():
if doc.archive_checksum:
self.assertIsNotNone(doc.archive_filename)
self.assertIsFile(archive_path_new(doc))
else:
self.assertIsNone(doc.archive_filename)
with Path(source_path(doc)).open("rb") as f:
original_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(original_checksum, doc.checksum)
if doc.archive_checksum:
self.assertIsFile(archive_path_new(doc))
with archive_path_new(doc).open("rb") as f:
archive_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(archive_checksum, doc.archive_checksum)
self.assertEqual(
Document.objects.filter(archive_checksum__isnull=False).count(),
6,
)
def test_filenames(self):
Document = self.apps.get_model("documents", "Document")
self.assertEqual(
Document.objects.get(id=self.unrelated.id).archive_filename,
"unrelated.pdf",
)
self.assertEqual(
Document.objects.get(id=self.no_text.id).archive_filename,
"no-text.pdf",
)
self.assertEqual(
Document.objects.get(id=self.doc_no_archive.id).archive_filename,
None,
)
self.assertEqual(
Document.objects.get(id=self.clash1.id).archive_filename,
f"{self.clash1.id:07}.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash2.id).archive_filename,
f"{self.clash2.id:07}.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash3.id).archive_filename,
f"{self.clash3.id:07}.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash4.id).archive_filename,
"clash.png.pdf",
)
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
class TestMigrateArchiveFilesWithFilenameFormat(TestMigrateArchiveFiles):
def test_filenames(self):
Document = self.apps.get_model("documents", "Document")
self.assertEqual(
Document.objects.get(id=self.unrelated.id).archive_filename,
"unrelated.pdf",
)
self.assertEqual(
Document.objects.get(id=self.no_text.id).archive_filename,
"no-text.pdf",
)
self.assertEqual(
Document.objects.get(id=self.doc_no_archive.id).archive_filename,
None,
)
self.assertEqual(
Document.objects.get(id=self.clash1.id).archive_filename,
"none/clash.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash2.id).archive_filename,
"none/clash_01.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash3.id).archive_filename,
"none/clash_02.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash4.id).archive_filename,
"clash.png.pdf",
)
def fake_parse_wrapper(parser, path, mime_type, file_name):
parser.archive_path = None
parser.text = "the text"
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFilesErrors(DirectoriesMixin, TestMigrations):
migrate_from = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
migrate_to = "1012_fix_archive_files"
auto_migrate = False
@pytest.mark.skip(reason="Fails with migration tearDown util. Needs investigation.")
def test_archive_missing(self):
Document = self.apps.get_model("documents", "Document")
doc = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
)
archive_path_old(doc).unlink()
self.assertRaisesMessage(
ValueError,
"does not exist at: ",
self.performMigration,
)
@pytest.mark.skip(reason="Fails with migration tearDown util. Needs investigation.")
def test_parser_missing(self):
Document = self.apps.get_model("documents", "Document")
make_test_document(
Document,
"document",
"invalid/typesss768",
simple_png,
"document.png",
simple_pdf,
)
make_test_document(
Document,
"document",
"invalid/typesss768",
simple_jpg,
"document.jpg",
simple_pdf,
)
self.assertRaisesMessage(
ValueError,
"no parsers are available",
self.performMigration,
)
@mock.patch(f"{__name__}.migration_1012_obj.parse_wrapper")
def test_parser_error(self, m):
m.side_effect = ParseError()
Document = self.apps.get_model("documents", "Document")
doc1 = make_test_document(
Document,
"document",
"image/png",
simple_png,
"document.png",
simple_pdf,
)
doc2 = make_test_document(
Document,
"document",
"application/pdf",
simple_jpg,
"document.jpg",
simple_pdf,
)
self.assertIsNotNone(doc1.archive_checksum)
self.assertIsNotNone(doc2.archive_checksum)
with self.assertLogs() as capture:
self.performMigration()
self.assertEqual(m.call_count, 6)
self.assertEqual(
len(
list(
filter(
lambda log: "Parse error, will try again in 5 seconds" in log,
capture.output,
),
),
),
4,
)
self.assertEqual(
len(
list(
filter(
lambda log: "Unable to regenerate archive document for ID:"
in log,
capture.output,
),
),
),
2,
)
Document = self.apps.get_model("documents", "Document")
doc1 = Document.objects.get(id=doc1.id)
doc2 = Document.objects.get(id=doc2.id)
self.assertIsNone(doc1.archive_checksum)
self.assertIsNone(doc2.archive_checksum)
self.assertIsNone(doc1.archive_filename)
self.assertIsNone(doc2.archive_filename)
@mock.patch(f"{__name__}.migration_1012_obj.parse_wrapper")
def test_parser_no_archive(self, m):
m.side_effect = fake_parse_wrapper
Document = self.apps.get_model("documents", "Document")
doc1 = make_test_document(
Document,
"document",
"image/png",
simple_png,
"document.png",
simple_pdf,
)
doc2 = make_test_document(
Document,
"document",
"application/pdf",
simple_jpg,
"document.jpg",
simple_pdf,
)
with self.assertLogs() as capture:
self.performMigration()
self.assertEqual(
len(
list(
filter(
lambda log: "Parser did not return an archive document for document"
in log,
capture.output,
),
),
),
2,
)
Document = self.apps.get_model("documents", "Document")
doc1 = Document.objects.get(id=doc1.id)
doc2 = Document.objects.get(id=doc2.id)
self.assertIsNone(doc1.archive_checksum)
self.assertIsNone(doc2.archive_checksum)
self.assertIsNone(doc1.archive_filename)
self.assertIsNone(doc2.archive_filename)
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFilesBackwards(
DirectoriesMixin,
FileSystemAssertsMixin,
TestMigrations,
):
migrate_from = "1012_fix_archive_files"
migrate_to = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
make_test_document(
Document,
"unrelated",
"application/pdf",
simple_pdf2,
"unrelated.txt",
simple_pdf2,
"unrelated.pdf",
)
make_test_document(
Document,
"no_archive",
"text/plain",
simple_txt,
"no_archive.txt",
)
make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
"clash_02.pdf",
)
def testArchiveFilesReverted(self):
Document = self.apps.get_model("documents", "Document")
for doc in Document.objects.all():
if doc.archive_checksum:
self.assertIsFile(archive_path_old(doc))
with Path(source_path(doc)).open("rb") as f:
original_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(original_checksum, doc.checksum)
if doc.archive_checksum:
self.assertIsFile(archive_path_old(doc))
with archive_path_old(doc).open("rb") as f:
archive_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(archive_checksum, doc.archive_checksum)
self.assertEqual(
Document.objects.filter(archive_checksum__isnull=False).count(),
2,
)
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
class TestMigrateArchiveFilesBackwardsWithFilenameFormat(
TestMigrateArchiveFilesBackwards,
):
pass
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFilesBackwardsErrors(DirectoriesMixin, TestMigrations):
migrate_from = "1012_fix_archive_files"
migrate_to = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
auto_migrate = False
def test_filename_clash(self):
Document = self.apps.get_model("documents", "Document")
self.clashA = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
"clash_02.pdf",
)
self.clashB = make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
"clash_01.pdf",
)
self.assertRaisesMessage(
ValueError,
"would clash with another archive filename",
self.performMigration,
)
def test_filename_exists(self):
Document = self.apps.get_model("documents", "Document")
self.clashA = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
"clash.pdf",
)
self.clashB = make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
"clash_01.pdf",
)
self.assertRaisesMessage(
ValueError,
"file already exists.",
self.performMigration,
)

View File

@@ -1,50 +0,0 @@
from django.contrib.auth import get_user_model
from documents.tests.utils import TestMigrations
class TestMigrateConsumptionTemplate(TestMigrations):
migrate_from = "1038_sharelink"
migrate_to = "1039_consumptiontemplate"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_document")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_users_with_add_documents_get_add_consumptiontemplate(self):
permission = self.Permission.objects.get(codename="add_consumptiontemplate")
self.assertTrue(self.user.has_perm(f"documents.{permission.codename}"))
self.assertTrue(permission in self.group.permissions.all())
class TestReverseMigrateConsumptionTemplate(TestMigrations):
migrate_from = "1039_consumptiontemplate"
migrate_to = "1038_sharelink"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.filter(
codename="add_consumptiontemplate",
).first()
if permission is not None:
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_remove_consumptiontemplate_permissions(self):
permission = self.Permission.objects.filter(
codename="add_consumptiontemplate",
).first()
# can be None ? now that CTs removed
if permission is not None:
self.assertFalse(self.user.has_perm(f"documents.{permission.codename}"))
self.assertFalse(permission in self.group.permissions.all())

View File

@@ -1,33 +0,0 @@
from datetime import date
from datetime import datetime
from datetime import timedelta
from django.utils.timezone import make_aware
from pytz import UTC
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
class TestMigrateDocumentCreated(DirectoriesMixin, TestMigrations):
migrate_from = "1066_alter_workflowtrigger_schedule_offset_days"
migrate_to = "1067_alter_document_created"
def setUpBeforeMigration(self, apps):
# create 600 documents
for i in range(600):
Document = apps.get_model("documents", "Document")
naive = datetime(2023, 10, 1, 12, 0, 0) + timedelta(days=i)
Document.objects.create(
title=f"test{i}",
mime_type="application/pdf",
filename=f"file{i}.pdf",
created=make_aware(naive, timezone=UTC),
checksum=i,
)
def testDocumentCreatedMigrated(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=1)
self.assertEqual(doc.created, date(2023, 10, 1))

View File

@@ -1,87 +0,0 @@
from unittest.mock import ANY
from documents.tests.utils import TestMigrations
class TestMigrateCustomFieldSelects(TestMigrations):
migrate_from = "1059_workflowactionemail_workflowactionwebhook_and_more"
migrate_to = "1060_alter_customfieldinstance_value_select"
def setUpBeforeMigration(self, apps):
CustomField = apps.get_model("documents.CustomField")
self.old_format = CustomField.objects.create(
name="cf1",
data_type="select",
extra_data={"select_options": ["Option 1", "Option 2", "Option 3"]},
)
Document = apps.get_model("documents.Document")
doc = Document.objects.create(title="doc1")
CustomFieldInstance = apps.get_model("documents.CustomFieldInstance")
self.old_instance = CustomFieldInstance.objects.create(
field=self.old_format,
value_select=0,
document=doc,
)
def test_migrate_old_to_new_select_fields(self):
self.old_format.refresh_from_db()
self.old_instance.refresh_from_db()
self.assertEqual(
self.old_format.extra_data["select_options"],
[
{"label": "Option 1", "id": ANY},
{"label": "Option 2", "id": ANY},
{"label": "Option 3", "id": ANY},
],
)
self.assertEqual(
self.old_instance.value_select,
self.old_format.extra_data["select_options"][0]["id"],
)
class TestMigrationCustomFieldSelectsReverse(TestMigrations):
migrate_from = "1060_alter_customfieldinstance_value_select"
migrate_to = "1059_workflowactionemail_workflowactionwebhook_and_more"
def setUpBeforeMigration(self, apps):
CustomField = apps.get_model("documents.CustomField")
self.new_format = CustomField.objects.create(
name="cf1",
data_type="select",
extra_data={
"select_options": [
{"label": "Option 1", "id": "id1"},
{"label": "Option 2", "id": "id2"},
{"label": "Option 3", "id": "id3"},
],
},
)
Document = apps.get_model("documents.Document")
doc = Document.objects.create(title="doc1")
CustomFieldInstance = apps.get_model("documents.CustomFieldInstance")
self.new_instance = CustomFieldInstance.objects.create(
field=self.new_format,
value_select="id1",
document=doc,
)
def test_migrate_new_to_old_select_fields(self):
self.new_format.refresh_from_db()
self.new_instance.refresh_from_db()
self.assertEqual(
self.new_format.extra_data["select_options"],
[
"Option 1",
"Option 2",
"Option 3",
],
)
self.assertEqual(
self.new_instance.value_select,
0,
)

View File

@@ -1,43 +0,0 @@
from django.contrib.auth import get_user_model
from documents.tests.utils import TestMigrations
class TestMigrateCustomFields(TestMigrations):
migrate_from = "1039_consumptiontemplate"
migrate_to = "1040_customfield_customfieldinstance_and_more"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_document")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_users_with_add_documents_get_add_customfields(self):
permission = self.Permission.objects.get(codename="add_customfield")
self.assertTrue(self.user.has_perm(f"documents.{permission.codename}"))
self.assertTrue(permission in self.group.permissions.all())
class TestReverseMigrateCustomFields(TestMigrations):
migrate_from = "1040_customfield_customfieldinstance_and_more"
migrate_to = "1039_consumptiontemplate"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_customfield")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_remove_consumptiontemplate_permissions(self):
permission = self.Permission.objects.get(codename="add_customfield")
self.assertFalse(self.user.has_perm(f"documents.{permission.codename}"))
self.assertFalse(permission in self.group.permissions.all())

View File

@@ -1,59 +0,0 @@
import shutil
from pathlib import Path
from django.conf import settings
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
def source_path_before(self) -> Path:
if self.filename:
fname = str(self.filename)
return Path(settings.ORIGINALS_DIR) / fname
class TestMigrateDocumentPageCount(DirectoriesMixin, TestMigrations):
migrate_from = "1052_document_transaction_id"
migrate_to = "1053_document_page_count"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test1",
mime_type="application/pdf",
filename="file1.pdf",
)
self.doc_id = doc.id
shutil.copy(
Path(__file__).parent / "samples" / "simple.pdf",
source_path_before(doc),
)
def testDocumentPageCountMigrated(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=self.doc_id)
self.assertEqual(doc.page_count, 1)
class TestMigrateDocumentPageCountBackwards(TestMigrations):
migrate_from = "1053_document_page_count"
migrate_to = "1052_document_transaction_id"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test1",
mime_type="application/pdf",
filename="file1.pdf",
page_count=8,
)
self.doc_id = doc.id
def test_remove_number_of_pages_to_page_count(self):
Document = self.apps.get_model("documents", "Document")
self.assertFalse(
"page_count" in [field.name for field in Document._meta.get_fields()],
)

View File

@@ -1,283 +0,0 @@
import importlib
import shutil
import tempfile
from collections.abc import Callable
from collections.abc import Iterable
from pathlib import Path
from unittest import mock
from django.test import override_settings
from documents.tests.utils import TestMigrations
# https://github.com/python/cpython/issues/100950
migration_1037_obj = importlib.import_module(
"documents.migrations.1037_webp_encrypted_thumbnail_conversion",
)
@override_settings(PASSPHRASE="test")
@mock.patch(
f"{__name__}.migration_1037_obj.multiprocessing.pool.Pool.map",
)
@mock.patch(f"{__name__}.migration_1037_obj.run_convert")
class TestMigrateToEncrytpedWebPThumbnails(TestMigrations):
migrate_from = (
"1022_paperlesstask_squashed_1036_alter_savedviewfilterrule_rule_type"
)
migrate_to = "1037_webp_encrypted_thumbnail_conversion"
auto_migrate = False
def pretend_convert_output(self, *args, **kwargs):
"""
Pretends to do the conversion, by copying the input file
to the output file
"""
shutil.copy2(
Path(kwargs["input_file"].rstrip("[0]")),
Path(kwargs["output_file"]),
)
def pretend_map(self, func: Callable, iterable: Iterable):
"""
Pretends to be the map of a multiprocessing.Pool, but secretly does
everything in series
"""
for item in iterable:
func(item)
def create_dummy_thumbnails(
self,
thumb_dir: Path,
ext: str,
count: int,
start_count: int = 0,
):
"""
Helper to create a certain count of files of given extension in a given directory
"""
for idx in range(count):
(Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
# Triple check expected files exist
self.assert_file_count_by_extension(ext, thumb_dir, count)
def create_webp_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy WebP thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
def create_encrypted_webp_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy encrypted WebP thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "webp.gpg", count, start_count)
def create_png_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy PNG thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
def create_encrypted_png_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy encrypted PNG thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "png.gpg", count, start_count)
def assert_file_count_by_extension(
self,
ext: str,
dir: str | Path,
expected_count: int,
):
"""
Helper to assert a certain count of given extension files in given directory
"""
if not isinstance(dir, Path):
dir = Path(dir)
matching_files = list(dir.glob(f"*.{ext}"))
self.assertEqual(len(matching_files), expected_count)
def assert_encrypted_png_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of excrypted PNG extension files in given directory
"""
self.assert_file_count_by_extension("png.gpg", dir, expected_count)
def assert_encrypted_webp_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of encrypted WebP extension files in given directory
"""
self.assert_file_count_by_extension("webp.gpg", dir, expected_count)
def assert_webp_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of WebP extension files in given directory
"""
self.assert_file_count_by_extension("webp", dir, expected_count)
def assert_png_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of PNG extension files in given directory
"""
self.assert_file_count_by_extension("png", dir, expected_count)
def setUp(self):
self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
return super().setUp()
def tearDown(self) -> None:
shutil.rmtree(self.thumbnail_dir)
return super().tearDown()
def test_do_nothing_if_converted(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Encrypted document exists with existing encrypted WebP thumbnail path
WHEN:
- Migration is attempted
THEN:
- Nothing is converted
"""
map_mock.side_effect = self.pretend_map
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_encrypted_webp_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_not_called()
self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
def test_convert_thumbnails(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Encrypted documents exist with PNG thumbnail
WHEN:
- Migration is attempted
THEN:
- Thumbnails are converted to webp & re-encrypted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
def test_convert_errors_out(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Encrypted document exists with PNG thumbnail
WHEN:
- Migration is attempted, but raises an exception
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = OSError
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_encrypted_png_file_count(self.thumbnail_dir, 3)
def test_convert_mixed(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Documents exist with PNG, encrypted PNG and WebP thumbnails
WHEN:
- Migration is attempted
THEN:
- Only encrypted PNG thumbnails are converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_files(self.thumbnail_dir, 3)
self.create_encrypted_png_thumbnail_files(
self.thumbnail_dir,
3,
start_count=3,
)
self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=6)
self.create_encrypted_webp_thumbnail_files(
self.thumbnail_dir,
3,
start_count=8,
)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_png_file_count(self.thumbnail_dir, 3)
self.assert_encrypted_webp_file_count(self.thumbnail_dir, 6)
self.assert_webp_file_count(self.thumbnail_dir, 2)
self.assert_encrypted_png_file_count(self.thumbnail_dir, 0)

View File

@@ -1,108 +0,0 @@
import shutil
from pathlib import Path
from django.conf import settings
from django.test import override_settings
from documents.parsers import get_default_file_extension
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
STORAGE_TYPE_UNENCRYPTED = "unencrypted"
STORAGE_TYPE_GPG = "gpg"
def source_path_before(self):
if self.filename:
fname = str(self.filename)
else:
fname = f"{self.pk:07}.{self.file_type}"
if self.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg"
return Path(settings.ORIGINALS_DIR) / fname
def file_type_after(self):
return get_default_file_extension(self.mime_type)
def source_path_after(doc):
if doc.filename:
fname = str(doc.filename)
else:
fname = f"{doc.pk:07}{file_type_after(doc)}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
return Path(settings.ORIGINALS_DIR) / fname
@override_settings(PASSPHRASE="test")
class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
migrate_from = "1002_auto_20201111_1105"
migrate_to = "1003_mime_types"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test",
file_type="pdf",
filename="file1.pdf",
)
self.doc_id = doc.id
shutil.copy(
Path(__file__).parent / "samples" / "simple.pdf",
source_path_before(doc),
)
doc2 = Document.objects.create(
checksum="B",
file_type="pdf",
storage_type=STORAGE_TYPE_GPG,
)
self.doc2_id = doc2.id
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000004.pdf.gpg"
),
source_path_before(doc2),
)
def testMimeTypesMigrated(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=self.doc_id)
self.assertEqual(doc.mime_type, "application/pdf")
doc2 = Document.objects.get(id=self.doc2_id)
self.assertEqual(doc2.mime_type, "application/pdf")
@override_settings(PASSPHRASE="test")
class TestMigrateMimeTypeBackwards(DirectoriesMixin, TestMigrations):
migrate_from = "1003_mime_types"
migrate_to = "1002_auto_20201111_1105"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test",
mime_type="application/pdf",
filename="file1.pdf",
)
self.doc_id = doc.id
shutil.copy(
Path(__file__).parent / "samples" / "simple.pdf",
source_path_after(doc),
)
def testMimeTypesReverted(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=self.doc_id)
self.assertEqual(doc.file_type, "pdf")

View File

@@ -1,15 +0,0 @@
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
class TestMigrateNullCharacters(DirectoriesMixin, TestMigrations):
migrate_from = "1014_auto_20210228_1614"
migrate_to = "1015_remove_null_characters"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
self.doc = Document.objects.create(content="aaa\0bbb")
def testMimeTypesMigrated(self):
Document = self.apps.get_model("documents", "Document")
self.assertNotIn("\0", Document.objects.get(id=self.doc.id).content)

View File

@@ -1,30 +0,0 @@
from documents.models import StoragePath
from documents.tests.utils import TestMigrations
class TestMigrateStoragePathToTemplate(TestMigrations):
migrate_from = "1054_customfieldinstance_value_monetary_amount_and_more"
migrate_to = "1055_alter_storagepath_path"
def setUpBeforeMigration(self, apps):
self.old_format = StoragePath.objects.create(
name="sp1",
path="Something/{title}",
)
self.new_format = StoragePath.objects.create(
name="sp2",
path="{{asn}}/{{title}}",
)
self.no_formatting = StoragePath.objects.create(
name="sp3",
path="Some/Fixed/Path",
)
def test_migrate_old_to_new_storage_path(self):
self.old_format.refresh_from_db()
self.new_format.refresh_from_db()
self.no_formatting.refresh_from_db()
self.assertEqual(self.old_format.path, "Something/{{ title }}")
self.assertEqual(self.new_format.path, "{{asn}}/{{title}}")
self.assertEqual(self.no_formatting.path, "Some/Fixed/Path")

View File

@@ -1,36 +0,0 @@
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
class TestMigrateTagColor(DirectoriesMixin, TestMigrations):
migrate_from = "1012_fix_archive_files"
migrate_to = "1013_migrate_tag_colour"
def setUpBeforeMigration(self, apps):
Tag = apps.get_model("documents", "Tag")
self.t1_id = Tag.objects.create(name="tag1").id
self.t2_id = Tag.objects.create(name="tag2", colour=1).id
self.t3_id = Tag.objects.create(name="tag3", colour=5).id
def testMimeTypesMigrated(self):
Tag = self.apps.get_model("documents", "Tag")
self.assertEqual(Tag.objects.get(id=self.t1_id).color, "#a6cee3")
self.assertEqual(Tag.objects.get(id=self.t2_id).color, "#a6cee3")
self.assertEqual(Tag.objects.get(id=self.t3_id).color, "#fb9a99")
class TestMigrateTagColorBackwards(DirectoriesMixin, TestMigrations):
migrate_from = "1013_migrate_tag_colour"
migrate_to = "1012_fix_archive_files"
def setUpBeforeMigration(self, apps):
Tag = apps.get_model("documents", "Tag")
self.t1_id = Tag.objects.create(name="tag1").id
self.t2_id = Tag.objects.create(name="tag2", color="#cab2d6").id
self.t3_id = Tag.objects.create(name="tag3", color="#123456").id
def testMimeTypesReverted(self):
Tag = self.apps.get_model("documents", "Tag")
self.assertEqual(Tag.objects.get(id=self.t1_id).colour, 1)
self.assertEqual(Tag.objects.get(id=self.t2_id).colour, 9)
self.assertEqual(Tag.objects.get(id=self.t3_id).colour, 1)

View File

@@ -1,230 +0,0 @@
import importlib
import shutil
import tempfile
from collections.abc import Callable
from collections.abc import Iterable
from pathlib import Path
from unittest import mock
from django.test import override_settings
from documents.tests.utils import TestMigrations
# https://github.com/python/cpython/issues/100950
migration_1021_obj = importlib.import_module(
"documents.migrations.1021_webp_thumbnail_conversion",
)
@mock.patch(
f"{__name__}.migration_1021_obj.multiprocessing.pool.Pool.map",
)
@mock.patch(f"{__name__}.migration_1021_obj.run_convert")
class TestMigrateWebPThumbnails(TestMigrations):
migrate_from = "1016_auto_20210317_1351_squashed_1020_merge_20220518_1839"
migrate_to = "1021_webp_thumbnail_conversion"
auto_migrate = False
def pretend_convert_output(self, *args, **kwargs):
"""
Pretends to do the conversion, by copying the input file
to the output file
"""
shutil.copy2(
Path(kwargs["input_file"].rstrip("[0]")),
Path(kwargs["output_file"]),
)
def pretend_map(self, func: Callable, iterable: Iterable):
"""
Pretends to be the map of a multiprocessing.Pool, but secretly does
everything in series
"""
for item in iterable:
func(item)
def create_dummy_thumbnails(
self,
thumb_dir: Path,
ext: str,
count: int,
start_count: int = 0,
):
"""
Helper to create a certain count of files of given extension in a given directory
"""
for idx in range(count):
(Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
# Triple check expected files exist
self.assert_file_count_by_extension(ext, thumb_dir, count)
def create_webp_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy WebP thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
def create_png_thumbnail_file(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy PNG thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
def assert_file_count_by_extension(
self,
ext: str,
dir: str | Path,
expected_count: int,
):
"""
Helper to assert a certain count of given extension files in given directory
"""
if not isinstance(dir, Path):
dir = Path(dir)
matching_files = list(dir.glob(f"*.{ext}"))
self.assertEqual(len(matching_files), expected_count)
def assert_png_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of PNG extension files in given directory
"""
self.assert_file_count_by_extension("png", dir, expected_count)
def assert_webp_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of WebP extension files in given directory
"""
self.assert_file_count_by_extension("webp", dir, expected_count)
def setUp(self):
self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
return super().setUp()
def tearDown(self) -> None:
shutil.rmtree(self.thumbnail_dir)
return super().tearDown()
def test_do_nothing_if_converted(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with default WebP thumbnail path
WHEN:
- Thumbnail conversion is attempted
THEN:
- Nothing is converted
"""
map_mock.side_effect = self.pretend_map
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_webp_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_not_called()
self.assert_webp_file_count(self.thumbnail_dir, 3)
def test_convert_single_thumbnail(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with PNG thumbnail
WHEN:
- Thumbnail conversion is attempted
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_webp_file_count(self.thumbnail_dir, 3)
def test_convert_errors_out(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with PNG thumbnail
WHEN:
- Thumbnail conversion is attempted, but raises an exception
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = OSError
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_png_file_count(self.thumbnail_dir, 3)
def test_convert_mixed(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with PNG thumbnail
WHEN:
- Thumbnail conversion is attempted, but raises an exception
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_png_file_count(self.thumbnail_dir, 0)
self.assert_webp_file_count(self.thumbnail_dir, 5)

View File

@@ -1,134 +0,0 @@
from documents.data_models import DocumentSource
from documents.tests.utils import TestMigrations
class TestMigrateWorkflow(TestMigrations):
migrate_from = "1043_alter_savedviewfilterrule_rule_type"
migrate_to = "1044_workflow_workflowaction_workflowtrigger_and_more"
dependencies = (
(
"paperless_mail",
"0029_mailrule_pdf_layout",
),
)
def setUpBeforeMigration(self, apps):
User = apps.get_model("auth", "User")
Group = apps.get_model("auth", "Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_document")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
# create a CT to migrate
c = apps.get_model("documents", "Correspondent").objects.create(
name="Correspondent Name",
)
dt = apps.get_model("documents", "DocumentType").objects.create(
name="DocType Name",
)
t1 = apps.get_model("documents", "Tag").objects.create(name="t1")
sp = apps.get_model("documents", "StoragePath").objects.create(path="/test/")
cf1 = apps.get_model("documents", "CustomField").objects.create(
name="Custom Field 1",
data_type="string",
)
ma = apps.get_model("paperless_mail", "MailAccount").objects.create(
name="MailAccount 1",
)
mr = apps.get_model("paperless_mail", "MailRule").objects.create(
name="MailRule 1",
order=0,
account=ma,
)
user2 = User.objects.create(username="user2")
user3 = User.objects.create(username="user3")
group2 = Group.objects.create(name="group2")
ConsumptionTemplate = apps.get_model("documents", "ConsumptionTemplate")
ct = ConsumptionTemplate.objects.create(
name="Template 1",
order=0,
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
filter_filename="*simple*",
filter_path="*/samples/*",
filter_mailrule=mr,
assign_title="Doc from {correspondent}",
assign_correspondent=c,
assign_document_type=dt,
assign_storage_path=sp,
assign_owner=user2,
)
ct.assign_tags.add(t1)
ct.assign_view_users.add(user3)
ct.assign_view_groups.add(group2)
ct.assign_change_users.add(user3)
ct.assign_change_groups.add(group2)
ct.assign_custom_fields.add(cf1)
ct.save()
def test_users_with_add_documents_get_add_and_workflow_templates_get_migrated(self):
permission = self.Permission.objects.get(codename="add_workflow")
self.assertTrue(permission in self.user.user_permissions.all())
self.assertTrue(permission in self.group.permissions.all())
Workflow = self.apps.get_model("documents", "Workflow")
self.assertEqual(Workflow.objects.all().count(), 1)
class TestReverseMigrateWorkflow(TestMigrations):
migrate_from = "1044_workflow_workflowaction_workflowtrigger_and_more"
migrate_to = "1043_alter_savedviewfilterrule_rule_type"
def setUpBeforeMigration(self, apps):
User = apps.get_model("auth", "User")
Group = apps.get_model("auth", "Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.filter(
codename="add_workflow",
).first()
if permission is not None:
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
Workflow = apps.get_model("documents", "Workflow")
WorkflowTrigger = apps.get_model("documents", "WorkflowTrigger")
WorkflowAction = apps.get_model("documents", "WorkflowAction")
trigger = WorkflowTrigger.objects.create(
type=0,
sources=[str(DocumentSource.ConsumeFolder)],
filter_path="*/path/*",
filter_filename="*file*",
)
action = WorkflowAction.objects.create(
assign_title="assign title",
)
workflow = Workflow.objects.create(
name="workflow 1",
order=0,
)
workflow.triggers.set([trigger])
workflow.actions.set([action])
workflow.save()
def test_remove_workflow_permissions_and_migrate_workflows_to_consumption_templates(
self,
):
permission = self.Permission.objects.filter(
codename="add_workflow",
).first()
if permission is not None:
self.assertFalse(permission in self.user.user_permissions.all())
self.assertFalse(permission in self.group.permissions.all())
ConsumptionTemplate = self.apps.get_model("documents", "ConsumptionTemplate")
self.assertEqual(ConsumptionTemplate.objects.all().count(), 1)

View File

@@ -3,14 +3,17 @@ from datetime import timedelta
from pathlib import Path
from unittest import mock
from celery import states
from django.conf import settings
from django.test import TestCase
from django.test import override_settings
from django.utils import timezone
from documents import tasks
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import PaperlessTask
from documents.models import Tag
from documents.sanity_checker import SanityCheckFailedException
from documents.sanity_checker import SanityCheckMessages
@@ -270,3 +273,103 @@ class TestUpdateContent(DirectoriesMixin, TestCase):
tasks.update_document_content_maybe_archive_file(doc.pk)
self.assertNotEqual(Document.objects.get(pk=doc.pk).content, "test")
class TestAIIndex(DirectoriesMixin, TestCase):
@override_settings(
AI_ENABLED=True,
LLM_EMBEDDING_BACKEND="huggingface",
)
def test_ai_index_success(self):
"""
GIVEN:
- Document exists, AI is enabled, llm index backend is set
WHEN:
- llmindex_index task is called
THEN:
- update_llm_index is called, and the task is marked as success
"""
Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
# lazy-loaded so mock the actual function
with mock.patch("paperless_ai.indexing.update_llm_index") as update_llm_index:
update_llm_index.return_value = "LLM index updated successfully."
tasks.llmindex_index()
update_llm_index.assert_called_once()
task = PaperlessTask.objects.get(
task_name=PaperlessTask.TaskName.LLMINDEX_UPDATE,
)
self.assertEqual(task.status, states.SUCCESS)
self.assertEqual(task.result, "LLM index updated successfully.")
@override_settings(
AI_ENABLED=True,
LLM_EMBEDDING_BACKEND="huggingface",
)
def test_ai_index_failure(self):
"""
GIVEN:
- Document exists, AI is enabled, llm index backend is set
WHEN:
- llmindex_index task is called
THEN:
- update_llm_index raises an exception, and the task is marked as failure
"""
Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
# lazy-loaded so mock the actual function
with mock.patch("paperless_ai.indexing.update_llm_index") as update_llm_index:
update_llm_index.side_effect = Exception("LLM index update failed.")
tasks.llmindex_index()
update_llm_index.assert_called_once()
task = PaperlessTask.objects.get(
task_name=PaperlessTask.TaskName.LLMINDEX_UPDATE,
)
self.assertEqual(task.status, states.FAILURE)
self.assertIn("LLM index update failed.", task.result)
def test_update_document_in_llm_index(self):
"""
GIVEN:
- Nothing
WHEN:
- update_document_in_llm_index task is called
THEN:
- llm_index_add_or_update_document is called
"""
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
with mock.patch(
"documents.tasks.llm_index_add_or_update_document",
) as llm_index_add_or_update_document:
tasks.update_document_in_llm_index(doc)
llm_index_add_or_update_document.assert_called_once_with(doc)
def test_remove_document_from_llm_index(self):
"""
GIVEN:
- Nothing
WHEN:
- remove_document_from_llm_index task is called
THEN:
- llm_index_remove_document is called
"""
doc = Document.objects.create(
title="test",
content="my document",
checksum="wow",
)
with mock.patch(
"documents.tasks.llm_index_remove_document",
) as llm_index_remove_document:
tasks.remove_document_from_llm_index(doc)
llm_index_remove_document.assert_called_once_with(doc)

View File

@@ -2,6 +2,8 @@ import json
import tempfile
from datetime import timedelta
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import patch
from django.conf import settings
from django.contrib.auth.models import Group
@@ -15,9 +17,15 @@ from django.utils import timezone
from guardian.shortcuts import assign_perm
from rest_framework import status
from documents.caching import get_llm_suggestion_cache
from documents.caching import set_llm_suggestions_cache
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import ShareLink
from documents.models import StoragePath
from documents.models import Tag
from documents.signals.handlers import update_llm_suggestions_cache
from documents.tests.utils import DirectoriesMixin
from paperless.models import ApplicationConfiguration
@@ -270,3 +278,176 @@ class TestViews(DirectoriesMixin, TestCase):
f"Possible N+1 queries detected: {num_queries_small} queries for 2 tags, "
f"but {num_queries_large} queries for 50 tags"
)
class TestAISuggestions(DirectoriesMixin, TestCase):
def setUp(self):
self.user = User.objects.create_superuser(username="testuser")
self.document = Document.objects.create(
title="Test Document",
filename="test.pdf",
mime_type="application/pdf",
)
self.tag1 = Tag.objects.create(name="tag1")
self.correspondent1 = Correspondent.objects.create(name="correspondent1")
self.document_type1 = DocumentType.objects.create(name="type1")
self.path1 = StoragePath.objects.create(name="path1")
super().setUp()
@patch("documents.views.get_llm_suggestion_cache")
@patch("documents.views.refresh_suggestions_cache")
@override_settings(
AI_ENABLED=True,
LLM_BACKEND="mock_backend",
)
def test_suggestions_with_cached_llm(self, mock_refresh_cache, mock_get_cache):
mock_get_cache.return_value = MagicMock(suggestions={"tags": ["tag1", "tag2"]})
self.client.force_login(user=self.user)
response = self.client.get(f"/api/documents/{self.document.pk}/suggestions/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.json(), {"tags": ["tag1", "tag2"]})
mock_refresh_cache.assert_called_once_with(self.document.pk)
@patch("documents.views.get_ai_document_classification")
@override_settings(
AI_ENABLED=True,
LLM_BACKEND="mock_backend",
)
def test_suggestions_with_ai_enabled(
self,
mock_get_ai_classification,
):
mock_get_ai_classification.return_value = {
"title": "AI Title",
"tags": ["tag1", "tag2"],
"correspondents": ["correspondent1"],
"document_types": ["type1"],
"storage_paths": ["path1"],
"dates": ["2023-01-01"],
}
self.client.force_login(user=self.user)
response = self.client.get(f"/api/documents/{self.document.pk}/suggestions/")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.json(),
{
"title": "AI Title",
"tags": [self.tag1.pk],
"suggested_tags": ["tag2"],
"correspondents": [self.correspondent1.pk],
"suggested_correspondents": [],
"document_types": [self.document_type1.pk],
"suggested_document_types": [],
"storage_paths": [self.path1.pk],
"suggested_storage_paths": [],
"dates": ["2023-01-01"],
},
)
def test_invalidate_suggestions_cache(self):
self.client.force_login(user=self.user)
suggestions = {
"title": "AI Title",
"tags": ["tag1", "tag2"],
"correspondents": ["correspondent1"],
"document_types": ["type1"],
"storage_paths": ["path1"],
"dates": ["2023-01-01"],
}
set_llm_suggestions_cache(
self.document.pk,
suggestions,
backend="mock_backend",
)
self.assertEqual(
get_llm_suggestion_cache(
self.document.pk,
backend="mock_backend",
).suggestions,
suggestions,
)
# post_save signal triggered
update_llm_suggestions_cache(
sender=None,
instance=self.document,
)
self.assertIsNone(
get_llm_suggestion_cache(
self.document.pk,
backend="mock_backend",
),
)
class TestAIChatStreamingView(DirectoriesMixin, TestCase):
ENDPOINT = "/api/documents/chat/"
def setUp(self):
self.user = User.objects.create_user(username="testuser", password="pass")
self.client.force_login(user=self.user)
self.document = Document.objects.create(
title="Test Document",
filename="test.pdf",
mime_type="application/pdf",
)
super().setUp()
@override_settings(AI_ENABLED=False)
def test_post_ai_disabled(self):
response = self.client.post(
self.ENDPOINT,
data='{"q": "question"}',
content_type="application/json",
)
self.assertEqual(response.status_code, 400)
self.assertIn(b"AI is required for this feature", response.content)
@patch("documents.views.stream_chat_with_documents")
@patch("documents.views.get_objects_for_user_owner_aware")
@override_settings(AI_ENABLED=True)
def test_post_no_document_id(self, mock_get_objects, mock_stream_chat):
mock_get_objects.return_value = [self.document]
mock_stream_chat.return_value = iter([b"data"])
response = self.client.post(
self.ENDPOINT,
data='{"q": "question"}',
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response["Content-Type"], "text/event-stream")
@patch("documents.views.stream_chat_with_documents")
@override_settings(AI_ENABLED=True)
def test_post_with_document_id(self, mock_stream_chat):
mock_stream_chat.return_value = iter([b"data"])
response = self.client.post(
self.ENDPOINT,
data=f'{{"q": "question", "document_id": {self.document.pk}}}',
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response["Content-Type"], "text/event-stream")
@override_settings(AI_ENABLED=True)
def test_post_with_invalid_document_id(self):
response = self.client.post(
self.ENDPOINT,
data='{"q": "question", "document_id": 999999}',
content_type="application/json",
)
self.assertEqual(response.status_code, 400)
self.assertIn(b"Document not found", response.content)
@patch("documents.views.has_perms_owner_aware")
@override_settings(AI_ENABLED=True)
def test_post_with_document_id_no_permission(self, mock_has_perms):
mock_has_perms.return_value = False
response = self.client.post(
self.ENDPOINT,
data=f'{{"q": "question", "document_id": {self.document.pk}}}',
content_type="application/json",
)
self.assertEqual(response.status_code, 403)
self.assertIn(b"Insufficient permissions", response.content)

View File

@@ -3401,7 +3401,7 @@ class TestWorkflows(
)
webhook_action = WorkflowActionWebhook.objects.create(
use_params=False,
body="Test message: {{doc_url}}",
body="Test message: {{doc_url}} with id {{doc_id}}",
url="http://paperless-ngx.com",
include_document=False,
)
@@ -3431,7 +3431,10 @@ class TestWorkflows(
mock_post.assert_called_once_with(
url="http://paperless-ngx.com",
data=f"Test message: http://localhost:8000/paperless/documents/{doc.id}/",
data=(
f"Test message: http://localhost:8000/paperless/documents/{doc.id}/"
f" with id {doc.id}"
),
headers={},
files=None,
as_json=False,