mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
Recover from accidentally renamed files
This commit is contained in:
parent
4fc1e67e9b
commit
f64b5bf737
@ -276,11 +276,43 @@ class Document(models.Model):
|
|||||||
return "{}: {}".format(created, self.correspondent or self.title)
|
return "{}: {}".format(created, self.correspondent or self.title)
|
||||||
return str(created)
|
return str(created)
|
||||||
|
|
||||||
|
def find_renamed_document(self, subdirectory=""):
|
||||||
|
suffix = "%07i.%s" % (self.pk, self.file_type)
|
||||||
|
|
||||||
|
# Append .gpg for encrypted files
|
||||||
|
if self.storage_type == self.STORAGE_TYPE_GPG:
|
||||||
|
suffix += ".gpg"
|
||||||
|
|
||||||
|
# Go up in the directory hierarchy and try to delete all directories
|
||||||
|
root = os.path.normpath(Document.filename_to_path(subdirectory))
|
||||||
|
|
||||||
|
for filename in os.listdir(root):
|
||||||
|
if filename.endswith(suffix):
|
||||||
|
return os.path.join(subdirectory, filename)
|
||||||
|
|
||||||
|
fullname = os.path.join(subdirectory, filename)
|
||||||
|
if os.path.isdir(Document.filename_to_path(fullname)):
|
||||||
|
return self.find_renamed_document(fullname)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def source_filename(self):
|
def source_filename(self):
|
||||||
|
# Initial filename generation (for new documents)
|
||||||
if self.filename is None:
|
if self.filename is None:
|
||||||
self.filename = self.generate_source_filename()
|
self.filename = self.generate_source_filename()
|
||||||
|
|
||||||
|
# Check if document is still available under filename
|
||||||
|
elif not os.path.isfile(Document.filename_to_path(self.filename)):
|
||||||
|
recovered_filename = self.find_renamed_document()
|
||||||
|
|
||||||
|
# If we have found the file, save filename and clean up empty dirs
|
||||||
|
if recovered_filename is not None:
|
||||||
|
self.filename = recovered_filename
|
||||||
|
self.save()
|
||||||
|
|
||||||
|
delete_all_empty_subdirectories(Document.filename_to_path(""))
|
||||||
|
|
||||||
return self.filename
|
return self.filename
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -414,6 +446,28 @@ def try_delete_empty_directories(directory):
|
|||||||
directory = os.path.normpath(directory)
|
directory = os.path.normpath(directory)
|
||||||
|
|
||||||
|
|
||||||
|
def delete_all_empty_subdirectories(directory):
|
||||||
|
# Go through all folders and try to delete all directories
|
||||||
|
root = os.path.normpath(Document.filename_to_path(directory))
|
||||||
|
|
||||||
|
for filename in os.listdir(root):
|
||||||
|
fullname = os.path.join(directory, filename)
|
||||||
|
|
||||||
|
if not os.path.isdir(Document.filename_to_path(fullname)):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Try to delete the directory
|
||||||
|
try:
|
||||||
|
os.rmdir(Document.filename_to_path(fullname))
|
||||||
|
continue
|
||||||
|
except os.error:
|
||||||
|
# Directory not empty, no need to go further up
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Go into subdirectory to see, if there is more to delete
|
||||||
|
delete_all_empty_subdirectories(os.path.join(directory, filename))
|
||||||
|
|
||||||
|
|
||||||
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
|
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
|
||||||
@receiver(models.signals.post_save, sender=Document)
|
@receiver(models.signals.post_save, sender=Document)
|
||||||
def update_filename(sender, instance, **kwargs):
|
def update_filename(sender, instance, **kwargs):
|
||||||
|
@ -331,3 +331,97 @@ class TestDate(TestCase):
|
|||||||
document.save()
|
document.save()
|
||||||
|
|
||||||
self.assertEqual(document.generate_source_filename(), "0000001.pdf")
|
self.assertEqual(document.generate_source_filename(), "0000001.pdf")
|
||||||
|
|
||||||
|
@override_settings(MEDIA_ROOT="/tmp/paperless-tests-{}".
|
||||||
|
format(str(uuid4())[:8]))
|
||||||
|
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
|
||||||
|
"{correspondent}")
|
||||||
|
def test_document_renamed(self):
|
||||||
|
document = Document()
|
||||||
|
document.file_type = "pdf"
|
||||||
|
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||||
|
document.save()
|
||||||
|
|
||||||
|
# Ensure that filename is properly generated
|
||||||
|
tmp = document.source_filename
|
||||||
|
self.assertEqual(document.generate_source_filename(),
|
||||||
|
"none/none-0000001.pdf")
|
||||||
|
document.create_source_directory()
|
||||||
|
Path(document.source_path).touch()
|
||||||
|
|
||||||
|
# Test source_path
|
||||||
|
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/none/none-0000001.pdf")
|
||||||
|
|
||||||
|
# Rename the document "illegaly"
|
||||||
|
os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test")
|
||||||
|
os.rename(settings.MEDIA_ROOT + "/documents/originals/" +
|
||||||
|
"none/none-0000001.pdf",
|
||||||
|
settings.MEDIA_ROOT + "/documents/originals/" +
|
||||||
|
"test/test-0000001.pdf")
|
||||||
|
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
|
||||||
|
"originals/test/test-0000001.pdf"), True)
|
||||||
|
|
||||||
|
# Set new correspondent and expect document to be saved properly
|
||||||
|
document.correspondent = Correspondent.objects.get_or_create(
|
||||||
|
name="foo")[0]
|
||||||
|
document.save()
|
||||||
|
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
|
||||||
|
"originals/foo/foo-0000001.pdf"), True)
|
||||||
|
|
||||||
|
# Check proper handling of files
|
||||||
|
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/foo"), True)
|
||||||
|
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/none"), False)
|
||||||
|
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/test"), False)
|
||||||
|
self.assertEqual(document.generate_source_filename(),
|
||||||
|
"foo/foo-0000001.pdf")
|
||||||
|
|
||||||
|
@override_settings(MEDIA_ROOT="/tmp/paperless-tests-{}".
|
||||||
|
format(str(uuid4())[:8]))
|
||||||
|
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
|
||||||
|
"{correspondent}")
|
||||||
|
def test_document_renamed_encrypted(self):
|
||||||
|
document = Document()
|
||||||
|
document.file_type = "pdf"
|
||||||
|
document.storage_type = Document.STORAGE_TYPE_GPG
|
||||||
|
document.save()
|
||||||
|
|
||||||
|
# Ensure that filename is properly generated
|
||||||
|
tmp = document.source_filename
|
||||||
|
self.assertEqual(document.generate_source_filename(),
|
||||||
|
"none/none-0000001.pdf.gpg")
|
||||||
|
document.create_source_directory()
|
||||||
|
Path(document.source_path).touch()
|
||||||
|
|
||||||
|
# Test source_path
|
||||||
|
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/none/none-0000001.pdf.gpg")
|
||||||
|
|
||||||
|
# Rename the document "illegaly"
|
||||||
|
os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test")
|
||||||
|
os.rename(settings.MEDIA_ROOT + "/documents/originals/" +
|
||||||
|
"none/none-0000001.pdf.gpg",
|
||||||
|
settings.MEDIA_ROOT + "/documents/originals/" +
|
||||||
|
"test/test-0000001.pdf.gpg")
|
||||||
|
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
|
||||||
|
"originals/test/test-0000001.pdf.gpg"), True)
|
||||||
|
|
||||||
|
# Set new correspondent and expect document to be saved properly
|
||||||
|
document.correspondent = Correspondent.objects.get_or_create(
|
||||||
|
name="foo")[0]
|
||||||
|
document.save()
|
||||||
|
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
|
||||||
|
"originals/foo/foo-0000001.pdf.gpg"), True)
|
||||||
|
|
||||||
|
# Check proper handling of files
|
||||||
|
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/foo"), True)
|
||||||
|
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/none"), False)
|
||||||
|
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
|
||||||
|
"/documents/originals/test"), False)
|
||||||
|
self.assertEqual(document.generate_source_filename(),
|
||||||
|
"foo/foo-0000001.pdf.gpg")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user