updated consumer: now using watchdog

This commit is contained in:
Jonas Winkler
2020-11-01 23:07:54 +01:00
parent e326bc4f51
commit 6fd73a04b8
4 changed files with 130 additions and 224 deletions

View File

@@ -1,12 +1,13 @@
import logging
import os
import time
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import BaseCommand
from ...consumer import Consumer, ConsumerError
from ...mail import MailFetcher, MailFetcherError
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from documents.consumer import Consumer
try:
from inotify_simple import INotify, flags
@@ -14,6 +15,15 @@ except ImportError:
INotify = flags = None
class Handler(FileSystemEventHandler):
def __init__(self, consumer):
self.consumer = consumer
def on_created(self, event):
self.consumer.try_consume_file(event.src_path)
class Command(BaseCommand):
"""
On every iteration of an infinite loop, consume what we can from the
@@ -29,6 +39,8 @@ class Command(BaseCommand):
self.mail_fetcher = None
self.first_iteration = True
self.consumer = Consumer()
BaseCommand.__init__(self, *args, **kwargs)
def add_arguments(self, parser):
@@ -38,111 +50,34 @@ class Command(BaseCommand):
nargs="?",
help="The consumption directory."
)
parser.add_argument(
"--loop-time",
default=settings.CONSUMER_LOOP_TIME,
type=int,
help="Wait time between each loop (in seconds)."
)
parser.add_argument(
"--mail-delta",
default=10,
type=int,
help="Wait time between each mail fetch (in minutes)."
)
parser.add_argument(
"--oneshot",
action="store_true",
help="Run only once."
)
parser.add_argument(
"--no-inotify",
action="store_true",
help="Don't use inotify, even if it's available.",
default=False
)
def handle(self, *args, **options):
self.verbosity = options["verbosity"]
directory = options["directory"]
loop_time = options["loop_time"]
mail_delta = options["mail_delta"] * 60
use_inotify = INotify is not None and options["no_inotify"] is False
try:
self.file_consumer = Consumer(consume=directory)
self.mail_fetcher = MailFetcher(consume=directory)
except (ConsumerError, MailFetcherError) as e:
raise CommandError(e)
for d in (settings.ORIGINALS_DIR, settings.THUMBNAIL_DIR):
os.makedirs(d, exist_ok=True)
logging.getLogger(__name__).info(
"Starting document consumer at {}{}".format(
directory,
" with inotify" if use_inotify else ""
"Starting document consumer at {}".format(
directory
)
)
if options["oneshot"]:
self.loop_step(mail_delta)
else:
try:
if use_inotify:
self.loop_inotify(mail_delta)
else:
self.loop(loop_time, mail_delta)
except KeyboardInterrupt:
print("Exiting")
# Consume all files as this is not done initially by the watchdog
for entry in os.scandir(directory):
if entry.is_file():
self.consumer.try_consume_file(entry.path)
def loop(self, loop_time, mail_delta):
while True:
start_time = time.time()
if self.verbosity > 1:
print(".", int(start_time))
self.loop_step(mail_delta, start_time)
# Sleep until the start of the next loop step
time.sleep(max(0, start_time + loop_time - time.time()))
def loop_step(self, mail_delta, time_now=None):
# Occasionally fetch mail and store it to be consumed on the next loop
# We fetch email when we first start up so that it is not necessary to
# wait for 10 minutes after making changes to the config file.
next_mail_time = self.mail_fetcher.last_checked + mail_delta
if self.first_iteration or time_now > next_mail_time:
self.first_iteration = False
self.mail_fetcher.pull()
self.file_consumer.consume_new_files()
def loop_inotify(self, mail_delta):
directory = self.file_consumer.consume
inotify = INotify()
inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO)
# Run initial mail fetch and consume all currently existing documents
self.loop_step(mail_delta)
next_mail_time = self.mail_fetcher.last_checked + mail_delta
while True:
# Consume documents until next_mail_time
while True:
delta = next_mail_time - time.time()
if delta > 0:
for event in inotify.read(timeout=delta):
file = os.path.join(directory, event.name)
if os.path.isfile(file):
self.file_consumer.try_consume_file(file)
else:
self.logger.warning(
"Skipping %s as it is not a file",
file
)
else:
break
self.mail_fetcher.pull()
next_mail_time = self.mail_fetcher.last_checked + mail_delta
# Start the watchdog. Woof!
observer = Observer()
event_handler = Handler(self.consumer)
observer.schedule(event_handler, directory, recursive=True)
observer.start()
try:
while observer.is_alive():
observer.join(1)
except KeyboardInterrupt:
observer.stop()
observer.join()