diff --git a/src/documents/consumer.py b/src/documents/consumer.py index d1d839e4d..514406646 100644 --- a/src/documents/consumer.py +++ b/src/documents/consumer.py @@ -3,8 +3,10 @@ import hashlib import logging import os import re +import time import uuid +from operator import itemgetter from django.conf import settings from django.utils import timezone from paperless.db import GnuPG @@ -32,13 +34,16 @@ class Consumer: 5. Delete the document and image(s) """ + # Files are considered ready for consumption if they have been unmodified + # for this duration + FILES_MIN_UNMODIFIED_DURATION = 0.5 + def __init__(self, consume=settings.CONSUMPTION_DIR, scratch=settings.SCRATCH_DIR): self.logger = logging.getLogger(__name__) self.logging_group = None - self.stats = {} self._ignore = [] self.consume = consume self.scratch = scratch @@ -70,27 +75,34 @@ class Consumer: "group": self.logging_group }) - def run(self): - docs = [entry.path for entry in os.scandir(self.consume)] - docs_old_to_new = sorted(docs, key=lambda doc: os.path.getmtime(doc)) + def consume_new_files(self): + """ + Find non-ignored files in consumption dir and consume them if they have + been unmodified for FILES_MIN_UNMODIFIED_DURATION. + """ + files = [] + for entry in os.scandir(self.consume): + if entry.is_file() and entry.path not in self._ignore: + files.append((entry.path, entry.stat().st_mtime)) - for doc in docs_old_to_new: - self.try_consume_file(doc) - - def try_consume_file(self, doc): - doc = os.path.join(self.consume, doc) - - if not os.path.isfile(doc): + if not files: return - if not re.match(FileInfo.REGEXES["title"], doc): - return + files_old_to_new = sorted(files, key=itemgetter(1)) - if doc in self._ignore: - return + time.sleep(self.FILES_MIN_UNMODIFIED_DURATION) - if not self._is_ready(doc): - return + for file, mtime in files_old_to_new: + if mtime == os.path.getmtime(file): + # File has not been modified and can be consumed + self.try_consume_file(file) + + def try_consume_file(self, file): + + if not re.match(FileInfo.REGEXES["title"], file): + return False + + doc = file if self._is_duplicate(doc): self.log( @@ -225,22 +237,6 @@ class Consumer: self.log("debug", "Deleting document {}".format(doc)) os.unlink(doc) - def _is_ready(self, doc): - """ - Detect whether ``doc`` is ready to consume or if it's still being - written to by the uploader. - """ - - t = os.stat(doc).st_mtime - - if self.stats.get(doc) == t: - del(self.stats[doc]) - return True - - self.stats[doc] = t - - return False - @staticmethod def _is_duplicate(doc): with open(doc, "rb") as f: diff --git a/src/documents/management/commands/document_consumer.py b/src/documents/management/commands/document_consumer.py index ae8ff7e35..4aec489b6 100644 --- a/src/documents/management/commands/document_consumer.py +++ b/src/documents/management/commands/document_consumer.py @@ -95,7 +95,4 @@ class Command(BaseCommand): self.first_iteration = False self.mail_fetcher.pull() - # Consume whatever files we can. - # We have to run twice as the first run checks for file readiness - for i in range(2): - self.file_consumer.run() + self.file_consumer.consume_new_files()