diff --git a/paperless.conf.example b/paperless.conf.example index b04e93f94..b99995b8f 100644 --- a/paperless.conf.example +++ b/paperless.conf.example @@ -54,6 +54,17 @@ PAPERLESS_CONSUME_MAIL_PASS="" # ignored. PAPERLESS_EMAIL_SECRET="" +# Specify a filename format for the document (directories are supported) +# Use the following placeholders: +# * {correspondent} +# * {title} +# * {created} +# * {added} +# * {tags[KEY]} If your tags conform to key_value or key-value +# * {tags[INDEX]} If your tags are strings, select the tag by index +# Uniqueness of filenames is ensured, as an incrementing counter is attached +# to each filename. +#PAPERLESS_FILENAME_FORMAT="" ############################################################################### #### Security #### diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 4e9f17116..282b688b1 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -239,6 +239,9 @@ class Consumer: self._write(document, doc, document.source_path) self._write(document, thumbnail, document.thumbnail_path) + document.set_filename(document.source_filename) + document.save() + self.log("info", "Completed") return document diff --git a/src/documents/management/commands/document_renamer.py b/src/documents/management/commands/document_renamer.py new file mode 100644 index 000000000..d7d77a111 --- /dev/null +++ b/src/documents/management/commands/document_renamer.py @@ -0,0 +1,24 @@ +from django.core.management.base import BaseCommand + +from documents.models import Document, Tag + +from ...mixins import Renderable + + +class Command(Renderable, BaseCommand): + + help = """ + This will rename all documents to match the latest filename format. + """.replace(" ", "") + + def __init__(self, *args, **kwargs): + self.verbosity = 0 + BaseCommand.__init__(self, *args, **kwargs) + + def handle(self, *args, **options): + + self.verbosity = options["verbosity"] + + for document in Document.objects.all(): + # Saving the document again will generate a new filename and rename + document.save() diff --git a/src/documents/migrations/0023_document_current_filename.py b/src/documents/migrations/0023_document_current_filename.py new file mode 100644 index 000000000..be78ea863 --- /dev/null +++ b/src/documents/migrations/0023_document_current_filename.py @@ -0,0 +1,37 @@ +# Generated by Django 2.0.10 on 2019-04-26 18:57 + +from django.db import migrations, models + + +def set_filename(apps, schema_editor): + Document = apps.get_model("documents", "Document") + for doc in Document.objects.all(): + file_name = "{:07}.{}".format(doc.pk, doc.file_type) + if doc.storage_type == "gpg": + file_name += ".gpg" + + # Set filename + doc.filename = file_name + + # Save document + doc.save() + + +class Migration(migrations.Migration): + + dependencies = [ + ('documents', '0022_auto_20181007_1420'), + ] + + operations = [ + migrations.AddField( + model_name='document', + name='filename', + field=models.FilePathField(default=None, + null=True, + editable=False, + help_text='Current filename in storage', + max_length=256), + ), + migrations.RunPython(set_filename) + ] diff --git a/src/documents/models.py b/src/documents/models.py index c6fc8191e..518a7b617 100644 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -7,12 +7,14 @@ import uuid from collections import OrderedDict import dateutil.parser +from django.dispatch import receiver from django.conf import settings from django.db import models from django.template.defaultfilters import slugify from django.utils import timezone from django.utils.text import slugify from fuzzywuzzy import fuzz +from collections import defaultdict from .managers import LogManager @@ -254,6 +256,14 @@ class Document(models.Model): added = models.DateTimeField( default=timezone.now, editable=False, db_index=True) + filename = models.FilePathField( + max_length=256, + editable=False, + default=None, + null=True, + help_text="Current filename in storage" + ) + class Meta: ordering = ("correspondent", "title") @@ -266,18 +276,125 @@ class Document(models.Model): return "{}: {}".format(created, self.correspondent or self.title) return str(created) + def find_renamed_document(self, subdirectory=""): + suffix = "%07i.%s" % (self.pk, self.file_type) + + # Append .gpg for encrypted files + if self.storage_type == self.STORAGE_TYPE_GPG: + suffix += ".gpg" + + # Go up in the directory hierarchy and try to delete all directories + root = os.path.normpath(Document.filename_to_path(subdirectory)) + + for filename in os.listdir(root): + if filename.endswith(suffix): + return os.path.join(subdirectory, filename) + + fullname = os.path.join(subdirectory, filename) + if os.path.isdir(Document.filename_to_path(fullname)): + return self.find_renamed_document(fullname) + + return None + + @property + def source_filename(self): + # Initial filename generation (for new documents) + if self.filename is None: + self.filename = self.generate_source_filename() + + # Check if document is still available under filename + elif not os.path.isfile(Document.filename_to_path(self.filename)): + recovered_filename = self.find_renamed_document() + + # If we have found the file so update the filename + if recovered_filename is not None: + logger = logging.getLogger(__name__) + logger.warning("Filename of document " + str(self.id) + + " has changed and was successfully updated") + self.filename = recovered_filename + + # Remove all empty subdirectories from MEDIA_ROOT + Document.delete_all_empty_subdirectories( + Document.filename_to_path("")) + else: + logger = logging.getLogger(__name__) + logger.error("File of document " + str(self.id) + " has " + + "gone and could not be recovered") + + return self.filename + + @staticmethod + def many_to_dictionary(field): + # Converts ManyToManyField to dictionary by assuming, that field + # entries contain an _ or - which will be used as a delimiter + mydictionary = dict() + + for index, t in enumerate(field.all()): + # Populate tag names by index + mydictionary[index] = slugify(t.name) + + # Find delimiter + delimiter = t.name.find('_') + + if delimiter is -1: + delimiter = t.name.find('-') + + if delimiter is -1: + continue + + key = t.name[:delimiter] + value = t.name[delimiter+1:] + + mydictionary[slugify(key)] = slugify(value) + + return mydictionary + + def generate_source_filename(self): + # Create filename based on configured format + if settings.PAPERLESS_FILENAME_FORMAT is not None: + tags = defaultdict(lambda: slugify(None), + self.many_to_dictionary(self.tags)) + path = settings.PAPERLESS_FILENAME_FORMAT.format( + correspondent=slugify(self.correspondent), + title=slugify(self.title), + created=slugify(self.created), + added=slugify(self.added), + tags=tags) + else: + path = "" + + # Always append the primary key to guarantee uniqueness of filename + if len(path) > 0: + filename = "%s-%07i.%s" % (path, self.pk, self.file_type) + else: + filename = "%07i.%s" % (self.pk, self.file_type) + + # Append .gpg for encrypted files + if self.storage_type == self.STORAGE_TYPE_GPG: + filename += ".gpg" + + return filename + + def create_source_directory(self): + new_filename = self.generate_source_filename() + + # Determine the full "target" path + dir_new = Document.filename_to_path(os.path.dirname(new_filename)) + + # Create new path + os.makedirs(dir_new, exist_ok=True) + @property def source_path(self): + return Document.filename_to_path(self.source_filename) - file_name = "{:07}.{}".format(self.pk, self.file_type) - if self.storage_type == self.STORAGE_TYPE_GPG: - file_name += ".gpg" - + @staticmethod + def filename_to_path(filename): return os.path.join( settings.MEDIA_ROOT, "documents", "originals", - file_name + filename ) @property @@ -314,6 +431,125 @@ class Document(models.Model): def thumbnail_url(self): return reverse("fetch", kwargs={"kind": "thumb", "pk": self.pk}) + def set_filename(self, filename): + if os.path.isfile(Document.filename_to_path(filename)): + self.filename = filename + + @staticmethod + def try_delete_empty_directories(directory): + # Go up in the directory hierarchy and try to delete all directories + directory = os.path.normpath(directory) + root = os.path.normpath(Document.filename_to_path("")) + + while directory != root: + # Try to delete the current directory + try: + os.rmdir(directory) + except os.error: + # Directory not empty, no need to go further up + return + + # Cut off actual directory and go one level up + directory, _ = os.path.split(directory) + directory = os.path.normpath(directory) + + @staticmethod + def delete_all_empty_subdirectories(directory): + # Go through all folders and try to delete all directories + root = os.path.normpath(Document.filename_to_path(directory)) + + for filename in os.listdir(root): + fullname = os.path.join(directory, filename) + + if not os.path.isdir(Document.filename_to_path(fullname)): + continue + + # Go into subdirectory to see, if there is more to delete + Document.delete_all_empty_subdirectories( + os.path.join(directory, filename)) + + # Try to delete the directory + try: + os.rmdir(Document.filename_to_path(fullname)) + continue + except os.error: + # Directory not empty, no need to go further up + continue + + +@receiver(models.signals.m2m_changed, sender=Document.tags.through) +@receiver(models.signals.post_save, sender=Document) +def update_filename(sender, instance, **kwargs): + # Skip if document has not been saved yet + if instance.filename is None: + return + + # Check is file exists and update filename otherwise + if not os.path.isfile(Document.filename_to_path(instance.filename)): + instance.filename = instance.source_filename + + # Build the new filename + new_filename = instance.generate_source_filename() + + # If the filename is the same, then nothing needs to be done + if instance.filename == new_filename: + return + + # Determine the full "target" path + path_new = instance.filename_to_path(new_filename) + dir_new = instance.filename_to_path(os.path.dirname(new_filename)) + + # Create new path + instance.create_source_directory() + + # Determine the full "current" path + path_current = instance.filename_to_path(instance.source_filename) + + # Move file + try: + os.rename(path_current, path_new) + except PermissionError: + # Do not update filename in object + return + except FileNotFoundError: + logger = logging.getLogger(__name__) + logger.error("Renaming of document " + str(instance.id) + " failed " + + "as file " + instance.filename + " was no longer present") + return + + # Delete empty directory + old_dir = os.path.dirname(instance.filename) + old_path = instance.filename_to_path(old_dir) + Document.try_delete_empty_directories(old_path) + + instance.filename = new_filename + + # Save instance + # This will not cause a cascade of post_save signals, as next time + # nothing needs to be renamed + instance.save() + + +@receiver(models.signals.post_delete, sender=Document) +def delete_files(sender, instance, **kwargs): + if instance.filename is None: + return + + # Remove the document + old_file = instance.filename_to_path(instance.filename) + + try: + os.remove(old_file) + except FileNotFoundError: + logger = logging.getLogger(__name__) + logger.warning("Deleted document " + str(instance.id) + " but file " + + old_file + " was no longer present") + + # And remove the directory (if applicable) + old_dir = os.path.dirname(instance.filename) + old_path = instance.filename_to_path(old_dir) + Document.try_delete_empty_directories(old_path) + class Log(models.Model): diff --git a/src/documents/tests/test_file_handling.py b/src/documents/tests/test_file_handling.py new file mode 100644 index 000000000..d55a50cd2 --- /dev/null +++ b/src/documents/tests/test_file_handling.py @@ -0,0 +1,559 @@ +import datetime +import os +import shutil +from unittest import mock +from uuid import uuid4 +from pathlib import Path +from shutil import rmtree + +from dateutil import tz +from django.test import TestCase, override_settings + +from django.utils.text import slugify +from ..models import Tag, Document, Correspondent +from django.conf import settings + + +class TestDate(TestCase): + deletion_list = [] + + def add_to_deletion_list(self, dirname): + self.deletion_list.append(dirname) + + def setUp(self): + folder = "/tmp/paperless-tests-{}".format(str(uuid4())[:8]) + os.makedirs(folder + "/documents/originals") + storage_override = override_settings(MEDIA_ROOT=folder) + storage_override.enable() + self.add_to_deletion_list(folder) + + def tearDown(self): + for dirname in self.deletion_list: + shutil.rmtree(dirname, ignore_errors=True) + + @override_settings(PAPERLESS_FILENAME_FORMAT="") + def test_source_filename(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + self.assertEqual(document.source_filename, "0000001.pdf") + + document.filename = "test.pdf" + self.assertEqual(document.source_filename, "test.pdf") + + @override_settings(PAPERLESS_FILENAME_FORMAT="") + def test_generate_source_filename(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + self.assertEqual(document.generate_source_filename(), "0000001.pdf") + + document.storage_type = Document.STORAGE_TYPE_GPG + self.assertEqual(document.generate_source_filename(), + "0000001.pdf.gpg") + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_file_renaming(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf") + + # Enable encryption and check again + document.storage_type = Document.STORAGE_TYPE_GPG + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf.gpg") + document.save() + + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), True) + + # Set a correspondent and save the document + document.correspondent = Correspondent.objects.get_or_create( + name="test")[0] + document.save() + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/test"), True) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/test/test-0000001.pdf.gpg"), True) + self.assertEqual(document.generate_source_filename(), + "test/test-0000001.pdf.gpg") + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_file_renaming_missing_permissions(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf") + + # Make the folder read- and execute-only (no writing and no renaming) + os.chmod(settings.MEDIA_ROOT + "/documents/originals/none", 0o555) + + # Set a correspondent and save the document + document.correspondent = Correspondent.objects.get_or_create( + name="test")[0] + document.save() + + # Check proper handling of files + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/none/none-0000001.pdf"), True) + self.assertEqual(document.source_filename, + "none/none-0000001.pdf") + + os.chmod(settings.MEDIA_ROOT + "/documents/originals/none", 0o777) + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_delete(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Ensure file deletion after delete + document.delete() + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_delete_nofile(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_directory_not_empty(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + Path(document.source_path + "test").touch() + + # Set a correspondent and save the document + document.correspondent = Correspondent.objects.get_or_create( + name="test")[0] + document.save() + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/test"), True) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), True) + + # Cleanup + os.remove(settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdftest") + os.rmdir(settings.MEDIA_ROOT + "/documents/originals/none") + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}") + def test_tags_with_underscore(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Add tag to document + document.tags.create(name="type_demo") + document.tags.create(name="foo_bar") + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "demo-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}") + def test_tags_with_dash(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Add tag to document + document.tags.create(name="type-demo") + document.tags.create(name="foo-bar") + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "demo-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}") + def test_tags_malformed(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Add tag to document + document.tags.create(name="type:demo") + document.tags.create(name="foo:bar") + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[0]}") + def test_tags_all(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Add tag to document + document.tags.create(name="demo") + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "demo-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[0]}") + def test_tags_out_of_bounds_0(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[10000000]}") + def test_tags_out_of_bounds_10000000(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[99]}") + def test_tags_out_of_bounds_99(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + document.delete() + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}/{correspondent}") + def test_nested_directory_cleanup(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none/none"), True) + + document.delete() + + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + + "/documents/originals/none/none/none-0000001.pdf"), + False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none/none"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals"), True) + + @override_settings(PAPERLESS_FILENAME_FORMAT=None) + def test_format_none(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + self.assertEqual(document.generate_source_filename(), "0000001.pdf") + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_renamed(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf") + + # Rename the document "illegaly" + os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test") + os.rename(settings.MEDIA_ROOT + "/documents/originals/" + + "none/none-0000001.pdf", + settings.MEDIA_ROOT + "/documents/originals/" + + "test/test-0000001.pdf") + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/test/test-0000001.pdf"), True) + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/none/none-0000001.pdf"), False) + + # Set new correspondent and expect document to be saved properly + document.correspondent = Correspondent.objects.get_or_create( + name="foo")[0] + document.save() + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/foo/foo-0000001.pdf"), True) + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/foo"), True) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/test"), False) + self.assertEqual(document.generate_source_filename(), + "foo/foo-0000001.pdf") + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_renamed_encrypted(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_GPG + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf.gpg") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf.gpg") + + # Rename the document "illegaly" + os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test") + os.rename(settings.MEDIA_ROOT + "/documents/originals/" + + "none/none-0000001.pdf.gpg", + settings.MEDIA_ROOT + "/documents/originals/" + + "test/test-0000001.pdf.gpg") + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/test/test-0000001.pdf.gpg"), True) + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/none/none-0000001.pdf"), False) + + # Set new correspondent and expect document to be saved properly + document.correspondent = Correspondent.objects.get_or_create( + name="foo")[0] + document.save() + self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + + "originals/foo/foo-0000001.pdf.gpg"), True) + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/foo"), True) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), False) + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/test"), False) + self.assertEqual(document.generate_source_filename(), + "foo/foo-0000001.pdf.gpg") + + def test_delete_all_empty_subdirectories(self): + # Create our working directory + tmp = "/tmp/paperless-tests-{}".format(str(uuid4())[:8]) + os.makedirs(tmp) + self.add_to_deletion_list(tmp) + + os.makedirs(os.path.join(tmp, "empty")) + os.makedirs(os.path.join(tmp, "empty", "subdirectory")) + + os.makedirs(os.path.join(tmp, "notempty")) + Path(os.path.join(tmp, "notempty", "file")).touch() + + Document.delete_all_empty_subdirectories(tmp) + + self.assertEqual(os.path.isdir(os.path.join(tmp, "notempty")), True) + self.assertEqual(os.path.isdir(os.path.join(tmp, "empty")), False) + self.assertEqual(os.path.isfile( + os.path.join(tmp, "notempty", "file")), True) + + def test_try_delete_empty_directories(self): + # Create our working directory + tmp = "/tmp/paperless-tests-{}".format(str(uuid4())[:8]) + os.makedirs(tmp) + self.add_to_deletion_list(tmp) + + os.makedirs(os.path.join(tmp, "notempty")) + Path(os.path.join(tmp, "notempty", "file")).touch() + os.makedirs(os.path.join(tmp, "notempty", "empty")) + + Document.try_delete_empty_directories( + os.path.join(tmp, "notempty", "empty")) + self.assertEqual(os.path.isdir(os.path.join(tmp, "notempty")), True) + self.assertEqual(os.path.isfile( + os.path.join(tmp, "notempty", "file")), True) + self.assertEqual(os.path.isdir( + os.path.join(tmp, "notempty", "empty")), False) + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_document_accidentally_deleted(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Test source_path + self.assertEqual(document.source_path, settings.MEDIA_ROOT + + "/documents/originals/none/none-0000001.pdf") + + # Delete the document "illegaly" + os.remove(settings.MEDIA_ROOT + "/documents/originals/" + + "none/none-0000001.pdf") + + # Set new correspondent and expect document to be saved properly + document.correspondent = Correspondent.objects.get_or_create( + name="foo")[0] + document.save() + + # Check proper handling of files + self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + + "/documents/originals/none"), True) + self.assertEqual(document.source_filename, + "none/none-0000001.pdf") + + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + + "{correspondent}") + def test_set_filename(self): + document = Document() + document.file_type = "pdf" + document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + document.save() + + # Ensure that filename is properly generated + tmp = document.source_filename + self.assertEqual(document.generate_source_filename(), + "none/none-0000001.pdf") + document.create_source_directory() + Path(document.source_path).touch() + + # Set existing filename + document.set_filename(tmp) + self.assertEqual(document.source_filename, "none/none-0000001.pdf") + + # Set non-existing filename + document.set_filename("doesnotexist") + self.assertEqual(document.source_filename, "none/none-0000001.pdf") diff --git a/src/paperless/settings.py b/src/paperless/settings.py index ffd1140a6..4667e62f5 100644 --- a/src/paperless/settings.py +++ b/src/paperless/settings.py @@ -334,3 +334,6 @@ for t in json.loads(os.getenv("PAPERLESS_FILENAME_PARSE_TRANSFORMS", "[]")): # well. Set to 0 to disable this filter. PAPERLESS_RECENT_CORRESPONDENT_YEARS = int(os.getenv( "PAPERLESS_RECENT_CORRESPONDENT_YEARS", 0)) + +# Specify the filename format for out files +PAPERLESS_FILENAME_FORMAT = os.getenv("PAPERLESS_FILENAME_FORMAT")