diff --git a/docker/supervisord.conf b/docker/supervisord.conf index c1681b7b3..21bbdd68d 100644 --- a/docker/supervisord.conf +++ b/docker/supervisord.conf @@ -19,6 +19,7 @@ stderr_logfile_maxbytes=0 [program:consumer] command=python3 manage.py document_consumer user=paperless +stopsignal=INT stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index baa14f166..1b232c072 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -2,6 +2,7 @@ import logging import os from pathlib import Path from pathlib import PurePath +from threading import Event from threading import Thread from time import monotonic from time import sleep @@ -148,9 +149,11 @@ class Command(BaseCommand): """ # This is here primarily for the tests and is irrelevant in production. - stop_flag = False - - observer = None + stop_flag = Event() + # Also only for testing, configures in one place the timeout used before checking + # the stop flag + testing_timeout_s: Final[float] = 0.5 + testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0 def add_arguments(self, parser): parser.add_argument( @@ -161,6 +164,16 @@ class Command(BaseCommand): ) parser.add_argument("--oneshot", action="store_true", help="Run only once.") + # Only use during unit testing, will configure a timeout + # Leaving it unset or false and the consumer will exit when it + # receives SIGINT + parser.add_argument( + "--testing", + action="store_true", + help="Flag used only for unit testing", + default=False, + ) + def handle(self, *args, **options): directory = options["directory"] recursive = settings.CONSUMER_RECURSIVE @@ -186,29 +199,40 @@ class Command(BaseCommand): return if settings.CONSUMER_POLLING == 0 and INotify: - self.handle_inotify(directory, recursive) + self.handle_inotify(directory, recursive, options["testing"]) else: - self.handle_polling(directory, recursive) + self.handle_polling(directory, recursive, options["testing"]) logger.debug("Consumer exiting.") - def handle_polling(self, directory, recursive): + def handle_polling(self, directory, recursive, is_testing: bool): logger.info(f"Polling directory for changes: {directory}") - self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING) - self.observer.schedule(Handler(), directory, recursive=recursive) - self.observer.start() - try: - while self.observer.is_alive(): - self.observer.join(1) - if self.stop_flag: - self.observer.stop() - except KeyboardInterrupt: - self.observer.stop() - self.observer.join() - def handle_inotify(self, directory, recursive): + timeout = None + if is_testing: + timeout = self.testing_timeout_s + logger.debug(f"Configuring timeout to {timeout}s") + + observer = PollingObserver(timeout=settings.CONSUMER_POLLING) + observer.schedule(Handler(), directory, recursive=recursive) + observer.start() + try: + while observer.is_alive(): + observer.join(timeout) + if self.stop_flag.is_set(): + observer.stop() + except KeyboardInterrupt: + observer.stop() + observer.join() + + def handle_inotify(self, directory, recursive, is_testing: bool): logger.info(f"Using inotify to watch directory for changes: {directory}") + timeout = None + if is_testing: + timeout = self.testing_timeout_ms + logger.debug(f"Configuring timeout to {timeout}ms") + inotify = INotify() inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO if recursive: @@ -216,14 +240,15 @@ class Command(BaseCommand): else: descriptor = inotify.add_watch(directory, inotify_flags) - try: + inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY - inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY - notified_files = {} + finished = False - while not self.stop_flag: + notified_files = {} - for event in inotify.read(timeout=1000): + while not finished: + try: + for event in inotify.read(timeout): if recursive: path = inotify.get_path(event.wd) else: @@ -256,8 +281,13 @@ class Command(BaseCommand): # These files are still waiting to hit the timeout notified_files = still_waiting - except KeyboardInterrupt: - pass + if self.stop_flag.is_set(): + logger.debug("Finishing because event is set") + finished = True + + except KeyboardInterrupt: + logger.info("Received SIGINT, stopping inotify") + finished = True inotify.rm_watch(descriptor) inotify.close() diff --git a/src/documents/tests/test_management_consumer.py b/src/documents/tests/test_management_consumer.py index 32b04282b..71cc97be1 100644 --- a/src/documents/tests/test_management_consumer.py +++ b/src/documents/tests/test_management_consumer.py @@ -2,6 +2,7 @@ import filecmp import os import shutil from threading import Thread +from time import monotonic from time import sleep from unittest import mock @@ -20,13 +21,14 @@ class ConsumerThread(Thread): def __init__(self): super().__init__() self.cmd = document_consumer.Command() + self.cmd.stop_flag.clear() def run(self) -> None: - self.cmd.handle(directory=settings.CONSUMPTION_DIR, oneshot=False) + self.cmd.handle(directory=settings.CONSUMPTION_DIR, oneshot=False, testing=True) def stop(self): # Consumer checks this every second. - self.cmd.stop_flag = True + self.cmd.stop_flag.set() def chunked(size, source): @@ -59,13 +61,14 @@ class ConsumerMixin: self.t.stop() # wait for the consumer to exit. self.t.join() + self.t = None super().tearDown() - def wait_for_task_mock_call(self, excpeted_call_count=1): + def wait_for_task_mock_call(self, expected_call_count=1): n = 0 - while n < 100: - if self.task_mock.call_count >= excpeted_call_count: + while n < 50: + if self.task_mock.call_count >= expected_call_count: # give task_mock some time to finish and raise errors sleep(1) return @@ -234,7 +237,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase): sleep(5) - self.wait_for_task_mock_call(excpeted_call_count=2) + self.wait_for_task_mock_call(expected_call_count=2) self.assertEqual(2, self.task_mock.call_count) @@ -281,7 +284,7 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase): @override_settings( CONSUMER_POLLING=1, - CONSUMER_POLLING_DELAY=3, + CONSUMER_POLLING_DELAY=1, CONSUMER_POLLING_RETRY_COUNT=20, ) class TestConsumerPolling(TestConsumer): @@ -298,7 +301,7 @@ class TestConsumerRecursive(TestConsumer): @override_settings( CONSUMER_RECURSIVE=True, CONSUMER_POLLING=1, - CONSUMER_POLLING_DELAY=3, + CONSUMER_POLLING_DELAY=1, CONSUMER_POLLING_RETRY_COUNT=20, ) class TestConsumerRecursivePolling(TestConsumer): @@ -307,8 +310,7 @@ class TestConsumerRecursivePolling(TestConsumer): class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase): - @override_settings(CONSUMER_RECURSIVE=True) - @override_settings(CONSUMER_SUBDIRS_AS_TAGS=True) + @override_settings(CONSUMER_RECURSIVE=True, CONSUMER_SUBDIRS_AS_TAGS=True) def test_consume_file_with_path_tags(self): tag_names = ("existingTag", "Space Tag")