Limit the number of threads waiting for files to be ready during polling

This commit is contained in:
Trenton H 2023-03-20 11:07:33 -07:00
parent 6a34a35585
commit 4a5f21dd87

View File

@ -1,10 +1,10 @@
import logging import logging
import os import os
from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter from fnmatch import filter
from pathlib import Path from pathlib import Path
from pathlib import PurePath from pathlib import PurePath
from threading import Event from threading import Event
from threading import Thread
from time import monotonic from time import monotonic
from time import sleep from time import sleep
from typing import Final from typing import Final
@ -168,11 +168,15 @@ def _consume_wait_unmodified(file: str) -> None:
class Handler(FileSystemEventHandler): class Handler(FileSystemEventHandler):
def __init__(self, pool: ThreadPoolExecutor) -> None:
super().__init__()
self._pool = pool
def on_created(self, event): def on_created(self, event):
Thread(target=_consume_wait_unmodified, args=(event.src_path,)).start() self._pool.submit(_consume_wait_unmodified, event.src_path)
def on_moved(self, event): def on_moved(self, event):
Thread(target=_consume_wait_unmodified, args=(event.dest_path,)).start() self._pool.submit(_consume_wait_unmodified, event.dest_path)
class Command(BaseCommand): class Command(BaseCommand):
@ -246,17 +250,18 @@ class Command(BaseCommand):
timeout = self.testing_timeout_s timeout = self.testing_timeout_s
logger.debug(f"Configuring timeout to {timeout}s") logger.debug(f"Configuring timeout to {timeout}s")
observer = PollingObserver(timeout=settings.CONSUMER_POLLING) with ThreadPoolExecutor(max_workers=4) as pool:
observer.schedule(Handler(), directory, recursive=recursive) observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
observer.start() observer.schedule(Handler(pool), directory, recursive=recursive)
try: observer.start()
while observer.is_alive(): try:
observer.join(timeout) while observer.is_alive():
if self.stop_flag.is_set(): observer.join(timeout)
observer.stop() if self.stop_flag.is_set():
except KeyboardInterrupt: observer.stop()
observer.stop() except KeyboardInterrupt:
observer.join() observer.stop()
observer.join()
def handle_inotify(self, directory, recursive, is_testing: bool): def handle_inotify(self, directory, recursive, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}") logger.info(f"Using inotify to watch directory for changes: {directory}")