mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-30 03:56:23 -05:00 
			
		
		
		
	first version of the new consumer.
This commit is contained in:
		| @@ -12,7 +12,7 @@ from django.utils import timezone | ||||
| from paperless.db import GnuPG | ||||
| from .classifier import DocumentClassifier, IncompatibleClassifierVersionError | ||||
| from .file_handling import generate_filename, create_source_path_directory | ||||
| from .models import Document, FileInfo | ||||
| from .models import Document, FileInfo, Correspondent, DocumentType, Tag | ||||
| from .parsers import ParseError, get_parser_class | ||||
| from .signals import ( | ||||
|     document_consumption_finished, | ||||
| @@ -25,130 +25,196 @@ class ConsumerError(Exception): | ||||
|  | ||||
|  | ||||
| class Consumer: | ||||
|     """ | ||||
|     Loop over every file found in CONSUMPTION_DIR and: | ||||
|       1. Convert it to a greyscale pnm | ||||
|       2. Use tesseract on the pnm | ||||
|       3. Store the document in the MEDIA_ROOT with optional encryption | ||||
|       4. Store the OCR'd text in the database | ||||
|       5. Delete the document and image(s) | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, consume=settings.CONSUMPTION_DIR, | ||||
|                  scratch=settings.SCRATCH_DIR): | ||||
|     def __init__(self): | ||||
|  | ||||
|         self.logger = logging.getLogger(__name__) | ||||
|         self.logging_group = None | ||||
|  | ||||
|         self.consume = consume | ||||
|         self.scratch = scratch | ||||
|  | ||||
|         self.classifier = DocumentClassifier() | ||||
|  | ||||
|         os.makedirs(self.scratch, exist_ok=True) | ||||
|  | ||||
|         self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED | ||||
|         if settings.PASSPHRASE: | ||||
|             self.storage_type = Document.STORAGE_TYPE_GPG | ||||
|  | ||||
|         if not self.consume: | ||||
|     @staticmethod | ||||
|     def pre_check_file_exists(filename): | ||||
|         if not os.path.isfile(filename): | ||||
|             raise ConsumerError("Cannot consume {}: It is not a file".format( | ||||
|                 filename)) | ||||
|  | ||||
|     @staticmethod | ||||
|     def pre_check_consumption_dir(): | ||||
|         if not settings.CONSUMPTION_DIR: | ||||
|             raise ConsumerError( | ||||
|                 "The CONSUMPTION_DIR settings variable does not appear to be " | ||||
|                 "set." | ||||
|                 "set.") | ||||
|  | ||||
|         if not os.path.isdir(settings.CONSUMPTION_DIR): | ||||
|             raise ConsumerError( | ||||
|                 "Consumption directory {} does not exist".format( | ||||
|                     settings.CONSUMPTION_DIR)) | ||||
|  | ||||
|     @staticmethod | ||||
|     def pre_check_regex(filename): | ||||
|         if not re.match(FileInfo.REGEXES["title"], filename): | ||||
|             raise ConsumerError( | ||||
|                 "Filename {} does not seem to be safe to " | ||||
|                 "consume".format(filename)) | ||||
|  | ||||
|     @staticmethod | ||||
|     def pre_check_duplicate(filename): | ||||
|         with open(filename, "rb") as f: | ||||
|             checksum = hashlib.md5(f.read()).hexdigest() | ||||
|         if Document.objects.filter(checksum=checksum).exists(): | ||||
|             if settings.CONSUMER_DELETE_DUPLICATES: | ||||
|                 os.unlink(filename) | ||||
|             raise ConsumerError( | ||||
|                 "Not consuming {}: It is a duplicate.".format(filename) | ||||
|             ) | ||||
|  | ||||
|         if not os.path.exists(self.consume): | ||||
|             raise ConsumerError( | ||||
|                 "Consumption directory {} does not exist".format(self.consume)) | ||||
|     @staticmethod | ||||
|     def pre_check_scratch_fir(): | ||||
|         os.makedirs(settings.SCRATCH_DIR, exist_ok=True) | ||||
|  | ||||
|     def log(self, level, message): | ||||
|         getattr(self.logger, level)(message, extra={ | ||||
|             "group": self.logging_group | ||||
|         }) | ||||
|  | ||||
|     @transaction.atomic | ||||
|     def try_consume_file(self, file): | ||||
|     def try_consume_file(self, | ||||
|                          filename, | ||||
|                          original_filename=None, | ||||
|                          force_title=None, | ||||
|                          force_correspondent_id=None, | ||||
|                          force_document_type_id=None, | ||||
|                          force_tag_ids=None): | ||||
|         """ | ||||
|         Return True if file was consumed | ||||
|         Return the document object if it was successfully created. | ||||
|         """ | ||||
|  | ||||
|         # this is for grouping logging entries for this particular file | ||||
|         # together. | ||||
|  | ||||
|         self.logging_group = uuid.uuid4() | ||||
|  | ||||
|         if not re.match(FileInfo.REGEXES["title"], file): | ||||
|             return False | ||||
|         # Make sure that preconditions for consuming the file are met. | ||||
|  | ||||
|         doc = file | ||||
|         self.pre_check_file_exists(filename) | ||||
|         self.pre_check_consumption_dir() | ||||
|         self.pre_check_scratch_fir() | ||||
|         self.pre_check_regex(filename) | ||||
|         self.pre_check_duplicate(filename) | ||||
|  | ||||
|         if self._is_duplicate(doc): | ||||
|             self.log( | ||||
|                 "warning", | ||||
|                 "Skipping {} as it appears to be a duplicate".format(doc) | ||||
|             ) | ||||
|             if settings.CONSUMER_DELETE_DUPLICATES: | ||||
|                 self._cleanup_doc(doc) | ||||
|             return False | ||||
|         self.log("info", "Consuming {}".format(filename)) | ||||
|  | ||||
|         self.log("info", "Consuming {}".format(doc)) | ||||
|         # Determine the parser class. | ||||
|  | ||||
|         parser_class = get_parser_class(doc) | ||||
|         parser_class = get_parser_class(original_filename or filename) | ||||
|         if not parser_class: | ||||
|             self.log( | ||||
|                 "error", "No parsers could be found for {}".format(doc)) | ||||
|             return False | ||||
|             raise ConsumerError("No parsers abvailable for {}".format(filename)) | ||||
|         else: | ||||
|             self.log("info", "Parser: {}".format(parser_class.__name__)) | ||||
|             self.log("debug", "Parser: {}".format(parser_class.__name__)) | ||||
|  | ||||
|         # Notify all listeners that we're going to do some work. | ||||
|  | ||||
|         document_consumption_started.send( | ||||
|             sender=self.__class__, | ||||
|             filename=doc, | ||||
|             filename=filename, | ||||
|             logging_group=self.logging_group | ||||
|         ) | ||||
|  | ||||
|         document_parser = parser_class(doc, self.logging_group) | ||||
|         # This doesn't parse the document yet, but gives us a parser. | ||||
|  | ||||
|         document_parser = parser_class(filename, self.logging_group) | ||||
|  | ||||
|         # However, this already created working directories which we have to | ||||
|         # clean up. | ||||
|  | ||||
|         # Parse the document. This may take some time. | ||||
|  | ||||
|         try: | ||||
|             self.log("info", "Generating thumbnail for {}...".format(doc)) | ||||
|             self.log("debug", "Generating thumbnail for {}...".format(filename)) | ||||
|             thumbnail = document_parser.get_optimised_thumbnail() | ||||
|             self.log("debug", "Parsing {}...".format(filename)) | ||||
|             text = document_parser.get_text() | ||||
|             date = document_parser.get_date() | ||||
|             document = self._store( | ||||
|                 text, | ||||
|                 doc, | ||||
|                 thumbnail, | ||||
|                 date | ||||
|             ) | ||||
|         except ParseError as e: | ||||
|             self.log("fatal", "PARSE FAILURE for {}: {}".format(doc, e)) | ||||
|             document_parser.cleanup() | ||||
|             return False | ||||
|         else: | ||||
|             document_parser.cleanup() | ||||
|             self._cleanup_doc(doc) | ||||
|             raise ConsumerError(e) | ||||
|  | ||||
|             self.log( | ||||
|                 "info", | ||||
|                 "Document {} consumption finished".format(document) | ||||
|             ) | ||||
|         # Prepare the document classifier. | ||||
|  | ||||
|         # TODO: I don't really like to do this here, but this way we avoid | ||||
|         #   reloading the classifier multiple times, since there are multiple | ||||
|         #   post-consume hooks that all require the classifier. | ||||
|  | ||||
|         try: | ||||
|             classifier = DocumentClassifier() | ||||
|             classifier.reload() | ||||
|         except (FileNotFoundError, IncompatibleClassifierVersionError) as e: | ||||
|             logging.getLogger(__name__).warning( | ||||
|                 "Cannot classify documents: {}.".format(e)) | ||||
|             classifier = None | ||||
|  | ||||
|             try: | ||||
|                 self.classifier.reload() | ||||
|                 classifier = self.classifier | ||||
|             except (FileNotFoundError, IncompatibleClassifierVersionError) as e: | ||||
|                 logging.getLogger(__name__).warning("Cannot classify documents: {}.".format(e)) | ||||
|         # now that everything is done, we can start to store the document | ||||
|         # in the system. This will be a transaction and reasonably fast. | ||||
|         try: | ||||
|             with transaction.atomic(): | ||||
|  | ||||
|             document_consumption_finished.send( | ||||
|                 sender=self.__class__, | ||||
|                 document=document, | ||||
|                 logging_group=self.logging_group, | ||||
|                 classifier=classifier | ||||
|             ) | ||||
|             return True | ||||
|                 # store the document. | ||||
|                 document = self._store( | ||||
|                     text=text, | ||||
|                     doc=filename, | ||||
|                     thumbnail=thumbnail, | ||||
|                     date=date, | ||||
|                     original_filename=original_filename, | ||||
|                     force_title=force_title, | ||||
|                     force_correspondent_id=force_correspondent_id, | ||||
|                     force_document_type_id=force_document_type_id, | ||||
|                     force_tag_ids=force_tag_ids | ||||
|                 ) | ||||
|  | ||||
|     def _store(self, text, doc, thumbnail, date): | ||||
|                 # If we get here, it was successful. Proceed with post-consume | ||||
|                 # hooks. If they fail, nothing will get changed. | ||||
|  | ||||
|         file_info = FileInfo.from_path(doc) | ||||
|                 document_consumption_finished.send( | ||||
|                     sender=self.__class__, | ||||
|                     document=document, | ||||
|                     logging_group=self.logging_group, | ||||
|                     classifier=classifier | ||||
|                 ) | ||||
|  | ||||
|                 # After everything is in the database, copy the files into | ||||
|                 # place. If this fails, we'll also rollback the transaction. | ||||
|  | ||||
|                 create_source_path_directory(document.source_path) | ||||
|                 self._write(document, filename, document.source_path) | ||||
|                 self._write(document, thumbnail, document.thumbnail_path) | ||||
|  | ||||
|                 # Delete the file only if it was successfully consumed | ||||
|                 self.log("debug", "Deleting document {}".format(filename)) | ||||
|                 os.unlink(filename) | ||||
|         except Exception as e: | ||||
|             raise ConsumerError(e) | ||||
|         finally: | ||||
|             document_parser.cleanup() | ||||
|  | ||||
|         self.log( | ||||
|             "info", | ||||
|             "Document {} consumption finished".format(document) | ||||
|         ) | ||||
|  | ||||
|         return document | ||||
|  | ||||
|     def _store(self, text, doc, thumbnail, date, | ||||
|                original_filename=None, | ||||
|                force_title=None, | ||||
|                force_correspondent_id=None, | ||||
|                force_document_type_id=None, | ||||
|                force_tag_ids=None): | ||||
|  | ||||
|         # If someone gave us the original filename, use it instead of doc. | ||||
|  | ||||
|         file_info = FileInfo.from_path(original_filename or doc) | ||||
|  | ||||
|         stats = os.stat(doc) | ||||
|  | ||||
| @@ -175,13 +241,21 @@ class Consumer: | ||||
|             self.log("debug", "Tagging with {}".format(tag_names)) | ||||
|             document.tags.add(*relevant_tags) | ||||
|  | ||||
|         if force_title: | ||||
|             document.title = force_title | ||||
|  | ||||
|         if force_correspondent_id: | ||||
|             document.correspondent = Correspondent.objects.get(pk=force_correspondent_id) | ||||
|  | ||||
|         if force_document_type_id: | ||||
|             document.document_type = DocumentType.objects.get(pk=force_document_type_id) | ||||
|  | ||||
|         if force_tag_ids: | ||||
|             for tag_id in force_tag_ids: | ||||
|                 document.tags.add(Tag.objects.get(pk=tag_id)) | ||||
|  | ||||
|         document.filename = generate_filename(document) | ||||
|  | ||||
|         create_source_path_directory(document.source_path) | ||||
|  | ||||
|         self._write(document, doc, document.source_path) | ||||
|         self._write(document, thumbnail, document.thumbnail_path) | ||||
|  | ||||
|         # We need to save the document twice, since we need the PK of the | ||||
|         # document in order to create its filename above. | ||||
|         document.save() | ||||
| @@ -196,13 +270,3 @@ class Consumer: | ||||
|                     return | ||||
|                 self.log("debug", "Encrypting") | ||||
|                 write_file.write(GnuPG.encrypted(read_file)) | ||||
|  | ||||
|     def _cleanup_doc(self, doc): | ||||
|         self.log("debug", "Deleting document {}".format(doc)) | ||||
|         os.unlink(doc) | ||||
|  | ||||
|     @staticmethod | ||||
|     def _is_duplicate(doc): | ||||
|         with open(doc, "rb") as f: | ||||
|             checksum = hashlib.md5(f.read()).hexdigest() | ||||
|         return Document.objects.filter(checksum=checksum).exists() | ||||
|   | ||||
| @@ -1,9 +1,11 @@ | ||||
| import os | ||||
| import tempfile | ||||
| from datetime import datetime | ||||
| from time import mktime | ||||
|  | ||||
| from django import forms | ||||
| from django.conf import settings | ||||
| from django_q.tasks import async_task | ||||
| from pathvalidate import validate_filename, ValidationError | ||||
|  | ||||
|  | ||||
| @@ -18,15 +20,6 @@ class UploadForm(forms.Form): | ||||
|             raise forms.ValidationError("That filename is suspicious.") | ||||
|         return self.cleaned_data.get("document") | ||||
|  | ||||
|     def get_filename(self, i=None): | ||||
|         return os.path.join( | ||||
|             settings.CONSUMPTION_DIR, | ||||
|             "{}_{}".format( | ||||
|                 str(i), | ||||
|                 self.cleaned_data.get("document").name | ||||
|             ) if i else self.cleaned_data.get("document").name | ||||
|         ) | ||||
|  | ||||
|     def save(self): | ||||
|         """ | ||||
|         Since the consumer already does a lot of work, it's easier just to save | ||||
| @@ -35,15 +28,13 @@ class UploadForm(forms.Form): | ||||
|         """ | ||||
|  | ||||
|         document = self.cleaned_data.get("document").read() | ||||
|         original_filename = self.cleaned_data.get("document").name | ||||
|  | ||||
|         t = int(mktime(datetime.now().timetuple())) | ||||
|  | ||||
|         file_name = self.get_filename() | ||||
|         i = 0 | ||||
|         while os.path.exists(file_name): | ||||
|             i += 1 | ||||
|             file_name = self.get_filename(i) | ||||
|         with tempfile.NamedTemporaryFile(prefix="paperless-upload-", suffix=".pdf", dir=settings.SCRATCH_DIR, delete=False) as f: | ||||
|  | ||||
|         with open(file_name, "wb") as f: | ||||
|             f.write(document) | ||||
|             os.utime(file_name, times=(t, t)) | ||||
|             os.utime(f.name, times=(t, t)) | ||||
|  | ||||
|             async_task("documents.tasks.consume_file", f.name, original_filename, task_name=os.path.basename(original_filename)) | ||||
|   | ||||
| @@ -3,11 +3,10 @@ import os | ||||
|  | ||||
| from django.conf import settings | ||||
| from django.core.management.base import BaseCommand | ||||
| from django_q.tasks import async_task | ||||
| from watchdog.events import FileSystemEventHandler | ||||
| from watchdog.observers import Observer | ||||
|  | ||||
| from documents.consumer import Consumer | ||||
|  | ||||
| try: | ||||
|     from inotify_simple import INotify, flags | ||||
| except ImportError: | ||||
| @@ -16,13 +15,10 @@ except ImportError: | ||||
|  | ||||
| class Handler(FileSystemEventHandler): | ||||
|  | ||||
|     def __init__(self, consumer): | ||||
|         self.consumer = consumer | ||||
|  | ||||
|     def _consume(self, file): | ||||
|         if os.path.isfile(file): | ||||
|             try: | ||||
|                 self.consumer.try_consume_file(file) | ||||
|                 async_task("documents.tasks.consume_file", file, task_name=os.path.basename(file)) | ||||
|             except Exception as e: | ||||
|                 # Catch all so that the consumer won't crash. | ||||
|                 logging.getLogger(__name__).error("Error while consuming document: {}".format(e)) | ||||
| @@ -49,8 +45,6 @@ class Command(BaseCommand): | ||||
|         self.mail_fetcher = None | ||||
|         self.first_iteration = True | ||||
|  | ||||
|         self.consumer = Consumer() | ||||
|  | ||||
|         BaseCommand.__init__(self, *args, **kwargs) | ||||
|  | ||||
|     def add_arguments(self, parser): | ||||
| @@ -78,11 +72,11 @@ class Command(BaseCommand): | ||||
|         # Consume all files as this is not done initially by the watchdog | ||||
|         for entry in os.scandir(directory): | ||||
|             if entry.is_file(): | ||||
|                 self.consumer.try_consume_file(entry.path) | ||||
|                 async_task("documents.tasks.consume_file", entry.path, task_name=os.path.basename(entry.path)) | ||||
|  | ||||
|         # Start the watchdog. Woof! | ||||
|         observer = Observer() | ||||
|         event_handler = Handler(self.consumer) | ||||
|         event_handler = Handler() | ||||
|         observer.schedule(event_handler, directory, recursive=True) | ||||
|         observer.start() | ||||
|         try: | ||||
|   | ||||
| @@ -6,6 +6,7 @@ from whoosh.writing import AsyncWriter | ||||
| from documents import index | ||||
| from documents.classifier import DocumentClassifier, \ | ||||
|     IncompatibleClassifierVersionError | ||||
| from documents.consumer import Consumer, ConsumerError | ||||
| from documents.mail import MailFetcher | ||||
| from documents.models import Document | ||||
|  | ||||
| @@ -54,3 +55,27 @@ def train_classifier(): | ||||
|         logging.getLogger(__name__).error( | ||||
|             "Classifier error: " + str(e) | ||||
|         ) | ||||
|  | ||||
|  | ||||
| def consume_file(file, | ||||
|                  original_filename=None, | ||||
|                  force_title=None, | ||||
|                  force_correspondent_id=None, | ||||
|                  force_document_type_id=None, | ||||
|                  force_tag_ids=None): | ||||
|  | ||||
|     document = Consumer().try_consume_file( | ||||
|         file, | ||||
|         original_filename=original_filename, | ||||
|         force_title=force_title, | ||||
|         force_correspondent_id=force_correspondent_id, | ||||
|         force_document_type_id=force_document_type_id, | ||||
|         force_tag_ids=force_tag_ids) | ||||
|  | ||||
|     if document: | ||||
|         return "Success. New document id {} created".format( | ||||
|             document.pk | ||||
|         ) | ||||
|     else: | ||||
|         raise ConsumerError("Unknown error: Returned document was null, but " | ||||
|                             "no error message was given.") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jonas Winkler
					Jonas Winkler