Create tags from sub directories

The names of sub directories in the consumer directory will be added as
tags for the document to be consumed.
To enable this, set:
PAPERLESS_CONSUMER_RECURSIVE=1
PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS=1

Fixes #50
This commit is contained in:
jayme-github 2020-11-29 15:39:43 +01:00
parent dddd6f5503
commit fa9a5cc247
4 changed files with 115 additions and 24 deletions

View File

@ -38,7 +38,7 @@ scikit-learn="~=0.23.2"
whitenoise = "~=5.2.0"
watchdog = "*"
whoosh="~=2.7.4"
inotify-simple = "*"
inotifyrecursive = ">=0.3.4"
[dev-packages]
coveralls = "*"

View File

@ -1,31 +1,60 @@
import logging
import os
from pathlib import Path
from time import sleep
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.text import slugify
from django_q.tasks import async_task
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from documents.models import Tag
try:
from inotify_simple import INotify, flags
from inotifyrecursive import INotify, flags
except ImportError:
INotify = flags = None
logger = logging.getLogger(__name__)
def _consume(file):
try:
if os.path.isfile(file):
async_task("documents.tasks.consume_file",
file,
task_name=os.path.basename(file)[:100])
else:
logger.debug(
f"Not consuming file {file}: File has moved.")
def _tags_from_path(filepath):
"""Walk up the directory tree from filepath to CONSUMPTION_DIr
and get or create Tag IDs for every directory.
"""
tag_ids = set()
path_parts = Path(filepath).relative_to(
settings.CONSUMPTION_DIR).parent.parts
for part in path_parts:
tag_ids.add(Tag.objects.get_or_create(
slug=slugify(part),
defaults={"name": part},
)[0].pk)
return tag_ids
def _consume(filepath):
if not os.path.isfile(filepath):
logger.debug(
f"Not consuming file {filepath}: File has moved.")
return
tag_ids = None
try:
if settings.CONSUMER_SUBDIRS_AS_TAGS:
tag_ids = _tags_from_path(filepath)
except Exception as e:
logger.error(
"Error creating tags from path: {}".format(e))
try:
async_task("documents.tasks.consume_file",
filepath,
override_tag_ids=tag_ids if tag_ids else None,
task_name=os.path.basename(filepath)[:100])
except Exception as e:
# Catch all so that the consumer won't crash.
# This is also what the test case is listening for to check for
@ -94,6 +123,7 @@ class Command(BaseCommand):
def handle(self, *args, **options):
directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE
if not directory:
raise CommandError(
@ -104,24 +134,30 @@ class Command(BaseCommand):
raise CommandError(
f"Consumption directory {directory} does not exist")
for entry in os.scandir(directory):
_consume(entry.path)
if recursive:
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
_consume(filepath)
else:
for entry in os.scandir(directory):
_consume(entry.path)
if options["oneshot"]:
return
if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory)
self.handle_inotify(directory, recursive)
else:
self.handle_polling(directory)
self.handle_polling(directory, recursive)
logger.debug("Consumer exiting.")
def handle_polling(self, directory):
def handle_polling(self, directory, recursive):
logging.getLogger(__name__).info(
f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=False)
self.observer.schedule(Handler(), directory, recursive=recursive)
self.observer.start()
try:
while self.observer.is_alive():
@ -132,18 +168,26 @@ class Command(BaseCommand):
self.observer.stop()
self.observer.join()
def handle_inotify(self, directory):
def handle_inotify(self, directory, recursive):
logging.getLogger(__name__).info(
f"Using inotify to watch directory for changes: {directory}")
inotify = INotify()
descriptor = inotify.add_watch(
directory, flags.CLOSE_WRITE | flags.MOVED_TO)
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
if recursive:
descriptor = inotify.add_watch_recursive(directory, inotify_flags)
else:
descriptor = inotify.add_watch(directory, inotify_flags)
try:
while not self.stop_flag:
for event in inotify.read(timeout=1000, read_delay=1000):
file = os.path.join(directory, event.name)
_consume(file)
if recursive:
path = inotify.get_path(event.wd)
else:
path = directory
filepath = os.path.join(path, event.name)
_consume(filepath)
except KeyboardInterrupt:
pass

View File

@ -7,8 +7,9 @@ from unittest import mock
from django.conf import settings
from django.core.management import call_command, CommandError
from django.test import override_settings, TestCase
from django.test import override_settings, TransactionTestCase
from documents.models import Tag
from documents.consumer import ConsumerError
from documents.management.commands import document_consumer
from documents.tests.utils import DirectoriesMixin
@ -33,7 +34,7 @@ def chunked(size, source):
yield source[i:i+size]
class TestConsumer(DirectoriesMixin, TestCase):
class TestConsumer(DirectoriesMixin, TransactionTestCase):
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
@ -126,6 +127,43 @@ class TestConsumer(DirectoriesMixin, TestCase):
def test_consume_existing_file_polling(self):
self.test_consume_existing_file()
@override_settings(CONSUMER_RECURSIVE=1)
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=1)
def test_consume_file_with_path_tags(self):
tag_names = ("existingTag", "Space Tag")
# Create a Tag prior to consuming a file using it in path
tag_ids = [Tag.objects.create(name=tag_names[0]).pk,]
self.t_start()
path = os.path.join(self.dirs.consumption_dir, *tag_names)
os.makedirs(path, exist_ok=True)
f = os.path.join(path, "my_file.pdf")
# Wait at least inotify read_delay for recursive watchers
# to be created for the new directories
sleep(1)
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
# Add the pk of the Tag created by _consume()
tag_ids.append(Tag.objects.get(name=tag_names[1]).pk)
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
# assertCountEqual has a bad name, but test that the first
# sequence contains the same elements as second, regardless of
# their order.
self.assertCountEqual(kwargs["override_tag_ids"], tag_ids)
@override_settings(CONSUMER_POLLING=1)
def test_consume_file_with_path_tags_polling(self):
self.test_consume_file_with_path_tags()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):

View File

@ -331,6 +331,15 @@ CONSUMER_POLLING = int(os.getenv("PAPERLESS_CONSUMER_POLLING", 0))
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
# Consume from subdirectories of CONSUMPTION_DIR as well
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
# Set the names of subdirectories as tags for consumed files.
# E.g. $CONSUMPTION_DIR/foo/bar/file.pdf will add the tags "foo" and "bar" to
# the consumed file.
# PAPERLESS_CONSUMER_RECURSIVE must be enabled for this to work.
CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true")
OCR_PAGES = int(os.getenv('PAPERLESS_OCR_PAGES', 0))