fixed the file handling implementation. The feature is cool, but the original implementation had so many small flaws it wasn't even funny.

This commit is contained in:
Jonas Winkler 2020-11-11 14:21:33 +01:00
parent 02ef7cb038
commit 734da28b69
8 changed files with 287 additions and 577 deletions

View File

@ -14,7 +14,6 @@ class DocumentsConfig(AppConfig):
add_inbox_tags,
run_pre_consume_script,
run_post_consume_script,
cleanup_document_deletion,
set_log_entry,
set_correspondent,
set_document_type,
@ -33,6 +32,4 @@ class DocumentsConfig(AppConfig):
document_consumption_finished.connect(add_to_index)
document_consumption_finished.connect(run_post_consume_script)
post_delete.connect(cleanup_document_deletion)
AppConfig.ready(self)

View File

@ -11,6 +11,7 @@ from django.utils import timezone
from paperless.db import GnuPG
from .classifier import DocumentClassifier, IncompatibleClassifierVersionError
from .file_handling import generate_filename, create_source_path_directory
from .models import Document, FileInfo
from .parsers import ParseError, get_parser_class
from .signals import (
@ -174,10 +175,15 @@ class Consumer:
self.log("debug", "Tagging with {}".format(tag_names))
document.tags.add(*relevant_tags)
document.filename = generate_filename(document)
create_source_path_directory(document.source_path)
self._write(document, doc, document.source_path)
self._write(document, thumbnail, document.thumbnail_path)
#TODO: why do we need to save the document again?
# We need to save the document twice, since we need the PK of the
# document in order to create its filename above.
document.save()
return document

View File

@ -0,0 +1,92 @@
import os
from collections import defaultdict
from django.conf import settings
from django.template.defaultfilters import slugify
def create_source_path_directory(source_path):
os.makedirs(os.path.dirname(source_path), exist_ok=True)
def delete_empty_directories(directory):
# Go up in the directory hierarchy and try to delete all directories
directory = os.path.normpath(directory)
root = os.path.normpath(settings.ORIGINALS_DIR)
if not directory.startswith(root + os.path.sep):
# don't do anything outside our originals folder.
# append os.path.set so that we avoid these cases:
# directory = /home/originals2/test
# root = /home/originals ("/" gets appended and startswith fails)
return
while directory != root:
if not os.listdir(directory):
# it's empty
try:
os.rmdir(directory)
except OSError:
# whatever. empty directories aren't that bad anyway.
return
else:
# it's not empty.
return
# go one level up
directory = os.path.normpath(os.path.dirname(directory))
def many_to_dictionary(field):
# Converts ManyToManyField to dictionary by assuming, that field
# entries contain an _ or - which will be used as a delimiter
mydictionary = dict()
for index, t in enumerate(field.all()):
# Populate tag names by index
mydictionary[index] = slugify(t.name)
# Find delimiter
delimiter = t.name.find('_')
if delimiter == -1:
delimiter = t.name.find('-')
if delimiter == -1:
continue
key = t.name[:delimiter]
value = t.name[delimiter + 1:]
mydictionary[slugify(key)] = slugify(value)
return mydictionary
def generate_filename(document):
# Create filename based on configured format
if settings.PAPERLESS_FILENAME_FORMAT is not None:
tags = defaultdict(lambda: slugify(None),
many_to_dictionary(document.tags))
path = settings.PAPERLESS_FILENAME_FORMAT.format(
correspondent=slugify(document.correspondent),
title=slugify(document.title),
created=document.created.date(),
added=slugify(document.added),
tags=tags,
)
else:
path = ""
# Always append the primary key to guarantee uniqueness of filename
if len(path) > 0:
filename = "%s-%07i.%s" % (path, document.pk, document.file_type)
else:
filename = "%07i.%s" % (document.pk, document.file_type)
# Append .gpg for encrypted files
if document.storage_type == document.STORAGE_TYPE_GPG:
filename += ".gpg"
return filename

View File

@ -8,6 +8,7 @@ from django.core.management import call_command
from documents.models import Document
from paperless.db import GnuPG
from ...file_handling import generate_filename, create_source_path_directory
from ...mixins import Renderable
@ -82,6 +83,10 @@ class Command(Renderable, BaseCommand):
def _import_files_from_manifest(self):
storage_type = Document.STORAGE_TYPE_UNENCRYPTED
if settings.PASSPHRASE:
storage_type = Document.STORAGE_TYPE_GPG
for record in self.manifest:
if not record["model"] == "documents.document":
@ -94,6 +99,14 @@ class Command(Renderable, BaseCommand):
document_path = os.path.join(self.source, doc_file)
thumbnail_path = os.path.join(self.source, thumb_file)
document.storage_type = storage_type
document.filename = generate_filename(document)
if os.path.isfile(document.source_path):
raise FileExistsError(document.source_path)
create_source_path_directory(document.source_path)
if settings.PASSPHRASE:
with open(document_path, "rb") as unencrypted:
@ -109,18 +122,8 @@ class Command(Renderable, BaseCommand):
encrypted.write(GnuPG.encrypted(unencrypted))
else:
print("Moving {} to {}".format(document_path, document.source_path))
shutil.copy(document_path, document.source_path)
shutil.copy(thumbnail_path, document.thumbnail_path)
# Reset the storage type to whatever we've used while importing
storage_type = Document.STORAGE_TYPE_UNENCRYPTED
if settings.PASSPHRASE:
storage_type = Document.STORAGE_TYPE_GPG
Document.objects.filter(
pk__in=[r["pk"] for r in self.manifest]
).update(
storage_type=storage_type
)
document.save()

View File

@ -0,0 +1,18 @@
# Generated by Django 3.1.3 on 2020-11-11 11:05
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('documents', '1001_auto_20201109_1636'),
]
operations = [
migrations.AlterField(
model_name='document',
name='filename',
field=models.FilePathField(default=None, editable=False, help_text='Current filename in storage', max_length=1024, null=True),
),
]

View File

@ -3,18 +3,15 @@
import logging
import os
import re
from collections import OrderedDict, defaultdict
from collections import OrderedDict
import dateutil.parser
from django.conf import settings
from django.db import models
from django.dispatch import receiver
from django.template.defaultfilters import slugify
from django.utils import timezone
from django.utils.text import slugify
class MatchingModel(models.Model):
MATCH_ANY = 1
@ -192,7 +189,7 @@ class Document(models.Model):
default=timezone.now, editable=False, db_index=True)
filename = models.FilePathField(
max_length=256,
max_length=1024,
editable=False,
default=None,
null=True,
@ -220,123 +217,18 @@ class Document(models.Model):
return "{}: {}".format(created, self.correspondent or self.title)
return str(created)
def find_renamed_document(self, subdirectory=""):
suffix = "%07i.%s" % (self.pk, self.file_type)
# Append .gpg for encrypted files
if self.storage_type == self.STORAGE_TYPE_GPG:
suffix += ".gpg"
# Go up in the directory hierarchy and try to delete all directories
root = os.path.normpath(Document.filename_to_path(subdirectory))
for filename in os.listdir(root):
if filename.endswith(suffix):
return os.path.join(subdirectory, filename)
fullname = os.path.join(subdirectory, filename)
if os.path.isdir(Document.filename_to_path(fullname)):
return self.find_renamed_document(fullname)
return None
@property
def source_filename(self):
# Initial filename generation (for new documents)
if self.filename is None:
self.filename = self.generate_source_filename()
# Check if document is still available under filename
elif not os.path.isfile(Document.filename_to_path(self.filename)):
recovered_filename = self.find_renamed_document()
# If we have found the file so update the filename
if recovered_filename is not None:
logger = logging.getLogger(__name__)
logger.warning("Filename of document " + str(self.id) +
" has changed and was successfully updated")
self.filename = recovered_filename
# Remove all empty subdirectories from MEDIA_ROOT
Document.delete_all_empty_subdirectories(
Document.filename_to_path(""))
else:
logger = logging.getLogger(__name__)
logger.error("File of document " + str(self.id) + " has " +
"gone and could not be recovered")
return self.filename
@staticmethod
def many_to_dictionary(field):
# Converts ManyToManyField to dictionary by assuming, that field
# entries contain an _ or - which will be used as a delimiter
mydictionary = dict()
for index, t in enumerate(field.all()):
# Populate tag names by index
mydictionary[index] = slugify(t.name)
# Find delimiter
delimiter = t.name.find('_')
if delimiter == -1:
delimiter = t.name.find('-')
if delimiter == -1:
continue
key = t.name[:delimiter]
value = t.name[delimiter+1:]
mydictionary[slugify(key)] = slugify(value)
return mydictionary
def generate_source_filename(self):
# Create filename based on configured format
if settings.PAPERLESS_FILENAME_FORMAT is not None:
tags = defaultdict(lambda: slugify(None),
self.many_to_dictionary(self.tags))
path = settings.PAPERLESS_FILENAME_FORMAT.format(
correspondent=slugify(self.correspondent),
title=slugify(self.title),
created=slugify(self.created),
added=slugify(self.added),
tags=tags)
else:
path = ""
# Always append the primary key to guarantee uniqueness of filename
if len(path) > 0:
filename = "%s-%07i.%s" % (path, self.pk, self.file_type)
else:
filename = "%07i.%s" % (self.pk, self.file_type)
# Append .gpg for encrypted files
if self.storage_type == self.STORAGE_TYPE_GPG:
filename += ".gpg"
return filename
def create_source_directory(self):
new_filename = self.generate_source_filename()
# Determine the full "target" path
dir_new = Document.filename_to_path(os.path.dirname(new_filename))
# Create new path
os.makedirs(dir_new, exist_ok=True)
@property
def source_path(self):
return Document.filename_to_path(self.source_filename)
if self.filename:
fname = str(self.filename)
else:
fname = "{:07}.{}".format(self.pk, self.file_type)
if self.storage_type == self.STORAGE_TYPE_GPG:
fname += ".gpg"
@staticmethod
def filename_to_path(filename):
return os.path.join(
settings.ORIGINALS_DIR,
filename
fname
)
@property
@ -362,125 +254,6 @@ class Document(models.Model):
def thumbnail_file(self):
return open(self.thumbnail_path, "rb")
def set_filename(self, filename):
if os.path.isfile(Document.filename_to_path(filename)):
self.filename = filename
@staticmethod
def try_delete_empty_directories(directory):
# Go up in the directory hierarchy and try to delete all directories
directory = os.path.normpath(directory)
root = os.path.normpath(Document.filename_to_path(""))
while directory != root:
# Try to delete the current directory
try:
os.rmdir(directory)
except os.error:
# Directory not empty, no need to go further up
return
# Cut off actual directory and go one level up
directory, _ = os.path.split(directory)
directory = os.path.normpath(directory)
@staticmethod
def delete_all_empty_subdirectories(directory):
# Go through all folders and try to delete all directories
root = os.path.normpath(Document.filename_to_path(directory))
for filename in os.listdir(root):
fullname = os.path.join(directory, filename)
if not os.path.isdir(Document.filename_to_path(fullname)):
continue
# Go into subdirectory to see, if there is more to delete
Document.delete_all_empty_subdirectories(
os.path.join(directory, filename))
# Try to delete the directory
try:
os.rmdir(Document.filename_to_path(fullname))
continue
except os.error:
# Directory not empty, no need to go further up
continue
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
@receiver(models.signals.post_save, sender=Document)
def update_filename(sender, instance, **kwargs):
# Skip if document has not been saved yet
if instance.filename is None:
return
# Check is file exists and update filename otherwise
if not os.path.isfile(Document.filename_to_path(instance.filename)):
instance.filename = instance.source_filename
# Build the new filename
new_filename = instance.generate_source_filename()
# If the filename is the same, then nothing needs to be done
if instance.filename == new_filename:
return
# Determine the full "target" path
path_new = instance.filename_to_path(new_filename)
dir_new = instance.filename_to_path(os.path.dirname(new_filename))
# Create new path
instance.create_source_directory()
# Determine the full "current" path
path_current = instance.filename_to_path(instance.source_filename)
# Move file
try:
os.rename(path_current, path_new)
except PermissionError:
# Do not update filename in object
return
except FileNotFoundError:
logger = logging.getLogger(__name__)
logger.error("Renaming of document " + str(instance.id) + " failed " +
"as file " + instance.filename + " was no longer present")
return
# Delete empty directory
old_dir = os.path.dirname(instance.filename)
old_path = instance.filename_to_path(old_dir)
Document.try_delete_empty_directories(old_path)
instance.filename = new_filename
# Save instance
# This will not cause a cascade of post_save signals, as next time
# nothing needs to be renamed
instance.save()
@receiver(models.signals.post_delete, sender=Document)
def delete_files(sender, instance, **kwargs):
if instance.filename is None:
return
# Remove the document
old_file = instance.filename_to_path(instance.filename)
try:
os.remove(old_file)
except FileNotFoundError:
logger = logging.getLogger(__name__)
logger.warning("Deleted document " + str(instance.id) + " but file " +
old_file + " was no longer present")
# And remove the directory (if applicable)
old_dir = os.path.dirname(instance.filename)
old_path = instance.filename_to_path(old_dir)
Document.try_delete_empty_directories(old_path)
class Log(models.Model):

View File

@ -6,9 +6,13 @@ from django.conf import settings
from django.contrib.admin.models import ADDITION, LogEntry
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.db import models, DatabaseError
from django.dispatch import receiver
from django.utils import timezone
from .. import index, matching
from ..file_handling import delete_empty_directories, generate_filename, \
create_source_path_directory
from ..models import Document, Tag
@ -141,17 +145,65 @@ def run_post_consume_script(sender, document, **kwargs):
)).wait()
@receiver(models.signals.post_delete, sender=Document)
def cleanup_document_deletion(sender, instance, using, **kwargs):
if not isinstance(instance, Document):
return
for f in (instance.source_path, instance.thumbnail_path):
try:
os.unlink(f)
except FileNotFoundError:
pass # The file's already gone, so we're cool with it.
delete_empty_directories(os.path.dirname(instance.source_path))
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
@receiver(models.signals.post_save, sender=Document)
def update_filename_and_move_files(sender, instance, **kwargs):
if not instance.filename:
# Can't update the filename if there is not filename to begin with
# This happens after the consumer creates a new document.
# The PK needs to be set first by saving the document once. When this
# happens, the file is not yet in the ORIGINALS_DIR, and thus can't be
# renamed anyway. In all other cases, instance.filename will be set.
return
old_filename = instance.filename
old_path = instance.source_path
new_filename = generate_filename(instance)
if new_filename == instance.filename:
# Don't do anything if its the same.
return
new_path = os.path.join(settings.ORIGINALS_DIR, new_filename)
if not os.path.isfile(old_path):
# Can't do anything if the old file does not exist anymore.
logging.getLogger(__name__).fatal('Document {}: File {} has gone.'.format(str(instance), old_path))
return
if os.path.isfile(new_path):
# Can't do anything if the new file already exists. Skip updating file.
logging.getLogger(__name__).warning('Document {}: Cannot rename file since target path {} already exists.'.format(str(instance), new_path))
return
create_source_path_directory(new_path)
try:
os.rename(old_path, new_path)
instance.filename = new_filename
instance.save()
except OSError as e:
instance.filename = old_filename
except DatabaseError as e:
os.rename(new_path, old_path)
instance.filename = old_filename
if not os.path.isfile(old_path):
delete_empty_directories(os.path.dirname(old_path))
def set_log_entry(sender, document=None, logging_group=None, **kwargs):

View File

@ -10,6 +10,8 @@ from dateutil import tz
from django.test import TestCase, override_settings
from django.utils.text import slugify
from ..file_handling import generate_filename, create_source_path_directory, delete_empty_directories
from ..models import Tag, Document, Correspondent
from django.conf import settings
@ -31,18 +33,6 @@ class TestDate(TestCase):
for dirname in self.deletion_list:
shutil.rmtree(dirname, ignore_errors=True)
@override_settings(PAPERLESS_FILENAME_FORMAT="")
def test_source_filename(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
self.assertEqual(document.source_filename, "0000001.pdf")
document.filename = "test.pdf"
self.assertEqual(document.source_filename, "test.pdf")
@override_settings(PAPERLESS_FILENAME_FORMAT="")
def test_generate_source_filename(self):
document = Document()
@ -50,40 +40,40 @@ class TestDate(TestCase):
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
self.assertEqual(document.generate_source_filename(), "0000001.pdf")
self.assertEqual(generate_filename(document), "{:07d}.pdf".format(document.pk))
document.storage_type = Document.STORAGE_TYPE_GPG
self.assertEqual(document.generate_source_filename(),
"0000001.pdf.gpg")
self.assertEqual(generate_filename(document),
"{:07d}.pdf.gpg".format(document.pk))
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{correspondent}")
def test_file_renaming(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
# Test default source_path
self.assertEqual(document.source_path, settings.ORIGINALS_DIR + "/{:07d}.pdf".format(document.pk))
# Test source_path
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdf")
document.filename = generate_filename(document)
# Ensure that filename is properly generated
self.assertEqual(document.filename, "none/none-{:07d}.pdf".format(document.pk))
# Enable encryption and check again
document.storage_type = Document.STORAGE_TYPE_GPG
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf.gpg")
document.filename = generate_filename(document)
self.assertEqual(document.filename,
"none/none-{:07d}.pdf.gpg".format(document.pk))
document.save()
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), True)
# test that creating dirs for the source_path creates the correct directory
create_source_path_directory(document.source_path)
Path(document.source_path).touch()
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/none"), True)
# Set a correspondent and save the document
document.correspondent = Correspondent.objects.get_or_create(
@ -91,14 +81,12 @@ class TestDate(TestCase):
document.save()
# Check proper handling of files
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/test"), True)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), False)
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/test/test-0000001.pdf.gpg"), True)
self.assertEqual(document.generate_source_filename(),
"test/test-0000001.pdf.gpg")
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/test"), True)
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/none"), False)
self.assertEqual(os.path.isfile(settings.ORIGINALS_DIR +
"/test/test-{:07d}.pdf.gpg".format(document.pk)), True)
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
@ -109,18 +97,18 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
document.filename = generate_filename(document)
self.assertEqual(document.filename,
"none/none-{:07d}.pdf".format(document.pk))
create_source_path_directory(document.source_path)
Path(document.source_path).touch()
# Test source_path
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdf")
self.assertEqual(document.source_path, settings.ORIGINALS_DIR +
"/none/none-{:07d}.pdf".format(document.pk))
# Make the folder read- and execute-only (no writing and no renaming)
os.chmod(settings.MEDIA_ROOT + "/documents/originals/none", 0o555)
os.chmod(settings.ORIGINALS_DIR + "/none", 0o555)
# Set a correspondent and save the document
document.correspondent = Correspondent.objects.get_or_create(
@ -129,11 +117,12 @@ class TestDate(TestCase):
# Check proper handling of files
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/none/none-0000001.pdf"), True)
self.assertEqual(document.source_filename,
"none/none-0000001.pdf")
"originals/none/none-{:07d}.pdf".format(document.pk)), True)
self.assertEqual(document.filename,
"none/none-{:07d}.pdf".format(document.pk))
os.chmod(settings.ORIGINALS_DIR + "/none", 0o777)
os.chmod(settings.MEDIA_ROOT + "/documents/originals/none", 0o777)
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
@ -144,18 +133,20 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
document.filename = generate_filename(document)
self.assertEqual(document.filename,
"none/none-{:07d}.pdf".format(document.pk))
create_source_path_directory(document.source_path)
Path(document.source_path).touch()
# Ensure file deletion after delete
pk = document.pk
document.delete()
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdf"), False)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), False)
self.assertEqual(os.path.isfile(settings.ORIGINALS_DIR +
"/none/none-{:07d}.pdf".format(pk)), False)
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/none"), False)
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
@ -176,12 +167,15 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
document.filename = generate_filename(document)
self.assertEqual(document.filename,
"none/none-{:07d}.pdf".format(document.pk))
create_source_path_directory(document.source_path)
Path(document.source_path).touch()
Path(document.source_path + "test").touch()
important_file = document.source_path + "test"
Path(important_file).touch()
# Set a correspondent and save the document
document.correspondent = Correspondent.objects.get_or_create(
@ -193,11 +187,8 @@ class TestDate(TestCase):
"/documents/originals/test"), True)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), True)
self.assertTrue(os.path.isfile(important_file))
# Cleanup
os.remove(settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdftest")
os.rmdir(settings.MEDIA_ROOT + "/documents/originals/none")
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}")
def test_tags_with_underscore(self):
@ -212,13 +203,8 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"demo-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
document.delete()
self.assertEqual(generate_filename(document),
"demo-{:07d}.pdf".format(document.pk))
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}")
def test_tags_with_dash(self):
@ -233,13 +219,8 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"demo-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
document.delete()
self.assertEqual(generate_filename(document),
"demo-{:07d}.pdf".format(document.pk))
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[type]}")
def test_tags_malformed(self):
@ -254,13 +235,8 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
document.delete()
self.assertEqual(generate_filename(document),
"none-{:07d}.pdf".format(document.pk))
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[0]}")
def test_tags_all(self):
@ -274,61 +250,24 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"demo-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
self.assertEqual(generate_filename(document),
"demo-{:07d}.pdf".format(document.pk))
document.delete()
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[0]}")
def test_tags_out_of_bounds_0(self):
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[1]}")
def test_tags_out_of_bounds(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
document.delete()
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[10000000]}")
def test_tags_out_of_bounds_10000000(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
# Add tag to document
document.tags.create(name="demo")
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
self.assertEqual(generate_filename(document),
"none-{:07d}.pdf".format(document.pk))
document.delete()
@override_settings(PAPERLESS_FILENAME_FORMAT="{tags[99]}")
def test_tags_out_of_bounds_99(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
document.delete()
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}/{correspondent}")
@ -339,153 +278,40 @@ class TestDate(TestCase):
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none/none-0000001.pdf")
document.create_source_directory()
document.filename = generate_filename(document)
self.assertEqual(document.filename,
"none/none/none-{:07d}.pdf".format(document.pk))
create_source_path_directory(document.source_path)
Path(document.source_path).touch()
# Check proper handling of files
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none/none"), True)
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/none/none"), True)
pk = document.pk
document.delete()
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT +
"/documents/originals/none/none/none-0000001.pdf"),
self.assertEqual(os.path.isfile(settings.ORIGINALS_DIR +
"/none/none/none-{:07d}.pdf".format(pk)),
False)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none/none"), False)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), False)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals"), True)
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/none/none"), False)
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR +
"/none"), False)
self.assertEqual(os.path.isdir(settings.ORIGINALS_DIR), True)
@override_settings(PAPERLESS_FILENAME_FORMAT=None)
def test_format_none(self):
document = Document()
document.pk = 1
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
self.assertEqual(document.generate_source_filename(), "0000001.pdf")
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
def test_document_renamed(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
# Test source_path
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdf")
# Rename the document "illegaly"
os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test")
os.rename(settings.MEDIA_ROOT + "/documents/originals/" +
"none/none-0000001.pdf",
settings.MEDIA_ROOT + "/documents/originals/" +
"test/test-0000001.pdf")
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/test/test-0000001.pdf"), True)
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/none/none-0000001.pdf"), False)
# Set new correspondent and expect document to be saved properly
document.correspondent = Correspondent.objects.get_or_create(
name="foo")[0]
document.save()
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/foo/foo-0000001.pdf"), True)
# Check proper handling of files
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/foo"), True)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), False)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/test"), False)
self.assertEqual(document.generate_source_filename(),
"foo/foo-0000001.pdf")
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
def test_document_renamed_encrypted(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_GPG
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf.gpg")
document.create_source_directory()
Path(document.source_path).touch()
# Test source_path
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdf.gpg")
# Rename the document "illegaly"
os.makedirs(settings.MEDIA_ROOT + "/documents/originals/test")
os.rename(settings.MEDIA_ROOT + "/documents/originals/" +
"none/none-0000001.pdf.gpg",
settings.MEDIA_ROOT + "/documents/originals/" +
"test/test-0000001.pdf.gpg")
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/test/test-0000001.pdf.gpg"), True)
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/none/none-0000001.pdf"), False)
# Set new correspondent and expect document to be saved properly
document.correspondent = Correspondent.objects.get_or_create(
name="foo")[0]
document.save()
self.assertEqual(os.path.isfile(settings.MEDIA_ROOT + "/documents/" +
"originals/foo/foo-0000001.pdf.gpg"), True)
# Check proper handling of files
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/foo"), True)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), False)
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/test"), False)
self.assertEqual(document.generate_source_filename(),
"foo/foo-0000001.pdf.gpg")
def test_delete_all_empty_subdirectories(self):
# Create our working directory
tmp = "/tmp/paperless-tests-{}".format(str(uuid4())[:8])
os.makedirs(tmp)
self.add_to_deletion_list(tmp)
os.makedirs(os.path.join(tmp, "empty"))
os.makedirs(os.path.join(tmp, "empty", "subdirectory"))
os.makedirs(os.path.join(tmp, "notempty"))
Path(os.path.join(tmp, "notempty", "file")).touch()
Document.delete_all_empty_subdirectories(tmp)
self.assertEqual(os.path.isdir(os.path.join(tmp, "notempty")), True)
self.assertEqual(os.path.isdir(os.path.join(tmp, "empty")), False)
self.assertEqual(os.path.isfile(
os.path.join(tmp, "notempty", "file")), True)
self.assertEqual(generate_filename(document), "0000001.pdf")
def test_try_delete_empty_directories(self):
# Create our working directory
tmp = "/tmp/paperless-tests-{}".format(str(uuid4())[:8])
tmp = os.path.join(settings.ORIGINALS_DIR, "test_delete_empty")
os.makedirs(tmp)
self.add_to_deletion_list(tmp)
@ -493,67 +319,10 @@ class TestDate(TestCase):
Path(os.path.join(tmp, "notempty", "file")).touch()
os.makedirs(os.path.join(tmp, "notempty", "empty"))
Document.try_delete_empty_directories(
delete_empty_directories(
os.path.join(tmp, "notempty", "empty"))
self.assertEqual(os.path.isdir(os.path.join(tmp, "notempty")), True)
self.assertEqual(os.path.isfile(
os.path.join(tmp, "notempty", "file")), True)
self.assertEqual(os.path.isdir(
os.path.join(tmp, "notempty", "empty")), False)
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
def test_document_accidentally_deleted(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
# Test source_path
self.assertEqual(document.source_path, settings.MEDIA_ROOT +
"/documents/originals/none/none-0000001.pdf")
# Delete the document "illegaly"
os.remove(settings.MEDIA_ROOT + "/documents/originals/" +
"none/none-0000001.pdf")
# Set new correspondent and expect document to be saved properly
document.correspondent = Correspondent.objects.get_or_create(
name="foo")[0]
document.save()
# Check proper handling of files
self.assertEqual(os.path.isdir(settings.MEDIA_ROOT +
"/documents/originals/none"), True)
self.assertEqual(document.source_filename,
"none/none-0000001.pdf")
@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/" +
"{correspondent}")
def test_set_filename(self):
document = Document()
document.file_type = "pdf"
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
document.save()
# Ensure that filename is properly generated
tmp = document.source_filename
self.assertEqual(document.generate_source_filename(),
"none/none-0000001.pdf")
document.create_source_directory()
Path(document.source_path).touch()
# Set existing filename
document.set_filename(tmp)
self.assertEqual(document.source_filename, "none/none-0000001.pdf")
# Set non-existing filename
document.set_filename("doesnotexist")
self.assertEqual(document.source_filename, "none/none-0000001.pdf")