2021-01-09 01:54:51 +01:00

204 lines
6.1 KiB
Python

import logging
import os
from pathlib import Path
from time import sleep
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.text import slugify
from django_q.tasks import async_task
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
from documents.models import Tag
from documents.parsers import is_file_ext_supported
try:
from inotifyrecursive import INotify, flags
except ImportError:
INotify = flags = None
logger = logging.getLogger(__name__)
def _tags_from_path(filepath):
"""Walk up the directory tree from filepath to CONSUMPTION_DIr
and get or create Tag IDs for every directory.
"""
tag_ids = set()
path_parts = Path(filepath).relative_to(
settings.CONSUMPTION_DIR).parent.parts
for part in path_parts:
tag_ids.add(Tag.objects.get_or_create(name__iexact=part, defaults={
"name": part
})[0].pk)
return tag_ids
def _consume(filepath):
if os.path.isdir(filepath):
return
if not os.path.isfile(filepath):
logger.debug(
f"Not consuming file {filepath}: File has moved.")
return
if not is_file_ext_supported(os.path.splitext(filepath)[1]):
logger.warning(
f"Not consuming file {filepath}: Unknown file extension.")
return
tag_ids = None
try:
if settings.CONSUMER_SUBDIRS_AS_TAGS:
tag_ids = _tags_from_path(filepath)
except Exception as e:
logger.error(
"Error creating tags from path: {}".format(e))
try:
async_task("documents.tasks.consume_file",
filepath,
override_tag_ids=tag_ids if tag_ids else None,
task_name=os.path.basename(filepath)[:100])
except Exception as e:
# Catch all so that the consumer won't crash.
# This is also what the test case is listening for to check for
# errors.
logger.error(
"Error while consuming document: {}".format(e))
def _consume_wait_unmodified(file, num_tries=20, wait_time=1):
mtime = -1
current_try = 0
while current_try < num_tries:
try:
new_mtime = os.stat(file).st_mtime
except FileNotFoundError:
logger.debug(f"File {file} moved while waiting for it to remain "
f"unmodified.")
return
if new_mtime == mtime:
_consume(file)
return
mtime = new_mtime
sleep(wait_time)
current_try += 1
logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
class Handler(FileSystemEventHandler):
def on_created(self, event):
_consume_wait_unmodified(event.src_path)
def on_moved(self, event):
_consume_wait_unmodified(event.dest_path)
class Command(BaseCommand):
"""
On every iteration of an infinite loop, consume what we can from the
consumption directory.
"""
# This is here primarily for the tests and is irrelevant in production.
stop_flag = False
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
BaseCommand.__init__(self, *args, **kwargs)
self.observer = None
def add_arguments(self, parser):
parser.add_argument(
"directory",
default=settings.CONSUMPTION_DIR,
nargs="?",
help="The consumption directory."
)
parser.add_argument(
"--oneshot",
action="store_true",
help="Run only once."
)
def handle(self, *args, **options):
directory = options["directory"]
recursive = settings.CONSUMER_RECURSIVE
if not directory:
raise CommandError(
"CONSUMPTION_DIR does not appear to be set."
)
if not os.path.isdir(directory):
raise CommandError(
f"Consumption directory {directory} does not exist")
if recursive:
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
_consume(filepath)
else:
for entry in os.scandir(directory):
_consume(entry.path)
if options["oneshot"]:
return
if settings.CONSUMER_POLLING == 0 and INotify:
self.handle_inotify(directory, recursive)
else:
self.handle_polling(directory, recursive)
logger.debug("Consumer exiting.")
def handle_polling(self, directory, recursive):
logging.getLogger(__name__).info(
f"Polling directory for changes: {directory}")
self.observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
self.observer.schedule(Handler(), directory, recursive=recursive)
self.observer.start()
try:
while self.observer.is_alive():
self.observer.join(1)
if self.stop_flag:
self.observer.stop()
except KeyboardInterrupt:
self.observer.stop()
self.observer.join()
def handle_inotify(self, directory, recursive):
logging.getLogger(__name__).info(
f"Using inotify to watch directory for changes: {directory}")
inotify = INotify()
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO
if recursive:
descriptor = inotify.add_watch_recursive(directory, inotify_flags)
else:
descriptor = inotify.add_watch(directory, inotify_flags)
try:
while not self.stop_flag:
for event in inotify.read(timeout=1000):
if recursive:
path = inotify.get_path(event.wd)
else:
path = directory
filepath = os.path.join(path, event.name)
_consume(filepath)
except KeyboardInterrupt:
pass
inotify.rm_watch(descriptor)
inotify.close()