checking file types against parsers in the consumer.

This commit is contained in:
jonaswinkler 2020-12-01 15:26:05 +01:00
parent b0c8ade241
commit 834352130c
5 changed files with 127 additions and 113 deletions

View File

@ -41,21 +41,6 @@ class Consumer(LoggingMixin):
raise ConsumerError("Cannot consume {}: It is not a file".format( raise ConsumerError("Cannot consume {}: It is not a file".format(
self.path)) self.path))
def pre_check_file_extension(self):
extensions = get_supported_file_extensions()
_, ext = os.path.splitext(self.filename)
if not ext:
raise ConsumerError(
f"Not consuming {self.filename}: File type unknown."
)
if ext not in extensions:
raise ConsumerError(
f"Not consuming {self.filename}: File extension {ext} does "
f"not map to any known file type ({str(extensions)})"
)
def pre_check_duplicate(self): def pre_check_duplicate(self):
with open(self.path, "rb") as f: with open(self.path, "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest() checksum = hashlib.md5(f.read()).hexdigest()
@ -98,7 +83,6 @@ class Consumer(LoggingMixin):
# Make sure that preconditions for consuming the file are met. # Make sure that preconditions for consuming the file are met.
self.pre_check_file_exists() self.pre_check_file_exists()
self.pre_check_file_extension()
self.pre_check_directories() self.pre_check_directories()
self.pre_check_duplicate() self.pre_check_duplicate()

View File

@ -11,6 +11,7 @@ from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver from watchdog.observers.polling import PollingObserver
from documents.models import Tag from documents.models import Tag
from documents.parsers import is_file_ext_supported
try: try:
from inotifyrecursive import INotify, flags from inotifyrecursive import INotify, flags
@ -37,11 +38,19 @@ def _tags_from_path(filepath):
def _consume(filepath): def _consume(filepath):
if os.path.isdir(filepath):
return
if not os.path.isfile(filepath): if not os.path.isfile(filepath):
logger.debug( logger.debug(
f"Not consuming file {filepath}: File has moved.") f"Not consuming file {filepath}: File has moved.")
return return
if not is_file_ext_supported(os.path.splitext(filepath)[1]):
logger.debug(
f"Not consuming file {filepath}: Unknown file extension.")
return
tag_ids = None tag_ids = None
try: try:
if settings.CONSUMER_SUBDIRS_AS_TAGS: if settings.CONSUMER_SUBDIRS_AS_TAGS:
@ -181,7 +190,7 @@ class Command(BaseCommand):
try: try:
while not self.stop_flag: while not self.stop_flag:
for event in inotify.read(timeout=1000, read_delay=1000): for event in inotify.read(timeout=1000):
if recursive: if recursive:
path = inotify.get_path(event.wd) path = inotify.get_path(event.wd)
else: else:

View File

@ -54,6 +54,13 @@ def get_default_file_extension(mime_type):
return None return None
def is_file_ext_supported(ext):
if ext:
return ext.lower() in get_supported_file_extensions()
else:
return False
def get_supported_file_extensions(): def get_supported_file_extensions():
extensions = set() extensions = set()
for response in document_consumer_declaration.send(None): for response in document_consumer_declaration.send(None):

View File

@ -552,7 +552,7 @@ class TestConsumer(DirectoriesMixin, TestCase):
try: try:
self.consumer.try_consume_file(self.get_test_file()) self.consumer.try_consume_file(self.get_test_file())
except ConsumerError as e: except ConsumerError as e:
self.assertTrue("File extension .pdf does not map to any" in str(e)) self.assertTrue("No parsers abvailable for" in str(e))
return return
self.fail("Should throw exception") self.fail("Should throw exception")

View File

@ -34,12 +34,12 @@ def chunked(size, source):
yield source[i:i+size] yield source[i:i+size]
class TestConsumer(DirectoriesMixin, TransactionTestCase): class ConsumerMixin:
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
def setUp(self) -> None: def setUp(self) -> None:
super(TestConsumer, self).setUp() super(ConsumerMixin, self).setUp()
self.t = None self.t = None
patcher = mock.patch("documents.management.commands.document_consumer.async_task") patcher = mock.patch("documents.management.commands.document_consumer.async_task")
self.task_mock = patcher.start() self.task_mock = patcher.start()
@ -58,7 +58,7 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
# wait for the consumer to exit. # wait for the consumer to exit.
self.t.join() self.t.join()
super(TestConsumer, self).tearDown() super(ConsumerMixin, self).tearDown()
def wait_for_task_mock_call(self): def wait_for_task_mock_call(self):
n = 0 n = 0
@ -69,7 +69,6 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
return return
n += 1 n += 1
sleep(0.1) sleep(0.1)
self.fail("async_task was never called")
# A bogus async_task that will simply check the file for # A bogus async_task that will simply check the file for
# completeness and raise an exception otherwise. # completeness and raise an exception otherwise.
@ -96,6 +95,9 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
sleep(0.1) sleep(0.1)
print("file completed.") print("file completed.")
class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
def test_consume_file(self): def test_consume_file(self):
self.t_start() self.t_start()
@ -109,9 +111,15 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
args, kwargs = self.task_mock.call_args args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f) self.assertEqual(args[1], f)
@override_settings(CONSUMER_POLLING=1) def test_consume_file_invalid_ext(self):
def test_consume_file_polling(self): self.t_start()
self.test_consume_file()
f = os.path.join(self.dirs.consumption_dir, "my_file.wow")
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
self.task_mock.assert_not_called()
def test_consume_existing_file(self): def test_consume_existing_file(self):
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf") f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
@ -123,12 +131,101 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
args, kwargs = self.task_mock.call_args args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], f) self.assertEqual(args[1], f)
@override_settings(CONSUMER_POLLING=1) @mock.patch("documents.management.commands.document_consumer.logger.error")
def test_consume_existing_file_polling(self): def test_slow_write_pdf(self, error_logger):
self.test_consume_existing_file()
@override_settings(CONSUMER_RECURSIVE=1) self.task_mock.side_effect = self.bogus_task
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=1)
self.t_start()
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
self.slow_write_file(fname)
self.wait_for_task_mock_call()
error_logger.assert_not_called()
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_and_move(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.dirs.consumption_dir, "my_file.~df")
fname2 = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
self.slow_write_file(fname)
shutil.move(fname, fname2)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname2)
error_logger.assert_not_called()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_incomplete(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
self.slow_write_file(fname, incomplete=True)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
# assert that we have an error logged with this invalid file.
error_logger.assert_called_once()
@override_settings(CONSUMPTION_DIR="does_not_exist")
def test_consumption_directory_invalid(self):
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
@override_settings(CONSUMPTION_DIR="")
def test_consumption_directory_unset(self):
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
@override_settings(CONSUMER_POLLING=1)
class TestConsumerPolling(TestConsumer):
# just do all the tests with polling
pass
@override_settings(CONSUMER_RECURSIVE=True)
class TestConsumerRecursive(TestConsumer):
# just do all the tests with recursive
pass
@override_settings(CONSUMER_RECURSIVE=True)
@override_settings(CONSUMER_POLLING=1)
class TestConsumerRecursivePolling(TestConsumer):
# just do all the tests with polling and recursive
pass
class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
@override_settings(CONSUMER_RECURSIVE=True)
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=True)
def test_consume_file_with_path_tags(self): def test_consume_file_with_path_tags(self):
tag_names = ("existingTag", "Space Tag") tag_names = ("existingTag", "Space Tag")
@ -163,86 +260,3 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
@override_settings(CONSUMER_POLLING=1) @override_settings(CONSUMER_POLLING=1)
def test_consume_file_with_path_tags_polling(self): def test_consume_file_with_path_tags_polling(self):
self.test_consume_file_with_path_tags() self.test_consume_file_with_path_tags()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_pdf(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
self.slow_write_file(fname)
self.wait_for_task_mock_call()
error_logger.assert_not_called()
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
@override_settings(CONSUMER_POLLING=1)
def test_slow_write_pdf_polling(self):
self.test_slow_write_pdf()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_and_move(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.dirs.consumption_dir, "my_file.~df")
fname2 = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
self.slow_write_file(fname)
shutil.move(fname, fname2)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname2)
error_logger.assert_not_called()
@override_settings(CONSUMER_POLLING=1)
def test_slow_write_and_move_polling(self):
self.test_slow_write_and_move()
@mock.patch("documents.management.commands.document_consumer.logger.error")
def test_slow_write_incomplete(self, error_logger):
self.task_mock.side_effect = self.bogus_task
self.t_start()
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
self.slow_write_file(fname, incomplete=True)
self.wait_for_task_mock_call()
self.task_mock.assert_called_once()
args, kwargs = self.task_mock.call_args
self.assertEqual(args[1], fname)
# assert that we have an error logged with this invalid file.
error_logger.assert_called_once()
@override_settings(CONSUMER_POLLING=1)
def test_slow_write_incomplete_polling(self):
self.test_slow_write_incomplete()
@override_settings(CONSUMPTION_DIR="does_not_exist")
def test_consumption_directory_invalid(self):
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
@override_settings(CONSUMPTION_DIR="")
def test_consumption_directory_unset(self):
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')