From f64b5bf737505c68ad6498f2aa4b9066961b2af4 Mon Sep 17 00:00:00 2001 From: Wolf-Bastian Poettner Date: Fri, 10 Apr 2020 12:54:07 +0000 Subject: [PATCH] Recover from accidentally renamed files --- src/documents/models.py | 54 +++++++++++++ src/documents/tests/test_file_handling.py | 94 +++++++++++++++++++++++ 2 files changed, 148 insertions(+) diff --git a/src/documents/models.py b/src/documents/models.py index b81b4d42d..ba5b861c6 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -276,11 +276,43 @@ class Document(models.Model): return "{}: {}".format(created, self.correspondent or self.title) return str(created) + def find_renamed_document(self, subdirectory=""): + suffix = "%07i.%s" % (self.pk, self.file_type) + + # Append .gpg for encrypted files + if self.storage_type == self.STORAGE_TYPE_GPG: + suffix += ".gpg" + + # Go up in the directory hierarchy and try to delete all directories + root = os.path.normpath(Document.filename_to_path(subdirectory)) + + for filename in os.listdir(root): + if filename.endswith(suffix): + return os.path.join(subdirectory, filename) + + fullname = os.path.join(subdirectory, filename) + if os.path.isdir(Document.filename_to_path(fullname)): + return self.find_renamed_document(fullname) + + return None + @property def source_filename(self): + # Initial filename generation (for new documents) if self.filename is None: self.filename = self.generate_source_filename() + # Check if document is still available under filename + elif not os.path.isfile(Document.filename_to_path(self.filename)): + recovered_filename = self.find_renamed_document() + + # If we have found the file, save filename and clean up empty dirs + if recovered_filename is not None: + self.filename = recovered_filename + self.save() + + delete_all_empty_subdirectories(Document.filename_to_path("")) + return self.filename @staticmethod @@ -414,6 +446,28 @@ def try_delete_empty_directories(directory): directory = os.path.normpath(directory) +def delete_all_empty_subdirectories(directory): + # Go through all folders and try to delete all directories + root = os.path.normpath(Document.filename_to_path(directory)) + + for filename in os.listdir(root): + fullname = os.path.join(directory, filename) + + if not os.path.isdir(Document.filename_to_path(fullname)): + continue + + # Try to delete the directory + try: + os.rmdir(Document.filename_to_path(fullname)) + continue + except os.error: + # Directory not empty, no need to go further up + continue + + # Go into subdirectory to see, if there is more to delete + delete_all_empty_subdirectories(os.path.join(directory, filename)) + + @receiver(models.signals.m2m_changed, sender=Document.tags.through) @receiver(models.signals.post_save, sender=Document) def update_filename(sender, instance, **kwargs): diff --git a/src/documents/tests/test_file_handling.py b/src/documents/tests/test_file_handling.py index 7af99adfc..0da2c27e7 100644 --- a/src/documents/tests/test_file_handling.py +++ b/src/documents/tests/test_file_handling.py @@ -331,3 +331,97 @@ class TestDate(TestCase): document.save() self.assertEqual(document.generate_source_filename(), "0000001.pdf") + + @override_settings(MEDIA_ROOT="/tmp/paperless-tests-{}". + format(str(uuid4())[:8])) + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_renamed(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf") + + # Rename the document "illegaly" + os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test") + os.rename(settings.MEDIA_ROOT + "/documents/originals/" + + "none/none-0000001.pdf", + settings.MEDIA_ROOT + "/documents/originals/" + + "test/test-0000001.pdf") + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/test/test-0000001.pdf"), True) + + # Set new correspondent and expect document to be saved properly + document.correspondent = Correspondent.objects.get_or_create( + name="foo")[0] + document.save() + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/foo/foo-0000001.pdf"), True) + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/foo"), True) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/test"), False) + self.assertEqual(document.generate_source_filename(), + "foo/foo-0000001.pdf") + + @override_settings(MEDIA_ROOT="/tmp/paperless-tests-{}". + format(str(uuid4())[:8])) + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_renamed_encrypted(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_GPG + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf.gpg") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf.gpg") + + # Rename the document "illegaly" + os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test") + os.rename(settings.MEDIA_ROOT + "/documents/originals/" + + "none/none-0000001.pdf.gpg", + settings.MEDIA_ROOT + "/documents/originals/" + + "test/test-0000001.pdf.gpg") + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/test/test-0000001.pdf.gpg"), True) + + # Set new correspondent and expect document to be saved properly + document.correspondent = Correspondent.objects.get_or_create( + name="foo")[0] + document.save() + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/foo/foo-0000001.pdf.gpg"), True) + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/foo"), True) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/test"), False) + self.assertEqual(document.generate_source_filename(), + "foo/foo-0000001.pdf.gpg")