Merge pull request #69 from jayme-github/feature-directory-tags

Create tags from sub directories
This commit is contained in:
jonaswinkler 2020-11-30 22:53:52 +01:00 committed by GitHub
commit c5dbd7a6fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 24 deletions

View File

@ -38,7 +38,7 @@ scikit-learn="~=0.23.2"
whitenoise = "~=5.2.0" whitenoise = "~=5.2.0"
watchdog = "*" watchdog = "*"
whoosh="~=2.7.4" whoosh="~=2.7.4"
inotify-simple = "*" inotifyrecursive = ">=0.3.4"
[dev-packages] [dev-packages]
coveralls = "*" coveralls = "*"

View File

@ -1,31 +1,60 @@
import logging import logging
import os import os
from pathlib import Path
from time import sleep from time import sleep
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand, CommandError
from django.utils.text import slugify
from django_q.tasks import async_task from django_q.tasks import async_task
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver from watchdog.observers.polling import PollingObserver
from documents.models import Tag
try: try:
from inotify_simple import INotify, flags from inotifyrecursive import INotify, flags
except ImportError: except ImportError:
INotify = flags = None INotify = flags = None
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _consume(file): def _tags_from_path(filepath):
try: """Walk up the directory tree from filepath to CONSUMPTION_DIr
if os.path.isfile(file): and get or create Tag IDs for every directory.
async_task("documents.tasks.consume_file", """
file, tag_ids = set()
task_name=os.path.basename(file)[:100]) path_parts = Path(filepath).relative_to(
else: settings.CONSUMPTION_DIR).parent.parts
logger.debug( for part in path_parts:
f"Not consuming file {file}: File has moved.") tag_ids.add(Tag.objects.get_or_create(
slug=slugify(part),
defaults={"name": part},
)[0].pk)
return tag_ids
def _consume(filepath):
if not os.path.isfile(filepath):
logger.debug(
f"Not consuming file {filepath}: File has moved.")
return
tag_ids = None
try:
if settings.CONSUMER_SUBDIRS_AS_TAGS:
tag_ids = _tags_from_path(filepath)
except Exception as e:
logger.error(
"Error creating tags from path: {}".format(e))
try:
async_task("documents.tasks.consume_file",
filepath,
override_tag_ids=tag_ids if tag_ids else None,
task_name=os.path.basename(filepath)[:100])
except Exception as e: except Exception as e:
# Catch all so that the consumer won't crash. # Catch all so that the consumer won't crash.
# This is also what the test case is listening for to check for # This is also what the test case is listening for to check for
@ -94,6 +123,7 @@ class Command(BaseCommand):
def handle(self, *args, **options): def handle(self, *args, **options):
directory = options["directory"] directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE
if not directory: if not directory:
raise CommandError( raise CommandError(
@ -104,24 +134,30 @@ class Command(BaseCommand):
raise CommandError( raise CommandError(
f"Consumption directory {directory} does not exist") f"Consumption directory {directory} does not exist")
for entry in os.scandir(directory): if recursive:
_consume(entry.path) for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
_consume(filepath)
else:
for entry in os.scandir(directory):
_consume(entry.path)
if options["oneshot"]: if options["oneshot"]:
return return
if settings.CONSUMER_POLLING == 0 and INotify: if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory) self.handle_inotify(directory, recursive)
else: else:
self.handle_polling(directory) self.handle_polling(directory, recursive)
logger.debug("Consumer exiting.") logger.debug("Consumer exiting.")
def handle_polling(self, directory): def handle_polling(self, directory, recursive):
logging.getLogger(__name__).info( logging.getLogger(__name__).info(
f"Polling directory for changes: {directory}") f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING) self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=False) self.observer.schedule(Handler(), directory, recursive=recursive)
self.observer.start() self.observer.start()
try: try:
while self.observer.is_alive(): while self.observer.is_alive():
@ -132,18 +168,26 @@ class Command(BaseCommand):
self.observer.stop() self.observer.stop()
self.observer.join() self.observer.join()
def handle_inotify(self, directory): def handle_inotify(self, directory, recursive):
logging.getLogger(__name__).info( logging.getLogger(__name__).info(
f"Using inotify to watch directory for changes: {directory}") f"Using inotify to watch directory for changes: {directory}")
inotify = INotify() inotify = INotify()
descriptor = inotify.add_watch( inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
directory, flags.CLOSE_WRITE | flags.MOVED_TO) if recursive:
descriptor = inotify.add_watch_recursive(directory, inotify_flags)
else:
descriptor = inotify.add_watch(directory, inotify_flags)
try: try:
while not self.stop_flag: while not self.stop_flag:
for event in inotify.read(timeout=1000, read_delay=1000): for event in inotify.read(timeout=1000, read_delay=1000):
file = os.path.join(directory, event.name) if recursive:
_consume(file) path = inotify.get_path(event.wd)
else:
path = directory
filepath = os.path.join(path, event.name)
_consume(filepath)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -7,8 +7,9 @@ from unittest import mock
from django.conf import settings from django.conf import settings
from django.core.management import call_command, CommandError from django.core.management import call_command, CommandError
from django.test import override_settings, TestCase from django.test import override_settings, TransactionTestCase
from documents.models import Tag
from documents.consumer import ConsumerError from documents.consumer import ConsumerError
from documents.management.commands import document_consumer from documents.management.commands import document_consumer
from documents.tests.utils import DirectoriesMixin from documents.tests.utils import DirectoriesMixin
@ -33,7 +34,7 @@ def chunked(size, source):
yield source[i:i+size] yield source[i:i+size]
class TestConsumer(DirectoriesMixin, TestCase): class TestConsumer(DirectoriesMixin, TransactionTestCase):
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
@ -126,6 +127,43 @@ class TestConsumer(DirectoriesMixin, TestCase):
def test_consume_existing_file_polling(self): def test_consume_existing_file_polling(self):
self.test_consume_existing_file() self.test_consume_existing_file()
@override_settings(CONSUMER_RECURSIVE=1)
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=1)
def test_consume_file_with_path_tags(self):
tag_names = ("existingTag", "Space Tag")
# Create a Tag prior to consuming a file using it in path
tag_ids = [Tag.objects.create(name=tag_names[0]).pk,]
self.t_start()
path = os.path.join(self.dirs.consumption_dir, *tag_names)
os.makedirs(path, exist_ok=True)
f = os.path.join(path, "my_file.pdf")
# Wait at least inotify read_delay for recursive watchers
# to be created for the new directories
sleep(1)
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
# Add the pk of the Tag created by _consume()
tag_ids.append(Tag.objects.get(name=tag_names[1]).pk)
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f)
# assertCountEqual has a bad name, but test that the first
# sequence contains the same elements as second, regardless of
# their order.
self.assertCountEqual(kwargs["override_tag_ids"], tag_ids)
@override_settings(CONSUMER_POLLING=1)
def test_consume_file_with_path_tags_polling(self):
self.test_consume_file_with_path_tags()
@mock.patch("documents.management.commands.document_consumer.logger.error") @mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger): def test_slow_write_pdf(self, error_logger):

View File

@ -331,6 +331,15 @@ CONSUMER_POLLING = int(os.getenv("PAPERLESS_CONSUMER_POLLING", 0))
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES") CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
# Consume from subdirectories of CONSUMPTION_DIR as well
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
# Set the names of subdirectories as tags for consumed files.
# E.g. $CONSUMPTION_DIR/foo/bar/file.pdf will add the tags "foo" and "bar" to
# the consumed file.
# PAPERLESS_CONSUMER_RECURSIVE must be enabled for this to work.
CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true") OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true")
OCR_PAGES = int(os.getenv('PAPERLESS_OCR_PAGES', 0)) OCR_PAGES = int(os.getenv('PAPERLESS_OCR_PAGES', 0))