mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	migration for #511
This commit is contained in:
		
							
								
								
									
										181
									
								
								src/documents/migrations/1012_fix_archive_files.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										181
									
								
								src/documents/migrations/1012_fix_archive_files.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,181 @@
 | 
				
			|||||||
 | 
					# Generated by Django 3.1.6 on 2021-02-07 22:26
 | 
				
			||||||
 | 
					import hashlib
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import shutil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from django.conf import settings
 | 
				
			||||||
 | 
					from django.db import migrations
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					logger = logging.getLogger("paperless.migrations")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_name_from_filename_old(filename):
 | 
				
			||||||
 | 
					    return os.path.splitext(filename)[0] + ".pdf"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_path_old(doc):
 | 
				
			||||||
 | 
					    if doc.filename:
 | 
				
			||||||
 | 
					        fname = archive_name_from_filename_old(doc.filename)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        fname = "{:07}.pdf".format(doc.pk)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.ARCHIVE_DIR,
 | 
				
			||||||
 | 
					        fname
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_name_from_filename_new(filename):
 | 
				
			||||||
 | 
					    name, ext = os.path.splitext(filename)
 | 
				
			||||||
 | 
					    if ext == ".pdf":
 | 
				
			||||||
 | 
					        return filename
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        return filename + ".pdf"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_path_new(doc):
 | 
				
			||||||
 | 
					    if doc.filename:
 | 
				
			||||||
 | 
					        fname = archive_name_from_filename_new(doc.filename)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        fname = "{:07}.pdf".format(doc.pk)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.ARCHIVE_DIR,
 | 
				
			||||||
 | 
					        fname
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					STORAGE_TYPE_GPG = "gpg"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def source_path(doc):
 | 
				
			||||||
 | 
					    if doc.filename:
 | 
				
			||||||
 | 
					        fname = str(doc.filename)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        fname = "{:07}{}".format(doc.pk, doc.file_type)
 | 
				
			||||||
 | 
					        if doc.storage_type == STORAGE_TYPE_GPG:
 | 
				
			||||||
 | 
					            fname += ".gpg"  # pragma: no cover
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.ORIGINALS_DIR,
 | 
				
			||||||
 | 
					        fname
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def move_old_to_new_locations(apps, schema_editor):
 | 
				
			||||||
 | 
					    Document = apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    affected_document_ids = set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    old_archive_path_to_id = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # check for documents that have incorrect archive versions
 | 
				
			||||||
 | 
					    for doc in Document.objects.filter(archive_checksum__isnull=False):
 | 
				
			||||||
 | 
					        old_path = archive_path_old(doc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not os.path.isfile(old_path):
 | 
				
			||||||
 | 
					            raise ValueError(
 | 
				
			||||||
 | 
					                f"Archived document of {doc.filename} does not exist at: "
 | 
				
			||||||
 | 
					                f"{old_path}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if old_path in old_archive_path_to_id:
 | 
				
			||||||
 | 
					            affected_document_ids.add(doc.id)
 | 
				
			||||||
 | 
					            affected_document_ids.add(old_archive_path_to_id[old_path])
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            old_archive_path_to_id[old_path] = doc.id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # check that we can regenerate these archive versions
 | 
				
			||||||
 | 
					    for doc_id in affected_document_ids:
 | 
				
			||||||
 | 
					        from documents.parsers import get_parser_class_for_mime_type
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        doc = Document.objects.get(id=doc_id)
 | 
				
			||||||
 | 
					        parser_class = get_parser_class_for_mime_type(doc.mime_type)
 | 
				
			||||||
 | 
					        if not parser_class:
 | 
				
			||||||
 | 
					            raise Exception(
 | 
				
			||||||
 | 
					                f"document {doc.filename} has an invalid archived document, "
 | 
				
			||||||
 | 
					                f"but no parsers are available. Cannot migrate.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # move files
 | 
				
			||||||
 | 
					    for doc in Document.objects.filter(archive_checksum__isnull=False):
 | 
				
			||||||
 | 
					        old_path = archive_path_old(doc)
 | 
				
			||||||
 | 
					        new_path = archive_path_new(doc)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if old_path != new_path and not os.path.isfile(new_path):
 | 
				
			||||||
 | 
					            logger.debug(
 | 
				
			||||||
 | 
					                f"Moving {old_path} to {new_path}"
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            shutil.move(old_path, new_path)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # regenerate archive documents
 | 
				
			||||||
 | 
					    for doc_id in affected_document_ids:
 | 
				
			||||||
 | 
					        from documents.parsers import get_parser_class_for_mime_type, \
 | 
				
			||||||
 | 
					            DocumentParser, \
 | 
				
			||||||
 | 
					            ParseError
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        doc = Document.objects.get(id=doc_id)
 | 
				
			||||||
 | 
					        logger.info(
 | 
				
			||||||
 | 
					            f"Regenerating archive document for {doc.filename}"
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        parser_class = get_parser_class_for_mime_type(doc.mime_type)
 | 
				
			||||||
 | 
					        parser: DocumentParser = parser_class(None, None)
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            parser.parse(source_path(doc), doc.mime_type, os.path.basename(doc.filename))
 | 
				
			||||||
 | 
					            doc.content = parser.get_text()
 | 
				
			||||||
 | 
					            if parser.archive_path and os.path.isfile(parser.archive_path):
 | 
				
			||||||
 | 
					                with open(parser.archive_path, "rb") as f:
 | 
				
			||||||
 | 
					                    doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					                shutil.copy2(parser.archive_path, archive_path_new(doc))
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                doc.archive_checksum = None
 | 
				
			||||||
 | 
					                if os.path.isfile(archive_path_new(doc)):
 | 
				
			||||||
 | 
					                    os.unlink(archive_path_new(doc))
 | 
				
			||||||
 | 
					            doc.save()
 | 
				
			||||||
 | 
					        except ParseError:
 | 
				
			||||||
 | 
					            logger.exception(
 | 
				
			||||||
 | 
					                f"Unable to regenerate archive document for {doc.filename}"
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        finally:
 | 
				
			||||||
 | 
					            parser.cleanup()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def move_new_to_old_locations(apps, schema_editor):
 | 
				
			||||||
 | 
					    Document = apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    old_archive_paths = set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for doc in Document.objects.filter(archive_checksum__isnull=False):
 | 
				
			||||||
 | 
					        new_archive_path = archive_path_new(doc)
 | 
				
			||||||
 | 
					        old_archive_path = archive_path_old(doc)
 | 
				
			||||||
 | 
					        if old_archive_path in old_archive_paths:
 | 
				
			||||||
 | 
					            raise ValueError(
 | 
				
			||||||
 | 
					                f"Cannot migrate: Archive file name {old_archive_path} of "
 | 
				
			||||||
 | 
					                f"document {doc.filename} would clash with another archive "
 | 
				
			||||||
 | 
					                f"filename.")
 | 
				
			||||||
 | 
					        old_archive_paths.add(old_archive_path)
 | 
				
			||||||
 | 
					        if new_archive_path != old_archive_path and os.path.isfile(old_archive_path):
 | 
				
			||||||
 | 
					            raise ValueError(
 | 
				
			||||||
 | 
					                f"Cannot migrate: Cannot move {new_archive_path} to "
 | 
				
			||||||
 | 
					                f"{old_archive_path}: file already exists."
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for doc in Document.objects.filter(archive_checksum__isnull=False):
 | 
				
			||||||
 | 
					        new_archive_path = archive_path_new(doc)
 | 
				
			||||||
 | 
					        old_archive_path = archive_path_old(doc)
 | 
				
			||||||
 | 
					        shutil.move(new_archive_path, old_archive_path)
 | 
				
			||||||
 | 
					        logger.debug(f"Moving {new_archive_path} to {old_archive_path}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Migration(migrations.Migration):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    dependencies = [
 | 
				
			||||||
 | 
					        ('documents', '1011_auto_20210101_2340'),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    operations = [
 | 
				
			||||||
 | 
					        migrations.RunPython(
 | 
				
			||||||
 | 
					            move_old_to_new_locations,
 | 
				
			||||||
 | 
					            move_new_to_old_locations
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
							
								
								
									
										
											BIN
										
									
								
								src/documents/tests/samples/simple.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								src/documents/tests/samples/simple.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| 
		 After Width: | Height: | Size: 17 KiB  | 
							
								
								
									
										1
									
								
								src/documents/tests/samples/simple.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								src/documents/tests/samples/simple.txt
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
				
			|||||||
 | 
					This is a test file.
 | 
				
			||||||
							
								
								
									
										175
									
								
								src/documents/tests/test_migration_archive_files.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										175
									
								
								src/documents/tests/test_migration_archive_files.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,175 @@
 | 
				
			|||||||
 | 
					import hashlib
 | 
				
			||||||
 | 
					import os
 | 
				
			||||||
 | 
					import shutil
 | 
				
			||||||
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from django.conf import settings
 | 
				
			||||||
 | 
					from django.test import override_settings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from documents.sanity_checker import SanityFailedError
 | 
				
			||||||
 | 
					from documents.tasks import sanity_check
 | 
				
			||||||
 | 
					from documents.tests.utils import DirectoriesMixin, TestMigrations
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					STORAGE_TYPE_GPG = "gpg"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_name_from_filename_old(filename):
 | 
				
			||||||
 | 
					    return os.path.splitext(filename)[0] + ".pdf"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_path_old(self):
 | 
				
			||||||
 | 
					    if self.filename:
 | 
				
			||||||
 | 
					        fname = archive_name_from_filename_old(self.filename)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        fname = "{:07}.pdf".format(self.pk)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.ARCHIVE_DIR,
 | 
				
			||||||
 | 
					        fname
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_name_from_filename_new(filename):
 | 
				
			||||||
 | 
					    name, ext = os.path.splitext(filename)
 | 
				
			||||||
 | 
					    if ext == ".pdf":
 | 
				
			||||||
 | 
					        return filename
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        return filename + ".pdf"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def archive_path_new(self):
 | 
				
			||||||
 | 
					    if self.filename:
 | 
				
			||||||
 | 
					        fname = archive_name_from_filename_new(self.filename)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        fname = "{:07}.pdf".format(self.pk)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.ARCHIVE_DIR,
 | 
				
			||||||
 | 
					        fname
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def source_path(doc):
 | 
				
			||||||
 | 
					    if doc.filename:
 | 
				
			||||||
 | 
					        fname = str(doc.filename)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        fname = "{:07}{}".format(doc.pk, doc.file_type)
 | 
				
			||||||
 | 
					        if doc.storage_type == STORAGE_TYPE_GPG:
 | 
				
			||||||
 | 
					            fname += ".gpg"  # pragma: no cover
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.ORIGINALS_DIR,
 | 
				
			||||||
 | 
					        fname
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def thumbnail_path(doc):
 | 
				
			||||||
 | 
					    file_name = "{:07}.png".format(doc.pk)
 | 
				
			||||||
 | 
					    if doc.storage_type == STORAGE_TYPE_GPG:
 | 
				
			||||||
 | 
					        file_name += ".gpg"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return os.path.join(
 | 
				
			||||||
 | 
					        settings.THUMBNAIL_DIR,
 | 
				
			||||||
 | 
					        file_name
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def make_test_document(document_class, title: str, filename: str, mime_type: str, original: str, archive: str = None, new: bool = False):
 | 
				
			||||||
 | 
					    doc = document_class()
 | 
				
			||||||
 | 
					    doc.filename = filename
 | 
				
			||||||
 | 
					    doc.title = title
 | 
				
			||||||
 | 
					    doc.mime_type = mime_type
 | 
				
			||||||
 | 
					    doc.content = "the content, does not matter for this test"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    shutil.copy2(original, source_path(doc))
 | 
				
			||||||
 | 
					    with open(original, "rb") as f:
 | 
				
			||||||
 | 
					        doc.checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if archive:
 | 
				
			||||||
 | 
					        if new:
 | 
				
			||||||
 | 
					            shutil.copy2(archive, archive_path_new(doc))
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            shutil.copy2(archive, archive_path_old(doc))
 | 
				
			||||||
 | 
					        with open(archive, "rb") as f:
 | 
				
			||||||
 | 
					            doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    doc.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Path(thumbnail_path(doc)).touch()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return doc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@override_settings(PAPERLESS_FILENAME_FORMAT="{title}")
 | 
				
			||||||
 | 
					class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    migrate_from = '1011_auto_20210101_2340'
 | 
				
			||||||
 | 
					    migrate_to = '1012_fix_archive_files'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def setUpBeforeMigration(self, apps):
 | 
				
			||||||
 | 
					        simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg")
 | 
				
			||||||
 | 
					        simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
 | 
				
			||||||
 | 
					        simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf")
 | 
				
			||||||
 | 
					        simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Document = apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.doc_unrelated = make_test_document(Document, "unrelated", "unrelated.txt", "application/pdf", simple_pdf2, simple_pdf2)
 | 
				
			||||||
 | 
					        self.doc_no_archive = make_test_document(Document, "no_archive", "no_archive.txt", "text/plain", simple_txt)
 | 
				
			||||||
 | 
					        self.clashA = make_test_document(Document, "clash", "clash.pdf", "application/pdf", simple_pdf, simple_pdf)
 | 
				
			||||||
 | 
					        self.clashB = make_test_document(Document, "clash", "clash.jpg", "image/jpeg", simple_jpg, simple_pdf)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertEqual(archive_path_old(self.clashA), archive_path_old(self.clashB))
 | 
				
			||||||
 | 
					        self.assertRaises(SanityFailedError, sanity_check)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testArchiveFilesMigrated(self):
 | 
				
			||||||
 | 
					        Document = self.apps.get_model('documents', 'Document')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for doc in Document.objects.all():
 | 
				
			||||||
 | 
					            self.assertTrue(os.path.isfile(archive_path_new(self.clashB)))
 | 
				
			||||||
 | 
					            with open(source_path(doc), "rb") as f:
 | 
				
			||||||
 | 
					                original_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					            self.assertEqual(original_checksum, doc.checksum)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if doc.archive_checksum:
 | 
				
			||||||
 | 
					                self.assertTrue(os.path.isfile(archive_path_new(doc)))
 | 
				
			||||||
 | 
					                with open(archive_path_new(doc), "rb") as f:
 | 
				
			||||||
 | 
					                    archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					                self.assertEqual(archive_checksum, doc.archive_checksum)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # this will raise errors when any inconsistencies remain after migration
 | 
				
			||||||
 | 
					        sanity_check()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    migrate_from = '1012_fix_archive_files'
 | 
				
			||||||
 | 
					    migrate_to = '1011_auto_20210101_2340'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def setUpBeforeMigration(self, apps):
 | 
				
			||||||
 | 
					        simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg")
 | 
				
			||||||
 | 
					        simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
 | 
				
			||||||
 | 
					        simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf")
 | 
				
			||||||
 | 
					        simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Document = apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.doc_unrelated = make_test_document(Document, "unrelated", "unrelated.txt", "application/pdf", simple_pdf2, simple_pdf2, new=True)
 | 
				
			||||||
 | 
					        self.doc_no_archive = make_test_document(Document, "no_archive", "no_archive.txt", "text/plain", simple_txt, new=True)
 | 
				
			||||||
 | 
					        self.clashB = make_test_document(Document, "clash", "clash.jpg", "image/jpeg", simple_jpg, simple_pdf, new=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def testArchiveFilesReverted(self):
 | 
				
			||||||
 | 
					        Document = self.apps.get_model('documents', 'Document')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for doc in Document.objects.all():
 | 
				
			||||||
 | 
					            self.assertTrue(os.path.isfile(archive_path_old(self.clashB)))
 | 
				
			||||||
 | 
					            with open(source_path(doc), "rb") as f:
 | 
				
			||||||
 | 
					                original_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					            self.assertEqual(original_checksum, doc.checksum)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if doc.archive_checksum:
 | 
				
			||||||
 | 
					                self.assertTrue(os.path.isfile(archive_path_old(doc)))
 | 
				
			||||||
 | 
					                with open(archive_path_old(doc), "rb") as f:
 | 
				
			||||||
 | 
					                    archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					                self.assertEqual(archive_checksum, doc.archive_checksum)
 | 
				
			||||||
		Reference in New Issue
	
	Block a user