Merge branch 'inotify' of git://github.com/erikarvstedt/paperless into erikarvstedt-inotify

This commit is contained in:
Daniel Quinn
2018-05-28 13:03:06 +01:00
7 changed files with 178 additions and 123 deletions

View File

@@ -3,8 +3,10 @@ import hashlib
import logging
import os
import re
import time
import uuid
from operator import itemgetter
from django.conf import settings
from django.utils import timezone
from paperless.db import GnuPG
@@ -32,21 +34,21 @@ class Consumer:
5. Delete the document and image(s)
"""
# Files are considered ready for consumption if they have been unmodified
# for this duration
FILES_MIN_UNMODIFIED_DURATION = 0.5
def __init__(self, consume=settings.CONSUMPTION_DIR,
scratch=settings.SCRATCH_DIR):
self.logger = logging.getLogger(__name__)
self.logging_group = None
self.stats = {}
self._ignore = []
self.consume = consume
self.scratch = scratch
try:
os.makedirs(self.scratch)
except FileExistsError:
pass
os.makedirs(self.scratch, exists_ok=True)
if not self.consume:
raise ConsumerError(
@@ -73,83 +75,99 @@ class Consumer:
"group": self.logging_group
})
def run(self):
def consume_new_files(self):
"""
Find non-ignored files in consumption dir and consume them if they have
been unmodified for FILES_MIN_UNMODIFIED_DURATION.
"""
ignored_files = []
files = []
for entry in os.scandir(self.consume):
if entry.is_file():
file = (entry.path, entry.stat().st_mtime)
if file in self._ignore:
ignored_files.append(file)
else:
files.append(file)
for doc in os.listdir(self.consume):
if not files:
return
doc = os.path.join(self.consume, doc)
# Set _ignore to only include files that still exist.
# This keeps it from growing indefinitely.
self._ignore[:] = ignored_files
if not os.path.isfile(doc):
continue
files_old_to_new = sorted(files, key=itemgetter(1))
if not re.match(FileInfo.REGEXES["title"], doc):
continue
time.sleep(self.FILES_MIN_UNMODIFIED_DURATION)
if doc in self._ignore:
continue
for file, mtime in files_old_to_new:
if mtime == os.path.getmtime(file):
# File has not been modified and can be consumed
if not self.try_consume_file(file):
self._ignore.append((file, mtime))
if not self._is_ready(doc):
continue
def try_consume_file(self, file):
"Return True if file was consumed"
if self._is_duplicate(doc):
self.log(
"info",
"Skipping {} as it appears to be a duplicate".format(doc)
)
self._ignore.append(doc)
continue
if not re.match(FileInfo.REGEXES["title"], file):
return False
parser_class = self._get_parser_class(doc)
if not parser_class:
self.log(
"error", "No parsers could be found for {}".format(doc))
self._ignore.append(doc)
continue
doc = file
self.logging_group = uuid.uuid4()
if self._is_duplicate(doc):
self.log(
"info",
"Skipping {} as it appears to be a duplicate".format(doc)
)
return False
self.log("info", "Consuming {}".format(doc))
parser_class = self._get_parser_class(doc)
if not parser_class:
self.log(
"error", "No parsers could be found for {}".format(doc))
return False
document_consumption_started.send(
sender=self.__class__,
filename=doc,
logging_group=self.logging_group
self.logging_group = uuid.uuid4()
self.log("info", "Consuming {}".format(doc))
document_consumption_started.send(
sender=self.__class__,
filename=doc,
logging_group=self.logging_group
)
parsed_document = parser_class(doc)
try:
thumbnail = parsed_document.get_thumbnail()
date = parsed_document.get_date()
document = self._store(
parsed_document.get_text(),
doc,
thumbnail,
date
)
except ParseError as e:
self.log("error", "PARSE FAILURE for {}: {}".format(doc, e))
parsed_document.cleanup()
return False
else:
parsed_document.cleanup()
self._cleanup_doc(doc)
self.log(
"info",
"Document {} consumption finished".format(document)
)
parsed_document = parser_class(doc)
try:
thumbnail = parsed_document.get_thumbnail()
date = parsed_document.get_date()
document = self._store(
parsed_document.get_text(),
doc,
thumbnail,
date
)
except ParseError as e:
self._ignore.append(doc)
self.log("error", "PARSE FAILURE for {}: {}".format(doc, e))
parsed_document.cleanup()
continue
else:
parsed_document.cleanup()
self._cleanup_doc(doc)
self.log(
"info",
"Document {} consumption finished".format(document)
)
document_consumption_finished.send(
sender=self.__class__,
document=document,
logging_group=self.logging_group
)
document_consumption_finished.send(
sender=self.__class__,
document=document,
logging_group=self.logging_group
)
return True
def _get_parser_class(self, doc):
"""
@@ -224,22 +242,6 @@ class Consumer:
self.log("debug", "Deleting document {}".format(doc))
os.unlink(doc)
def _is_ready(self, doc):
"""
Detect whether ``doc`` is ready to consume or if it's still being
written to by the uploader.
"""
t = os.stat(doc).st_mtime
if self.stats.get(doc) == t:
del(self.stats[doc])
return True
self.stats[doc] = t
return False
@staticmethod
def _is_duplicate(doc):
with open(doc, "rb") as f:

View File

@@ -20,7 +20,7 @@ class MailFetcherError(Exception):
pass
class InvalidMessageError(Exception):
class InvalidMessageError(MailFetcherError):
pass
@@ -75,6 +75,9 @@ class Message(Loggable):
continue
dispositions = content_disposition.strip().split(";")
if len(dispositions) < 2:
continue
if not dispositions[0].lower() == "attachment" and \
"filename" not in dispositions[1].lower():
continue
@@ -159,8 +162,10 @@ class MailFetcher(Loggable):
self._inbox = os.getenv("PAPERLESS_CONSUME_MAIL_INBOX", "INBOX")
self._enabled = bool(self._host)
if self._enabled and Message.SECRET is None:
raise MailFetcherError("No PAPERLESS_EMAIL_SECRET defined")
self.last_checked = datetime.datetime.now()
self.last_checked = time.time()
self.consume = consume
def pull(self):
@@ -187,7 +192,7 @@ class MailFetcher(Loggable):
f.write(message.attachment.data)
os.utime(file_name, times=(t, t))
self.last_checked = datetime.datetime.now()
self.last_checked = time.time()
def _get_messages(self):
@@ -205,7 +210,7 @@ class MailFetcher(Loggable):
self._connection.close()
self._connection.logout()
except Exception as e:
except MailFetcherError as e:
self.log("error", str(e))
return r

View File

@@ -1,6 +1,7 @@
import datetime
import logging
import os
import sys
import time
from django.conf import settings
@@ -9,6 +10,11 @@ from django.core.management.base import BaseCommand, CommandError
from ...consumer import Consumer, ConsumerError
from ...mail import MailFetcher, MailFetcherError
try:
from inotify_simple import INotify, flags
except ImportError:
pass
class Command(BaseCommand):
"""
@@ -53,13 +59,20 @@ class Command(BaseCommand):
action="store_true",
help="Run only once."
)
parser.add_argument(
"--no-inotify",
action="store_true",
help="Don't use inotify, even if it's available."
)
def handle(self, *args, **options):
self.verbosity = options["verbosity"]
directory = options["directory"]
loop_time = options["loop_time"]
mail_delta = datetime.timedelta(minutes=options["mail_delta"])
mail_delta = options["mail_delta"] * 60
use_inotify = (not options["no_inotify"]
and "inotify_simple" in sys.modules)
try:
self.file_consumer = Consumer(consume=directory)
@@ -67,39 +80,68 @@ class Command(BaseCommand):
except (ConsumerError, MailFetcherError) as e:
raise CommandError(e)
for path in (self.ORIGINAL_DOCS, self.THUMB_DOCS):
try:
os.makedirs(path)
except FileExistsError:
pass
for d in (self.ORIGINAL_DOCS, self.THUMB_DOCS):
os.makedirs(d, exists_ok=True)
logging.getLogger(__name__).info(
"Starting document consumer at {}".format(directory)
"Starting document consumer at {}{}".format(
directory,
" with inotify" if use_inotify else ""
)
)
if options["oneshot"]:
self.loop(mail_delta=mail_delta)
self.loop_step(mail_delta)
else:
try:
while True:
self.loop(mail_delta=mail_delta)
time.sleep(loop_time)
if self.verbosity > 1:
print(".", int(time.time()))
if use_inotify:
self.loop_inotify(mail_delta)
else:
self.loop(loop_time, mail_delta)
except KeyboardInterrupt:
print("Exiting")
def loop(self, mail_delta):
def loop(self, loop_time, mail_delta):
while True:
start_time = time.time()
if self.verbosity > 1:
print(".", int(start_time))
self.loop_step(mail_delta, start_time)
# Sleep until the start of the next loop step
time.sleep(max(0, start_time + loop_time - time.time()))
def loop_step(self, mail_delta, time_now=None):
# Occasionally fetch mail and store it to be consumed on the next loop
# We fetch email when we first start up so that it is not necessary to
# wait for 10 minutes after making changes to the config file.
delta = self.mail_fetcher.last_checked + mail_delta
if self.first_iteration or delta < datetime.datetime.now():
next_mail_time = self.mail_fetcher.last_checked + mail_delta
if self.first_iteration or time_now > next_mail_time:
self.first_iteration = False
self.mail_fetcher.pull()
# Consume whatever files we can.
# We have to run twice as the first run checks for file readiness
for i in range(2):
self.file_consumer.run()
self.file_consumer.consume_new_files()
def loop_inotify(self, mail_delta):
directory = self.file_consumer.consume
inotify = INotify()
inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO)
# Run initial mail fetch and consume all currently existing documents
self.loop_step(mail_delta)
next_mail_time = self.mail_fetcher.last_checked + mail_delta
while True:
# Consume documents until next_mail_time
while True:
delta = next_mail_time - time.time()
if delta > 0:
for event in inotify.read(timeout=delta):
file = os.path.join(directory, event.name)
if os.path.isfile(file):
self.file_consumer.try_consume_file(file)
else:
break
self.mail_fetcher.pull()
next_mail_time = self.mail_fetcher.last_checked + mail_delta

View File

@@ -246,6 +246,8 @@ SCRATCH_DIR = os.getenv("PAPERLESS_SCRATCH_DIR", "/tmp/paperless")
# This is where Paperless will look for PDFs to index
CONSUMPTION_DIR = os.getenv("PAPERLESS_CONSUMPTION_DIR")
# (This setting is ignored on Linux where inotify is used instead of a
# polling loop.)
# The number of seconds that Paperless will wait between checking
# CONSUMPTION_DIR. If you tend to write documents to this directory very
# slowly, you may want to use a higher value than the default.