mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	testing the updated migration
This commit is contained in:
		| @@ -6,21 +6,19 @@ from pathlib import Path | |||||||
| from django.conf import settings | from django.conf import settings | ||||||
| from django.test import override_settings | from django.test import override_settings | ||||||
|  |  | ||||||
| from documents.sanity_checker import SanityFailedError |  | ||||||
| from documents.tasks import sanity_check |  | ||||||
| from documents.tests.utils import DirectoriesMixin, TestMigrations | from documents.tests.utils import DirectoriesMixin, TestMigrations | ||||||
|  |  | ||||||
|  |  | ||||||
| STORAGE_TYPE_GPG = "gpg" | STORAGE_TYPE_GPG = "gpg" | ||||||
|  |  | ||||||
|  |  | ||||||
| def archive_name_from_filename_old(filename): | def archive_name_from_filename(filename): | ||||||
|     return os.path.splitext(filename)[0] + ".pdf" |     return os.path.splitext(filename)[0] + ".pdf" | ||||||
|  |  | ||||||
|  |  | ||||||
| def archive_path_old(self): | def archive_path_old(self): | ||||||
|     if self.filename: |     if self.filename: | ||||||
|         fname = archive_name_from_filename_old(self.filename) |         fname = archive_name_from_filename(self.filename) | ||||||
|     else: |     else: | ||||||
|         fname = "{:07}.pdf".format(self.pk) |         fname = "{:07}.pdf".format(self.pk) | ||||||
|  |  | ||||||
| @@ -30,24 +28,14 @@ def archive_path_old(self): | |||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |  | ||||||
| def archive_name_from_filename_new(filename): | def archive_path_new(doc): | ||||||
|     name, ext = os.path.splitext(filename) |         if doc.archive_filename is not None: | ||||||
|     if ext == ".pdf": |             return os.path.join( | ||||||
|         return filename |                 settings.ARCHIVE_DIR, | ||||||
|     else: |                 str(doc.archive_filename) | ||||||
|         return filename + ".pdf" |             ) | ||||||
|  |         else: | ||||||
|  |             return None | ||||||
| def archive_path_new(self): |  | ||||||
|     if self.filename: |  | ||||||
|         fname = archive_name_from_filename_new(self.filename) |  | ||||||
|     else: |  | ||||||
|         fname = "{:07}.pdf".format(self.pk) |  | ||||||
|  |  | ||||||
|     return os.path.join( |  | ||||||
|         settings.ARCHIVE_DIR, |  | ||||||
|         fname |  | ||||||
|     ) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def source_path(doc): | def source_path(doc): | ||||||
| @@ -75,22 +63,25 @@ def thumbnail_path(doc): | |||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |  | ||||||
| def make_test_document(document_class, title: str, filename: str, mime_type: str, original: str, archive: str = None, new: bool = False): | def make_test_document(document_class, title: str, mime_type: str, original: str, original_filename: str, archive: str = None, archive_filename: str = None): | ||||||
|     doc = document_class() |     doc = document_class() | ||||||
|     doc.filename = filename |     doc.filename = original_filename | ||||||
|     doc.title = title |     doc.title = title | ||||||
|     doc.mime_type = mime_type |     doc.mime_type = mime_type | ||||||
|     doc.content = "the content, does not matter for this test" |     doc.content = "the content, does not matter for this test" | ||||||
|  |     doc.save() | ||||||
|  |  | ||||||
|     shutil.copy2(original, source_path(doc)) |     shutil.copy2(original, source_path(doc)) | ||||||
|     with open(original, "rb") as f: |     with open(original, "rb") as f: | ||||||
|         doc.checksum = hashlib.md5(f.read()).hexdigest() |         doc.checksum = hashlib.md5(f.read()).hexdigest() | ||||||
|  |  | ||||||
|     if archive: |     if archive: | ||||||
|         if new: |         if archive_filename: | ||||||
|  |             doc.archive_filename = archive_filename | ||||||
|             shutil.copy2(archive, archive_path_new(doc)) |             shutil.copy2(archive, archive_path_new(doc)) | ||||||
|         else: |         else: | ||||||
|             shutil.copy2(archive, archive_path_old(doc)) |             shutil.copy2(archive, archive_path_old(doc)) | ||||||
|  |  | ||||||
|         with open(archive, "rb") as f: |         with open(archive, "rb") as f: | ||||||
|             doc.archive_checksum = hashlib.md5(f.read()).hexdigest() |             doc.archive_checksum = hashlib.md5(f.read()).hexdigest() | ||||||
|  |  | ||||||
| @@ -101,36 +92,42 @@ def make_test_document(document_class, title: str, filename: str, mime_type: str | |||||||
|     return doc |     return doc | ||||||
|  |  | ||||||
|  |  | ||||||
| @override_settings(PAPERLESS_FILENAME_FORMAT="{title}") | simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg") | ||||||
|  | simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") | ||||||
|  | simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf") | ||||||
|  | simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt") | ||||||
|  | simple_png = os.path.join(os.path.dirname(__file__), "samples", "simple-noalpha.png") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @override_settings(PAPERLESS_FILENAME_FORMAT="") | ||||||
| class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations): | class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations): | ||||||
|  |  | ||||||
|     migrate_from = '1011_auto_20210101_2340' |     migrate_from = '1011_auto_20210101_2340' | ||||||
|     migrate_to = '1012_fix_archive_files' |     migrate_to = '1012_fix_archive_files' | ||||||
|  |  | ||||||
|     def setUpBeforeMigration(self, apps): |     def setUpBeforeMigration(self, apps): | ||||||
|         simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg") |  | ||||||
|         simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") |  | ||||||
|         simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf") |  | ||||||
|         simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt") |  | ||||||
|         simple_png = os.path.join(os.path.dirname(__file__), "samples", "simple-noalpha.png") |  | ||||||
|  |  | ||||||
|         Document = apps.get_model("documents", "Document") |         Document = apps.get_model("documents", "Document") | ||||||
|  |  | ||||||
|         self.doc_unrelated = make_test_document(Document, "unrelated", "unrelated.txt", "application/pdf", simple_pdf2, simple_pdf2) |         doc_no_archive = make_test_document(Document, "no_archive", "text/plain", simple_txt, "no_archive.txt") | ||||||
|         self.doc_no_archive = make_test_document(Document, "no_archive", "no_archive.txt", "text/plain", simple_txt) |         clash1 = make_test_document(Document, "clash", "application/pdf", simple_pdf, "clash.pdf", simple_pdf) | ||||||
|         self.clashA = make_test_document(Document, "clash", "clash.pdf", "application/pdf", simple_pdf, simple_pdf) |         clash2 = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf) | ||||||
|         self.clashB = make_test_document(Document, "clash", "clash.jpg", "image/jpeg", simple_jpg, simple_pdf) |         clash3 = make_test_document(Document, "clash", "image/png", simple_png, "clash.png", simple_pdf) | ||||||
|         self.clashC = make_test_document(Document, "clash", "clash.png", "image/png", simple_png, simple_pdf) |         clash4 = make_test_document(Document, "clash.png", "application/pdf", simple_pdf2, "clash.png.pdf", simple_pdf2) | ||||||
|  |  | ||||||
|         self.assertEqual(archive_path_old(self.clashA), archive_path_old(self.clashB)) |         self.assertEqual(archive_path_old(clash1), archive_path_old(clash2)) | ||||||
|         self.assertEqual(archive_path_old(self.clashA), archive_path_old(self.clashC)) |         self.assertEqual(archive_path_old(clash1), archive_path_old(clash3)) | ||||||
|         self.assertRaises(SanityFailedError, sanity_check) |         self.assertNotEqual(archive_path_old(clash1), archive_path_old(clash4)) | ||||||
|  |  | ||||||
|     def testArchiveFilesMigrated(self): |     def testArchiveFilesMigrated(self): | ||||||
|         Document = self.apps.get_model('documents', 'Document') |         Document = self.apps.get_model('documents', 'Document') | ||||||
|  |  | ||||||
|         for doc in Document.objects.all(): |         for doc in Document.objects.all(): | ||||||
|             self.assertTrue(os.path.isfile(archive_path_new(self.clashB))) |             if doc.archive_checksum: | ||||||
|  |                 self.assertIsNotNone(doc.archive_filename) | ||||||
|  |                 self.assertTrue(os.path.isfile(archive_path_new(doc))) | ||||||
|  |             else: | ||||||
|  |                 self.assertIsNone(doc.archive_filename) | ||||||
|  |  | ||||||
|             with open(source_path(doc), "rb") as f: |             with open(source_path(doc), "rb") as f: | ||||||
|                 original_checksum = hashlib.md5(f.read()).hexdigest() |                 original_checksum = hashlib.md5(f.read()).hexdigest() | ||||||
|             self.assertEqual(original_checksum, doc.checksum) |             self.assertEqual(original_checksum, doc.checksum) | ||||||
| @@ -143,32 +140,32 @@ class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations): | |||||||
|  |  | ||||||
|         self.assertEqual(Document.objects.filter(archive_checksum__isnull=False).count(), 4) |         self.assertEqual(Document.objects.filter(archive_checksum__isnull=False).count(), 4) | ||||||
|  |  | ||||||
|         # this will raise errors when any inconsistencies remain after migration |  | ||||||
|         sanity_check() | @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{title}") | ||||||
|  | class TestMigrateArchiveFilesWithFilenameFormat(TestMigrateArchiveFiles): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @override_settings(PAPERLESS_FILENAME_FORMAT="") | ||||||
| class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations): | class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations): | ||||||
|  |  | ||||||
|     migrate_from = '1012_fix_archive_files' |     migrate_from = '1012_fix_archive_files' | ||||||
|     migrate_to = '1011_auto_20210101_2340' |     migrate_to = '1011_auto_20210101_2340' | ||||||
|  |  | ||||||
|     def setUpBeforeMigration(self, apps): |     def setUpBeforeMigration(self, apps): | ||||||
|         simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg") |  | ||||||
|         simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") |  | ||||||
|         simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf") |  | ||||||
|         simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt") |  | ||||||
|  |  | ||||||
|         Document = apps.get_model("documents", "Document") |         Document = apps.get_model("documents", "Document") | ||||||
|  |  | ||||||
|         self.doc_unrelated = make_test_document(Document, "unrelated", "unrelated.txt", "application/pdf", simple_pdf2, simple_pdf2, new=True) |         doc_unrelated = make_test_document(Document, "unrelated", "application/pdf", simple_pdf2, "unrelated.txt", simple_pdf2, "unrelated.pdf") | ||||||
|         self.doc_no_archive = make_test_document(Document, "no_archive", "no_archive.txt", "text/plain", simple_txt, new=True) |         doc_no_archive = make_test_document(Document, "no_archive", "text/plain", simple_txt, "no_archive.txt") | ||||||
|         self.clashB = make_test_document(Document, "clash", "clash.jpg", "image/jpeg", simple_jpg, simple_pdf, new=True) |         clashB = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf, "clash_02.pdf") | ||||||
|  |  | ||||||
|     def testArchiveFilesReverted(self): |     def testArchiveFilesReverted(self): | ||||||
|         Document = self.apps.get_model('documents', 'Document') |         Document = self.apps.get_model('documents', 'Document') | ||||||
|  |  | ||||||
|         for doc in Document.objects.all(): |         for doc in Document.objects.all(): | ||||||
|             self.assertTrue(os.path.isfile(archive_path_old(self.clashB))) |             if doc.archive_checksum: | ||||||
|  |                 self.assertTrue(os.path.isfile(archive_path_old(doc))) | ||||||
|             with open(source_path(doc), "rb") as f: |             with open(source_path(doc), "rb") as f: | ||||||
|                 original_checksum = hashlib.md5(f.read()).hexdigest() |                 original_checksum = hashlib.md5(f.read()).hexdigest() | ||||||
|             self.assertEqual(original_checksum, doc.checksum) |             self.assertEqual(original_checksum, doc.checksum) | ||||||
| @@ -178,3 +175,36 @@ class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations): | |||||||
|                 with open(archive_path_old(doc), "rb") as f: |                 with open(archive_path_old(doc), "rb") as f: | ||||||
|                     archive_checksum = hashlib.md5(f.read()).hexdigest() |                     archive_checksum = hashlib.md5(f.read()).hexdigest() | ||||||
|                 self.assertEqual(archive_checksum, doc.archive_checksum) |                 self.assertEqual(archive_checksum, doc.archive_checksum) | ||||||
|  |  | ||||||
|  |         self.assertEqual(Document.objects.filter(archive_checksum__isnull=False).count(), 2) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{title}") | ||||||
|  | class TestMigrateArchiveFilesBackwardsWithFilenameFormat(TestMigrateArchiveFilesBackwards): | ||||||
|  |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | @override_settings(PAPERLESS_FILENAME_FORMAT="") | ||||||
|  | class TestMigrateArchiveFilesBackwardsErrors(DirectoriesMixin, TestMigrations): | ||||||
|  |  | ||||||
|  |     migrate_from = '1012_fix_archive_files' | ||||||
|  |     migrate_to = '1011_auto_20210101_2340' | ||||||
|  |     auto_migrate = False | ||||||
|  |  | ||||||
|  |     def test_filename_clash(self): | ||||||
|  |  | ||||||
|  |         Document = self.apps.get_model("documents", "Document") | ||||||
|  |  | ||||||
|  |         self.clashA = make_test_document(Document, "clash", "application/pdf", simple_pdf, "clash.pdf", simple_pdf, "clash_02.pdf") | ||||||
|  |         self.clashB = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf, "clash_01.pdf") | ||||||
|  |  | ||||||
|  |         self.assertRaisesMessage(ValueError, "would clash with another archive filename", self.performMigration) | ||||||
|  |  | ||||||
|  |     def test_filename_exists(self): | ||||||
|  |  | ||||||
|  |         Document = self.apps.get_model("documents", "Document") | ||||||
|  |  | ||||||
|  |         self.clashA = make_test_document(Document, "clash", "application/pdf", simple_pdf, "clash.pdf", simple_pdf, "clash.pdf") | ||||||
|  |         self.clashB = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf, "clash_01.pdf") | ||||||
|  |  | ||||||
|  |         self.assertRaisesMessage(ValueError, "file already exists.", self.performMigration) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 jonaswinkler
					jonaswinkler