mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-08-14 00:26:21 +00:00
added the filename handling back into the code
This commit is contained in:
@@ -3,11 +3,12 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from collections import OrderedDict
|
||||
from collections import OrderedDict, defaultdict
|
||||
|
||||
import dateutil.parser
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
from django.dispatch import receiver
|
||||
from django.template.defaultfilters import slugify
|
||||
from django.utils import timezone
|
||||
from django.utils.text import slugify
|
||||
@@ -190,6 +191,14 @@ class Document(models.Model):
|
||||
added = models.DateTimeField(
|
||||
default=timezone.now, editable=False, db_index=True)
|
||||
|
||||
filename = models.FilePathField(
|
||||
max_length=256,
|
||||
editable=False,
|
||||
default=None,
|
||||
null=True,
|
||||
help_text="Current filename in storage"
|
||||
)
|
||||
|
||||
archive_serial_number = models.IntegerField(
|
||||
blank=True,
|
||||
null=True,
|
||||
@@ -211,15 +220,123 @@ class Document(models.Model):
|
||||
return "{}: {}".format(created, self.correspondent or self.title)
|
||||
return str(created)
|
||||
|
||||
def find_renamed_document(self, subdirectory=""):
|
||||
suffix = "%07i.%s" % (self.pk, self.file_type)
|
||||
|
||||
# Append .gpg for encrypted files
|
||||
if self.storage_type == self.STORAGE_TYPE_GPG:
|
||||
suffix += ".gpg"
|
||||
|
||||
# Go up in the directory hierarchy and try to delete all directories
|
||||
root = os.path.normpath(Document.filename_to_path(subdirectory))
|
||||
|
||||
for filename in os.listdir(root):
|
||||
if filename.endswith(suffix):
|
||||
return os.path.join(subdirectory, filename)
|
||||
|
||||
fullname = os.path.join(subdirectory, filename)
|
||||
if os.path.isdir(Document.filename_to_path(fullname)):
|
||||
return self.find_renamed_document(fullname)
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def source_filename(self):
|
||||
# Initial filename generation (for new documents)
|
||||
if self.filename is None:
|
||||
self.filename = self.generate_source_filename()
|
||||
|
||||
# Check if document is still available under filename
|
||||
elif not os.path.isfile(Document.filename_to_path(self.filename)):
|
||||
recovered_filename = self.find_renamed_document()
|
||||
|
||||
# If we have found the file so update the filename
|
||||
if recovered_filename is not None:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning("Filename of document " + str(self.id) +
|
||||
" has changed and was successfully updated")
|
||||
self.filename = recovered_filename
|
||||
|
||||
# Remove all empty subdirectories from MEDIA_ROOT
|
||||
Document.delete_all_empty_subdirectories(
|
||||
Document.filename_to_path(""))
|
||||
else:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error("File of document " + str(self.id) + " has " +
|
||||
"gone and could not be recovered")
|
||||
|
||||
return self.filename
|
||||
|
||||
@staticmethod
|
||||
def many_to_dictionary(field):
|
||||
# Converts ManyToManyField to dictionary by assuming, that field
|
||||
# entries contain an _ or - which will be used as a delimiter
|
||||
mydictionary = dict()
|
||||
|
||||
for index, t in enumerate(field.all()):
|
||||
# Populate tag names by index
|
||||
mydictionary[index] = slugify(t.name)
|
||||
|
||||
# Find delimiter
|
||||
delimiter = t.name.find('_')
|
||||
|
||||
if delimiter == -1:
|
||||
delimiter = t.name.find('-')
|
||||
|
||||
if delimiter == -1:
|
||||
continue
|
||||
|
||||
key = t.name[:delimiter]
|
||||
value = t.name[delimiter+1:]
|
||||
|
||||
mydictionary[slugify(key)] = slugify(value)
|
||||
|
||||
return mydictionary
|
||||
|
||||
def generate_source_filename(self):
|
||||
# Create filename based on configured format
|
||||
if settings.PAPERLESS_FILENAME_FORMAT is not None:
|
||||
tags = defaultdict(lambda: slugify(None),
|
||||
self.many_to_dictionary(self.tags))
|
||||
path = settings.PAPERLESS_FILENAME_FORMAT.format(
|
||||
correspondent=slugify(self.correspondent),
|
||||
title=slugify(self.title),
|
||||
created=slugify(self.created),
|
||||
added=slugify(self.added),
|
||||
tags=tags)
|
||||
else:
|
||||
path = ""
|
||||
|
||||
# Always append the primary key to guarantee uniqueness of filename
|
||||
if len(path) > 0:
|
||||
filename = "%s-%07i.%s" % (path, self.pk, self.file_type)
|
||||
else:
|
||||
filename = "%07i.%s" % (self.pk, self.file_type)
|
||||
|
||||
# Append .gpg for encrypted files
|
||||
if self.storage_type == self.STORAGE_TYPE_GPG:
|
||||
filename += ".gpg"
|
||||
|
||||
return filename
|
||||
|
||||
def create_source_directory(self):
|
||||
new_filename = self.generate_source_filename()
|
||||
|
||||
# Determine the full "target" path
|
||||
dir_new = Document.filename_to_path(os.path.dirname(new_filename))
|
||||
|
||||
# Create new path
|
||||
os.makedirs(dir_new, exist_ok=True)
|
||||
|
||||
@property
|
||||
def source_path(self):
|
||||
file_name = "{:07}.{}".format(self.pk, self.file_type)
|
||||
if self.storage_type == self.STORAGE_TYPE_GPG:
|
||||
file_name += ".gpg"
|
||||
return Document.filename_to_path(self.source_filename)
|
||||
|
||||
@staticmethod
|
||||
def filename_to_path(filename):
|
||||
return os.path.join(
|
||||
settings.ORIGINALS_DIR,
|
||||
file_name
|
||||
filename
|
||||
)
|
||||
|
||||
@property
|
||||
@@ -245,6 +362,125 @@ class Document(models.Model):
|
||||
def thumbnail_file(self):
|
||||
return open(self.thumbnail_path, "rb")
|
||||
|
||||
def set_filename(self, filename):
|
||||
if os.path.isfile(Document.filename_to_path(filename)):
|
||||
self.filename = filename
|
||||
|
||||
@staticmethod
|
||||
def try_delete_empty_directories(directory):
|
||||
# Go up in the directory hierarchy and try to delete all directories
|
||||
directory = os.path.normpath(directory)
|
||||
root = os.path.normpath(Document.filename_to_path(""))
|
||||
|
||||
while directory != root:
|
||||
# Try to delete the current directory
|
||||
try:
|
||||
os.rmdir(directory)
|
||||
except os.error:
|
||||
# Directory not empty, no need to go further up
|
||||
return
|
||||
|
||||
# Cut off actual directory and go one level up
|
||||
directory, _ = os.path.split(directory)
|
||||
directory = os.path.normpath(directory)
|
||||
|
||||
@staticmethod
|
||||
def delete_all_empty_subdirectories(directory):
|
||||
# Go through all folders and try to delete all directories
|
||||
root = os.path.normpath(Document.filename_to_path(directory))
|
||||
|
||||
for filename in os.listdir(root):
|
||||
fullname = os.path.join(directory, filename)
|
||||
|
||||
if not os.path.isdir(Document.filename_to_path(fullname)):
|
||||
continue
|
||||
|
||||
# Go into subdirectory to see, if there is more to delete
|
||||
Document.delete_all_empty_subdirectories(
|
||||
os.path.join(directory, filename))
|
||||
|
||||
# Try to delete the directory
|
||||
try:
|
||||
os.rmdir(Document.filename_to_path(fullname))
|
||||
continue
|
||||
except os.error:
|
||||
# Directory not empty, no need to go further up
|
||||
continue
|
||||
|
||||
|
||||
@receiver(models.signals.m2m_changed, sender=Document.tags.through)
|
||||
@receiver(models.signals.post_save, sender=Document)
|
||||
def update_filename(sender, instance, **kwargs):
|
||||
# Skip if document has not been saved yet
|
||||
if instance.filename is None:
|
||||
return
|
||||
|
||||
# Check is file exists and update filename otherwise
|
||||
if not os.path.isfile(Document.filename_to_path(instance.filename)):
|
||||
instance.filename = instance.source_filename
|
||||
|
||||
# Build the new filename
|
||||
new_filename = instance.generate_source_filename()
|
||||
|
||||
# If the filename is the same, then nothing needs to be done
|
||||
if instance.filename == new_filename:
|
||||
return
|
||||
|
||||
# Determine the full "target" path
|
||||
path_new = instance.filename_to_path(new_filename)
|
||||
dir_new = instance.filename_to_path(os.path.dirname(new_filename))
|
||||
|
||||
# Create new path
|
||||
instance.create_source_directory()
|
||||
|
||||
# Determine the full "current" path
|
||||
path_current = instance.filename_to_path(instance.source_filename)
|
||||
|
||||
# Move file
|
||||
try:
|
||||
os.rename(path_current, path_new)
|
||||
except PermissionError:
|
||||
# Do not update filename in object
|
||||
return
|
||||
except FileNotFoundError:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error("Renaming of document " + str(instance.id) + " failed " +
|
||||
"as file " + instance.filename + " was no longer present")
|
||||
return
|
||||
|
||||
# Delete empty directory
|
||||
old_dir = os.path.dirname(instance.filename)
|
||||
old_path = instance.filename_to_path(old_dir)
|
||||
Document.try_delete_empty_directories(old_path)
|
||||
|
||||
instance.filename = new_filename
|
||||
|
||||
# Save instance
|
||||
# This will not cause a cascade of post_save signals, as next time
|
||||
# nothing needs to be renamed
|
||||
instance.save()
|
||||
|
||||
|
||||
@receiver(models.signals.post_delete, sender=Document)
|
||||
def delete_files(sender, instance, **kwargs):
|
||||
if instance.filename is None:
|
||||
return
|
||||
|
||||
# Remove the document
|
||||
old_file = instance.filename_to_path(instance.filename)
|
||||
|
||||
try:
|
||||
os.remove(old_file)
|
||||
except FileNotFoundError:
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.warning("Deleted document " + str(instance.id) + " but file " +
|
||||
old_file + " was no longer present")
|
||||
|
||||
# And remove the directory (if applicable)
|
||||
old_dir = os.path.dirname(instance.filename)
|
||||
old_path = instance.filename_to_path(old_dir)
|
||||
Document.try_delete_empty_directories(old_path)
|
||||
|
||||
|
||||
class Log(models.Model):
|
||||
|
||||
|
Reference in New Issue
Block a user