mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	Recover from accidentally renamed files
This commit is contained in:
		@@ -276,11 +276,43 @@ class Document(models.Model):
 | 
			
		||||
            return "{}: {}".format(created, self.correspondent or self.title)
 | 
			
		||||
        return str(created)
 | 
			
		||||
 | 
			
		||||
    def find_renamed_document(self, subdirectory=""):
 | 
			
		||||
        suffix = "%07i.%s" % (self.pk, self.file_type)
 | 
			
		||||
 | 
			
		||||
        # Append .gpg for encrypted files
 | 
			
		||||
        if self.storage_type == self.STORAGE_TYPE_GPG:
 | 
			
		||||
            suffix += ".gpg"
 | 
			
		||||
 | 
			
		||||
        # Go up in the directory hierarchy and try to delete all directories
 | 
			
		||||
        root = os.path.normpath(Document.filename_to_path(subdirectory))
 | 
			
		||||
 | 
			
		||||
        for filename in os.listdir(root):
 | 
			
		||||
            if filename.endswith(suffix):
 | 
			
		||||
                return os.path.join(subdirectory, filename)
 | 
			
		||||
 | 
			
		||||
            fullname = os.path.join(subdirectory, filename)
 | 
			
		||||
            if os.path.isdir(Document.filename_to_path(fullname)):
 | 
			
		||||
                return self.find_renamed_document(fullname)
 | 
			
		||||
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def source_filename(self):
 | 
			
		||||
        # Initial filename generation (for new documents)
 | 
			
		||||
        if self.filename is None:
 | 
			
		||||
            self.filename = self.generate_source_filename()
 | 
			
		||||
 | 
			
		||||
        # Check if document is still available under filename
 | 
			
		||||
        elif not os.path.isfile(Document.filename_to_path(self.filename)):
 | 
			
		||||
            recovered_filename = self.find_renamed_document()
 | 
			
		||||
 | 
			
		||||
            # If we have found the file, save filename and clean up empty dirs
 | 
			
		||||
            if recovered_filename is not None:
 | 
			
		||||
                self.filename = recovered_filename
 | 
			
		||||
                self.save()
 | 
			
		||||
 | 
			
		||||
                delete_all_empty_subdirectories(Document.filename_to_path(""))
 | 
			
		||||
 | 
			
		||||
        return self.filename
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
@@ -414,6 +446,28 @@ def try_delete_empty_directories(directory):
 | 
			
		||||
        directory = os.path.normpath(directory)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def delete_all_empty_subdirectories(directory):
 | 
			
		||||
    # Go through all folders and try to delete all directories
 | 
			
		||||
    root = os.path.normpath(Document.filename_to_path(directory))
 | 
			
		||||
 | 
			
		||||
    for filename in os.listdir(root):
 | 
			
		||||
        fullname = os.path.join(directory, filename)
 | 
			
		||||
 | 
			
		||||
        if not os.path.isdir(Document.filename_to_path(fullname)):
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        # Try to delete the directory
 | 
			
		||||
        try:
 | 
			
		||||
            os.rmdir(Document.filename_to_path(fullname))
 | 
			
		||||
            continue
 | 
			
		||||
        except os.error:
 | 
			
		||||
            # Directory not empty, no need to go further up
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        # Go into subdirectory to see, if there is more to delete
 | 
			
		||||
        delete_all_empty_subdirectories(os.path.join(directory, filename))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
 | 
			
		||||
@receiver(models.signals.post_save, sender=Document)
 | 
			
		||||
def update_filename(sender, instance, **kwargs):
 | 
			
		||||
 
 | 
			
		||||
@@ -331,3 +331,97 @@ class TestDate(TestCase):
 | 
			
		||||
        document.save()
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(document.generate_source_filename(), "0000001.pdf")
 | 
			
		||||
 | 
			
		||||
    @override_settings(MEDIA_ROOT="/tmp/paperless-tests-{}".
 | 
			
		||||
                       format(str(uuid4())[:8]))
 | 
			
		||||
    @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
 | 
			
		||||
                       "{correspondent}")
 | 
			
		||||
    def test_document_renamed(self):
 | 
			
		||||
        document = Document()
 | 
			
		||||
        document.file_type = "pdf"
 | 
			
		||||
        document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
 | 
			
		||||
        document.save()
 | 
			
		||||
 | 
			
		||||
        # Ensure that filename is properly generated
 | 
			
		||||
        tmp = document.source_filename
 | 
			
		||||
        self.assertEqual(document.generate_source_filename(),
 | 
			
		||||
                         "none/none-0000001.pdf")
 | 
			
		||||
        document.create_source_directory()
 | 
			
		||||
        Path(document.source_path).touch()
 | 
			
		||||
 | 
			
		||||
        # Test source_path
 | 
			
		||||
        self.assertEqual(document.source_path, settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/none/none-0000001.pdf")
 | 
			
		||||
 | 
			
		||||
        # Rename the document "illegaly"
 | 
			
		||||
        os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test")
 | 
			
		||||
        os.rename(settings.MEDIA_ROOT + "/documents/originals/" +
 | 
			
		||||
                                        "none/none-0000001.pdf",
 | 
			
		||||
                  settings.MEDIA_ROOT + "/documents/originals/" +
 | 
			
		||||
                                        "test/test-0000001.pdf")
 | 
			
		||||
        self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
 | 
			
		||||
                         "originals/test/test-0000001.pdf"), True)
 | 
			
		||||
 | 
			
		||||
        # Set new correspondent and expect document to be saved properly
 | 
			
		||||
        document.correspondent = Correspondent.objects.get_or_create(
 | 
			
		||||
                name="foo")[0]
 | 
			
		||||
        document.save()
 | 
			
		||||
        self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
 | 
			
		||||
                         "originals/foo/foo-0000001.pdf"), True)
 | 
			
		||||
 | 
			
		||||
        # Check proper handling of files
 | 
			
		||||
        self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/foo"), True)
 | 
			
		||||
        self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/none"), False)
 | 
			
		||||
        self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/test"), False)
 | 
			
		||||
        self.assertEqual(document.generate_source_filename(),
 | 
			
		||||
                         "foo/foo-0000001.pdf")
 | 
			
		||||
 | 
			
		||||
    @override_settings(MEDIA_ROOT="/tmp/paperless-tests-{}".
 | 
			
		||||
                       format(str(uuid4())[:8]))
 | 
			
		||||
    @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
 | 
			
		||||
                       "{correspondent}")
 | 
			
		||||
    def test_document_renamed_encrypted(self):
 | 
			
		||||
        document = Document()
 | 
			
		||||
        document.file_type = "pdf"
 | 
			
		||||
        document.storage_type = Document.STORAGE_TYPE_GPG
 | 
			
		||||
        document.save()
 | 
			
		||||
 | 
			
		||||
        # Ensure that filename is properly generated
 | 
			
		||||
        tmp = document.source_filename
 | 
			
		||||
        self.assertEqual(document.generate_source_filename(),
 | 
			
		||||
                         "none/none-0000001.pdf.gpg")
 | 
			
		||||
        document.create_source_directory()
 | 
			
		||||
        Path(document.source_path).touch()
 | 
			
		||||
 | 
			
		||||
        # Test source_path
 | 
			
		||||
        self.assertEqual(document.source_path, settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/none/none-0000001.pdf.gpg")
 | 
			
		||||
 | 
			
		||||
        # Rename the document "illegaly"
 | 
			
		||||
        os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test")
 | 
			
		||||
        os.rename(settings.MEDIA_ROOT + "/documents/originals/" +
 | 
			
		||||
                                        "none/none-0000001.pdf.gpg",
 | 
			
		||||
                  settings.MEDIA_ROOT + "/documents/originals/" +
 | 
			
		||||
                                        "test/test-0000001.pdf.gpg")
 | 
			
		||||
        self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
 | 
			
		||||
                         "originals/test/test-0000001.pdf.gpg"), True)
 | 
			
		||||
 | 
			
		||||
        # Set new correspondent and expect document to be saved properly
 | 
			
		||||
        document.correspondent = Correspondent.objects.get_or_create(
 | 
			
		||||
                name="foo")[0]
 | 
			
		||||
        document.save()
 | 
			
		||||
        self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
 | 
			
		||||
                         "originals/foo/foo-0000001.pdf.gpg"), True)
 | 
			
		||||
 | 
			
		||||
        # Check proper handling of files
 | 
			
		||||
        self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/foo"), True)
 | 
			
		||||
        self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/none"), False)
 | 
			
		||||
        self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
 | 
			
		||||
                         "/documents/originals/test"), False)
 | 
			
		||||
        self.assertEqual(document.generate_source_filename(),
 | 
			
		||||
                         "foo/foo-0000001.pdf.gpg")
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user