Apparently there was a very good reason to use inotify. fixes #46 complete with test cases for inotify and polling.

This commit is contained in:
jonaswinkler
2020-11-26 17:41:50 +01:00
parent 43b473dc53
commit 75390693b9
4 changed files with 293 additions and 42 deletions

View File

@@ -1,11 +1,11 @@
import logging
import os
from time import sleep
from django.conf import settings
from django.core.management.base import BaseCommand
from django_q.tasks import async_task
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
try:
@@ -13,25 +13,54 @@ try:
except ImportError:
INotify = flags = None
logger = logging.getLogger(__name__)
def _consume(file):
try:
if os.path.isfile(file):
async_task("documents.tasks.consume_file",
file,
task_name=os.path.basename(file)[:100])
else:
logger.debug(
f"Not consuming file {file}: File has moved.")
except Exception as e:
# Catch all so that the consumer won't crash.
# This is also what the test case is listening for to check for
# errors.
logger.error(
"Error while consuming document: {}".format(e))
def _consume_wait_unmodified(file, num_tries=20, wait_time=1):
mtime = -1
current_try = 0
while current_try < num_tries:
try:
new_mtime = os.stat(file).st_mtime
except FileNotFoundError:
logger.debug(f"File {file} moved while waiting for it to remain "
f"unmodified.")
return
if new_mtime == mtime:
_consume(file)
return
mtime = new_mtime
sleep(wait_time)
current_try += 1
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
class Handler(FileSystemEventHandler):
def _consume(self, file):
if os.path.isfile(file):
try:
async_task("documents.tasks.consume_file",
file,
task_name=os.path.basename(file)[:100])
except Exception as e:
# Catch all so that the consumer won't crash.
logging.getLogger(__name__).error(
"Error while consuming document: {}".format(e))
def on_created(self, event):
self._consume(event.src_path)
_consume_wait_unmodified(event.src_path)
def on_moved(self, event):
self._consume(event.src_path)
_consume_wait_unmodified(event.dest_path)
class Command(BaseCommand):
@@ -40,12 +69,15 @@ class Command(BaseCommand):
consumption directory.
"""
# This is here primarily for the tests and is irrelevant in production.
stop_flag = False
def __init__(self, *args, **kwargs):
self.verbosity = 0
self.logger = logging.getLogger(__name__)
BaseCommand.__init__(self, *args, **kwargs)
self.observer = None
def add_arguments(self, parser):
parser.add_argument(
@@ -54,38 +86,60 @@ class Command(BaseCommand):
nargs="?",
help="The consumption directory."
)
parser.add_argument(
"--oneshot",
action="store_true",
help="Run only once."
)
def handle(self, *args, **options):
self.verbosity = options["verbosity"]
directory = options["directory"]
logging.getLogger(__name__).info(
"Starting document consumer at {}".format(
directory
)
)
f"Starting document consumer at {directory}")
# Consume all files as this is not done initially by the watchdog
for entry in os.scandir(directory):
if entry.is_file():
async_task("documents.tasks.consume_file",
entry.path,
task_name=os.path.basename(entry.path)[:100])
# Start the watchdog. Woof!
if settings.CONSUMER_POLLING > 0:
logging.getLogger(__name__).info(
"Using polling instead of file system notifications.")
observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
if options["oneshot"]:
return
if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory)
else:
observer = Observer()
event_handler = Handler()
observer.schedule(event_handler, directory, recursive=True)
observer.start()
self.handle_polling(directory)
logger.debug("Consumer exiting.")
def handle_polling(self, directory):
logging.getLogger(__name__).info(
f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=False)
self.observer.start()
try:
while observer.is_alive():
observer.join(1)
while self.observer.is_alive():
self.observer.join(1)
if self.stop_flag:
self.observer.stop()
except KeyboardInterrupt:
observer.stop()
observer.join()
self.observer.stop()
self.observer.join()
def handle_inotify(self, directory):
logging.getLogger(__name__).info(
f"Using inotify to watch directory for changes: {directory}")
inotify = INotify()
inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO)
try:
while not self.stop_flag:
for event in inotify.read(timeout=1000, read_delay=1000):
file = os.path.join(directory, event.name)
if os.path.isfile(file):
_consume(file)
except KeyboardInterrupt:
pass

View File

@@ -0,0 +1,188 @@
import filecmp
import os
import shutil
import tempfile
from threading import Thread
from time import sleep
from unittest import mock
from django.conf import settings
from django.test import TestCase, override_settings
from documents.consumer import ConsumerError
from documents.management.commands import document_consumer
class ConsumerThread(Thread):
def __init__(self):
super().__init__()
self.cmd = document_consumer.Command()
def run(self) -> None:
self.cmd.handle(directory=settings.CONSUMPTION_DIR, oneshot=False)
def stop(self):
# Consumer checks this every second.
self.cmd.stop_flag = True
def chunked(size, source):
for i in range(0, len(source), size):
yield source[i:i+size]
class TestConsumer(TestCase):
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
def setUp(self) -> None:
patcher = mock.patch("documents.management.commands.document_consumer.async_task")
self.task_mock = patcher.start()
self.addCleanup(patcher.stop)
self.consume_dir = tempfile.mkdtemp()
override_settings(CONSUMPTION_DIR=self.consume_dir).enable()
def t_start(self):
self.t = ConsumerThread()
self.t.start()
# give the consumer some time to do initial work
sleep(1)
def tearDown(self) -> None:
if self.t:
self.t.stop()
def wait_for_task_mock_call(self):
n = 0
while n < 100:
if self.task_mock.call_count > 0:
# give task_mock some time to finish and raise errors
sleep(1)
return
n += 1
sleep(0.1)
self.fail("async_task was never called")
# A bogus async_task that will simply check the file for
# completeness and raise an exception otherwise.
def bogus_task(self, func, filename, **kwargs):
eq = filecmp.cmp(filename, self.sample_file, shallow=False)
if not eq:
print("Consumed an INVALID file.")
raise ConsumerError("Incomplete File READ FAILED")
else:
print("Consumed a perfectly valid file.")
def slow_write_file(self, target, incomplete=False):
with open(self.sample_file, 'rb') as f:
pdf_bytes = f.read()
if incomplete:
pdf_bytes = pdf_bytes[:len(pdf_bytes) - 100]
with open(target, 'wb') as f:
# this will take 2 seconds, since the file is about 20k.
print("Start writing file.")
for b in chunked(1000, pdf_bytes):
f.write(b)
sleep(0.1)
print("file completed.")
def test_consume_file(self):
self.t_start()
f = os.path.join(self.consume_dir, "my_file.pdf")
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
self.assertEqual(self.task_mock.call_args.args[1], f)
@override_settings(CONSUMER_POLLING=1)
def test_consume_file_polling(self):
self.test_consume_file()
def test_consume_existing_file(self):
f = os.path.join(self.consume_dir, "my_file.pdf")
shutil.copy(self.sample_file, f)
self.t_start()
self.task_mock.assert_called_once()
self.assertEqual(self.task_mock.call_args.args[1], f)
@override_settings(CONSUMER_POLLING=1)
def test_consume_existing_file_polling(self):
self.test_consume_existing_file()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.consume_dir, "my_file.pdf")
self.slow_write_file(fname)
self.wait_for_task_mock_call()
error_logger.assert_not_called()
self.task_mock.assert_called_once()
self.assertEqual(self.task_mock.call_args.args[1], fname)
@override_settings(CONSUMER_POLLING=1)
def test_slow_write_pdf_polling(self):
self.test_slow_write_pdf()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_and_move(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.consume_dir, "my_file.~df")
fname2 = os.path.join(self.consume_dir, "my_file.pdf")
self.slow_write_file(fname)
shutil.move(fname, fname2)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
self.assertEqual(self.task_mock.call_args.args[1], fname2)
error_logger.assert_not_called()
@override_settings(CONSUMER_POLLING=1)
def test_slow_write_and_move_polling(self):
self.test_slow_write_and_move()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_incomplete(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.consume_dir, "my_file.pdf")
self.slow_write_file(fname, incomplete=True)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
self.assertEqual(self.task_mock.call_args.args[1], fname)
# assert that we have an error logged with this invalid file.
error_logger.assert_called_once()
@override_settings(CONSUMER_POLLING=1)
def test_slow_write_incomplete_polling(self):
self.test_slow_write_incomplete()