From 61cd050e241984cdffe1f4698461f9802f120d0e Mon Sep 17 00:00:00 2001 From: Erik Arvstedt Date: Fri, 11 May 2018 14:01:16 +0200 Subject: [PATCH] Ensure docs have been unmodified for some time before consuming Previously, the second mtime check for new files usually happened right after the first one, which could have caused consumption of docs that were still being modified. We're now waiting for at least FILES_MIN_UNMODIFIED_DURATION (0.5s). This also cleans up the logic by eliminating the consumer.stats attribute and the weird double call to consumer.run(). Additionally, this a fixes memory leak in consumer.stats where paths could be added but never removed if the corresponding files disappeared from the consumer dir before being considered ready. --- src/documents/consumer.py | 62 +++++++++---------- .../management/commands/document_consumer.py | 5 +- 2 files changed, 30 insertions(+), 37 deletions(-) diff --git a/src/documents/consumer.py b/src/documents/consumer.py index d1d839e4d..514406646 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -3,8 +3,10 @@ import hashlib import logging import os import re +import time import uuid +from operator import itemgetter from django.conf import settings from django.utils import timezone from paperless.db import GnuPG @@ -32,13 +34,16 @@ class Consumer: 5. Delete the document and image(s) """ + # Files are considered ready for consumption if they have been unmodified + # for this duration + FILES_MIN_UNMODIFIED_DURATION = 0.5 + def __init__(self, consume=settings.CONSUMPTION_DIR, scratch=settings.SCRATCH_DIR): self.logger = logging.getLogger(__name__) self.logging_group = None - self.stats = {} self._ignore = [] self.consume = consume self.scratch = scratch @@ -70,27 +75,34 @@ class Consumer: "group": self.logging_group }) - def run(self): - docs = [entry.path for entry in os.scandir(self.consume)] - docs_old_to_new = sorted(docs, key=lambda doc: os.path.getmtime(doc)) + def consume_new_files(self): + """ + Find non-ignored files in consumption dir and consume them if they have + been unmodified for FILES_MIN_UNMODIFIED_DURATION. + """ + files = [] + for entry in os.scandir(self.consume): + if entry.is_file() and entry.path not in self._ignore: + files.append((entry.path, entry.stat().st_mtime)) - for doc in docs_old_to_new: - self.try_consume_file(doc) - - def try_consume_file(self, doc): - doc = os.path.join(self.consume, doc) - - if not os.path.isfile(doc): + if not files: return - if not re.match(FileInfo.REGEXES["title"], doc): - return + files_old_to_new = sorted(files, key=itemgetter(1)) - if doc in self._ignore: - return + time.sleep(self.FILES_MIN_UNMODIFIED_DURATION) - if not self._is_ready(doc): - return + for file, mtime in files_old_to_new: + if mtime == os.path.getmtime(file): + # File has not been modified and can be consumed + self.try_consume_file(file) + + def try_consume_file(self, file): + + if not re.match(FileInfo.REGEXES["title"], file): + return False + + doc = file if self._is_duplicate(doc): self.log( @@ -225,22 +237,6 @@ class Consumer: self.log("debug", "Deleting document {}".format(doc)) os.unlink(doc) - def _is_ready(self, doc): - """ - Detect whether ``doc`` is ready to consume or if it's still being - written to by the uploader. - """ - - t = os.stat(doc).st_mtime - - if self.stats.get(doc) == t: - del(self.stats[doc]) - return True - - self.stats[doc] = t - - return False - @staticmethod def _is_duplicate(doc): with open(doc, "rb") as f: diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index ae8ff7e35..4aec489b6 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -95,7 +95,4 @@ class Command(BaseCommand): self.first_iteration = False self.mail_fetcher.pull() - # Consume whatever files we can. - # We have to run twice as the first run checks for file readiness - for i in range(2): - self.file_consumer.run() + self.file_consumer.consume_new_files()