mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	Create tags from sub directories
The names of sub directories in the consumer directory will be added as tags for the document to be consumed. To enable this, set: PAPERLESS_CONSUMER_RECURSIVE=1 PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS=1 Fixes #50
This commit is contained in:
		
							
								
								
									
										2
									
								
								Pipfile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Pipfile
									
									
									
									
									
								
							@@ -38,7 +38,7 @@ scikit-learn="~=0.23.2"
 | 
			
		||||
whitenoise = "~=5.2.0"
 | 
			
		||||
watchdog = "*"
 | 
			
		||||
whoosh="~=2.7.4"
 | 
			
		||||
inotify-simple = "*"
 | 
			
		||||
inotifyrecursive = ">=0.3.4"
 | 
			
		||||
 | 
			
		||||
[dev-packages]
 | 
			
		||||
coveralls = "*"
 | 
			
		||||
 
 | 
			
		||||
@@ -1,31 +1,60 @@
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
from time import sleep
 | 
			
		||||
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.core.management.base import BaseCommand, CommandError
 | 
			
		||||
from django.utils.text import slugify
 | 
			
		||||
from django_q.tasks import async_task
 | 
			
		||||
from watchdog.events import FileSystemEventHandler
 | 
			
		||||
from watchdog.observers.polling import PollingObserver
 | 
			
		||||
 | 
			
		||||
from documents.models import Tag
 | 
			
		||||
 | 
			
		||||
try:
 | 
			
		||||
    from inotify_simple import INotify, flags
 | 
			
		||||
    from inotifyrecursive import INotify, flags
 | 
			
		||||
except ImportError:
 | 
			
		||||
    INotify = flags = None
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _consume(file):
 | 
			
		||||
    try:
 | 
			
		||||
        if os.path.isfile(file):
 | 
			
		||||
            async_task("documents.tasks.consume_file",
 | 
			
		||||
                       file,
 | 
			
		||||
                       task_name=os.path.basename(file)[:100])
 | 
			
		||||
        else:
 | 
			
		||||
            logger.debug(
 | 
			
		||||
                f"Not consuming file {file}: File has moved.")
 | 
			
		||||
def _tags_from_path(filepath):
 | 
			
		||||
    """Walk up the directory tree from filepath to CONSUMPTION_DIr
 | 
			
		||||
       and get or create Tag IDs for every directory.
 | 
			
		||||
    """
 | 
			
		||||
    tag_ids = set()
 | 
			
		||||
    path_parts = Path(filepath).relative_to(
 | 
			
		||||
                settings.CONSUMPTION_DIR).parent.parts
 | 
			
		||||
    for part in path_parts:
 | 
			
		||||
        tag_ids.add(Tag.objects.get_or_create(
 | 
			
		||||
            slug=slugify(part),
 | 
			
		||||
            defaults={"name": part},
 | 
			
		||||
        )[0].pk)
 | 
			
		||||
 | 
			
		||||
    return tag_ids
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _consume(filepath):
 | 
			
		||||
    if not os.path.isfile(filepath):
 | 
			
		||||
        logger.debug(
 | 
			
		||||
            f"Not consuming file {filepath}: File has moved.")
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    tag_ids = None
 | 
			
		||||
    try:
 | 
			
		||||
        if settings.CONSUMER_SUBDIRS_AS_TAGS:
 | 
			
		||||
            tag_ids = _tags_from_path(filepath)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(
 | 
			
		||||
            "Error creating tags from path: {}".format(e))
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        async_task("documents.tasks.consume_file",
 | 
			
		||||
                   filepath,
 | 
			
		||||
                   override_tag_ids=tag_ids if tag_ids else None,
 | 
			
		||||
                   task_name=os.path.basename(filepath)[:100])
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        # Catch all so that the consumer won't crash.
 | 
			
		||||
        # This is also what the test case is listening for to check for
 | 
			
		||||
@@ -94,6 +123,7 @@ class Command(BaseCommand):
 | 
			
		||||
 | 
			
		||||
    def handle(self, *args, **options):
 | 
			
		||||
        directory = options["directory"]
 | 
			
		||||
        recursive = settings.CONSUMER_RECURSIVE
 | 
			
		||||
 | 
			
		||||
        if not directory:
 | 
			
		||||
            raise CommandError(
 | 
			
		||||
@@ -104,24 +134,30 @@ class Command(BaseCommand):
 | 
			
		||||
            raise CommandError(
 | 
			
		||||
                f"Consumption directory {directory} does not exist")
 | 
			
		||||
 | 
			
		||||
        for entry in os.scandir(directory):
 | 
			
		||||
            _consume(entry.path)
 | 
			
		||||
        if recursive:
 | 
			
		||||
            for dirpath, _, filenames in os.walk(directory):
 | 
			
		||||
                for filename in filenames:
 | 
			
		||||
                    filepath = os.path.join(dirpath, filename)
 | 
			
		||||
                    _consume(filepath)
 | 
			
		||||
        else:
 | 
			
		||||
            for entry in os.scandir(directory):
 | 
			
		||||
                _consume(entry.path)
 | 
			
		||||
 | 
			
		||||
        if options["oneshot"]:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if settings.CONSUMER_POLLING == 0 and INotify:
 | 
			
		||||
            self.handle_inotify(directory)
 | 
			
		||||
            self.handle_inotify(directory, recursive)
 | 
			
		||||
        else:
 | 
			
		||||
            self.handle_polling(directory)
 | 
			
		||||
            self.handle_polling(directory, recursive)
 | 
			
		||||
 | 
			
		||||
        logger.debug("Consumer exiting.")
 | 
			
		||||
 | 
			
		||||
    def handle_polling(self, directory):
 | 
			
		||||
    def handle_polling(self, directory, recursive):
 | 
			
		||||
        logging.getLogger(__name__).info(
 | 
			
		||||
            f"Polling directory for changes: {directory}")
 | 
			
		||||
        self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
 | 
			
		||||
        self.observer.schedule(Handler(), directory, recursive=False)
 | 
			
		||||
        self.observer.schedule(Handler(), directory, recursive=recursive)
 | 
			
		||||
        self.observer.start()
 | 
			
		||||
        try:
 | 
			
		||||
            while self.observer.is_alive():
 | 
			
		||||
@@ -132,18 +168,26 @@ class Command(BaseCommand):
 | 
			
		||||
            self.observer.stop()
 | 
			
		||||
        self.observer.join()
 | 
			
		||||
 | 
			
		||||
    def handle_inotify(self, directory):
 | 
			
		||||
    def handle_inotify(self, directory, recursive):
 | 
			
		||||
        logging.getLogger(__name__).info(
 | 
			
		||||
            f"Using inotify to watch directory for changes: {directory}")
 | 
			
		||||
 | 
			
		||||
        inotify = INotify()
 | 
			
		||||
        descriptor = inotify.add_watch(
 | 
			
		||||
            directory, flags.CLOSE_WRITE | flags.MOVED_TO)
 | 
			
		||||
        inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
 | 
			
		||||
        if recursive:
 | 
			
		||||
            descriptor = inotify.add_watch_recursive(directory, inotify_flags)
 | 
			
		||||
        else:
 | 
			
		||||
            descriptor = inotify.add_watch(directory, inotify_flags)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            while not self.stop_flag:
 | 
			
		||||
                for event in inotify.read(timeout=1000, read_delay=1000):
 | 
			
		||||
                    file = os.path.join(directory, event.name)
 | 
			
		||||
                    _consume(file)
 | 
			
		||||
                    if recursive:
 | 
			
		||||
                        path = inotify.get_path(event.wd)
 | 
			
		||||
                    else:
 | 
			
		||||
                        path = directory
 | 
			
		||||
                    filepath = os.path.join(path, event.name)
 | 
			
		||||
                    _consume(filepath)
 | 
			
		||||
        except KeyboardInterrupt:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -7,8 +7,9 @@ from unittest import mock
 | 
			
		||||
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.core.management import call_command, CommandError
 | 
			
		||||
from django.test import override_settings, TestCase
 | 
			
		||||
from django.test import override_settings, TransactionTestCase
 | 
			
		||||
 | 
			
		||||
from documents.models import Tag
 | 
			
		||||
from documents.consumer import ConsumerError
 | 
			
		||||
from documents.management.commands import document_consumer
 | 
			
		||||
from documents.tests.utils import DirectoriesMixin
 | 
			
		||||
@@ -33,7 +34,7 @@ def chunked(size, source):
 | 
			
		||||
        yield source[i:i+size]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestConsumer(DirectoriesMixin, TestCase):
 | 
			
		||||
class TestConsumer(DirectoriesMixin, TransactionTestCase):
 | 
			
		||||
 | 
			
		||||
    sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
 | 
			
		||||
 | 
			
		||||
@@ -126,6 +127,43 @@ class TestConsumer(DirectoriesMixin, TestCase):
 | 
			
		||||
    def test_consume_existing_file_polling(self):
 | 
			
		||||
        self.test_consume_existing_file()
 | 
			
		||||
 | 
			
		||||
    @override_settings(CONSUMER_RECURSIVE=1)
 | 
			
		||||
    @override_settings(CONSUMER_SUBDIRS_AS_TAGS=1)
 | 
			
		||||
    def test_consume_file_with_path_tags(self):
 | 
			
		||||
 | 
			
		||||
        tag_names = ("existingTag", "Space Tag")
 | 
			
		||||
        # Create a Tag prior to consuming a file using it in path
 | 
			
		||||
        tag_ids = [Tag.objects.create(name=tag_names[0]).pk,]
 | 
			
		||||
 | 
			
		||||
        self.t_start()
 | 
			
		||||
 | 
			
		||||
        path = os.path.join(self.dirs.consumption_dir, *tag_names)
 | 
			
		||||
        os.makedirs(path, exist_ok=True)
 | 
			
		||||
        f = os.path.join(path, "my_file.pdf")
 | 
			
		||||
        # Wait at least inotify read_delay for recursive watchers
 | 
			
		||||
        # to be created for the new directories
 | 
			
		||||
        sleep(1)
 | 
			
		||||
        shutil.copy(self.sample_file, f)
 | 
			
		||||
 | 
			
		||||
        self.wait_for_task_mock_call()
 | 
			
		||||
 | 
			
		||||
        self.task_mock.assert_called_once()
 | 
			
		||||
 | 
			
		||||
        # Add the pk of the Tag created by _consume()
 | 
			
		||||
        tag_ids.append(Tag.objects.get(name=tag_names[1]).pk)
 | 
			
		||||
 | 
			
		||||
        args, kwargs = self.task_mock.call_args
 | 
			
		||||
        self.assertEqual(args[1], f)
 | 
			
		||||
 | 
			
		||||
        # assertCountEqual has a bad name, but test that the first
 | 
			
		||||
        # sequence contains the same elements as second, regardless of
 | 
			
		||||
        # their order.
 | 
			
		||||
        self.assertCountEqual(kwargs["override_tag_ids"], tag_ids)
 | 
			
		||||
 | 
			
		||||
    @override_settings(CONSUMER_POLLING=1)
 | 
			
		||||
    def test_consume_file_with_path_tags_polling(self):
 | 
			
		||||
        self.test_consume_file_with_path_tags()
 | 
			
		||||
 | 
			
		||||
    @mock.patch("documents.management.commands.document_consumer.logger.error")
 | 
			
		||||
    def test_slow_write_pdf(self, error_logger):
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -331,6 +331,15 @@ CONSUMER_POLLING = int(os.getenv("PAPERLESS_CONSUMER_POLLING", 0))
 | 
			
		||||
 | 
			
		||||
CONSUMER_DELETE_DUPLICATES = __get_boolean("PAPERLESS_CONSUMER_DELETE_DUPLICATES")
 | 
			
		||||
 | 
			
		||||
# Consume from subdirectories of CONSUMPTION_DIR as well
 | 
			
		||||
CONSUMER_RECURSIVE = __get_boolean("PAPERLESS_CONSUMER_RECURSIVE")
 | 
			
		||||
 | 
			
		||||
# Set the names of subdirectories as tags for consumed files.
 | 
			
		||||
# E.g. $CONSUMPTION_DIR/foo/bar/file.pdf will add the tags "foo" and "bar" to
 | 
			
		||||
# the consumed file.
 | 
			
		||||
# PAPERLESS_CONSUMER_RECURSIVE must be enabled for this to work.
 | 
			
		||||
CONSUMER_SUBDIRS_AS_TAGS = __get_boolean("PAPERLESS_CONSUMER_SUBDIRS_AS_TAGS")
 | 
			
		||||
 | 
			
		||||
OPTIMIZE_THUMBNAILS = __get_boolean("PAPERLESS_OPTIMIZE_THUMBNAILS", "true")
 | 
			
		||||
 | 
			
		||||
OCR_PAGES = int(os.getenv('PAPERLESS_OCR_PAGES', 0))
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user