updated consumer: now using watchdog

This commit is contained in:
Jonas Winkler 2020-11-01 23:07:54 +01:00
parent 8f4ddb30c1
commit 9f29dc2863
4 changed files with 130 additions and 224 deletions

19
Pipfile
View File

@ -4,29 +4,28 @@ verify_ssl = true
name = "pypi" name = "pypi"
[packages] [packages]
django = "*" django = "~=3.1"
pillow = "*" pillow = "*"
dateparser = "*" dateparser = "~=0.7"
django-cors-headers = "*" django-cors-headers = "*"
djangorestframework = "*" djangorestframework = "~=3.12"
inotify-simple = "*"
python-gnupg = "*" python-gnupg = "*"
python-dotenv = "*" python-dotenv = "*"
filemagic = "*" filemagic = "*"
pyocr = "*" pyocr = "~=0.7"
langdetect = "*" langdetect = "*"
pdftotext = "*" pdftotext = "*"
django-filter = "*" django-filter = "~=2.4"
python-dateutil = "*" python-dateutil = "*"
psycopg2-binary = "*" psycopg2-binary = "*"
scikit-learn="*" scikit-learn="~=0.23"
whoosh="*" whoosh="~=2.7"
gunicorn = "*" gunicorn = "*"
whitenoise = "*" whitenoise = "*"
fuzzywuzzy = "*" fuzzywuzzy = "*"
python-Levenshtein = "*" python-Levenshtein = "*"
django-extensions = ""
django-extensions = "*" watchdog = "*"
[dev-packages] [dev-packages]
coveralls = "*" coveralls = "*"

160
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "48343a032c1becd5f1a3ae46c2ade70c14c251591c5f9cb49dd2cab26b0e0bea" "sha256": "2c1558fe7df0aee1ee20b095c2102f802470bf4a4ae09a7749ac487f8bfab8b6"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": {}, "requires": {},
@ -52,6 +52,7 @@
"sha256:dc663652ac9460fd06580a973576820430c6d428720e874ae46b041fa63e0efa" "sha256:dc663652ac9460fd06580a973576820430c6d428720e874ae46b041fa63e0efa"
], ],
"index": "pypi", "index": "pypi",
"markers": "python_version >= '3.5'",
"version": "==3.0.9" "version": "==3.0.9"
}, },
"django-filter": { "django-filter": {
@ -93,13 +94,6 @@
"index": "pypi", "index": "pypi",
"version": "==20.0.4" "version": "==20.0.4"
}, },
"inotify-simple": {
"hashes": [
"sha256:8440ffe49c4ae81a8df57c1ae1eb4b6bfa7acb830099bfb3e305b383005cc128"
],
"index": "pypi",
"version": "==1.3.5"
},
"joblib": { "joblib": {
"hashes": [ "hashes": [
"sha256:698c311779f347cf6b7e6b8a39bb682277b8ee4aba8cf9507bc0cf4cd4737b72", "sha256:698c311779f347cf6b7e6b8a39bb682277b8ee4aba8cf9507bc0cf4cd4737b72",
@ -118,35 +112,49 @@
}, },
"numpy": { "numpy": {
"hashes": [ "hashes": [
"sha256:04c7d4ebc5ff93d9822075ddb1751ff392a4375e5885299445fcebf877f179d5", "sha256:0ee77786eebbfa37f2141fd106b549d37c89207a0d01d8852fde1c82e9bfc0e7",
"sha256:0bfd85053d1e9f60234f28f63d4a5147ada7f432943c113a11afcf3e65d9d4c8", "sha256:199bebc296bd8a5fc31c16f256ac873dd4d5b4928dfd50e6c4995570fc71a8f3",
"sha256:0c66da1d202c52051625e55a249da35b31f65a81cb56e4c69af0dfb8fb0125bf", "sha256:1a307bdd3dd444b1d0daa356b5f4c7de2e24d63bdc33ea13ff718b8ec4c6a268",
"sha256:0d310730e1e793527065ad7dde736197b705d0e4c9999775f212b03c44a8484c", "sha256:1ea7e859f16e72ab81ef20aae69216cfea870676347510da9244805ff9670170",
"sha256:1669ec8e42f169ff715a904c9b2105b6640f3f2a4c4c2cb4920ae8b2785dac65", "sha256:271139653e8b7a046d11a78c0d33bafbddd5c443a5b9119618d0652a4eb3a09f",
"sha256:2117536e968abb7357d34d754e3733b0d7113d4c9f1d921f21a3d96dec5ff716", "sha256:35bf5316af8dc7c7db1ad45bec603e5fb28671beb98ebd1d65e8059efcfd3b72",
"sha256:3733640466733441295b0d6d3dcbf8e1ffa7e897d4d82903169529fd3386919a", "sha256:463792a249a81b9eb2b63676347f996d3f0082c2666fd0604f4180d2e5445996",
"sha256:4339741994c775396e1a274dba3609c69ab0f16056c1077f18979bec2a2c2e6e", "sha256:50d3513469acf5b2c0406e822d3f314d7ac5788c2b438c24e5dd54d5a81ef522",
"sha256:51ee93e1fac3fe08ef54ff1c7f329db64d8a9c5557e6c8e908be9497ac76374b", "sha256:50f68ebc439821b826823a8da6caa79cd080dee2a6d5ab9f1163465a060495ed",
"sha256:54045b198aebf41bf6bf4088012777c1d11703bf74461d70cd350c0af2182e45", "sha256:51e8d2ae7c7e985c7bebf218e56f72fa93c900ad0c8a7d9fbbbf362f45710f69",
"sha256:58d66a6b3b55178a1f8a5fe98df26ace76260a70de694d99577ddeab7eaa9a9d", "sha256:522053b731e11329dd52d258ddf7de5288cae7418b55e4b7d32f0b7e31787e9d",
"sha256:59f3d687faea7a4f7f93bd9665e5b102f32f3fa28514f15b126f099b7997203d", "sha256:5ea4401ada0d3988c263df85feb33818dc995abc85b8125f6ccb762009e7bc68",
"sha256:62139af94728d22350a571b7c82795b9d59be77fc162414ada6c8b6a10ef5d02", "sha256:604d2e5a31482a3ad2c88206efd43d6fcf666ada1f3188fd779b4917e49b7a98",
"sha256:7118f0a9f2f617f921ec7d278d981244ba83c85eea197be7c5a4f84af80a9c3c", "sha256:6ff88bcf1872b79002569c63fe26cd2cda614e573c553c4d5b814fb5eb3d2822",
"sha256:7c6646314291d8f5ea900a7ea9c4261f834b5b62159ba2abe3836f4fa6705526", "sha256:7197ee0a25629ed782c7bd01871ee40702ffeef35bc48004bc2fdcc71e29ba9d",
"sha256:967c92435f0b3ba37a4257c48b8715b76741410467e2bdb1097e8391fccfae15", "sha256:741d95eb2b505bb7a99fbf4be05fa69f466e240c2b4f2d3ddead4f1b5f82a5a5",
"sha256:9a3001248b9231ed73894c773142658bab914645261275f675d86c290c37f66d", "sha256:83af653bb92d1e248ccf5fdb05ccc934c14b936bcfe9b917dc180d3f00250ac6",
"sha256:aba1d5daf1144b956bc87ffb87966791f5e9f3e1f6fab3d7f581db1f5b598f7a", "sha256:8802d23e4895e0c65e418abe67cdf518aa5cbb976d97f42fd591f921d6dffad0",
"sha256:addaa551b298052c16885fc70408d3848d4e2e7352de4e7a1e13e691abc734c1", "sha256:8edc4d687a74d0a5f8b9b26532e860f4f85f56c400b3a98899fc44acb5e27add",
"sha256:b594f76771bc7fc8a044c5ba303427ee67c17a09b36e1fa32bde82f5c419d17a", "sha256:942d2cdcb362739908c26ce8dd88db6e139d3fa829dd7452dd9ff02cba6b58b2",
"sha256:c35a01777f81e7333bcf276b605f39c872e28295441c265cd0c860f4b40148c1", "sha256:9a0669787ba8c9d3bb5de5d9429208882fb47764aa79123af25c5edc4f5966b9",
"sha256:cebd4f4e64cfe87f2039e4725781f6326a61f095bc77b3716502bed812b385a9", "sha256:9d08d84bb4128abb9fbd9f073e5c69f70e5dab991a9c42e5b4081ea5b01b5db0",
"sha256:d526fa58ae4aead839161535d59ea9565863bb0b0bdb3cc63214613fb16aced4", "sha256:9f7f56b5e85b08774939622b7d45a5d00ff511466522c44fc0756ac7692c00f2",
"sha256:d7ac33585e1f09e7345aa902c281bd777fdb792432d27fca857f39b70e5dd31c", "sha256:a2daea1cba83210c620e359de2861316f49cc7aea8e9a6979d6cb2ddab6dda8c",
"sha256:e6ddbdc5113628f15de7e4911c02aed74a4ccff531842c583e5032f6e5a179bd", "sha256:b9074d062d30c2779d8af587924f178a539edde5285d961d2dfbecbac9c4c931",
"sha256:eb25c381d168daf351147713f49c626030dcff7a393d5caa62515d415a6071d8" "sha256:c4aa79993f5d856765819a3651117520e41ac3f89c3fc1cb6dee11aa562df6da",
"sha256:d78294f1c20f366cde8a75167f822538a7252b6e8b9d6dbfb3bdab34e7c1929e",
"sha256:dfdc8b53aa9838b9d44ed785431ca47aa3efaa51d0d5dd9c412ab5247151a7c4",
"sha256:dffed17848e8b968d8d3692604e61881aa6ef1f8074c99e81647ac84f6038535",
"sha256:e080087148fd70469aade2abfeadee194357defd759f9b59b349c6192aba994c",
"sha256:e983cbabe10a8989333684c98fdc5dd2f28b236216981e0c26ed359aaa676772",
"sha256:ea6171d2d8d648dee717457d0f75db49ad8c2f13100680e284d7becf3dc311a6",
"sha256:eefc13863bf01583a85e8c1121a901cc7cb8f059b960c4eba30901e2e6aba95f",
"sha256:efd656893171bbf1331beca4ec9f2e74358fc732a2084f664fd149cc4b3441d2"
], ],
"markers": "python_version >= '3.6'", "markers": "python_version >= '3.6'",
"version": "==1.19.2" "version": "==1.19.3"
},
"pathtools": {
"hashes": [
"sha256:7c35c5421a39bb82e58018febd90e3b6e5db34c5443aaaf742b3f33d4655f1c0"
],
"version": "==0.1.2"
}, },
"pdftotext": { "pdftotext": {
"hashes": [ "hashes": [
@ -245,11 +253,11 @@
}, },
"python-dotenv": { "python-dotenv": {
"hashes": [ "hashes": [
"sha256:8c10c99a1b25d9a68058a1ad6f90381a62ba68230ca93966882a4dbc3bc9c33d", "sha256:0c8d1b80d1a1e91717ea7d526178e3882732420b03f08afea0406db6402e220e",
"sha256:c10863aee750ad720f4f43436565e4c1698798d763b63234fb5021b6c616e423" "sha256:587825ed60b1711daea4832cf37524dfd404325b7db5e25ebe88c495c9f807a0"
], ],
"index": "pypi", "index": "pypi",
"version": "==0.14.0" "version": "==0.15.0"
}, },
"python-gnupg": { "python-gnupg": {
"hashes": [ "hashes": [
@ -275,35 +283,35 @@
}, },
"regex": { "regex": {
"hashes": [ "hashes": [
"sha256:0cb23ed0e327c18fb7eac61ebbb3180ebafed5b9b86ca2e15438201e5903b5dd", "sha256:03855ee22980c3e4863dc84c42d6d2901133362db5daf4c36b710dd895d78f0a",
"sha256:1a065e7a6a1b4aa851a0efa1a2579eabc765246b8b3a5fd74000aaa3134b8b4e", "sha256:06b52815d4ad38d6524666e0d50fe9173533c9cc145a5779b89733284e6f688f",
"sha256:1a511470db3aa97432ac8c1bf014fcc6c9fbfd0f4b1313024d342549cf86bcd6", "sha256:11116d424734fe356d8777f89d625f0df783251ada95d6261b4c36ad27a394bb",
"sha256:1c447b0d108cddc69036b1b3910fac159f2b51fdeec7f13872e059b7bc932be1", "sha256:119e0355dbdd4cf593b17f2fc5dbd4aec2b8899d0057e4957ba92f941f704bf5",
"sha256:2278453c6a76280b38855a263198961938108ea2333ee145c5168c36b8e2b376", "sha256:1ec66700a10e3c75f1f92cbde36cca0d3aaee4c73dfa26699495a3a30b09093c",
"sha256:240509721a663836b611fa13ca1843079fc52d0b91ef3f92d9bba8da12e768a0", "sha256:2dc522e25e57e88b4980d2bdd334825dbf6fa55f28a922fc3bfa60cc09e5ef53",
"sha256:4e21340c07090ddc8c16deebfd82eb9c9e1ec5e62f57bb86194a2595fd7b46e0", "sha256:3a5f08039eee9ea195a89e180c5762bfb55258bfb9abb61a20d3abee3b37fd12",
"sha256:570e916a44a361d4e85f355aacd90e9113319c78ce3c2d098d2ddf9631b34505", "sha256:49461446b783945597c4076aea3f49aee4b4ce922bd241e4fcf62a3e7c61794c",
"sha256:59d5c6302d22c16d59611a9fd53556554010db1d47e9df5df37be05007bebe75", "sha256:4afa350f162551cf402bfa3cd8302165c8e03e689c897d185f16a167328cc6dd",
"sha256:6a46eba253cedcbe8a6469f881f014f0a98819d99d341461630885139850e281", "sha256:4b5a9bcb56cc146c3932c648603b24514447eafa6ce9295234767bf92f69b504",
"sha256:6f567df0601e9c7434958143aebea47a9c4b45434ea0ae0286a4ec19e9877169", "sha256:625116aca6c4b57c56ea3d70369cacc4d62fead4930f8329d242e4fe7a58ce4b",
"sha256:781906e45ef1d10a0ed9ec8ab83a09b5e0d742de70e627b20d61ccb1b1d3964d", "sha256:654c1635f2313d0843028487db2191530bca45af61ca85d0b16555c399625b0e",
"sha256:8469377a437dbc31e480993399fd1fd15fe26f382dc04c51c9cb73e42965cc06", "sha256:8092a5a06ad9a7a247f2a76ace121183dc4e1a84c259cf9c2ce3bbb69fac3582",
"sha256:8cd0d587aaac74194ad3e68029124c06245acaeddaae14cb45844e5c9bebeea4", "sha256:832339223b9ce56b7b15168e691ae654d345ac1635eeb367ade9ecfe0e66bee0",
"sha256:97a023f97cddf00831ba04886d1596ef10f59b93df7f855856f037190936e868", "sha256:8ca9dca965bd86ea3631b975d63b0693566d3cc347e55786d5514988b6f5b84c",
"sha256:a973d5a7a324e2a5230ad7c43f5e1383cac51ef4903bf274936a5634b724b531", "sha256:a62162be05edf64f819925ea88d09d18b09bebf20971b363ce0c24e8b4aa14c0",
"sha256:af360e62a9790e0a96bc9ac845d87bfa0e4ee0ee68547ae8b5a9c1030517dbef", "sha256:b88fa3b8a3469f22b4f13d045d9bd3eda797aa4e406fde0a2644bc92bbdd4bdd",
"sha256:b706c70070eea03411b1761fff3a2675da28d042a1ab7d0863b3efe1faa125c9", "sha256:c13d311a4c4a8d671f5860317eb5f09591fbe8259676b86a85769423b544451e",
"sha256:bfd7a9fddd11d116a58b62ee6c502fd24cfe22a4792261f258f886aa41c2a899", "sha256:c2c6c56ee97485a127555c9595c069201b5161de9d05495fbe2132b5ac104786",
"sha256:c30d8766a055c22e39dd7e1a4f98f6266169f2de05db737efe509c2fb9c8a3c8", "sha256:c3466a84fce42c2016113101018a9981804097bacbab029c2d5b4fcb224b89de",
"sha256:c53dc8ee3bb7b7e28ee9feb996a0c999137be6c1d3b02cb6b3c4cba4f9e5ed09", "sha256:c8a2b7ccff330ae4c460aff36626f911f918555660cc28163417cb84ffb25789",
"sha256:c95d514093b80e5309bdca5dd99e51bcf82c44043b57c34594d9d7556bd04d05", "sha256:cb905f3d2e290a8b8f1579d3984f2cfa7c3a29cc7cba608540ceeed18513f520",
"sha256:d43cf21df524283daa80ecad551c306b7f52881c8d0fe4e3e76a96b626b6d8d8", "sha256:cfcf28ed4ce9ced47b9b9670a4f0d3d3c0e4d4779ad4dadb1ad468b097f808aa",
"sha256:d62205f00f461fe8b24ade07499454a3b7adf3def1225e258b994e2215fd15c5", "sha256:dd3e6547ecf842a29cf25123fbf8d2461c53c8d37aa20d87ecee130c89b7079b",
"sha256:e289a857dca3b35d3615c3a6a438622e20d1bf0abcb82c57d866c8d0be3f44c4", "sha256:ea37320877d56a7f0a1e6a625d892cf963aa7f570013499f5b8d5ab8402b5625",
"sha256:e5f6aa56dda92472e9d6f7b1e6331f4e2d51a67caafff4d4c5121cadac03941e", "sha256:f1fce1e4929157b2afeb4bb7069204d4370bab9f4fc03ca1fbec8bd601f8c87d",
"sha256:f4b1c65ee86bfbf7d0c3dfd90592a9e3d6e9ecd36c367c884094c050d4c35d04" "sha256:f43109822df2d3faac7aad79613f5f02e4eab0fc8ad7932d2e70e2a83bd49c26"
], ],
"version": "==2020.10.23" "version": "==2020.10.28"
}, },
"scikit-learn": { "scikit-learn": {
"hashes": [ "hashes": [
@ -383,6 +391,13 @@
], ],
"version": "==2.1" "version": "==2.1"
}, },
"watchdog": {
"hashes": [
"sha256:4214e1379d128b0588021880ccaf40317ee156d4603ac388b9adcf29165e0c04"
],
"index": "pypi",
"version": "==0.10.3"
},
"whitenoise": { "whitenoise": {
"hashes": [ "hashes": [
"sha256:05ce0be39ad85740a78750c86a93485c40f08ad8c62a6006de0233765996e5c7", "sha256:05ce0be39ad85740a78750c86a93485c40f08ad8c62a6006de0233765996e5c7",
@ -674,11 +689,11 @@
}, },
"pytest": { "pytest": {
"hashes": [ "hashes": [
"sha256:7a8190790c17d79a11f847fba0b004ee9a8122582ebff4729a082c109e81a4c9", "sha256:4288fed0d9153d9646bfcdf0c0428197dba1ecb27a33bb6e031d002fa88653fe",
"sha256:8f593023c1a0f916110285b6efd7f99db07d59546e3d8c36fc60e2ab05d3be92" "sha256:c0a7e94a8cdbc5422a51ccdad8e6f1024795939cc89159a0ae7f0b316ad3823e"
], ],
"index": "pypi", "index": "pypi",
"version": "==6.1.1" "version": "==6.1.2"
}, },
"pytest-cov": { "pytest-cov": {
"hashes": [ "hashes": [
@ -835,10 +850,11 @@
}, },
"toml": { "toml": {
"hashes": [ "hashes": [
"sha256:926b612be1e5ce0634a2ca03470f95169cf16f939018233a670519cb4ac58b0f", "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
"sha256:bda89d5935c2eac546d648028b9901107a595863cb36bae0c73ac804a9b4ce88" "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
], ],
"version": "==0.10.1" "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.10.2"
}, },
"tox": { "tox": {
"hashes": [ "hashes": [

View File

@ -4,10 +4,8 @@ import hashlib
import logging import logging
import os import os
import re import re
import time
import uuid import uuid
from operator import itemgetter
from django.conf import settings from django.conf import settings
from django.utils import timezone from django.utils import timezone
from paperless.db import GnuPG from paperless.db import GnuPG
@ -36,17 +34,12 @@ class Consumer:
5. Delete the document and image(s) 5. Delete the document and image(s)
""" """
# Files are considered ready for consumption if they have been unmodified
# for this duration
FILES_MIN_UNMODIFIED_DURATION = 0.5
def __init__(self, consume=settings.CONSUMPTION_DIR, def __init__(self, consume=settings.CONSUMPTION_DIR,
scratch=settings.SCRATCH_DIR): scratch=settings.SCRATCH_DIR):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.logging_group = None self.logging_group = None
self._ignore = []
self.consume = consume self.consume = consume
self.scratch = scratch self.scratch = scratch
@ -83,43 +76,6 @@ class Consumer:
"group": self.logging_group "group": self.logging_group
}) })
def consume_new_files(self):
"""
Find non-ignored files in consumption dir and consume them if they have
been unmodified for FILES_MIN_UNMODIFIED_DURATION.
"""
ignored_files = []
files = []
for entry in os.scandir(self.consume):
if entry.is_file():
file = (entry.path, entry.stat().st_mtime)
if file in self._ignore:
ignored_files.append(file)
else:
files.append(file)
else:
self.logger.warning(
"Skipping %s as it is not a file",
entry.path
)
if not files:
return
# Set _ignore to only include files that still exist.
# This keeps it from growing indefinitely.
self._ignore[:] = ignored_files
files_old_to_new = sorted(files, key=itemgetter(1))
time.sleep(self.FILES_MIN_UNMODIFIED_DURATION)
for file, mtime in files_old_to_new:
if mtime == os.path.getmtime(file):
# File has not been modified and can be consumed
if not self.try_consume_file(file):
self._ignore.append((file, mtime))
@transaction.atomic @transaction.atomic
def try_consume_file(self, file): def try_consume_file(self, file):
""" """

View File

@ -1,12 +1,13 @@
import logging import logging
import os import os
import time
from django.conf import settings from django.conf import settings
from django.core.management.base import BaseCommand, CommandError from django.core.management.base import BaseCommand
from ...consumer import Consumer, ConsumerError from watchdog.observers import Observer
from ...mail import MailFetcher, MailFetcherError from watchdog.events import FileSystemEventHandler
from documents.consumer import Consumer
try: try:
from inotify_simple import INotify, flags from inotify_simple import INotify, flags
@ -14,6 +15,15 @@ except ImportError:
INotify = flags = None INotify = flags = None
class Handler(FileSystemEventHandler):
def __init__(self, consumer):
self.consumer = consumer
def on_created(self, event):
self.consumer.try_consume_file(event.src_path)
class Command(BaseCommand): class Command(BaseCommand):
""" """
On every iteration of an infinite loop, consume what we can from the On every iteration of an infinite loop, consume what we can from the
@ -29,6 +39,8 @@ class Command(BaseCommand):
self.mail_fetcher = None self.mail_fetcher = None
self.first_iteration = True self.first_iteration = True
self.consumer = Consumer()
BaseCommand.__init__(self, *args, **kwargs) BaseCommand.__init__(self, *args, **kwargs)
def add_arguments(self, parser): def add_arguments(self, parser):
@ -38,111 +50,34 @@ class Command(BaseCommand):
nargs="?", nargs="?",
help="The consumption directory." help="The consumption directory."
) )
parser.add_argument(
"--loop-time",
default=settings.CONSUMER_LOOP_TIME,
type=int,
help="Wait time between each loop (in seconds)."
)
parser.add_argument(
"--mail-delta",
default=10,
type=int,
help="Wait time between each mail fetch (in minutes)."
)
parser.add_argument(
"--oneshot",
action="store_true",
help="Run only once."
)
parser.add_argument(
"--no-inotify",
action="store_true",
help="Don't use inotify, even if it's available.",
default=False
)
def handle(self, *args, **options): def handle(self, *args, **options):
self.verbosity = options["verbosity"] self.verbosity = options["verbosity"]
directory = options["directory"] directory = options["directory"]
loop_time = options["loop_time"]
mail_delta = options["mail_delta"] * 60
use_inotify = INotify is not None and options["no_inotify"] is False
try:
self.file_consumer = Consumer(consume=directory)
self.mail_fetcher = MailFetcher(consume=directory)
except (ConsumerError, MailFetcherError) as e:
raise CommandError(e)
for d in (settings.ORIGINALS_DIR, settings.THUMBNAIL_DIR): for d in (settings.ORIGINALS_DIR, settings.THUMBNAIL_DIR):
os.makedirs(d, exist_ok=True) os.makedirs(d, exist_ok=True)
logging.getLogger(__name__).info( logging.getLogger(__name__).info(
"Starting document consumer at {}{}".format( "Starting document consumer at {}".format(
directory, directory
" with inotify" if use_inotify else ""
) )
) )
if options["oneshot"]: # Consume all files as this is not done initially by the watchdog
self.loop_step(mail_delta) for entry in os.scandir(directory):
else: if entry.is_file():
try: self.consumer.try_consume_file(entry.path)
if use_inotify:
self.loop_inotify(mail_delta)
else:
self.loop(loop_time, mail_delta)
except KeyboardInterrupt:
print("Exiting")
def loop(self, loop_time, mail_delta): # Start the watchdog. Woof!
while True: observer = Observer()
start_time = time.time() event_handler = Handler(self.consumer)
if self.verbosity > 1: observer.schedule(event_handler, directory, recursive=True)
print(".", int(start_time)) observer.start()
self.loop_step(mail_delta, start_time) try:
# Sleep until the start of the next loop step while observer.is_alive():
time.sleep(max(0, start_time + loop_time - time.time())) observer.join(1)
except KeyboardInterrupt:
def loop_step(self, mail_delta, time_now=None): observer.stop()
observer.join()
# Occasionally fetch mail and store it to be consumed on the next loop
# We fetch email when we first start up so that it is not necessary to
# wait for 10 minutes after making changes to the config file.
next_mail_time = self.mail_fetcher.last_checked + mail_delta
if self.first_iteration or time_now > next_mail_time:
self.first_iteration = False
self.mail_fetcher.pull()
self.file_consumer.consume_new_files()
def loop_inotify(self, mail_delta):
directory = self.file_consumer.consume
inotify = INotify()
inotify.add_watch(directory, flags.CLOSE_WRITE | flags.MOVED_TO)
# Run initial mail fetch and consume all currently existing documents
self.loop_step(mail_delta)
next_mail_time = self.mail_fetcher.last_checked + mail_delta
while True:
# Consume documents until next_mail_time
while True:
delta = next_mail_time - time.time()
if delta > 0:
for event in inotify.read(timeout=delta):
file = os.path.join(directory, event.name)
if os.path.isfile(file):
self.file_consumer.try_consume_file(file)
else:
self.logger.warning(
"Skipping %s as it is not a file",
file
)
else:
break
self.mail_fetcher.pull()
next_mail_time = self.mail_fetcher.last_checked + mail_delta