From 2414dad6566674770ef1efd3cdc3ad22fdf0d1eb Mon Sep 17 00:00:00 2001
From: Trenton Holmes <holmes.trenton@gmail.com>
Date: Mon, 21 Mar 2022 16:02:02 -0700
Subject: [PATCH] Adds additional checking for both inotify and polling around
 document still being busy before consuming it

---
 .pre-commit-config.yaml                       |  6 +--
 .../management/commands/document_consumer.py  | 49 +++++++++++++++++--
 .../tests/test_management_consumer.py         | 15 ++++++
 3 files changed, 64 insertions(+), 6 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index cdc98a35b..65ecc7980 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -5,7 +5,7 @@
 repos:
   # General hooks
   - repo: https://github.com/pre-commit/pre-commit-hooks
-    rev: v4.1.0
+    rev: v4.2.0
     hooks:
       - id: check-docstring-first
       - id: check-json
@@ -27,7 +27,7 @@ repos:
       - id: check-case-conflict
       - id: detect-private-key
   - repo: https://github.com/pre-commit/mirrors-prettier
-    rev: "v2.6.1"
+    rev: "v2.6.2"
     hooks:
       - id: prettier
         types_or:
@@ -47,7 +47,7 @@ repos:
       - id: yesqa
         exclude: "(migrations)"
   - repo: https://github.com/asottile/add-trailing-comma
-    rev: "v2.2.1"
+    rev: "v2.2.2"
     hooks:
       - id: add-trailing-comma
         exclude: "(migrations)"
diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py
index 89467c94a..eb6f0a405 100644
--- a/src/documents/management/commands/document_consumer.py
+++ b/src/documents/management/commands/document_consumer.py
@@ -3,7 +3,9 @@ import os
 from pathlib import Path
 from pathlib import PurePath
 from threading import Thread
+from time import monotonic
 from time import sleep
+from typing import Final
 
 from django.conf import settings
 from django.core.management.base import BaseCommand
@@ -53,6 +55,25 @@ def _consume(filepath):
         logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
         return
 
+    # Total wait time: up to 500ms
+    os_error_retry_count: Final[int] = 50
+    os_error_retry_wait: Final[float] = 0.01
+
+    read_try_count = 0
+    file_open_ok = False
+
+    while (read_try_count < os_error_retry_count) and not file_open_ok:
+        try:
+            with open(filepath, "rb"):
+                file_open_ok = True
+        except OSError:
+            read_try_count += 1
+            sleep(os_error_retry_wait)
+
+    if read_try_count >= os_error_retry_count:
+        logger.warning(f"Not consuming file {filepath}: OS reports file as busy still")
+        return
+
     tag_ids = None
     try:
         if settings.CONSUMER_SUBDIRS_AS_TAGS:
@@ -81,19 +102,23 @@ def _consume_wait_unmodified(file):
 
     logger.debug(f"Waiting for file {file} to remain unmodified")
     mtime = -1
+    size = -1
     current_try = 0
     while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
         try:
-            new_mtime = os.stat(file).st_mtime
+            stat_data = os.stat(file)
+            new_mtime = stat_data.st_mtime
+            new_size = stat_data.st_size
         except FileNotFoundError:
             logger.debug(
                 f"File {file} moved while waiting for it to remain " f"unmodified.",
             )
             return
-        if new_mtime == mtime:
+        if new_mtime == mtime and new_size == size:
             _consume(file)
             return
         mtime = new_mtime
+        size = new_size
         sleep(settings.CONSUMER_POLLING_DELAY)
         current_try += 1
 
@@ -182,14 +207,32 @@ class Command(BaseCommand):
             descriptor = inotify.add_watch(directory, inotify_flags)
 
         try:
+
+            inotify_debounce: Final[float] = 0.5
+            notified_files = {}
+
             while not self.stop_flag:
+
                 for event in inotify.read(timeout=1000):
                     if recursive:
                         path = inotify.get_path(event.wd)
                     else:
                         path = directory
                     filepath = os.path.join(path, event.name)
-                    _consume(filepath)
+                    notified_files[filepath] = monotonic()
+
+                # Check the files against the timeout
+                still_waiting = {}
+                for filepath in notified_files:
+                    # Time of the last inotify event for this file
+                    last_event_time = notified_files[filepath]
+                    if (monotonic() - last_event_time) > inotify_debounce:
+                        _consume(filepath)
+                    else:
+                        still_waiting[filepath] = last_event_time
+                # These files are still waiting to hit the timeout
+                notified_files = still_waiting
+
         except KeyboardInterrupt:
             pass
 
diff --git a/src/documents/tests/test_management_consumer.py b/src/documents/tests/test_management_consumer.py
index e4d772730..d99b01e66 100644
--- a/src/documents/tests/test_management_consumer.py
+++ b/src/documents/tests/test_management_consumer.py
@@ -260,6 +260,21 @@ class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
                 f'_is_ignored("{file_path}") != {expected_ignored}',
             )
 
+    @mock.patch("documents.management.commands.document_consumer.open")
+    def test_consume_file_busy(self, open_mock):
+
+        # Calling this mock always raises this
+        open_mock.side_effect = OSError
+
+        self.t_start()
+
+        f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
+        shutil.copy(self.sample_file, f)
+
+        self.wait_for_task_mock_call()
+
+        self.task_mock.assert_not_called()
+
 
 @override_settings(
     CONSUMER_POLLING=1,