mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-10-24 03:26:11 -05:00
checking file types against parsers in the consumer.
This commit is contained in:
@@ -41,21 +41,6 @@ class Consumer(LoggingMixin):
|
|||||||
raise ConsumerError("Cannot consume {}: It is not a file".format(
|
raise ConsumerError("Cannot consume {}: It is not a file".format(
|
||||||
self.path))
|
self.path))
|
||||||
|
|
||||||
def pre_check_file_extension(self):
|
|
||||||
extensions = get_supported_file_extensions()
|
|
||||||
_, ext = os.path.splitext(self.filename)
|
|
||||||
|
|
||||||
if not ext:
|
|
||||||
raise ConsumerError(
|
|
||||||
f"Not consuming {self.filename}: File type unknown."
|
|
||||||
)
|
|
||||||
|
|
||||||
if ext not in extensions:
|
|
||||||
raise ConsumerError(
|
|
||||||
f"Not consuming {self.filename}: File extension {ext} does "
|
|
||||||
f"not map to any known file type ({str(extensions)})"
|
|
||||||
)
|
|
||||||
|
|
||||||
def pre_check_duplicate(self):
|
def pre_check_duplicate(self):
|
||||||
with open(self.path, "rb") as f:
|
with open(self.path, "rb") as f:
|
||||||
checksum = hashlib.md5(f.read()).hexdigest()
|
checksum = hashlib.md5(f.read()).hexdigest()
|
||||||
@@ -98,7 +83,6 @@ class Consumer(LoggingMixin):
|
|||||||
# Make sure that preconditions for consuming the file are met.
|
# Make sure that preconditions for consuming the file are met.
|
||||||
|
|
||||||
self.pre_check_file_exists()
|
self.pre_check_file_exists()
|
||||||
self.pre_check_file_extension()
|
|
||||||
self.pre_check_directories()
|
self.pre_check_directories()
|
||||||
self.pre_check_duplicate()
|
self.pre_check_duplicate()
|
||||||
|
|
||||||
|
@@ -11,6 +11,7 @@ from watchdog.events import FileSystemEventHandler
|
|||||||
from watchdog.observers.polling import PollingObserver
|
from watchdog.observers.polling import PollingObserver
|
||||||
|
|
||||||
from documents.models import Tag
|
from documents.models import Tag
|
||||||
|
from documents.parsers import is_file_ext_supported
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from inotifyrecursive import INotify, flags
|
from inotifyrecursive import INotify, flags
|
||||||
@@ -37,11 +38,19 @@ def _tags_from_path(filepath):
|
|||||||
|
|
||||||
|
|
||||||
def _consume(filepath):
|
def _consume(filepath):
|
||||||
|
if os.path.isdir(filepath):
|
||||||
|
return
|
||||||
|
|
||||||
if not os.path.isfile(filepath):
|
if not os.path.isfile(filepath):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Not consuming file {filepath}: File has moved.")
|
f"Not consuming file {filepath}: File has moved.")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not is_file_ext_supported(os.path.splitext(filepath)[1]):
|
||||||
|
logger.debug(
|
||||||
|
f"Not consuming file {filepath}: Unknown file extension.")
|
||||||
|
return
|
||||||
|
|
||||||
tag_ids = None
|
tag_ids = None
|
||||||
try:
|
try:
|
||||||
if settings.CONSUMER_SUBDIRS_AS_TAGS:
|
if settings.CONSUMER_SUBDIRS_AS_TAGS:
|
||||||
@@ -181,7 +190,7 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
while not self.stop_flag:
|
while not self.stop_flag:
|
||||||
for event in inotify.read(timeout=1000, read_delay=1000):
|
for event in inotify.read(timeout=1000):
|
||||||
if recursive:
|
if recursive:
|
||||||
path = inotify.get_path(event.wd)
|
path = inotify.get_path(event.wd)
|
||||||
else:
|
else:
|
||||||
|
@@ -54,6 +54,13 @@ def get_default_file_extension(mime_type):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def is_file_ext_supported(ext):
|
||||||
|
if ext:
|
||||||
|
return ext.lower() in get_supported_file_extensions()
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_supported_file_extensions():
|
def get_supported_file_extensions():
|
||||||
extensions = set()
|
extensions = set()
|
||||||
for response in document_consumer_declaration.send(None):
|
for response in document_consumer_declaration.send(None):
|
||||||
|
@@ -552,7 +552,7 @@ class TestConsumer(DirectoriesMixin, TestCase):
|
|||||||
try:
|
try:
|
||||||
self.consumer.try_consume_file(self.get_test_file())
|
self.consumer.try_consume_file(self.get_test_file())
|
||||||
except ConsumerError as e:
|
except ConsumerError as e:
|
||||||
self.assertTrue("File extension .pdf does not map to any" in str(e))
|
self.assertTrue("No parsers abvailable for" in str(e))
|
||||||
return
|
return
|
||||||
|
|
||||||
self.fail("Should throw exception")
|
self.fail("Should throw exception")
|
||||||
|
@@ -34,12 +34,12 @@ def chunked(size, source):
|
|||||||
yield source[i:i+size]
|
yield source[i:i+size]
|
||||||
|
|
||||||
|
|
||||||
class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
class ConsumerMixin:
|
||||||
|
|
||||||
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||||
|
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
super(TestConsumer, self).setUp()
|
super(ConsumerMixin, self).setUp()
|
||||||
self.t = None
|
self.t = None
|
||||||
patcher = mock.patch("documents.management.commands.document_consumer.async_task")
|
patcher = mock.patch("documents.management.commands.document_consumer.async_task")
|
||||||
self.task_mock = patcher.start()
|
self.task_mock = patcher.start()
|
||||||
@@ -58,7 +58,7 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
|||||||
# wait for the consumer to exit.
|
# wait for the consumer to exit.
|
||||||
self.t.join()
|
self.t.join()
|
||||||
|
|
||||||
super(TestConsumer, self).tearDown()
|
super(ConsumerMixin, self).tearDown()
|
||||||
|
|
||||||
def wait_for_task_mock_call(self):
|
def wait_for_task_mock_call(self):
|
||||||
n = 0
|
n = 0
|
||||||
@@ -69,7 +69,6 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
|||||||
return
|
return
|
||||||
n += 1
|
n += 1
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
self.fail("async_task was never called")
|
|
||||||
|
|
||||||
# A bogus async_task that will simply check the file for
|
# A bogus async_task that will simply check the file for
|
||||||
# completeness and raise an exception otherwise.
|
# completeness and raise an exception otherwise.
|
||||||
@@ -96,6 +95,9 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
|||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
print("file completed.")
|
print("file completed.")
|
||||||
|
|
||||||
|
|
||||||
|
class TestConsumer(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
|
||||||
|
|
||||||
def test_consume_file(self):
|
def test_consume_file(self):
|
||||||
self.t_start()
|
self.t_start()
|
||||||
|
|
||||||
@@ -109,9 +111,15 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
|||||||
args, kwargs = self.task_mock.call_args
|
args, kwargs = self.task_mock.call_args
|
||||||
self.assertEqual(args[1], f)
|
self.assertEqual(args[1], f)
|
||||||
|
|
||||||
@override_settings(CONSUMER_POLLING=1)
|
def test_consume_file_invalid_ext(self):
|
||||||
def test_consume_file_polling(self):
|
self.t_start()
|
||||||
self.test_consume_file()
|
|
||||||
|
f = os.path.join(self.dirs.consumption_dir, "my_file.wow")
|
||||||
|
shutil.copy(self.sample_file, f)
|
||||||
|
|
||||||
|
self.wait_for_task_mock_call()
|
||||||
|
|
||||||
|
self.task_mock.assert_not_called()
|
||||||
|
|
||||||
def test_consume_existing_file(self):
|
def test_consume_existing_file(self):
|
||||||
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||||
@@ -123,12 +131,101 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
|||||||
args, kwargs = self.task_mock.call_args
|
args, kwargs = self.task_mock.call_args
|
||||||
self.assertEqual(args[1], f)
|
self.assertEqual(args[1], f)
|
||||||
|
|
||||||
@override_settings(CONSUMER_POLLING=1)
|
@mock.patch("documents.management.commands.document_consumer.logger.error")
|
||||||
def test_consume_existing_file_polling(self):
|
def test_slow_write_pdf(self, error_logger):
|
||||||
self.test_consume_existing_file()
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_RECURSIVE=1)
|
self.task_mock.side_effect = self.bogus_task
|
||||||
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=1)
|
|
||||||
|
self.t_start()
|
||||||
|
|
||||||
|
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||||
|
|
||||||
|
self.slow_write_file(fname)
|
||||||
|
|
||||||
|
self.wait_for_task_mock_call()
|
||||||
|
|
||||||
|
error_logger.assert_not_called()
|
||||||
|
|
||||||
|
self.task_mock.assert_called_once()
|
||||||
|
|
||||||
|
args, kwargs = self.task_mock.call_args
|
||||||
|
self.assertEqual(args[1], fname)
|
||||||
|
|
||||||
|
@mock.patch("documents.management.commands.document_consumer.logger.error")
|
||||||
|
def test_slow_write_and_move(self, error_logger):
|
||||||
|
|
||||||
|
self.task_mock.side_effect = self.bogus_task
|
||||||
|
|
||||||
|
self.t_start()
|
||||||
|
|
||||||
|
fname = os.path.join(self.dirs.consumption_dir, "my_file.~df")
|
||||||
|
fname2 = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||||
|
|
||||||
|
self.slow_write_file(fname)
|
||||||
|
shutil.move(fname, fname2)
|
||||||
|
|
||||||
|
self.wait_for_task_mock_call()
|
||||||
|
|
||||||
|
self.task_mock.assert_called_once()
|
||||||
|
|
||||||
|
args, kwargs = self.task_mock.call_args
|
||||||
|
self.assertEqual(args[1], fname2)
|
||||||
|
|
||||||
|
error_logger.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch("documents.management.commands.document_consumer.logger.error")
|
||||||
|
def test_slow_write_incomplete(self, error_logger):
|
||||||
|
|
||||||
|
self.task_mock.side_effect = self.bogus_task
|
||||||
|
|
||||||
|
self.t_start()
|
||||||
|
|
||||||
|
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||||
|
self.slow_write_file(fname, incomplete=True)
|
||||||
|
|
||||||
|
self.wait_for_task_mock_call()
|
||||||
|
|
||||||
|
self.task_mock.assert_called_once()
|
||||||
|
args, kwargs = self.task_mock.call_args
|
||||||
|
self.assertEqual(args[1], fname)
|
||||||
|
|
||||||
|
# assert that we have an error logged with this invalid file.
|
||||||
|
error_logger.assert_called_once()
|
||||||
|
|
||||||
|
@override_settings(CONSUMPTION_DIR="does_not_exist")
|
||||||
|
def test_consumption_directory_invalid(self):
|
||||||
|
|
||||||
|
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
|
||||||
|
|
||||||
|
@override_settings(CONSUMPTION_DIR="")
|
||||||
|
def test_consumption_directory_unset(self):
|
||||||
|
|
||||||
|
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
|
||||||
|
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_POLLING=1)
|
||||||
|
class TestConsumerPolling(TestConsumer):
|
||||||
|
# just do all the tests with polling
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_RECURSIVE=True)
|
||||||
|
class TestConsumerRecursive(TestConsumer):
|
||||||
|
# just do all the tests with recursive
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_RECURSIVE=True)
|
||||||
|
@override_settings(CONSUMER_POLLING=1)
|
||||||
|
class TestConsumerRecursivePolling(TestConsumer):
|
||||||
|
# just do all the tests with polling and recursive
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TestConsumerTags(DirectoriesMixin, ConsumerMixin, TransactionTestCase):
|
||||||
|
|
||||||
|
@override_settings(CONSUMER_RECURSIVE=True)
|
||||||
|
@override_settings(CONSUMER_SUBDIRS_AS_TAGS=True)
|
||||||
def test_consume_file_with_path_tags(self):
|
def test_consume_file_with_path_tags(self):
|
||||||
|
|
||||||
tag_names = ("existingTag", "Space Tag")
|
tag_names = ("existingTag", "Space Tag")
|
||||||
@@ -163,86 +260,3 @@ class TestConsumer(DirectoriesMixin, TransactionTestCase):
|
|||||||
@override_settings(CONSUMER_POLLING=1)
|
@override_settings(CONSUMER_POLLING=1)
|
||||||
def test_consume_file_with_path_tags_polling(self):
|
def test_consume_file_with_path_tags_polling(self):
|
||||||
self.test_consume_file_with_path_tags()
|
self.test_consume_file_with_path_tags()
|
||||||
|
|
||||||
@mock.patch("documents.management.commands.document_consumer.logger.error")
|
|
||||||
def test_slow_write_pdf(self, error_logger):
|
|
||||||
|
|
||||||
self.task_mock.side_effect = self.bogus_task
|
|
||||||
|
|
||||||
self.t_start()
|
|
||||||
|
|
||||||
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
|
||||||
|
|
||||||
self.slow_write_file(fname)
|
|
||||||
|
|
||||||
self.wait_for_task_mock_call()
|
|
||||||
|
|
||||||
error_logger.assert_not_called()
|
|
||||||
|
|
||||||
self.task_mock.assert_called_once()
|
|
||||||
|
|
||||||
args, kwargs = self.task_mock.call_args
|
|
||||||
self.assertEqual(args[1], fname)
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_POLLING=1)
|
|
||||||
def test_slow_write_pdf_polling(self):
|
|
||||||
self.test_slow_write_pdf()
|
|
||||||
|
|
||||||
@mock.patch("documents.management.commands.document_consumer.logger.error")
|
|
||||||
def test_slow_write_and_move(self, error_logger):
|
|
||||||
|
|
||||||
self.task_mock.side_effect = self.bogus_task
|
|
||||||
|
|
||||||
self.t_start()
|
|
||||||
|
|
||||||
fname = os.path.join(self.dirs.consumption_dir, "my_file.~df")
|
|
||||||
fname2 = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
|
||||||
|
|
||||||
self.slow_write_file(fname)
|
|
||||||
shutil.move(fname, fname2)
|
|
||||||
|
|
||||||
self.wait_for_task_mock_call()
|
|
||||||
|
|
||||||
self.task_mock.assert_called_once()
|
|
||||||
|
|
||||||
args, kwargs = self.task_mock.call_args
|
|
||||||
self.assertEqual(args[1], fname2)
|
|
||||||
|
|
||||||
error_logger.assert_not_called()
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_POLLING=1)
|
|
||||||
def test_slow_write_and_move_polling(self):
|
|
||||||
self.test_slow_write_and_move()
|
|
||||||
|
|
||||||
@mock.patch("documents.management.commands.document_consumer.logger.error")
|
|
||||||
def test_slow_write_incomplete(self, error_logger):
|
|
||||||
|
|
||||||
self.task_mock.side_effect = self.bogus_task
|
|
||||||
|
|
||||||
self.t_start()
|
|
||||||
|
|
||||||
fname = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
|
||||||
self.slow_write_file(fname, incomplete=True)
|
|
||||||
|
|
||||||
self.wait_for_task_mock_call()
|
|
||||||
|
|
||||||
self.task_mock.assert_called_once()
|
|
||||||
args, kwargs = self.task_mock.call_args
|
|
||||||
self.assertEqual(args[1], fname)
|
|
||||||
|
|
||||||
# assert that we have an error logged with this invalid file.
|
|
||||||
error_logger.assert_called_once()
|
|
||||||
|
|
||||||
@override_settings(CONSUMER_POLLING=1)
|
|
||||||
def test_slow_write_incomplete_polling(self):
|
|
||||||
self.test_slow_write_incomplete()
|
|
||||||
|
|
||||||
@override_settings(CONSUMPTION_DIR="does_not_exist")
|
|
||||||
def test_consumption_directory_invalid(self):
|
|
||||||
|
|
||||||
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
|
|
||||||
|
|
||||||
@override_settings(CONSUMPTION_DIR="")
|
|
||||||
def test_consumption_directory_unset(self):
|
|
||||||
|
|
||||||
self.assertRaises(CommandError, call_command, 'document_consumer', '--oneshot')
|
|
||||||
|
Reference in New Issue
Block a user