diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cdc98a35b..65ecc7980 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ repos: # General hooks - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.1.0 + rev: v4.2.0 hooks: - id: check-docstring-first - id: check-json @@ -27,7 +27,7 @@ repos: - id: check-case-conflict - id: detect-private-key - repo: https://github.com/pre-commit/mirrors-prettier - rev: "v2.6.1" + rev: "v2.6.2" hooks: - id: prettier types_or: @@ -47,7 +47,7 @@ repos: - id: yesqa exclude: "(migrations)" - repo: https://github.com/asottile/add-trailing-comma - rev: "v2.2.1" + rev: "v2.2.2" hooks: - id: add-trailing-comma exclude: "(migrations)" diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index 89467c94a..eb6f0a405 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -3,7 +3,9 @@ import os from pathlib import Path from pathlib import PurePath from threading import Thread +from time import monotonic from time import sleep +from typing import Final from django.conf import settings from django.core.management.base import BaseCommand @@ -53,6 +55,25 @@ def _consume(filepath): logger.warning(f"Not consuming file {filepath}: Unknown file extension.") return + # Total wait time: up to 500ms + os_error_retry_count: Final[int] = 50 + os_error_retry_wait: Final[float] = 0.01 + + read_try_count = 0 + file_open_ok = False + + while (read_try_count < os_error_retry_count) and not file_open_ok: + try: + with open(filepath, "rb"): + file_open_ok = True + except OSError: + read_try_count += 1 + sleep(os_error_retry_wait) + + if read_try_count >= os_error_retry_count: + logger.warning(f"Not consuming file {filepath}: OS reports file as busy still") + return + tag_ids = None try: if settings.CONSUMER_SUBDIRS_AS_TAGS: @@ -81,19 +102,23 @@ def _consume_wait_unmodified(file): logger.debug(f"Waiting for file {file} to remain unmodified") mtime = -1 + size = -1 current_try = 0 while current_try < settings.CONSUMER_POLLING_RETRY_COUNT: try: - new_mtime = os.stat(file).st_mtime + stat_data = os.stat(file) + new_mtime = stat_data.st_mtime + new_size = stat_data.st_size except FileNotFoundError: logger.debug( f"File {file} moved while waiting for it to remain " f"unmodified.", ) return - if new_mtime == mtime: + if new_mtime == mtime and new_size == size: _consume(file) return mtime = new_mtime + size = new_size sleep(settings.CONSUMER_POLLING_DELAY) current_try += 1 @@ -182,14 +207,32 @@ class Command(BaseCommand): descriptor = inotify.add_watch(directory, inotify_flags) try: + + inotify_debounce: Final[float] = 0.5 + notified_files = {} + while not self.stop_flag: + for event in inotify.read(timeout=1000): if recursive: path = inotify.get_path(event.wd) else: path = directory filepath = os.path.join(path, event.name) - _consume(filepath) + notified_files[filepath] = monotonic() + + # Check the files against the timeout + still_waiting = {} + for filepath in notified_files: + # Time of the last inotify event for this file + last_event_time = notified_files[filepath] + if (monotonic() - last_event_time) > inotify_debounce: + _consume(filepath) + else: + still_waiting[filepath] = last_event_time + # These files are still waiting to hit the timeout + notified_files = still_waiting + except KeyboardInterrupt: pass diff --git a/src/documents/tests/test_management_consumer.py b/src/documents/tests/test_management_consumer.py index e4d772730..d99b01e66 100644 --- a/src/documents/tests/test_management_consumer.py +++ b/src/documents/tests/test_management_consumer.py @@ -260,6 +260,21 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase): f'_is_ignored("{file_path}") != {expected_ignored}', ) + @mock.patch("documents.management.commands.document_consumer.open") + def test_consume_file_busy(self, open_mock): + + # Calling this mock always raises this + open_mock.side_effect = OSError + + self.t_start() + + f = os.path.join(self.dirs.consumption_dir, "my_file.pdf") + shutil.copy(self.sample_file, f) + + self.wait_for_task_mock_call() + + self.task_mock.assert_not_called() + @override_settings( CONSUMER_POLLING=1,