diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index c3f6bbed4..d4ace3f1b 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -1,10 +1,10 @@ import logging import os +from concurrent.futures import ThreadPoolExecutor from fnmatch import filter from pathlib import Path from pathlib import PurePath from threading import Event -from threading import Thread from time import monotonic from time import sleep from typing import Final @@ -168,11 +168,15 @@ def _consume_wait_unmodified(file: str) -> None: class Handler(FileSystemEventHandler): + def __init__(self, pool: ThreadPoolExecutor) -> None: + super().__init__() + self._pool = pool + def on_created(self, event): - Thread(target=_consume_wait_unmodified, args=(event.src_path,)).start() + self._pool.submit(_consume_wait_unmodified, event.src_path) def on_moved(self, event): - Thread(target=_consume_wait_unmodified, args=(event.dest_path,)).start() + self._pool.submit(_consume_wait_unmodified, event.dest_path) class Command(BaseCommand): @@ -246,17 +250,18 @@ class Command(BaseCommand): timeout = self.testing_timeout_s logger.debug(f"Configuring timeout to {timeout}s") - observer = PollingObserver(timeout=settings.CONSUMER_POLLING) - observer.schedule(Handler(), directory, recursive=recursive) - observer.start() - try: - while observer.is_alive(): - observer.join(timeout) - if self.stop_flag.is_set(): - observer.stop() - except KeyboardInterrupt: - observer.stop() - observer.join() + with ThreadPoolExecutor(max_workers=4) as pool: + observer = PollingObserver(timeout=settings.CONSUMER_POLLING) + observer.schedule(Handler(pool), directory, recursive=recursive) + observer.start() + try: + while observer.is_alive(): + observer.join(timeout) + if self.stop_flag.is_set(): + observer.stop() + except KeyboardInterrupt: + observer.stop() + observer.join() def handle_inotify(self, directory, recursive, is_testing: bool): logger.info(f"Using inotify to watch directory for changes: {directory}")