Removes the migration tests

This commit is contained in:
Trenton H
2026-01-20 10:45:49 -08:00
parent a527f5e244
commit 3b3f372439
13 changed files with 0 additions and 1682 deletions

View File

@@ -1,574 +0,0 @@
import hashlib
import importlib
import shutil
from pathlib import Path
from unittest import mock
import pytest
from django.conf import settings
from django.test import override_settings
from documents.parsers import ParseError
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import TestMigrations
STORAGE_TYPE_GPG = "gpg"
migration_1012_obj = importlib.import_module(
"documents.migrations.1012_fix_archive_files",
)
def archive_name_from_filename(filename: Path) -> Path:
return Path(filename.stem + ".pdf")
def archive_path_old(self) -> Path:
if self.filename:
fname = archive_name_from_filename(Path(self.filename))
else:
fname = Path(f"{self.pk:07}.pdf")
return Path(settings.ARCHIVE_DIR) / fname
def archive_path_new(doc):
if doc.archive_filename is not None:
return Path(settings.ARCHIVE_DIR) / str(doc.archive_filename)
else:
return None
def source_path(doc):
if doc.filename:
fname = str(doc.filename)
else:
fname = f"{doc.pk:07}{doc.file_type}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
return Path(settings.ORIGINALS_DIR) / fname
def thumbnail_path(doc):
file_name = f"{doc.pk:07}.png"
if doc.storage_type == STORAGE_TYPE_GPG:
file_name += ".gpg"
return Path(settings.THUMBNAIL_DIR) / file_name
def make_test_document(
document_class,
title: str,
mime_type: str,
original: str,
original_filename: str,
archive: str | None = None,
archive_filename: str | None = None,
):
doc = document_class()
doc.filename = original_filename
doc.title = title
doc.mime_type = mime_type
doc.content = "the content, does not matter for this test"
doc.save()
shutil.copy2(original, source_path(doc))
with Path(original).open("rb") as f:
doc.checksum = hashlib.md5(f.read()).hexdigest()
if archive:
if archive_filename:
doc.archive_filename = archive_filename
shutil.copy2(archive, archive_path_new(doc))
else:
shutil.copy2(archive, archive_path_old(doc))
with Path(archive).open("rb") as f:
doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
doc.save()
Path(thumbnail_path(doc)).touch()
return doc
simple_jpg = Path(__file__).parent / "samples" / "simple.jpg"
simple_pdf = Path(__file__).parent / "samples" / "simple.pdf"
simple_pdf2 = (
Path(__file__).parent / "samples" / "documents" / "originals" / "0000002.pdf"
)
simple_pdf3 = (
Path(__file__).parent / "samples" / "documents" / "originals" / "0000003.pdf"
)
simple_txt = Path(__file__).parent / "samples" / "simple.txt"
simple_png = Path(__file__).parent / "samples" / "simple-noalpha.png"
simple_png2 = Path(__file__).parent / "examples" / "no-text.png"
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFiles(DirectoriesMixin, FileSystemAssertsMixin, TestMigrations):
migrate_from = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
migrate_to = "1012_fix_archive_files"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
self.unrelated = make_test_document(
Document,
"unrelated",
"application/pdf",
simple_pdf3,
"unrelated.pdf",
simple_pdf,
)
self.no_text = make_test_document(
Document,
"no-text",
"image/png",
simple_png2,
"no-text.png",
simple_pdf,
)
self.doc_no_archive = make_test_document(
Document,
"no_archive",
"text/plain",
simple_txt,
"no_archive.txt",
)
self.clash1 = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
)
self.clash2 = make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
)
self.clash3 = make_test_document(
Document,
"clash",
"image/png",
simple_png,
"clash.png",
simple_pdf,
)
self.clash4 = make_test_document(
Document,
"clash.png",
"application/pdf",
simple_pdf2,
"clash.png.pdf",
simple_pdf2,
)
self.assertEqual(archive_path_old(self.clash1), archive_path_old(self.clash2))
self.assertEqual(archive_path_old(self.clash1), archive_path_old(self.clash3))
self.assertNotEqual(
archive_path_old(self.clash1),
archive_path_old(self.clash4),
)
def testArchiveFilesMigrated(self):
Document = self.apps.get_model("documents", "Document")
for doc in Document.objects.all():
if doc.archive_checksum:
self.assertIsNotNone(doc.archive_filename)
self.assertIsFile(archive_path_new(doc))
else:
self.assertIsNone(doc.archive_filename)
with Path(source_path(doc)).open("rb") as f:
original_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(original_checksum, doc.checksum)
if doc.archive_checksum:
self.assertIsFile(archive_path_new(doc))
with archive_path_new(doc).open("rb") as f:
archive_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(archive_checksum, doc.archive_checksum)
self.assertEqual(
Document.objects.filter(archive_checksum__isnull=False).count(),
6,
)
def test_filenames(self):
Document = self.apps.get_model("documents", "Document")
self.assertEqual(
Document.objects.get(id=self.unrelated.id).archive_filename,
"unrelated.pdf",
)
self.assertEqual(
Document.objects.get(id=self.no_text.id).archive_filename,
"no-text.pdf",
)
self.assertEqual(
Document.objects.get(id=self.doc_no_archive.id).archive_filename,
None,
)
self.assertEqual(
Document.objects.get(id=self.clash1.id).archive_filename,
f"{self.clash1.id:07}.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash2.id).archive_filename,
f"{self.clash2.id:07}.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash3.id).archive_filename,
f"{self.clash3.id:07}.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash4.id).archive_filename,
"clash.png.pdf",
)
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
class TestMigrateArchiveFilesWithFilenameFormat(TestMigrateArchiveFiles):
def test_filenames(self):
Document = self.apps.get_model("documents", "Document")
self.assertEqual(
Document.objects.get(id=self.unrelated.id).archive_filename,
"unrelated.pdf",
)
self.assertEqual(
Document.objects.get(id=self.no_text.id).archive_filename,
"no-text.pdf",
)
self.assertEqual(
Document.objects.get(id=self.doc_no_archive.id).archive_filename,
None,
)
self.assertEqual(
Document.objects.get(id=self.clash1.id).archive_filename,
"none/clash.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash2.id).archive_filename,
"none/clash_01.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash3.id).archive_filename,
"none/clash_02.pdf",
)
self.assertEqual(
Document.objects.get(id=self.clash4.id).archive_filename,
"clash.png.pdf",
)
def fake_parse_wrapper(parser, path, mime_type, file_name):
parser.archive_path = None
parser.text = "the text"
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFilesErrors(DirectoriesMixin, TestMigrations):
migrate_from = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
migrate_to = "1012_fix_archive_files"
auto_migrate = False
@pytest.mark.skip(reason="Fails with migration tearDown util. Needs investigation.")
def test_archive_missing(self):
Document = self.apps.get_model("documents", "Document")
doc = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
)
archive_path_old(doc).unlink()
self.assertRaisesMessage(
ValueError,
"does not exist at: ",
self.performMigration,
)
@pytest.mark.skip(reason="Fails with migration tearDown util. Needs investigation.")
def test_parser_missing(self):
Document = self.apps.get_model("documents", "Document")
make_test_document(
Document,
"document",
"invalid/typesss768",
simple_png,
"document.png",
simple_pdf,
)
make_test_document(
Document,
"document",
"invalid/typesss768",
simple_jpg,
"document.jpg",
simple_pdf,
)
self.assertRaisesMessage(
ValueError,
"no parsers are available",
self.performMigration,
)
@mock.patch(f"{__name__}.migration_1012_obj.parse_wrapper")
def test_parser_error(self, m):
m.side_effect = ParseError()
Document = self.apps.get_model("documents", "Document")
doc1 = make_test_document(
Document,
"document",
"image/png",
simple_png,
"document.png",
simple_pdf,
)
doc2 = make_test_document(
Document,
"document",
"application/pdf",
simple_jpg,
"document.jpg",
simple_pdf,
)
self.assertIsNotNone(doc1.archive_checksum)
self.assertIsNotNone(doc2.archive_checksum)
with self.assertLogs() as capture:
self.performMigration()
self.assertEqual(m.call_count, 6)
self.assertEqual(
len(
list(
filter(
lambda log: "Parse error, will try again in 5 seconds" in log,
capture.output,
),
),
),
4,
)
self.assertEqual(
len(
list(
filter(
lambda log: "Unable to regenerate archive document for ID:"
in log,
capture.output,
),
),
),
2,
)
Document = self.apps.get_model("documents", "Document")
doc1 = Document.objects.get(id=doc1.id)
doc2 = Document.objects.get(id=doc2.id)
self.assertIsNone(doc1.archive_checksum)
self.assertIsNone(doc2.archive_checksum)
self.assertIsNone(doc1.archive_filename)
self.assertIsNone(doc2.archive_filename)
@mock.patch(f"{__name__}.migration_1012_obj.parse_wrapper")
def test_parser_no_archive(self, m):
m.side_effect = fake_parse_wrapper
Document = self.apps.get_model("documents", "Document")
doc1 = make_test_document(
Document,
"document",
"image/png",
simple_png,
"document.png",
simple_pdf,
)
doc2 = make_test_document(
Document,
"document",
"application/pdf",
simple_jpg,
"document.jpg",
simple_pdf,
)
with self.assertLogs() as capture:
self.performMigration()
self.assertEqual(
len(
list(
filter(
lambda log: "Parser did not return an archive document for document"
in log,
capture.output,
),
),
),
2,
)
Document = self.apps.get_model("documents", "Document")
doc1 = Document.objects.get(id=doc1.id)
doc2 = Document.objects.get(id=doc2.id)
self.assertIsNone(doc1.archive_checksum)
self.assertIsNone(doc2.archive_checksum)
self.assertIsNone(doc1.archive_filename)
self.assertIsNone(doc2.archive_filename)
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFilesBackwards(
DirectoriesMixin,
FileSystemAssertsMixin,
TestMigrations,
):
migrate_from = "1012_fix_archive_files"
migrate_to = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
make_test_document(
Document,
"unrelated",
"application/pdf",
simple_pdf2,
"unrelated.txt",
simple_pdf2,
"unrelated.pdf",
)
make_test_document(
Document,
"no_archive",
"text/plain",
simple_txt,
"no_archive.txt",
)
make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
"clash_02.pdf",
)
def testArchiveFilesReverted(self):
Document = self.apps.get_model("documents", "Document")
for doc in Document.objects.all():
if doc.archive_checksum:
self.assertIsFile(archive_path_old(doc))
with Path(source_path(doc)).open("rb") as f:
original_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(original_checksum, doc.checksum)
if doc.archive_checksum:
self.assertIsFile(archive_path_old(doc))
with archive_path_old(doc).open("rb") as f:
archive_checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(archive_checksum, doc.archive_checksum)
self.assertEqual(
Document.objects.filter(archive_checksum__isnull=False).count(),
2,
)
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
class TestMigrateArchiveFilesBackwardsWithFilenameFormat(
TestMigrateArchiveFilesBackwards,
):
pass
@override_settings(FILENAME_FORMAT="")
class TestMigrateArchiveFilesBackwardsErrors(DirectoriesMixin, TestMigrations):
migrate_from = "1012_fix_archive_files"
migrate_to = "1006_auto_20201208_2209_squashed_1011_auto_20210101_2340"
auto_migrate = False
def test_filename_clash(self):
Document = self.apps.get_model("documents", "Document")
self.clashA = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
"clash_02.pdf",
)
self.clashB = make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
"clash_01.pdf",
)
self.assertRaisesMessage(
ValueError,
"would clash with another archive filename",
self.performMigration,
)
def test_filename_exists(self):
Document = self.apps.get_model("documents", "Document")
self.clashA = make_test_document(
Document,
"clash",
"application/pdf",
simple_pdf,
"clash.pdf",
simple_pdf,
"clash.pdf",
)
self.clashB = make_test_document(
Document,
"clash",
"image/jpeg",
simple_jpg,
"clash.jpg",
simple_pdf,
"clash_01.pdf",
)
self.assertRaisesMessage(
ValueError,
"file already exists.",
self.performMigration,
)

View File

@@ -1,50 +0,0 @@
from django.contrib.auth import get_user_model
from documents.tests.utils import TestMigrations
class TestMigrateConsumptionTemplate(TestMigrations):
migrate_from = "1038_sharelink"
migrate_to = "1039_consumptiontemplate"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_document")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_users_with_add_documents_get_add_consumptiontemplate(self):
permission = self.Permission.objects.get(codename="add_consumptiontemplate")
self.assertTrue(self.user.has_perm(f"documents.{permission.codename}"))
self.assertTrue(permission in self.group.permissions.all())
class TestReverseMigrateConsumptionTemplate(TestMigrations):
migrate_from = "1039_consumptiontemplate"
migrate_to = "1038_sharelink"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.filter(
codename="add_consumptiontemplate",
).first()
if permission is not None:
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_remove_consumptiontemplate_permissions(self):
permission = self.Permission.objects.filter(
codename="add_consumptiontemplate",
).first()
# can be None ? now that CTs removed
if permission is not None:
self.assertFalse(self.user.has_perm(f"documents.{permission.codename}"))
self.assertFalse(permission in self.group.permissions.all())

View File

@@ -1,33 +0,0 @@
from datetime import date
from datetime import datetime
from datetime import timedelta
from django.utils.timezone import make_aware
from pytz import UTC
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
class TestMigrateDocumentCreated(DirectoriesMixin, TestMigrations):
migrate_from = "1066_alter_workflowtrigger_schedule_offset_days"
migrate_to = "1067_alter_document_created"
def setUpBeforeMigration(self, apps):
# create 600 documents
for i in range(600):
Document = apps.get_model("documents", "Document")
naive = datetime(2023, 10, 1, 12, 0, 0) + timedelta(days=i)
Document.objects.create(
title=f"test{i}",
mime_type="application/pdf",
filename=f"file{i}.pdf",
created=make_aware(naive, timezone=UTC),
checksum=i,
)
def testDocumentCreatedMigrated(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=1)
self.assertEqual(doc.created, date(2023, 10, 1))

View File

@@ -1,87 +0,0 @@
from unittest.mock import ANY
from documents.tests.utils import TestMigrations
class TestMigrateCustomFieldSelects(TestMigrations):
migrate_from = "1059_workflowactionemail_workflowactionwebhook_and_more"
migrate_to = "1060_alter_customfieldinstance_value_select"
def setUpBeforeMigration(self, apps):
CustomField = apps.get_model("documents.CustomField")
self.old_format = CustomField.objects.create(
name="cf1",
data_type="select",
extra_data={"select_options": ["Option 1", "Option 2", "Option 3"]},
)
Document = apps.get_model("documents.Document")
doc = Document.objects.create(title="doc1")
CustomFieldInstance = apps.get_model("documents.CustomFieldInstance")
self.old_instance = CustomFieldInstance.objects.create(
field=self.old_format,
value_select=0,
document=doc,
)
def test_migrate_old_to_new_select_fields(self):
self.old_format.refresh_from_db()
self.old_instance.refresh_from_db()
self.assertEqual(
self.old_format.extra_data["select_options"],
[
{"label": "Option 1", "id": ANY},
{"label": "Option 2", "id": ANY},
{"label": "Option 3", "id": ANY},
],
)
self.assertEqual(
self.old_instance.value_select,
self.old_format.extra_data["select_options"][0]["id"],
)
class TestMigrationCustomFieldSelectsReverse(TestMigrations):
migrate_from = "1060_alter_customfieldinstance_value_select"
migrate_to = "1059_workflowactionemail_workflowactionwebhook_and_more"
def setUpBeforeMigration(self, apps):
CustomField = apps.get_model("documents.CustomField")
self.new_format = CustomField.objects.create(
name="cf1",
data_type="select",
extra_data={
"select_options": [
{"label": "Option 1", "id": "id1"},
{"label": "Option 2", "id": "id2"},
{"label": "Option 3", "id": "id3"},
],
},
)
Document = apps.get_model("documents.Document")
doc = Document.objects.create(title="doc1")
CustomFieldInstance = apps.get_model("documents.CustomFieldInstance")
self.new_instance = CustomFieldInstance.objects.create(
field=self.new_format,
value_select="id1",
document=doc,
)
def test_migrate_new_to_old_select_fields(self):
self.new_format.refresh_from_db()
self.new_instance.refresh_from_db()
self.assertEqual(
self.new_format.extra_data["select_options"],
[
"Option 1",
"Option 2",
"Option 3",
],
)
self.assertEqual(
self.new_instance.value_select,
0,
)

View File

@@ -1,43 +0,0 @@
from django.contrib.auth import get_user_model
from documents.tests.utils import TestMigrations
class TestMigrateCustomFields(TestMigrations):
migrate_from = "1039_consumptiontemplate"
migrate_to = "1040_customfield_customfieldinstance_and_more"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_document")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_users_with_add_documents_get_add_customfields(self):
permission = self.Permission.objects.get(codename="add_customfield")
self.assertTrue(self.user.has_perm(f"documents.{permission.codename}"))
self.assertTrue(permission in self.group.permissions.all())
class TestReverseMigrateCustomFields(TestMigrations):
migrate_from = "1040_customfield_customfieldinstance_and_more"
migrate_to = "1039_consumptiontemplate"
def setUpBeforeMigration(self, apps):
User = get_user_model()
Group = apps.get_model("auth.Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_customfield")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
def test_remove_consumptiontemplate_permissions(self):
permission = self.Permission.objects.get(codename="add_customfield")
self.assertFalse(self.user.has_perm(f"documents.{permission.codename}"))
self.assertFalse(permission in self.group.permissions.all())

View File

@@ -1,59 +0,0 @@
import shutil
from pathlib import Path
from django.conf import settings
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
def source_path_before(self) -> Path:
if self.filename:
fname = str(self.filename)
return Path(settings.ORIGINALS_DIR) / fname
class TestMigrateDocumentPageCount(DirectoriesMixin, TestMigrations):
migrate_from = "1052_document_transaction_id"
migrate_to = "1053_document_page_count"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test1",
mime_type="application/pdf",
filename="file1.pdf",
)
self.doc_id = doc.id
shutil.copy(
Path(__file__).parent / "samples" / "simple.pdf",
source_path_before(doc),
)
def testDocumentPageCountMigrated(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=self.doc_id)
self.assertEqual(doc.page_count, 1)
class TestMigrateDocumentPageCountBackwards(TestMigrations):
migrate_from = "1053_document_page_count"
migrate_to = "1052_document_transaction_id"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test1",
mime_type="application/pdf",
filename="file1.pdf",
page_count=8,
)
self.doc_id = doc.id
def test_remove_number_of_pages_to_page_count(self):
Document = self.apps.get_model("documents", "Document")
self.assertFalse(
"page_count" in [field.name for field in Document._meta.get_fields()],
)

View File

@@ -1,283 +0,0 @@
import importlib
import shutil
import tempfile
from collections.abc import Callable
from collections.abc import Iterable
from pathlib import Path
from unittest import mock
from django.test import override_settings
from documents.tests.utils import TestMigrations
# https://github.com/python/cpython/issues/100950
migration_1037_obj = importlib.import_module(
"documents.migrations.1037_webp_encrypted_thumbnail_conversion",
)
@override_settings(PASSPHRASE="test")
@mock.patch(
f"{__name__}.migration_1037_obj.multiprocessing.pool.Pool.map",
)
@mock.patch(f"{__name__}.migration_1037_obj.run_convert")
class TestMigrateToEncrytpedWebPThumbnails(TestMigrations):
migrate_from = (
"1022_paperlesstask_squashed_1036_alter_savedviewfilterrule_rule_type"
)
migrate_to = "1037_webp_encrypted_thumbnail_conversion"
auto_migrate = False
def pretend_convert_output(self, *args, **kwargs):
"""
Pretends to do the conversion, by copying the input file
to the output file
"""
shutil.copy2(
Path(kwargs["input_file"].rstrip("[0]")),
Path(kwargs["output_file"]),
)
def pretend_map(self, func: Callable, iterable: Iterable):
"""
Pretends to be the map of a multiprocessing.Pool, but secretly does
everything in series
"""
for item in iterable:
func(item)
def create_dummy_thumbnails(
self,
thumb_dir: Path,
ext: str,
count: int,
start_count: int = 0,
):
"""
Helper to create a certain count of files of given extension in a given directory
"""
for idx in range(count):
(Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
# Triple check expected files exist
self.assert_file_count_by_extension(ext, thumb_dir, count)
def create_webp_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy WebP thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
def create_encrypted_webp_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy encrypted WebP thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "webp.gpg", count, start_count)
def create_png_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy PNG thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
def create_encrypted_png_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy encrypted PNG thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "png.gpg", count, start_count)
def assert_file_count_by_extension(
self,
ext: str,
dir: str | Path,
expected_count: int,
):
"""
Helper to assert a certain count of given extension files in given directory
"""
if not isinstance(dir, Path):
dir = Path(dir)
matching_files = list(dir.glob(f"*.{ext}"))
self.assertEqual(len(matching_files), expected_count)
def assert_encrypted_png_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of excrypted PNG extension files in given directory
"""
self.assert_file_count_by_extension("png.gpg", dir, expected_count)
def assert_encrypted_webp_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of encrypted WebP extension files in given directory
"""
self.assert_file_count_by_extension("webp.gpg", dir, expected_count)
def assert_webp_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of WebP extension files in given directory
"""
self.assert_file_count_by_extension("webp", dir, expected_count)
def assert_png_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of PNG extension files in given directory
"""
self.assert_file_count_by_extension("png", dir, expected_count)
def setUp(self):
self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
return super().setUp()
def tearDown(self) -> None:
shutil.rmtree(self.thumbnail_dir)
return super().tearDown()
def test_do_nothing_if_converted(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Encrypted document exists with existing encrypted WebP thumbnail path
WHEN:
- Migration is attempted
THEN:
- Nothing is converted
"""
map_mock.side_effect = self.pretend_map
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_encrypted_webp_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_not_called()
self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
def test_convert_thumbnails(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Encrypted documents exist with PNG thumbnail
WHEN:
- Migration is attempted
THEN:
- Thumbnails are converted to webp & re-encrypted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
def test_convert_errors_out(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Encrypted document exists with PNG thumbnail
WHEN:
- Migration is attempted, but raises an exception
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = OSError
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_encrypted_png_file_count(self.thumbnail_dir, 3)
def test_convert_mixed(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Documents exist with PNG, encrypted PNG and WebP thumbnails
WHEN:
- Migration is attempted
THEN:
- Only encrypted PNG thumbnails are converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_files(self.thumbnail_dir, 3)
self.create_encrypted_png_thumbnail_files(
self.thumbnail_dir,
3,
start_count=3,
)
self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=6)
self.create_encrypted_webp_thumbnail_files(
self.thumbnail_dir,
3,
start_count=8,
)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_png_file_count(self.thumbnail_dir, 3)
self.assert_encrypted_webp_file_count(self.thumbnail_dir, 6)
self.assert_webp_file_count(self.thumbnail_dir, 2)
self.assert_encrypted_png_file_count(self.thumbnail_dir, 0)

View File

@@ -1,108 +0,0 @@
import shutil
from pathlib import Path
from django.conf import settings
from django.test import override_settings
from documents.parsers import get_default_file_extension
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
STORAGE_TYPE_UNENCRYPTED = "unencrypted"
STORAGE_TYPE_GPG = "gpg"
def source_path_before(self):
if self.filename:
fname = str(self.filename)
else:
fname = f"{self.pk:07}.{self.file_type}"
if self.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg"
return Path(settings.ORIGINALS_DIR) / fname
def file_type_after(self):
return get_default_file_extension(self.mime_type)
def source_path_after(doc):
if doc.filename:
fname = str(doc.filename)
else:
fname = f"{doc.pk:07}{file_type_after(doc)}"
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
return Path(settings.ORIGINALS_DIR) / fname
@override_settings(PASSPHRASE="test")
class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
migrate_from = "1002_auto_20201111_1105"
migrate_to = "1003_mime_types"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test",
file_type="pdf",
filename="file1.pdf",
)
self.doc_id = doc.id
shutil.copy(
Path(__file__).parent / "samples" / "simple.pdf",
source_path_before(doc),
)
doc2 = Document.objects.create(
checksum="B",
file_type="pdf",
storage_type=STORAGE_TYPE_GPG,
)
self.doc2_id = doc2.id
shutil.copy(
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000004.pdf.gpg"
),
source_path_before(doc2),
)
def testMimeTypesMigrated(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=self.doc_id)
self.assertEqual(doc.mime_type, "application/pdf")
doc2 = Document.objects.get(id=self.doc2_id)
self.assertEqual(doc2.mime_type, "application/pdf")
@override_settings(PASSPHRASE="test")
class TestMigrateMimeTypeBackwards(DirectoriesMixin, TestMigrations):
migrate_from = "1003_mime_types"
migrate_to = "1002_auto_20201111_1105"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
doc = Document.objects.create(
title="test",
mime_type="application/pdf",
filename="file1.pdf",
)
self.doc_id = doc.id
shutil.copy(
Path(__file__).parent / "samples" / "simple.pdf",
source_path_after(doc),
)
def testMimeTypesReverted(self):
Document = self.apps.get_model("documents", "Document")
doc = Document.objects.get(id=self.doc_id)
self.assertEqual(doc.file_type, "pdf")

View File

@@ -1,15 +0,0 @@
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
class TestMigrateNullCharacters(DirectoriesMixin, TestMigrations):
migrate_from = "1014_auto_20210228_1614"
migrate_to = "1015_remove_null_characters"
def setUpBeforeMigration(self, apps):
Document = apps.get_model("documents", "Document")
self.doc = Document.objects.create(content="aaa\0bbb")
def testMimeTypesMigrated(self):
Document = self.apps.get_model("documents", "Document")
self.assertNotIn("\0", Document.objects.get(id=self.doc.id).content)

View File

@@ -1,30 +0,0 @@
from documents.models import StoragePath
from documents.tests.utils import TestMigrations
class TestMigrateStoragePathToTemplate(TestMigrations):
migrate_from = "1054_customfieldinstance_value_monetary_amount_and_more"
migrate_to = "1055_alter_storagepath_path"
def setUpBeforeMigration(self, apps):
self.old_format = StoragePath.objects.create(
name="sp1",
path="Something/{title}",
)
self.new_format = StoragePath.objects.create(
name="sp2",
path="{{asn}}/{{title}}",
)
self.no_formatting = StoragePath.objects.create(
name="sp3",
path="Some/Fixed/Path",
)
def test_migrate_old_to_new_storage_path(self):
self.old_format.refresh_from_db()
self.new_format.refresh_from_db()
self.no_formatting.refresh_from_db()
self.assertEqual(self.old_format.path, "Something/{{ title }}")
self.assertEqual(self.new_format.path, "{{asn}}/{{title}}")
self.assertEqual(self.no_formatting.path, "Some/Fixed/Path")

View File

@@ -1,36 +0,0 @@
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
class TestMigrateTagColor(DirectoriesMixin, TestMigrations):
migrate_from = "1012_fix_archive_files"
migrate_to = "1013_migrate_tag_colour"
def setUpBeforeMigration(self, apps):
Tag = apps.get_model("documents", "Tag")
self.t1_id = Tag.objects.create(name="tag1").id
self.t2_id = Tag.objects.create(name="tag2", colour=1).id
self.t3_id = Tag.objects.create(name="tag3", colour=5).id
def testMimeTypesMigrated(self):
Tag = self.apps.get_model("documents", "Tag")
self.assertEqual(Tag.objects.get(id=self.t1_id).color, "#a6cee3")
self.assertEqual(Tag.objects.get(id=self.t2_id).color, "#a6cee3")
self.assertEqual(Tag.objects.get(id=self.t3_id).color, "#fb9a99")
class TestMigrateTagColorBackwards(DirectoriesMixin, TestMigrations):
migrate_from = "1013_migrate_tag_colour"
migrate_to = "1012_fix_archive_files"
def setUpBeforeMigration(self, apps):
Tag = apps.get_model("documents", "Tag")
self.t1_id = Tag.objects.create(name="tag1").id
self.t2_id = Tag.objects.create(name="tag2", color="#cab2d6").id
self.t3_id = Tag.objects.create(name="tag3", color="#123456").id
def testMimeTypesReverted(self):
Tag = self.apps.get_model("documents", "Tag")
self.assertEqual(Tag.objects.get(id=self.t1_id).colour, 1)
self.assertEqual(Tag.objects.get(id=self.t2_id).colour, 9)
self.assertEqual(Tag.objects.get(id=self.t3_id).colour, 1)

View File

@@ -1,230 +0,0 @@
import importlib
import shutil
import tempfile
from collections.abc import Callable
from collections.abc import Iterable
from pathlib import Path
from unittest import mock
from django.test import override_settings
from documents.tests.utils import TestMigrations
# https://github.com/python/cpython/issues/100950
migration_1021_obj = importlib.import_module(
"documents.migrations.1021_webp_thumbnail_conversion",
)
@mock.patch(
f"{__name__}.migration_1021_obj.multiprocessing.pool.Pool.map",
)
@mock.patch(f"{__name__}.migration_1021_obj.run_convert")
class TestMigrateWebPThumbnails(TestMigrations):
migrate_from = "1016_auto_20210317_1351_squashed_1020_merge_20220518_1839"
migrate_to = "1021_webp_thumbnail_conversion"
auto_migrate = False
def pretend_convert_output(self, *args, **kwargs):
"""
Pretends to do the conversion, by copying the input file
to the output file
"""
shutil.copy2(
Path(kwargs["input_file"].rstrip("[0]")),
Path(kwargs["output_file"]),
)
def pretend_map(self, func: Callable, iterable: Iterable):
"""
Pretends to be the map of a multiprocessing.Pool, but secretly does
everything in series
"""
for item in iterable:
func(item)
def create_dummy_thumbnails(
self,
thumb_dir: Path,
ext: str,
count: int,
start_count: int = 0,
):
"""
Helper to create a certain count of files of given extension in a given directory
"""
for idx in range(count):
(Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
# Triple check expected files exist
self.assert_file_count_by_extension(ext, thumb_dir, count)
def create_webp_thumbnail_files(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy WebP thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
def create_png_thumbnail_file(
self,
thumb_dir: Path,
count: int,
start_count: int = 0,
):
"""
Creates a dummy PNG thumbnail file in the given directory, based on
the database Document
"""
self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
def assert_file_count_by_extension(
self,
ext: str,
dir: str | Path,
expected_count: int,
):
"""
Helper to assert a certain count of given extension files in given directory
"""
if not isinstance(dir, Path):
dir = Path(dir)
matching_files = list(dir.glob(f"*.{ext}"))
self.assertEqual(len(matching_files), expected_count)
def assert_png_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of PNG extension files in given directory
"""
self.assert_file_count_by_extension("png", dir, expected_count)
def assert_webp_file_count(self, dir: Path, expected_count: int):
"""
Helper to assert a certain count of WebP extension files in given directory
"""
self.assert_file_count_by_extension("webp", dir, expected_count)
def setUp(self):
self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
return super().setUp()
def tearDown(self) -> None:
shutil.rmtree(self.thumbnail_dir)
return super().tearDown()
def test_do_nothing_if_converted(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with default WebP thumbnail path
WHEN:
- Thumbnail conversion is attempted
THEN:
- Nothing is converted
"""
map_mock.side_effect = self.pretend_map
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_webp_thumbnail_files(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_not_called()
self.assert_webp_file_count(self.thumbnail_dir, 3)
def test_convert_single_thumbnail(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with PNG thumbnail
WHEN:
- Thumbnail conversion is attempted
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_webp_file_count(self.thumbnail_dir, 3)
def test_convert_errors_out(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with PNG thumbnail
WHEN:
- Thumbnail conversion is attempted, but raises an exception
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = OSError
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_png_file_count(self.thumbnail_dir, 3)
def test_convert_mixed(
self,
run_convert_mock: mock.MagicMock,
map_mock: mock.MagicMock,
):
"""
GIVEN:
- Document exists with PNG thumbnail
WHEN:
- Thumbnail conversion is attempted, but raises an exception
THEN:
- Single thumbnail is converted
"""
map_mock.side_effect = self.pretend_map
run_convert_mock.side_effect = self.pretend_convert_output
with override_settings(
THUMBNAIL_DIR=self.thumbnail_dir,
):
self.create_png_thumbnail_file(self.thumbnail_dir, 3)
self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=3)
self.performMigration()
run_convert_mock.assert_called()
self.assertEqual(run_convert_mock.call_count, 3)
self.assert_png_file_count(self.thumbnail_dir, 0)
self.assert_webp_file_count(self.thumbnail_dir, 5)

View File

@@ -1,134 +0,0 @@
from documents.data_models import DocumentSource
from documents.tests.utils import TestMigrations
class TestMigrateWorkflow(TestMigrations):
migrate_from = "1043_alter_savedviewfilterrule_rule_type"
migrate_to = "1044_workflow_workflowaction_workflowtrigger_and_more"
dependencies = (
(
"paperless_mail",
"0029_mailrule_pdf_layout",
),
)
def setUpBeforeMigration(self, apps):
User = apps.get_model("auth", "User")
Group = apps.get_model("auth", "Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.get(codename="add_document")
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
# create a CT to migrate
c = apps.get_model("documents", "Correspondent").objects.create(
name="Correspondent Name",
)
dt = apps.get_model("documents", "DocumentType").objects.create(
name="DocType Name",
)
t1 = apps.get_model("documents", "Tag").objects.create(name="t1")
sp = apps.get_model("documents", "StoragePath").objects.create(path="/test/")
cf1 = apps.get_model("documents", "CustomField").objects.create(
name="Custom Field 1",
data_type="string",
)
ma = apps.get_model("paperless_mail", "MailAccount").objects.create(
name="MailAccount 1",
)
mr = apps.get_model("paperless_mail", "MailRule").objects.create(
name="MailRule 1",
order=0,
account=ma,
)
user2 = User.objects.create(username="user2")
user3 = User.objects.create(username="user3")
group2 = Group.objects.create(name="group2")
ConsumptionTemplate = apps.get_model("documents", "ConsumptionTemplate")
ct = ConsumptionTemplate.objects.create(
name="Template 1",
order=0,
sources=f"{DocumentSource.ApiUpload},{DocumentSource.ConsumeFolder},{DocumentSource.MailFetch}",
filter_filename="*simple*",
filter_path="*/samples/*",
filter_mailrule=mr,
assign_title="Doc from {correspondent}",
assign_correspondent=c,
assign_document_type=dt,
assign_storage_path=sp,
assign_owner=user2,
)
ct.assign_tags.add(t1)
ct.assign_view_users.add(user3)
ct.assign_view_groups.add(group2)
ct.assign_change_users.add(user3)
ct.assign_change_groups.add(group2)
ct.assign_custom_fields.add(cf1)
ct.save()
def test_users_with_add_documents_get_add_and_workflow_templates_get_migrated(self):
permission = self.Permission.objects.get(codename="add_workflow")
self.assertTrue(permission in self.user.user_permissions.all())
self.assertTrue(permission in self.group.permissions.all())
Workflow = self.apps.get_model("documents", "Workflow")
self.assertEqual(Workflow.objects.all().count(), 1)
class TestReverseMigrateWorkflow(TestMigrations):
migrate_from = "1044_workflow_workflowaction_workflowtrigger_and_more"
migrate_to = "1043_alter_savedviewfilterrule_rule_type"
def setUpBeforeMigration(self, apps):
User = apps.get_model("auth", "User")
Group = apps.get_model("auth", "Group")
self.Permission = apps.get_model("auth", "Permission")
self.user = User.objects.create(username="user1")
self.group = Group.objects.create(name="group1")
permission = self.Permission.objects.filter(
codename="add_workflow",
).first()
if permission is not None:
self.user.user_permissions.add(permission.id)
self.group.permissions.add(permission.id)
Workflow = apps.get_model("documents", "Workflow")
WorkflowTrigger = apps.get_model("documents", "WorkflowTrigger")
WorkflowAction = apps.get_model("documents", "WorkflowAction")
trigger = WorkflowTrigger.objects.create(
type=0,
sources=[str(DocumentSource.ConsumeFolder)],
filter_path="*/path/*",
filter_filename="*file*",
)
action = WorkflowAction.objects.create(
assign_title="assign title",
)
workflow = Workflow.objects.create(
name="workflow 1",
order=0,
)
workflow.triggers.set([trigger])
workflow.actions.set([action])
workflow.save()
def test_remove_workflow_permissions_and_migrate_workflows_to_consumption_templates(
self,
):
permission = self.Permission.objects.filter(
codename="add_workflow",
).first()
if permission is not None:
self.assertFalse(permission in self.user.user_permissions.all())
self.assertFalse(permission in self.group.permissions.all())
ConsumptionTemplate = self.apps.get_model("documents", "ConsumptionTemplate")
self.assertEqual(ConsumptionTemplate.objects.all().count(), 1)