diff --git a/Pipfile b/Pipfile index ad60e0905..a6169a2ba 100644 --- a/Pipfile +++ b/Pipfile @@ -35,6 +35,7 @@ scikit-learn="~=0.23.2" whitenoise = "~=5.2.0" watchdog = "*" whoosh="~=2.7.4" +inotify-simple = "*" [dev-packages] coveralls = "*" diff --git a/Pipfile.lock b/Pipfile.lock index 6ecca3c34..b10c414ed 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "ae2643b9cf0cf5741ae149fb6bc0c480de41329ce48e773eb4b5d760bc5e2244" + "sha256": "e9792119f687757dd388e73827ddd4216910327d5b65a8b950d4b202679c36eb" }, "pipfile-spec": 6, "requires": {}, @@ -129,6 +129,14 @@ "index": "pypi", "version": "==0.32.0" }, + "inotify-simple": { + "hashes": [ + "sha256:8440ffe49c4ae81a8df57c1ae1eb4b6bfa7acb830099bfb3e305b383005cc128", + "sha256:854f9ac752cc1fcff6ca34e9d3d875c9a94c9b7d6eb377f63be2d481a566c6ee" + ], + "index": "pypi", + "version": "==1.3.5" + }, "joblib": { "hashes": [ "sha256:698c311779f347cf6b7e6b8a39bb682277b8ee4aba8cf9507bc0cf4cd4737b72", @@ -663,11 +671,11 @@ }, "faker": { "hashes": [ - "sha256:3f5d379e4b5ce92a8afe3c2ce59d7c43886370dd3bf9495a936b91888debfc81", - "sha256:8c0e8a06acef4b9312902e2ce18becabe62badd3a6632180bd0680c6ee111473" + "sha256:5398268e1d751ffdb3ed36b8a790ed98659200599b368eec38a02eed15bce997", + "sha256:d4183b8f57316de3be27cd6c3b40e9f9343d27c95c96179f027316c58c2c239e" ], "markers": "python_version >= '3.5'", - "version": "==4.17.0" + "version": "==4.17.1" }, "filelock": { "hashes": [ @@ -999,11 +1007,11 @@ }, "virtualenv": { "hashes": [ - "sha256:b0011228208944ce71052987437d3843e05690b2f23d1c7da4263fde104c97a2", - "sha256:b8d6110f493af256a40d65e29846c69340a947669eec8ce784fcf3dd3af28380" + "sha256:07cff122e9d343140366055f31be4dcd61fd598c69d11cd33a9d9c8df4546dd7", + "sha256:e0aac7525e880a429764cefd3aaaff54afb5d9f25c82627563603f5d7de5a6e5" ], "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", - "version": "==20.1.0" + "version": "==20.2.1" } } } diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index 05711ebd8..4bfd78e8f 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -1,11 +1,11 @@ import logging import os +from time import sleep from django.conf import settings from django.core.management.base import BaseCommand from django_q.tasks import async_task from watchdog.events import FileSystemEventHandler -from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver try: @@ -13,25 +13,54 @@ try: except ImportError: INotify = flags = None +logger = logging.getLogger(__name__) + + +def _consume(file): + try: + if os.path.isfile(file): + async_task("documents.tasks.consume_file", + file, + task_name=os.path.basename(file)[:100]) + else: + logger.debug( + f"Not consuming file {file}: File has moved.") + + except Exception as e: + # Catch all so that the consumer won't crash. + # This is also what the test case is listening for to check for + # errors. + logger.error( + "Error while consuming document: {}".format(e)) + + +def _consume_wait_unmodified(file, num_tries=20, wait_time=1): + mtime = -1 + current_try = 0 + while current_try < num_tries: + try: + new_mtime = os.stat(file).st_mtime + except FileNotFoundError: + logger.debug(f"File {file} moved while waiting for it to remain " + f"unmodified.") + return + if new_mtime == mtime: + _consume(file) + return + mtime = new_mtime + sleep(wait_time) + current_try += 1 + + logger.error(f"Timeout while waiting on file {file} to remain unmodified.") + class Handler(FileSystemEventHandler): - def _consume(self, file): - if os.path.isfile(file): - try: - async_task("documents.tasks.consume_file", - file, - task_name=os.path.basename(file)[:100]) - except Exception as e: - # Catch all so that the consumer won't crash. - logging.getLogger(__name__).error( - "Error while consuming document: {}".format(e)) - def on_created(self, event): - self._consume(event.src_path) + _consume_wait_unmodified(event.src_path) def on_moved(self, event): - self._consume(event.src_path) + _consume_wait_unmodified(event.dest_path) class Command(BaseCommand): @@ -40,12 +69,15 @@ class Command(BaseCommand): consumption directory. """ + # This is here primarily for the tests and is irrelevant in production. + stop_flag = False + def __init__(self, *args, **kwargs): - self.verbosity = 0 self.logger = logging.getLogger(__name__) BaseCommand.__init__(self, *args, **kwargs) + self.observer = None def add_arguments(self, parser): parser.add_argument( @@ -54,38 +86,60 @@ class Command(BaseCommand): nargs="?", help="The consumption directory." ) + parser.add_argument( + "--oneshot", + action="store_true", + help="Run only once." + ) def handle(self, *args, **options): - - self.verbosity = options["verbosity"] directory = options["directory"] logging.getLogger(__name__).info( - "Starting document consumer at {}".format( - directory - ) - ) + f"Starting document consumer at {directory}") - # Consume all files as this is not done initially by the watchdog for entry in os.scandir(directory): if entry.is_file(): async_task("documents.tasks.consume_file", entry.path, task_name=os.path.basename(entry.path)[:100]) - # Start the watchdog. Woof! - if settings.CONSUMER_POLLING > 0: - logging.getLogger(__name__).info( - "Using polling instead of file system notifications.") - observer = PollingObserver(timeout=settings.CONSUMER_POLLING) + if options["oneshot"]: + return + + if settings.CONSUMER_POLLING == 0 and INotify: + self.handle_inotify(directory) else: - observer = Observer() - event_handler = Handler() - observer.schedule(event_handler, directory, recursive=True) - observer.start() + self.handle_polling(directory) + + logger.debug("Consumer exiting.") + + def handle_polling(self, directory): + logging.getLogger(__name__).info( + f"Polling directory for changes: {directory}") + self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING) + self.observer.schedule(Handler(), directory, recursive=False) + self.observer.start() try: - while observer.is_alive(): - observer.join(1) + while self.observer.is_alive(): + self.observer.join(1) + if self.stop_flag: + self.observer.stop() except KeyboardInterrupt: - observer.stop() - observer.join() + self.observer.stop() + self.observer.join() + + def handle_inotify(self, directory): + logging.getLogger(__name__).info( + f"Using inotify to watch directory for changes: {directory}") + + inotify = INotify() + inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO) + try: + while not self.stop_flag: + for event in inotify.read(timeout=1000, read_delay=1000): + file = os.path.join(directory, event.name) + if os.path.isfile(file): + _consume(file) + except KeyboardInterrupt: + pass diff --git a/src/documents/tests/test_management_consumer.py b/src/documents/tests/test_management_consumer.py new file mode 100644 index 000000000..bfb7520ee --- /dev/null +++ b/src/documents/tests/test_management_consumer.py @@ -0,0 +1,188 @@ +import filecmp +import os +import shutil +import tempfile +from threading import Thread +from time import sleep +from unittest import mock + +from django.conf import settings +from django.test import TestCase, override_settings + +from documents.consumer import ConsumerError +from documents.management.commands import document_consumer + + +class ConsumerThread(Thread): + + def __init__(self): + super().__init__() + self.cmd = document_consumer.Command() + + def run(self) -> None: + self.cmd.handle(directory=settings.CONSUMPTION_DIR, oneshot=False) + + def stop(self): + # Consumer checks this every second. + self.cmd.stop_flag = True + + +def chunked(size, source): + for i in range(0, len(source), size): + yield source[i:i+size] + + +class TestConsumer(TestCase): + + sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") + + def setUp(self) -> None: + patcher = mock.patch("documents.management.commands.document_consumer.async_task") + self.task_mock = patcher.start() + self.addCleanup(patcher.stop) + + self.consume_dir = tempfile.mkdtemp() + + override_settings(CONSUMPTION_DIR=self.consume_dir).enable() + + def t_start(self): + self.t = ConsumerThread() + self.t.start() + # give the consumer some time to do initial work + sleep(1) + + def tearDown(self) -> None: + if self.t: + self.t.stop() + + def wait_for_task_mock_call(self): + n = 0 + while n < 100: + if self.task_mock.call_count > 0: + # give task_mock some time to finish and raise errors + sleep(1) + return + n += 1 + sleep(0.1) + self.fail("async_task was never called") + + # A bogus async_task that will simply check the file for + # completeness and raise an exception otherwise. + def bogus_task(self, func, filename, **kwargs): + eq = filecmp.cmp(filename, self.sample_file, shallow=False) + if not eq: + print("Consumed an INVALID file.") + raise ConsumerError("Incomplete File READ FAILED") + else: + print("Consumed a perfectly valid file.") + + def slow_write_file(self, target, incomplete=False): + with open(self.sample_file, 'rb') as f: + pdf_bytes = f.read() + + if incomplete: + pdf_bytes = pdf_bytes[:len(pdf_bytes) - 100] + + with open(target, 'wb') as f: + # this will take 2 seconds, since the file is about 20k. + print("Start writing file.") + for b in chunked(1000, pdf_bytes): + f.write(b) + sleep(0.1) + print("file completed.") + + def test_consume_file(self): + self.t_start() + + f = os.path.join(self.consume_dir, "my_file.pdf") + shutil.copy(self.sample_file, f) + + self.wait_for_task_mock_call() + + self.task_mock.assert_called_once() + self.assertEqual(self.task_mock.call_args.args[1], f) + + @override_settings(CONSUMER_POLLING=1) + def test_consume_file_polling(self): + self.test_consume_file() + + def test_consume_existing_file(self): + f = os.path.join(self.consume_dir, "my_file.pdf") + shutil.copy(self.sample_file, f) + + self.t_start() + self.task_mock.assert_called_once() + self.assertEqual(self.task_mock.call_args.args[1], f) + + @override_settings(CONSUMER_POLLING=1) + def test_consume_existing_file_polling(self): + self.test_consume_existing_file() + + @mock.patch("documents.management.commands.document_consumer.logger.error") + def test_slow_write_pdf(self, error_logger): + + self.task_mock.side_effect = self.bogus_task + + self.t_start() + + fname = os.path.join(self.consume_dir, "my_file.pdf") + + self.slow_write_file(fname) + + self.wait_for_task_mock_call() + + error_logger.assert_not_called() + + self.task_mock.assert_called_once() + + self.assertEqual(self.task_mock.call_args.args[1], fname) + + @override_settings(CONSUMER_POLLING=1) + def test_slow_write_pdf_polling(self): + self.test_slow_write_pdf() + + @mock.patch("documents.management.commands.document_consumer.logger.error") + def test_slow_write_and_move(self, error_logger): + + self.task_mock.side_effect = self.bogus_task + + self.t_start() + + fname = os.path.join(self.consume_dir, "my_file.~df") + fname2 = os.path.join(self.consume_dir, "my_file.pdf") + + self.slow_write_file(fname) + shutil.move(fname, fname2) + + self.wait_for_task_mock_call() + + self.task_mock.assert_called_once() + self.assertEqual(self.task_mock.call_args.args[1], fname2) + + error_logger.assert_not_called() + + @override_settings(CONSUMER_POLLING=1) + def test_slow_write_and_move_polling(self): + self.test_slow_write_and_move() + + @mock.patch("documents.management.commands.document_consumer.logger.error") + def test_slow_write_incomplete(self, error_logger): + + self.task_mock.side_effect = self.bogus_task + + self.t_start() + + fname = os.path.join(self.consume_dir, "my_file.pdf") + self.slow_write_file(fname, incomplete=True) + + self.wait_for_task_mock_call() + + self.task_mock.assert_called_once() + self.assertEqual(self.task_mock.call_args.args[1], fname) + + # assert that we have an error logged with this invalid file. + error_logger.assert_called_once() + + @override_settings(CONSUMER_POLLING=1) + def test_slow_write_incomplete_polling(self): + self.test_slow_write_incomplete()