Merge pull request #1421 from paperless-ngx/feature-consumer-event-driven

Feature: Event driven consumer
This commit is contained in:
shamoon
2022-08-25 08:16:47 -07:00
committed by GitHub
3 changed files with 76 additions and 35 deletions

View File

@@ -2,6 +2,7 @@ import logging
import os
from pathlib import Path
from pathlib import PurePath
from threading import Event
from threading import Thread
from time import monotonic
from time import sleep
@@ -148,9 +149,11 @@ class Command(BaseCommand):
"""
# This is here primarily for the tests and is irrelevant in production.
stop_flag = False
observer = None
stop_flag = Event()
# Also only for testing, configures in one place the timeout used before checking
# the stop flag
testing_timeout_s: Final[float] = 0.5
testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
def add_arguments(self, parser):
parser.add_argument(
@@ -161,6 +164,16 @@ class Command(BaseCommand):
)
parser.add_argument("--oneshot", action="store_true", help="Run only once.")
# Only use during unit testing, will configure a timeout
# Leaving it unset or false and the consumer will exit when it
# receives SIGINT
parser.add_argument(
"--testing",
action="store_true",
help="Flag used only for unit testing",
default=False,
)
def handle(self, *args, **options):
directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE
@@ -186,29 +199,40 @@ class Command(BaseCommand):
return
if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory, recursive)
self.handle_inotify(directory, recursive, options["testing"])
else:
self.handle_polling(directory, recursive)
self.handle_polling(directory, recursive, options["testing"])
logger.debug("Consumer exiting.")
def handle_polling(self, directory, recursive):
def handle_polling(self, directory, recursive, is_testing: bool):
logger.info(f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=recursive)
self.observer.start()
try:
while self.observer.is_alive():
self.observer.join(1)
if self.stop_flag:
self.observer.stop()
except KeyboardInterrupt:
self.observer.stop()
self.observer.join()
def handle_inotify(self, directory, recursive):
timeout = None
if is_testing:
timeout = self.testing_timeout_s
logger.debug(f"Configuring timeout to {timeout}s")
observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
observer.schedule(Handler(), directory, recursive=recursive)
observer.start()
try:
while observer.is_alive():
observer.join(timeout)
if self.stop_flag.is_set():
observer.stop()
except KeyboardInterrupt:
observer.stop()
observer.join()
def handle_inotify(self, directory, recursive, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}")
timeout = None
if is_testing:
timeout = self.testing_timeout_ms
logger.debug(f"Configuring timeout to {timeout}ms")
inotify = INotify()
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
if recursive:
@@ -216,14 +240,15 @@ class Command(BaseCommand):
else:
descriptor = inotify.add_watch(directory, inotify_flags)
try:
inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY
inotify_debounce: Final[float] = settings.CONSUMER_INOTIFY_DELAY
notified_files = {}
finished = False
while not self.stop_flag:
notified_files = {}
for event in inotify.read(timeout=1000):
while not finished:
try:
for event in inotify.read(timeout=timeout):
if recursive:
path = inotify.get_path(event.wd)
else:
@@ -256,8 +281,22 @@ class Command(BaseCommand):
# These files are still waiting to hit the timeout
notified_files = still_waiting
except KeyboardInterrupt:
pass
# If files are waiting, need to exit read() to check them
# Otherwise, go back to infinite sleep time, but only if not testing
if len(notified_files) > 0:
timeout = inotify_debounce
elif is_testing:
timeout = self.testing_timeout_ms
else:
timeout = None
if self.stop_flag.is_set():
logger.debug("Finishing because event is set")
finished = True
except KeyboardInterrupt:
logger.info("Received SIGINT, stopping inotify")
finished = True
inotify.rm_watch(descriptor)
inotify.close()