Ensure docs have been unmodified for some time before consuming

Previously, the second mtime check for new files usually happened right
after the first one, which could have caused consumption of docs that
were still being modified.

We're now waiting for at least FILES_MIN_UNMODIFIED_DURATION (0.5s).

This also cleans up the logic by eliminating the consumer.stats attribute
and the weird double call to consumer.run().

Additionally, this a fixes memory leak in consumer.stats where paths could be
added but never removed if the corresponding files disappeared from
the consumer dir before being considered ready.
This commit is contained in:
Erik Arvstedt 2018-05-11 14:01:16 +02:00
parent f018e8e54f
commit 61cd050e24
2 changed files with 30 additions and 37 deletions

View File

@ -3,8 +3,10 @@ import hashlib
import logging import logging
import os import os
import re import re
import time
import uuid import uuid
from operator import itemgetter
from django.conf import settings from django.conf import settings
from django.utils import timezone from django.utils import timezone
from paperless.db import GnuPG from paperless.db import GnuPG
@ -32,13 +34,16 @@ class Consumer:
5. Delete the document and image(s) 5. Delete the document and image(s)
""" """
# Files are considered ready for consumption if they have been unmodified
# for this duration
FILES_MIN_UNMODIFIED_DURATION = 0.5
def __init__(self, consume=settings.CONSUMPTION_DIR, def __init__(self, consume=settings.CONSUMPTION_DIR,
scratch=settings.SCRATCH_DIR): scratch=settings.SCRATCH_DIR):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.logging_group = None self.logging_group = None
self.stats = {}
self._ignore = [] self._ignore = []
self.consume = consume self.consume = consume
self.scratch = scratch self.scratch = scratch
@ -70,27 +75,34 @@ class Consumer:
"group": self.logging_group "group": self.logging_group
}) })
def run(self): def consume_new_files(self):
docs = [entry.path for entry in os.scandir(self.consume)] """
docs_old_to_new = sorted(docs, key=lambda doc: os.path.getmtime(doc)) Find non-ignored files in consumption dir and consume them if they have
been unmodified for FILES_MIN_UNMODIFIED_DURATION.
"""
files = []
for entry in os.scandir(self.consume):
if entry.is_file() and entry.path not in self._ignore:
files.append((entry.path, entry.stat().st_mtime))
for doc in docs_old_to_new: if not files:
self.try_consume_file(doc)
def try_consume_file(self, doc):
doc = os.path.join(self.consume, doc)
if not os.path.isfile(doc):
return return
if not re.match(FileInfo.REGEXES["title"], doc): files_old_to_new = sorted(files, key=itemgetter(1))
return
if doc in self._ignore: time.sleep(self.FILES_MIN_UNMODIFIED_DURATION)
return
if not self._is_ready(doc): for file, mtime in files_old_to_new:
return if mtime == os.path.getmtime(file):
# File has not been modified and can be consumed
self.try_consume_file(file)
def try_consume_file(self, file):
if not re.match(FileInfo.REGEXES["title"], file):
return False
doc = file
if self._is_duplicate(doc): if self._is_duplicate(doc):
self.log( self.log(
@ -225,22 +237,6 @@ class Consumer:
self.log("debug", "Deleting document {}".format(doc)) self.log("debug", "Deleting document {}".format(doc))
os.unlink(doc) os.unlink(doc)
def _is_ready(self, doc):
"""
Detect whether ``doc`` is ready to consume or if it's still being
written to by the uploader.
"""
t = os.stat(doc).st_mtime
if self.stats.get(doc) == t:
del(self.stats[doc])
return True
self.stats[doc] = t
return False
@staticmethod @staticmethod
def _is_duplicate(doc): def _is_duplicate(doc):
with open(doc, "rb") as f: with open(doc, "rb") as f:

View File

@ -95,7 +95,4 @@ class Command(BaseCommand):
self.first_iteration = False self.first_iteration = False
self.mail_fetcher.pull() self.mail_fetcher.pull()
# Consume whatever files we can. self.file_consumer.consume_new_files()
# We have to run twice as the first run checks for file readiness
for i in range(2):
self.file_consumer.run()