Merge pull request #1421 from paperless-ngx/feature-consumer-event-driven

Feature: Event driven consumer
This commit is contained in:
shamoon 2022-08-25 08:16:47 -07:00 committed by GitHub
commit bb951ad860
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 35 deletions

View File

@ -19,6 +19,7 @@ stderr_logfile_maxbytes=0
[program:consumer] [program:consumer]
command=python3 manage.py document_consumer command=python3 manage.py document_consumer
user=paperless user=paperless
stopsignal=INT
stdout_logfile=/dev/stdout stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0 stdout_logfile_maxbytes=0

View File

@ -2,6 +2,7 @@ import logging
import os import os
from pathlib import Path from pathlib import Path
from pathlib import PurePath from pathlib import PurePath
from threading import Event
from threading import Thread from threading import Thread
from time import monotonic from time import monotonic
from time import sleep from time import sleep
@ -148,9 +149,11 @@ class Command(BaseCommand):
""" """
# This is here primarily for the tests and is irrelevant in production. # This is here primarily for the tests and is irrelevant in production.
stop_flag = False stop_flag = Event()
# Also only for testing, configures in one place the timeout used before checking
observer = None # the stop flag
testing_timeout_s: Final[float] = 0.5
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
def add_arguments(self, parser): def add_arguments(self, parser):
parser.add_argument( parser.add_argument(
@ -161,6 +164,16 @@ class Command(BaseCommand):
) )
parser.add_argument("--oneshot", action="store_true", help="Run only once.") parser.add_argument("--oneshot", action="store_true", help="Run only once.")
# Only use during unit testing, will configure a timeout
# Leaving it unset or false and the consumer will exit when it
# receives SIGINT
parser.add_argument(
"--testing",
action="store_true",
help="Flag used only for unit testing",
default=False,
)
def handle(self, *args, **options): def handle(self, *args, **options):
directory = options["directory"] directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE recursive = settings.CONSUMER_RECURSIVE
@ -186,29 +199,40 @@ class Command(BaseCommand):
return return
if settings.CONSUMER_POLLING == 0 and INotify: if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory, recursive) self.handle_inotify(directory, recursive, options["testing"])
else: else:
self.handle_polling(directory, recursive) self.handle_polling(directory, recursive, options["testing"])
logger.debug("Consumer exiting.") logger.debug("Consumer exiting.")
def handle_polling(self, directory, recursive): def handle_polling(self, directory, recursive, is_testing: bool):
logger.info(f"Polling directory for changes: {directory}") logger.info(f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=recursive)
self.observer.start()
try:
while self.observer.is_alive():
self.observer.join(1)
if self.stop_flag:
self.observer.stop()
except KeyboardInterrupt:
self.observer.stop()
self.observer.join()
def handle_inotify(self, directory, recursive): timeout = None
if is_testing:
timeout = self.testing_timeout_s
logger.debug(f"Configuring timeout to {timeout}s")
observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
observer.schedule(Handler(), directory, recursive=recursive)
observer.start()
try:
while observer.is_alive():
observer.join(timeout)
if self.stop_flag.is_set():
observer.stop()
except KeyboardInterrupt:
observer.stop()
observer.join()
def handle_inotify(self, directory, recursive, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}") logger.info(f"Using inotify to watch directory for changes: {directory}")
timeout = None
if is_testing:
timeout = self.testing_timeout_ms
logger.debug(f"Configuring timeout to {timeout}ms")
inotify = INotify() inotify = INotify()
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
if recursive: if recursive:
@ -216,14 +240,15 @@ class Command(BaseCommand):
else: else:
descriptor = inotify.add_watch(directory, inotify_flags) descriptor = inotify.add_watch(directory, inotify_flags)
try: inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY
inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY finished = False
notified_files = {}
while not self.stop_flag: notified_files = {}
for event in inotify.read(timeout=1000): while not finished:
try:
for event in inotify.read(timeout=timeout):
if recursive: if recursive:
path = inotify.get_path(event.wd) path = inotify.get_path(event.wd)
else: else:
@ -256,8 +281,22 @@ class Command(BaseCommand):
# These files are still waiting to hit the timeout # These files are still waiting to hit the timeout
notified_files = still_waiting notified_files = still_waiting
except KeyboardInterrupt: # If files are waiting, need to exit read() to check them
pass # Otherwise, go back to infinite sleep time, but only if not testing
if len(notified_files) > 0:
timeout = inotify_debounce
elif is_testing:
timeout = self.testing_timeout_ms
else:
timeout = None
if self.stop_flag.is_set():
logger.debug("Finishing because event is set")
finished = True
except KeyboardInterrupt:
logger.info("Received SIGINT, stopping inotify")
finished = True
inotify.rm_watch(descriptor) inotify.rm_watch(descriptor)
inotify.close() inotify.close()

View File

@ -20,13 +20,14 @@ class ConsumerThread(Thread):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.cmd = document_consumer.Command() self.cmd = document_consumer.Command()
self.cmd.stop_flag.clear()
def run(self) -> None: def run(self) -> None:
self.cmd.handle(directory=settings.CONSUMPTION_DIR, oneshot=False) self.cmd.handle(directory=settings.CONSUMPTION_DIR, oneshot=False, testing=True)
def stop(self): def stop(self):
# Consumer checks this every second. # Consumer checks this every second.
self.cmd.stop_flag = True self.cmd.stop_flag.set()
def chunked(size, source): def chunked(size, source):
@ -59,13 +60,14 @@ class ConsumerMixin:
self.t.stop() self.t.stop()
# wait for the consumer to exit. # wait for the consumer to exit.
self.t.join() self.t.join()
self.t = None
super().tearDown() super().tearDown()
def wait_for_task_mock_call(self, excpeted_call_count=1): def wait_for_task_mock_call(self, expected_call_count=1):
n = 0 n = 0
while n < 100: while n < 50:
if self.task_mock.call_count >= excpeted_call_count: if self.task_mock.call_count >= expected_call_count:
# give task_mock some time to finish and raise errors # give task_mock some time to finish and raise errors
sleep(1) sleep(1)
return return
@ -234,7 +236,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
sleep(5) sleep(5)
self.wait_for_task_mock_call(excpeted_call_count=2) self.wait_for_task_mock_call(expected_call_count=2)
self.assertEqual(2, self.task_mock.call_count) self.assertEqual(2, self.task_mock.call_count)
@ -281,7 +283,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
@override_settings( @override_settings(
CONSUMER_POLLING=1, CONSUMER_POLLING=1,
CONSUMER_POLLING_DELAY=3, CONSUMER_POLLING_DELAY=1,
CONSUMER_POLLING_RETRY_COUNT=20, CONSUMER_POLLING_RETRY_COUNT=20,
) )
class TestConsumerPolling(TestConsumer): class TestConsumerPolling(TestConsumer):
@ -298,7 +300,7 @@ class TestConsumerRecursive(TestConsumer):
@override_settings( @override_settings(
CONSUMER_RECURSIVE=True, CONSUMER_RECURSIVE=True,
CONSUMER_POLLING=1, CONSUMER_POLLING=1,
CONSUMER_POLLING_DELAY=3, CONSUMER_POLLING_DELAY=1,
CONSUMER_POLLING_RETRY_COUNT=20, CONSUMER_POLLING_RETRY_COUNT=20,
) )
class TestConsumerRecursivePolling(TestConsumer): class TestConsumerRecursivePolling(TestConsumer):
@ -307,8 +309,7 @@ class TestConsumerRecursivePolling(TestConsumer):
class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase): class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
@override_settings(CONSUMER_RECURSIVE=True) @override_settings(CONSUMER_RECURSIVE=True, CONSUMER_SUBDIRS_AS_TAGS=True)
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=True)
def test_consume_file_with_path_tags(self): def test_consume_file_with_path_tags(self):
tag_names = ("existingTag", "Space Tag") tag_names = ("existingTag", "Space Tag")