Adds additional checking for both inotify and polling around document still being busy before consuming it

This commit is contained in:
Trenton Holmes 2022-03-21 16:02:02 -07:00
parent 8f98cb4860
commit 2414dad656
3 changed files with 64 additions and 6 deletions

View File

@ -5,7 +5,7 @@
repos: repos:
# General hooks # General hooks
- repo: https://github.com/pre-commit/pre-commit-hooks - repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0 rev: v4.2.0
hooks: hooks:
- id: check-docstring-first - id: check-docstring-first
- id: check-json - id: check-json
@ -27,7 +27,7 @@ repos:
- id: check-case-conflict - id: check-case-conflict
- id: detect-private-key - id: detect-private-key
- repo: https://github.com/pre-commit/mirrors-prettier - repo: https://github.com/pre-commit/mirrors-prettier
rev: "v2.6.1" rev: "v2.6.2"
hooks: hooks:
- id: prettier - id: prettier
types_or: types_or:
@ -47,7 +47,7 @@ repos:
- id: yesqa - id: yesqa
exclude: "(migrations)" exclude: "(migrations)"
- repo: https://github.com/asottile/add-trailing-comma - repo: https://github.com/asottile/add-trailing-comma
rev: "v2.2.1" rev: "v2.2.2"
hooks: hooks:
- id: add-trailing-comma - id: add-trailing-comma
exclude: "(migrations)" exclude: "(migrations)"

View File

@ -3,7 +3,9 @@ import os
from pathlib import Path from pathlib import Path
from pathlib import PurePath from pathlib import PurePath
from threading import Thread from threading import Thread
from time import monotonic
from time import sleep from time import sleep
from typing import Final
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
@ -53,6 +55,25 @@ def _consume(filepath):
logger.warning(f"Not consuming file {filepath}: Unknown file extension.") logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
return return
# Total wait time: up to 500ms
os_error_retry_count: Final[int] = 50
os_error_retry_wait: Final[float] = 0.01
read_try_count = 0
file_open_ok = False
while (read_try_count < os_error_retry_count) and not file_open_ok:
try:
with open(filepath, "rb"):
file_open_ok = True
except OSError:
read_try_count += 1
sleep(os_error_retry_wait)
if read_try_count >= os_error_retry_count:
logger.warning(f"Not consuming file {filepath}: OS reports file as busy still")
return
tag_ids = None tag_ids = None
try: try:
if settings.CONSUMER_SUBDIRS_AS_TAGS: if settings.CONSUMER_SUBDIRS_AS_TAGS:
@ -81,19 +102,23 @@ def _consume_wait_unmodified(file):
logger.debug(f"Waiting for file {file} to remain unmodified") logger.debug(f"Waiting for file {file} to remain unmodified")
mtime = -1 mtime = -1
size = -1
current_try = 0 current_try = 0
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT: while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
try: try:
new_mtime = os.stat(file).st_mtime stat_data = os.stat(file)
new_mtime = stat_data.st_mtime
new_size = stat_data.st_size
except FileNotFoundError: except FileNotFoundError:
logger.debug( logger.debug(
f"File {file} moved while waiting for it to remain " f"unmodified.", f"File {file} moved while waiting for it to remain " f"unmodified.",
) )
return return
if new_mtime == mtime: if new_mtime == mtime and new_size == size:
_consume(file) _consume(file)
return return
mtime = new_mtime mtime = new_mtime
size = new_size
sleep(settings.CONSUMER_POLLING_DELAY) sleep(settings.CONSUMER_POLLING_DELAY)
current_try += 1 current_try += 1
@ -182,14 +207,32 @@ class Command(BaseCommand):
descriptor = inotify.add_watch(directory, inotify_flags) descriptor = inotify.add_watch(directory, inotify_flags)
try: try:
inotify_debounce: Final[float] = 0.5
notified_files = {}
while not self.stop_flag: while not self.stop_flag:
for event in inotify.read(timeout=1000): for event in inotify.read(timeout=1000):
if recursive: if recursive:
path = inotify.get_path(event.wd) path = inotify.get_path(event.wd)
else: else:
path = directory path = directory
filepath = os.path.join(path, event.name) filepath = os.path.join(path, event.name)
notified_files[filepath] = monotonic()
# Check the files against the timeout
still_waiting = {}
for filepath in notified_files:
# Time of the last inotify event for this file
last_event_time = notified_files[filepath]
if (monotonic() - last_event_time) > inotify_debounce:
_consume(filepath) _consume(filepath)
else:
still_waiting[filepath] = last_event_time
# These files are still waiting to hit the timeout
notified_files = still_waiting
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -260,6 +260,21 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
f'_is_ignored("{file_path}") != {expected_ignored}', f'_is_ignored("{file_path}") != {expected_ignored}',
) )
@mock.patch("documents.management.commands.document_consumer.open")
def test_consume_file_busy(self, open_mock):
# Calling this mock always raises this
open_mock.side_effect = OSError
self.t_start()
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
self.task_mock.assert_not_called()
@override_settings( @override_settings(
CONSUMER_POLLING=1, CONSUMER_POLLING=1,