From 4a5f21dd8769a6701df7be2ecb7df084d3bab359 Mon Sep 17 00:00:00 2001 From: Trenton H <797416+stumpylog@users.noreply.github.com> Date: Mon, 20 Mar 2023 11:07:33 -0700 Subject: [PATCH] Limit the number of threads waiting for files to be ready during polling --- .../management/commands/document_consumer.py | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index c3f6bbed4..d4ace3f1b 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -1,10 +1,10 @@ import logging import os +from concurrent.futures import ThreadPoolExecutor from fnmatch import filter from pathlib import Path from pathlib import PurePath from threading import Event -from threading import Thread from time import monotonic from time import sleep from typing import Final @@ -168,11 +168,15 @@ def _consume_wait_unmodified(file: str) -> None: class Handler(FileSystemEventHandler): + def __init__(self, pool: ThreadPoolExecutor) -> None: + super().__init__() + self._pool = pool + def on_created(self, event): - Thread(target=_consume_wait_unmodified, args=(event.src_path,)).start() + self._pool.submit(_consume_wait_unmodified, event.src_path) def on_moved(self, event): - Thread(target=_consume_wait_unmodified, args=(event.dest_path,)).start() + self._pool.submit(_consume_wait_unmodified, event.dest_path) class Command(BaseCommand): @@ -246,17 +250,18 @@ class Command(BaseCommand): timeout = self.testing_timeout_s logger.debug(f"Configuring timeout to {timeout}s") - observer = PollingObserver(timeout=settings.CONSUMER_POLLING) - observer.schedule(Handler(), directory, recursive=recursive) - observer.start() - try: - while observer.is_alive(): - observer.join(timeout) - if self.stop_flag.is_set(): - observer.stop() - except KeyboardInterrupt: - observer.stop() - observer.join() + with ThreadPoolExecutor(max_workers=4) as pool: + observer = PollingObserver(timeout=settings.CONSUMER_POLLING) + observer.schedule(Handler(pool), directory, recursive=recursive) + observer.start() + try: + while observer.is_alive(): + observer.join(timeout) + if self.stop_flag.is_set(): + observer.stop() + except KeyboardInterrupt: + observer.stop() + observer.join() def handle_inotify(self, directory, recursive, is_testing: bool): logger.info(f"Using inotify to watch directory for changes: {directory}")