diff --git a/src/documents/apps.py b/src/documents/apps.py index 83e671d07..6cf815122 100644 --- a/src/documents/apps.py +++ b/src/documents/apps.py @@ -14,7 +14,6 @@ class DocumentsConfig(AppConfig): add_inbox_tags, run_pre_consume_script, run_post_consume_script, - cleanup_document_deletion, set_log_entry, set_correspondent, set_document_type, @@ -33,6 +32,4 @@ class DocumentsConfig(AppConfig): document_consumption_finished.connect(add_to_index) document_consumption_finished.connect(run_post_consume_script) - post_delete.connect(cleanup_document_deletion) - AppConfig.ready(self) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index 96aad7d49..2e8c5493f 100755 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -11,6 +11,7 @@ from django.utils import timezone from paperless.db import GnuPG from .classifier import DocumentClassifier, IncompatibleClassifierVersionError +from .file_handling import generate_filename, create_source_path_directory from .models import Document, FileInfo from .parsers import ParseError, get_parser_class from .signals import ( @@ -174,10 +175,15 @@ class Consumer: self.log("debug", "Tagging with {}".format(tag_names)) document.tags.add(*relevant_tags) + document.filename = generate_filename(document) + + create_source_path_directory(document.source_path) + self._write(document, doc, document.source_path) self._write(document, thumbnail, document.thumbnail_path) - #TODO: why do we need to save the document again? + # We need to save the document twice, since we need the PK of the + # document in order to create its filename above. document.save() return document diff --git a/src/documents/file_handling.py b/src/documents/file_handling.py new file mode 100644 index 000000000..cac317d4c --- /dev/null +++ b/src/documents/file_handling.py @@ -0,0 +1,92 @@ +import os +from collections import defaultdict + +from django.conf import settings +from django.template.defaultfilters import slugify + + +def create_source_path_directory(source_path): + os.makedirs(os.path.dirname(source_path), exist_ok=True) + + +def delete_empty_directories(directory): + # Go up in the directory hierarchy and try to delete all directories + directory = os.path.normpath(directory) + root = os.path.normpath(settings.ORIGINALS_DIR) + + if not directory.startswith(root + os.path.sep): + # don't do anything outside our originals folder. + + # append os.path.set so that we avoid these cases: + # directory = /home/originals2/test + # root = /home/originals ("/" gets appended and startswith fails) + return + + while directory != root: + if not os.listdir(directory): + # it's empty + try: + os.rmdir(directory) + except OSError: + # whatever. empty directories aren't that bad anyway. + return + else: + # it's not empty. + return + + # go one level up + directory = os.path.normpath(os.path.dirname(directory)) + + +def many_to_dictionary(field): + # Converts ManyToManyField to dictionary by assuming, that field + # entries contain an _ or - which will be used as a delimiter + mydictionary = dict() + + for index, t in enumerate(field.all()): + # Populate tag names by index + mydictionary[index] = slugify(t.name) + + # Find delimiter + delimiter = t.name.find('_') + + if delimiter == -1: + delimiter = t.name.find('-') + + if delimiter == -1: + continue + + key = t.name[:delimiter] + value = t.name[delimiter + 1:] + + mydictionary[slugify(key)] = slugify(value) + + return mydictionary + + +def generate_filename(document): + # Create filename based on configured format + if settings.PAPERLESS_FILENAME_FORMAT is not None: + tags = defaultdict(lambda: slugify(None), + many_to_dictionary(document.tags)) + path = settings.PAPERLESS_FILENAME_FORMAT.format( + correspondent=slugify(document.correspondent), + title=slugify(document.title), + created=document.created.date(), + added=slugify(document.added), + tags=tags, + ) + else: + path = "" + + # Always append the primary key to guarantee uniqueness of filename + if len(path) > 0: + filename = "%s-%07i.%s" % (path, document.pk, document.file_type) + else: + filename = "%07i.%s" % (document.pk, document.file_type) + + # Append .gpg for encrypted files + if document.storage_type == document.STORAGE_TYPE_GPG: + filename += ".gpg" + + return filename diff --git a/src/documents/management/commands/document_importer.py b/src/documents/management/commands/document_importer.py index ae5c1853f..ef3eaafc0 100644 --- a/src/documents/management/commands/document_importer.py +++ b/src/documents/management/commands/document_importer.py @@ -8,6 +8,7 @@ from django.core.management import call_command from documents.models import Document from paperless.db import GnuPG +from ...file_handling import generate_filename, create_source_path_directory from ...mixins import Renderable @@ -82,6 +83,10 @@ class Command(Renderable, BaseCommand): def _import_files_from_manifest(self): + storage_type = Document.STORAGE_TYPE_UNENCRYPTED + if settings.PASSPHRASE: + storage_type = Document.STORAGE_TYPE_GPG + for record in self.manifest: if not record["model"] == "documents.document": @@ -94,6 +99,14 @@ class Command(Renderable, BaseCommand): document_path = os.path.join(self.source, doc_file) thumbnail_path = os.path.join(self.source, thumb_file) + document.storage_type = storage_type + document.filename = generate_filename(document) + + if os.path.isfile(document.source_path): + raise FileExistsError(document.source_path) + + create_source_path_directory(document.source_path) + if settings.PASSPHRASE: with open(document_path, "rb") as unencrypted: @@ -109,18 +122,8 @@ class Command(Renderable, BaseCommand): encrypted.write(GnuPG.encrypted(unencrypted)) else: - + print("Moving {} to {}".format(document_path, document.source_path)) shutil.copy(document_path, document.source_path) shutil.copy(thumbnail_path, document.thumbnail_path) - # Reset the storage type to whatever we've used while importing - - storage_type = Document.STORAGE_TYPE_UNENCRYPTED - if settings.PASSPHRASE: - storage_type = Document.STORAGE_TYPE_GPG - - Document.objects.filter( - pk__in=[r["pk"] for r in self.manifest] - ).update( - storage_type=storage_type - ) + document.save() diff --git a/src/documents/migrations/1002_auto_20201111_1105.py b/src/documents/migrations/1002_auto_20201111_1105.py new file mode 100644 index 000000000..7f6bae50b --- /dev/null +++ b/src/documents/migrations/1002_auto_20201111_1105.py @@ -0,0 +1,18 @@ +# Generated by Django 3.1.3 on 2020-11-11 11:05 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('documents', '1001_auto_20201109_1636'), + ] + + operations = [ + migrations.AlterField( + model_name='document', + name='filename', + field=models.FilePathField(default=None, editable=False, help_text='Current filename in storage', max_length=1024, null=True), + ), + ] diff --git a/src/documents/models.py b/src/documents/models.py index 88598b5f6..ab3262fb5 100755 --- a/src/documents/models.py +++ b/src/documents/models.py @@ -3,18 +3,15 @@ import logging import os import re -from collections import OrderedDict, defaultdict +from collections import OrderedDict import dateutil.parser from django.conf import settings from django.db import models -from django.dispatch import receiver -from django.template.defaultfilters import slugify from django.utils import timezone from django.utils.text import slugify - class MatchingModel(models.Model): MATCH_ANY = 1 @@ -192,7 +189,7 @@ class Document(models.Model): default=timezone.now, editable=False, db_index=True) filename = models.FilePathField( - max_length=256, + max_length=1024, editable=False, default=None, null=True, @@ -220,123 +217,18 @@ class Document(models.Model): return "{}: {}".format(created, self.correspondent or self.title) return str(created) - def find_renamed_document(self, subdirectory=""): - suffix = "%07i.%s" % (self.pk, self.file_type) - - # Append .gpg for encrypted files - if self.storage_type == self.STORAGE_TYPE_GPG: - suffix += ".gpg" - - # Go up in the directory hierarchy and try to delete all directories - root = os.path.normpath(Document.filename_to_path(subdirectory)) - - for filename in os.listdir(root): - if filename.endswith(suffix): - return os.path.join(subdirectory, filename) - - fullname = os.path.join(subdirectory, filename) - if os.path.isdir(Document.filename_to_path(fullname)): - return self.find_renamed_document(fullname) - - return None - - @property - def source_filename(self): - # Initial filename generation (for new documents) - if self.filename is None: - self.filename = self.generate_source_filename() - - # Check if document is still available under filename - elif not os.path.isfile(Document.filename_to_path(self.filename)): - recovered_filename = self.find_renamed_document() - - # If we have found the file so update the filename - if recovered_filename is not None: - logger = logging.getLogger(__name__) - logger.warning("Filename of document " + str(self.id) + - " has changed and was successfully updated") - self.filename = recovered_filename - - # Remove all empty subdirectories from MEDIA_ROOT - Document.delete_all_empty_subdirectories( - Document.filename_to_path("")) - else: - logger = logging.getLogger(__name__) - logger.error("File of document " + str(self.id) + " has " + - "gone and could not be recovered") - - return self.filename - - @staticmethod - def many_to_dictionary(field): - # Converts ManyToManyField to dictionary by assuming, that field - # entries contain an _ or - which will be used as a delimiter - mydictionary = dict() - - for index, t in enumerate(field.all()): - # Populate tag names by index - mydictionary[index] = slugify(t.name) - - # Find delimiter - delimiter = t.name.find('_') - - if delimiter == -1: - delimiter = t.name.find('-') - - if delimiter == -1: - continue - - key = t.name[:delimiter] - value = t.name[delimiter+1:] - - mydictionary[slugify(key)] = slugify(value) - - return mydictionary - - def generate_source_filename(self): - # Create filename based on configured format - if settings.PAPERLESS_FILENAME_FORMAT is not None: - tags = defaultdict(lambda: slugify(None), - self.many_to_dictionary(self.tags)) - path = settings.PAPERLESS_FILENAME_FORMAT.format( - correspondent=slugify(self.correspondent), - title=slugify(self.title), - created=slugify(self.created), - added=slugify(self.added), - tags=tags) - else: - path = "" - - # Always append the primary key to guarantee uniqueness of filename - if len(path) > 0: - filename = "%s-%07i.%s" % (path, self.pk, self.file_type) - else: - filename = "%07i.%s" % (self.pk, self.file_type) - - # Append .gpg for encrypted files - if self.storage_type == self.STORAGE_TYPE_GPG: - filename += ".gpg" - - return filename - - def create_source_directory(self): - new_filename = self.generate_source_filename() - - # Determine the full "target" path - dir_new = Document.filename_to_path(os.path.dirname(new_filename)) - - # Create new path - os.makedirs(dir_new, exist_ok=True) - @property def source_path(self): - return Document.filename_to_path(self.source_filename) + if self.filename: + fname = str(self.filename) + else: + fname = "{:07}.{}".format(self.pk, self.file_type) + if self.storage_type == self.STORAGE_TYPE_GPG: + fname += ".gpg" - @staticmethod - def filename_to_path(filename): return os.path.join( settings.ORIGINALS_DIR, - filename + fname ) @property @@ -362,125 +254,6 @@ class Document(models.Model): def thumbnail_file(self): return open(self.thumbnail_path, "rb") - def set_filename(self, filename): - if os.path.isfile(Document.filename_to_path(filename)): - self.filename = filename - - @staticmethod - def try_delete_empty_directories(directory): - # Go up in the directory hierarchy and try to delete all directories - directory = os.path.normpath(directory) - root = os.path.normpath(Document.filename_to_path("")) - - while directory != root: - # Try to delete the current directory - try: - os.rmdir(directory) - except os.error: - # Directory not empty, no need to go further up - return - - # Cut off actual directory and go one level up - directory, _ = os.path.split(directory) - directory = os.path.normpath(directory) - - @staticmethod - def delete_all_empty_subdirectories(directory): - # Go through all folders and try to delete all directories - root = os.path.normpath(Document.filename_to_path(directory)) - - for filename in os.listdir(root): - fullname = os.path.join(directory, filename) - - if not os.path.isdir(Document.filename_to_path(fullname)): - continue - - # Go into subdirectory to see, if there is more to delete - Document.delete_all_empty_subdirectories( - os.path.join(directory, filename)) - - # Try to delete the directory - try: - os.rmdir(Document.filename_to_path(fullname)) - continue - except os.error: - # Directory not empty, no need to go further up - continue - - -@receiver(models.signals.m2m_changed, sender=Document.tags.through) -@receiver(models.signals.post_save, sender=Document) -def update_filename(sender, instance, **kwargs): - # Skip if document has not been saved yet - if instance.filename is None: - return - - # Check is file exists and update filename otherwise - if not os.path.isfile(Document.filename_to_path(instance.filename)): - instance.filename = instance.source_filename - - # Build the new filename - new_filename = instance.generate_source_filename() - - # If the filename is the same, then nothing needs to be done - if instance.filename == new_filename: - return - - # Determine the full "target" path - path_new = instance.filename_to_path(new_filename) - dir_new = instance.filename_to_path(os.path.dirname(new_filename)) - - # Create new path - instance.create_source_directory() - - # Determine the full "current" path - path_current = instance.filename_to_path(instance.source_filename) - - # Move file - try: - os.rename(path_current, path_new) - except PermissionError: - # Do not update filename in object - return - except FileNotFoundError: - logger = logging.getLogger(__name__) - logger.error("Renaming of document " + str(instance.id) + " failed " + - "as file " + instance.filename + " was no longer present") - return - - # Delete empty directory - old_dir = os.path.dirname(instance.filename) - old_path = instance.filename_to_path(old_dir) - Document.try_delete_empty_directories(old_path) - - instance.filename = new_filename - - # Save instance - # This will not cause a cascade of post_save signals, as next time - # nothing needs to be renamed - instance.save() - - -@receiver(models.signals.post_delete, sender=Document) -def delete_files(sender, instance, **kwargs): - if instance.filename is None: - return - - # Remove the document - old_file = instance.filename_to_path(instance.filename) - - try: - os.remove(old_file) - except FileNotFoundError: - logger = logging.getLogger(__name__) - logger.warning("Deleted document " + str(instance.id) + " but file " + - old_file + " was no longer present") - - # And remove the directory (if applicable) - old_dir = os.path.dirname(instance.filename) - old_path = instance.filename_to_path(old_dir) - Document.try_delete_empty_directories(old_path) - class Log(models.Model): diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index cee1e042b..671cdb104 100755 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -6,9 +6,13 @@ from django.conf import settings from django.contrib.admin.models import ADDITION, LogEntry from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType +from django.db import models, DatabaseError +from django.dispatch import receiver from django.utils import timezone from .. import index, matching +from ..file_handling import delete_empty_directories, generate_filename, \ + create_source_path_directory from ..models import Document, Tag @@ -141,17 +145,65 @@ def run_post_consume_script(sender, document, **kwargs): )).wait() +@receiver(models.signals.post_delete, sender=Document) def cleanup_document_deletion(sender, instance, using, **kwargs): - - if not isinstance(instance, Document): - return - for f in (instance.source_path, instance.thumbnail_path): try: os.unlink(f) except FileNotFoundError: pass # The file's already gone, so we're cool with it. + delete_empty_directories(os.path.dirname(instance.source_path)) + + +@receiver(models.signals.m2m_changed, sender=Document.tags.through) +@receiver(models.signals.post_save, sender=Document) +def update_filename_and_move_files(sender, instance, **kwargs): + + if not instance.filename: + # Can't update the filename if there is not filename to begin with + # This happens after the consumer creates a new document. + # The PK needs to be set first by saving the document once. When this + # happens, the file is not yet in the ORIGINALS_DIR, and thus can't be + # renamed anyway. In all other cases, instance.filename will be set. + return + + old_filename = instance.filename + old_path = instance.source_path + new_filename = generate_filename(instance) + + if new_filename == instance.filename: + # Don't do anything if its the same. + return + + new_path = os.path.join(settings.ORIGINALS_DIR, new_filename) + + if not os.path.isfile(old_path): + # Can't do anything if the old file does not exist anymore. + logging.getLogger(__name__).fatal('Document {}: File {} has gone.'.format(str(instance), old_path)) + return + + if os.path.isfile(new_path): + # Can't do anything if the new file already exists. Skip updating file. + logging.getLogger(__name__).warning('Document {}: Cannot rename file since target path {} already exists.'.format(str(instance), new_path)) + return + + create_source_path_directory(new_path) + + try: + os.rename(old_path, new_path) + instance.filename = new_filename + instance.save() + + except OSError as e: + instance.filename = old_filename + except DatabaseError as e: + os.rename(new_path, old_path) + instance.filename = old_filename + + if not os.path.isfile(old_path): + delete_empty_directories(os.path.dirname(old_path)) + def set_log_entry(sender, document=None, logging_group=None, **kwargs): diff --git a/src/documents/tests/test_file_handling.py b/src/documents/tests/test_file_handling.py index 3b7c757d4..e228acabb 100644 --- a/src/documents/tests/test_file_handling.py +++ b/src/documents/tests/test_file_handling.py @@ -10,6 +10,8 @@ from dateutil import tz from django.test import TestCase, override_settings from django.utils.text import slugify + +from ..file_handling import generate_filename, create_source_path_directory, delete_empty_directories from ..models import Tag, Document, Correspondent from django.conf import settings @@ -31,18 +33,6 @@ class TestDate(TestCase): for dirname in self.deletion_list: shutil.rmtree(dirname, ignore_errors=True) - @override_settings(PAPERLESS_FILENAME_FORMAT="") - def test_source_filename(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED - document.save() - - self.assertEqual(document.source_filename, "0000001.pdf") - - document.filename = "test.pdf" - self.assertEqual(document.source_filename, "test.pdf") - @override_settings(PAPERLESS_FILENAME_FORMAT="") def test_generate_source_filename(self): document = Document() @@ -50,40 +40,40 @@ class TestDate(TestCase): document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED document.save() - self.assertEqual(document.generate_source_filename(), "0000001.pdf") + self.assertEqual(generate_filename(document), "{:07d}.pdf".format(document.pk)) document.storage_type = Document.STORAGE_TYPE_GPG - self.assertEqual(document.generate_source_filename(), - "0000001.pdf.gpg") + self.assertEqual(generate_filename(document), + "{:07d}.pdf.gpg".format(document.pk)) - @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + - "{correspondent}") + @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{correspondent}") def test_file_renaming(self): document = Document() document.file_type = "pdf" document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED document.save() - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() + # Test default source_path + self.assertEqual(document.source_path, settings.ORIGINALS_DIR + "/{:07d}.pdf".format(document.pk)) - # Test source_path - self.assertEqual(document.source_path, settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdf") + document.filename = generate_filename(document) + + # Ensure that filename is properly generated + self.assertEqual(document.filename, "none/none-{:07d}.pdf".format(document.pk)) # Enable encryption and check again document.storage_type = Document.STORAGE_TYPE_GPG - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf.gpg") + document.filename = generate_filename(document) + self.assertEqual(document.filename, + "none/none-{:07d}.pdf.gpg".format(document.pk)) + document.save() - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), True) + # test that creating dirs for the source_path creates the correct directory + create_source_path_directory(document.source_path) + Path(document.source_path).touch() + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/none"), True) # Set a correspondent and save the document document.correspondent = Correspondent.objects.get_or_create( @@ -91,14 +81,12 @@ class TestDate(TestCase): document.save() # Check proper handling of files - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/test"), True) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), False) - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/test/test-0000001.pdf.gpg"), True) - self.assertEqual(document.generate_source_filename(), - "test/test-0000001.pdf.gpg") + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/test"), True) + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/none"), False) + self.assertEqual(os.path.isfile(settings.ORIGINALS_DIR + + "/test/test-{:07d}.pdf.gpg".format(document.pk)), True) @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + "{correspondent}") @@ -109,18 +97,18 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() + document.filename = generate_filename(document) + self.assertEqual(document.filename, + "none/none-{:07d}.pdf".format(document.pk)) + create_source_path_directory(document.source_path) Path(document.source_path).touch() # Test source_path - self.assertEqual(document.source_path, settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdf") + self.assertEqual(document.source_path, settings.ORIGINALS_DIR + + "/none/none-{:07d}.pdf".format(document.pk)) # Make the folder read- and execute-only (no writing and no renaming) - os.chmod(settings.MEDIA_ROOT + "/documents/originals/none", 0o555) + os.chmod(settings.ORIGINALS_DIR + "/none", 0o555) # Set a correspondent and save the document document.correspondent = Correspondent.objects.get_or_create( @@ -129,11 +117,12 @@ class TestDate(TestCase): # Check proper handling of files self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/none/none-0000001.pdf"), True) - self.assertEqual(document.source_filename, - "none/none-0000001.pdf") + "originals/none/none-{:07d}.pdf".format(document.pk)), True) + self.assertEqual(document.filename, + "none/none-{:07d}.pdf".format(document.pk)) + + os.chmod(settings.ORIGINALS_DIR + "/none", 0o777) - os.chmod(settings.MEDIA_ROOT + "/documents/originals/none", 0o777) @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + "{correspondent}") @@ -144,18 +133,20 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() + document.filename = generate_filename(document) + self.assertEqual(document.filename, + "none/none-{:07d}.pdf".format(document.pk)) + + create_source_path_directory(document.source_path) Path(document.source_path).touch() # Ensure file deletion after delete + pk = document.pk document.delete() - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdf"), False) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), False) + self.assertEqual(os.path.isfile(settings.ORIGINALS_DIR + + "/none/none-{:07d}.pdf".format(pk)), False) + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/none"), False) @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + "{correspondent}") @@ -176,12 +167,15 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() + document.filename = generate_filename(document) + self.assertEqual(document.filename, + "none/none-{:07d}.pdf".format(document.pk)) + + create_source_path_directory(document.source_path) + Path(document.source_path).touch() - Path(document.source_path + "test").touch() + important_file = document.source_path + "test" + Path(important_file).touch() # Set a correspondent and save the document document.correspondent = Correspondent.objects.get_or_create( @@ -193,11 +187,8 @@ class TestDate(TestCase): "/documents/originals/test"), True) self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + "/documents/originals/none"), True) + self.assertTrue(os.path.isfile(important_file)) - # Cleanup - os.remove(settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdftest") - os.rmdir(settings.MEDIA_ROOT + "/documents/originals/none") @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}") def test_tags_with_underscore(self): @@ -212,13 +203,8 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "demo-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - document.delete() + self.assertEqual(generate_filename(document), + "demo-{:07d}.pdf".format(document.pk)) @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}") def test_tags_with_dash(self): @@ -233,13 +219,8 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "demo-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - document.delete() + self.assertEqual(generate_filename(document), + "demo-{:07d}.pdf".format(document.pk)) @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}") def test_tags_malformed(self): @@ -254,13 +235,8 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - document.delete() + self.assertEqual(generate_filename(document), + "none-{:07d}.pdf".format(document.pk)) @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[0]}") def test_tags_all(self): @@ -274,61 +250,24 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "demo-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() + self.assertEqual(generate_filename(document), + "demo-{:07d}.pdf".format(document.pk)) - document.delete() - - @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[0]}") - def test_tags_out_of_bounds_0(self): + @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[1]}") + def test_tags_out_of_bounds(self): document = Document() document.file_type = "pdf" document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED document.save() - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - document.delete() - - @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[10000000]}") - def test_tags_out_of_bounds_10000000(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED + # Add tag to document + document.tags.create(name="demo") document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() + self.assertEqual(generate_filename(document), + "none-{:07d}.pdf".format(document.pk)) - document.delete() - - @override_settings(PAPERLESS_FILENAME_FORMAT="{tags[99]}") - def test_tags_out_of_bounds_99(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED - document.save() - - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - document.delete() @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + "{correspondent}/{correspondent}") @@ -339,153 +278,40 @@ class TestDate(TestCase): document.save() # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none/none-0000001.pdf") - document.create_source_directory() + document.filename = generate_filename(document) + self.assertEqual(document.filename, + "none/none/none-{:07d}.pdf".format(document.pk)) + create_source_path_directory(document.source_path) Path(document.source_path).touch() # Check proper handling of files - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none/none"), True) + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/none/none"), True) + pk = document.pk document.delete() - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + - "/documents/originals/none/none/none-0000001.pdf"), + self.assertEqual(os.path.isfile(settings.ORIGINALS_DIR + + "/none/none/none-{:07d}.pdf".format(pk)), False) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none/none"), False) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), False) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals"), True) + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/none/none"), False) + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR + + "/none"), False) + self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR), True) @override_settings(PAPERLESS_FILENAME_FORMAT=None) def test_format_none(self): document = Document() + document.pk = 1 document.file_type = "pdf" document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED - document.save() - self.assertEqual(document.generate_source_filename(), "0000001.pdf") - - @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + - "{correspondent}") - def test_document_renamed(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED - document.save() - - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - # Test source_path - self.assertEqual(document.source_path, settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdf") - - # Rename the document "illegaly" - os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test") - os.rename(settings.MEDIA_ROOT + "/documents/originals/" + - "none/none-0000001.pdf", - settings.MEDIA_ROOT + "/documents/originals/" + - "test/test-0000001.pdf") - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/test/test-0000001.pdf"), True) - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/none/none-0000001.pdf"), False) - - # Set new correspondent and expect document to be saved properly - document.correspondent = Correspondent.objects.get_or_create( - name="foo")[0] - document.save() - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/foo/foo-0000001.pdf"), True) - - # Check proper handling of files - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/foo"), True) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), False) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/test"), False) - self.assertEqual(document.generate_source_filename(), - "foo/foo-0000001.pdf") - - @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + - "{correspondent}") - def test_document_renamed_encrypted(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_GPG - document.save() - - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf.gpg") - document.create_source_directory() - Path(document.source_path).touch() - - # Test source_path - self.assertEqual(document.source_path, settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdf.gpg") - - # Rename the document "illegaly" - os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test") - os.rename(settings.MEDIA_ROOT + "/documents/originals/" + - "none/none-0000001.pdf.gpg", - settings.MEDIA_ROOT + "/documents/originals/" + - "test/test-0000001.pdf.gpg") - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/test/test-0000001.pdf.gpg"), True) - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/none/none-0000001.pdf"), False) - - # Set new correspondent and expect document to be saved properly - document.correspondent = Correspondent.objects.get_or_create( - name="foo")[0] - document.save() - self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" + - "originals/foo/foo-0000001.pdf.gpg"), True) - - # Check proper handling of files - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/foo"), True) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), False) - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/test"), False) - self.assertEqual(document.generate_source_filename(), - "foo/foo-0000001.pdf.gpg") - - def test_delete_all_empty_subdirectories(self): - # Create our working directory - tmp = "/tmp/paperless-tests-{}".format(str(uuid4())[:8]) - os.makedirs(tmp) - self.add_to_deletion_list(tmp) - - os.makedirs(os.path.join(tmp, "empty")) - os.makedirs(os.path.join(tmp, "empty", "subdirectory")) - - os.makedirs(os.path.join(tmp, "notempty")) - Path(os.path.join(tmp, "notempty", "file")).touch() - - Document.delete_all_empty_subdirectories(tmp) - - self.assertEqual(os.path.isdir(os.path.join(tmp, "notempty")), True) - self.assertEqual(os.path.isdir(os.path.join(tmp, "empty")), False) - self.assertEqual(os.path.isfile( - os.path.join(tmp, "notempty", "file")), True) + self.assertEqual(generate_filename(document), "0000001.pdf") def test_try_delete_empty_directories(self): # Create our working directory - tmp = "/tmp/paperless-tests-{}".format(str(uuid4())[:8]) + tmp = os.path.join(settings.ORIGINALS_DIR, "test_delete_empty") os.makedirs(tmp) self.add_to_deletion_list(tmp) @@ -493,67 +319,10 @@ class TestDate(TestCase): Path(os.path.join(tmp, "notempty", "file")).touch() os.makedirs(os.path.join(tmp, "notempty", "empty")) - Document.try_delete_empty_directories( + delete_empty_directories( os.path.join(tmp, "notempty", "empty")) self.assertEqual(os.path.isdir(os.path.join(tmp, "notempty")), True) self.assertEqual(os.path.isfile( os.path.join(tmp, "notempty", "file")), True) self.assertEqual(os.path.isdir( os.path.join(tmp, "notempty", "empty")), False) - - @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + - "{correspondent}") - def test_document_accidentally_deleted(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED - document.save() - - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - # Test source_path - self.assertEqual(document.source_path, settings.MEDIA_ROOT + - "/documents/originals/none/none-0000001.pdf") - - # Delete the document "illegaly" - os.remove(settings.MEDIA_ROOT + "/documents/originals/" + - "none/none-0000001.pdf") - - # Set new correspondent and expect document to be saved properly - document.correspondent = Correspondent.objects.get_or_create( - name="foo")[0] - document.save() - - # Check proper handling of files - self.assertEqual(os.path.isdir(settings.MEDIA_ROOT + - "/documents/originals/none"), True) - self.assertEqual(document.source_filename, - "none/none-0000001.pdf") - - @override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" + - "{correspondent}") - def test_set_filename(self): - document = Document() - document.file_type = "pdf" - document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED - document.save() - - # Ensure that filename is properly generated - tmp = document.source_filename - self.assertEqual(document.generate_source_filename(), - "none/none-0000001.pdf") - document.create_source_directory() - Path(document.source_path).touch() - - # Set existing filename - document.set_filename(tmp) - self.assertEqual(document.source_filename, "none/none-0000001.pdf") - - # Set non-existing filename - document.set_filename("doesnotexist") - self.assertEqual(document.source_filename, "none/none-0000001.pdf")