mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
backend that supports asgi and status update sockets with channels
This commit is contained in:
@@ -5,6 +5,8 @@ import os
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from channels.layers import get_channel_layer
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from django.utils import timezone
|
||||
@@ -33,6 +35,17 @@ class Consumer:
|
||||
5. Delete the document and image(s)
|
||||
"""
|
||||
|
||||
def _send_progress(self, filename, current_progress, max_progress, status, message, document_id=None):
|
||||
payload = {
|
||||
'filename': os.path.basename(filename),
|
||||
'current_progress': current_progress,
|
||||
'max_progress': max_progress,
|
||||
'status': status,
|
||||
'message': message,
|
||||
'document_id': document_id
|
||||
}
|
||||
async_to_sync(self.channel_layer.group_send)("status_updates", {'type': 'status_update', 'data': payload})
|
||||
|
||||
def __init__(self, consume=settings.CONSUMPTION_DIR,
|
||||
scratch=settings.SCRATCH_DIR):
|
||||
|
||||
@@ -44,6 +57,8 @@ class Consumer:
|
||||
|
||||
self.classifier = DocumentClassifier()
|
||||
|
||||
self.channel_layer = get_channel_layer()
|
||||
|
||||
os.makedirs(self.scratch, exist_ok=True)
|
||||
|
||||
self.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
|
||||
@@ -60,7 +75,6 @@ class Consumer:
|
||||
raise ConsumerError(
|
||||
"Consumption directory {} does not exist".format(self.consume))
|
||||
|
||||
|
||||
def log(self, level, message):
|
||||
getattr(self.logger, level)(message, extra={
|
||||
"group": self.logging_group
|
||||
@@ -88,6 +102,7 @@ class Consumer:
|
||||
|
||||
self.log("info", "Consuming {}".format(doc))
|
||||
|
||||
|
||||
parser_class = get_parser_class(doc)
|
||||
if not parser_class:
|
||||
self.log(
|
||||
@@ -96,6 +111,7 @@ class Consumer:
|
||||
else:
|
||||
self.log("info", "Parser: {}".format(parser_class.__name__))
|
||||
|
||||
self._send_progress(file, 0, 100, 'WORKING', 'Consumption started')
|
||||
|
||||
document_consumption_started.send(
|
||||
sender=self.__class__,
|
||||
@@ -103,20 +119,37 @@ class Consumer:
|
||||
logging_group=self.logging_group
|
||||
)
|
||||
|
||||
document_parser = parser_class(doc, self.logging_group)
|
||||
def progress_callback(current_progress, max_progress, message):
|
||||
# recalculate progress to be within 20 and 80
|
||||
p = int((current_progress / max_progress) * 60 + 20)
|
||||
self._send_progress(file, p, 100, "WORKING", message)
|
||||
|
||||
document_parser = parser_class(doc, self.logging_group, progress_callback)
|
||||
|
||||
try:
|
||||
self.log("info", "Generating thumbnail for {}...".format(doc))
|
||||
self._send_progress(file, 10, 100, 'WORKING',
|
||||
'Generating thumbnail...')
|
||||
thumbnail = document_parser.get_optimised_thumbnail()
|
||||
self._send_progress(file, 20, 100, 'WORKING',
|
||||
'Getting text from document...')
|
||||
text = document_parser.get_text()
|
||||
self._send_progress(file, 80, 100, 'WORKING',
|
||||
'Getting date from document...')
|
||||
date = document_parser.get_date()
|
||||
self._send_progress(file, 85, 100, 'WORKING',
|
||||
'Storing the document...')
|
||||
document = self._store(
|
||||
document_parser.get_text(),
|
||||
text,
|
||||
doc,
|
||||
thumbnail,
|
||||
date
|
||||
)
|
||||
except ParseError as e:
|
||||
self.log("fatal", "PARSE FAILURE for {}: {}".format(doc, e))
|
||||
self._send_progress(file, 100, 100, 'FAILED',
|
||||
"Failed: {}".format(e))
|
||||
|
||||
document_parser.cleanup()
|
||||
return False
|
||||
else:
|
||||
@@ -136,12 +169,17 @@ class Consumer:
|
||||
except (FileNotFoundError, IncompatibleClassifierVersionError) as e:
|
||||
logging.getLogger(__name__).warning("Cannot classify documents: {}.".format(e))
|
||||
|
||||
self._send_progress(file, 90, 100, 'WORKING',
|
||||
'Performing post-consumption tasks...')
|
||||
|
||||
document_consumption_finished.send(
|
||||
sender=self.__class__,
|
||||
document=document,
|
||||
logging_group=self.logging_group,
|
||||
classifier=classifier
|
||||
)
|
||||
self._send_progress(file, 100, 100, 'SUCCESS',
|
||||
'Finished.', document.id)
|
||||
return True
|
||||
|
||||
def _store(self, text, doc, thumbnail, date):
|
||||
|
@@ -106,11 +106,12 @@ class DocumentParser:
|
||||
`paperless_tesseract.parsers` for inspiration.
|
||||
"""
|
||||
|
||||
def __init__(self, path, logging_group):
|
||||
def __init__(self, path, logging_group, progress_callback):
|
||||
self.document_path = path
|
||||
self.tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.logging_group = logging_group
|
||||
self.progress_callback = progress_callback
|
||||
|
||||
def get_thumbnail(self):
|
||||
"""
|
||||
|
37
src/paperless/asgi.py
Normal file
37
src/paperless/asgi.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from asgiref.sync import async_to_sync
|
||||
from channels.auth import AuthMiddlewareStack
|
||||
from channels.generic.websocket import WebsocketConsumer
|
||||
from channels.routing import ProtocolTypeRouter, URLRouter
|
||||
from django.core.asgi import get_asgi_application
|
||||
from django.urls import re_path
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'paperless.settings')
|
||||
|
||||
|
||||
class StatusConsumer(WebsocketConsumer):
|
||||
def connect(self):
|
||||
self.accept()
|
||||
async_to_sync(self.channel_layer.group_add)('status_updates', self.channel_name)
|
||||
|
||||
def disconnect(self, close_code):
|
||||
async_to_sync(self.channel_layer.group_discard)('status_updates', self.channel_name)
|
||||
|
||||
def status_update(self, event):
|
||||
self.send(json.dumps(event['data']))
|
||||
|
||||
|
||||
websocket_urlpatterns = [
|
||||
re_path(r'ws/status/$', StatusConsumer.as_asgi()),
|
||||
]
|
||||
|
||||
application = ProtocolTypeRouter({
|
||||
"http": get_asgi_application(),
|
||||
"websocket": AuthMiddlewareStack(
|
||||
URLRouter(
|
||||
websocket_urlpatterns
|
||||
)
|
||||
),
|
||||
})
|
@@ -69,6 +69,8 @@ INSTALLED_APPS = [
|
||||
"rest_framework.authtoken",
|
||||
"django_filters",
|
||||
|
||||
"channels",
|
||||
|
||||
]
|
||||
|
||||
REST_FRAMEWORK = {
|
||||
@@ -98,6 +100,7 @@ LOGIN_URL = "admin:login"
|
||||
FORCE_SCRIPT_NAME = os.getenv("PAPERLESS_FORCE_SCRIPT_NAME")
|
||||
|
||||
WSGI_APPLICATION = 'paperless.wsgi.application'
|
||||
ASGI_APPLICATION = "paperless.asgi.application"
|
||||
|
||||
STATIC_URL = os.getenv("PAPERLESS_STATIC_URL", "/static/")
|
||||
|
||||
@@ -299,3 +302,12 @@ FILENAME_DATE_ORDER = os.getenv("PAPERLESS_FILENAME_DATE_ORDER")
|
||||
FILENAME_PARSE_TRANSFORMS = []
|
||||
for t in json.loads(os.getenv("PAPERLESS_FILENAME_PARSE_TRANSFORMS", "[]")):
|
||||
FILENAME_PARSE_TRANSFORMS.append((re.compile(t["pattern"]), t["repl"]))
|
||||
|
||||
CHANNEL_LAYERS = {
|
||||
"default": {
|
||||
"BACKEND": "channels_redis.core.RedisChannelLayer",
|
||||
"CONFIG": {
|
||||
"hosts": [("127.0.0.1", 6379)],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@@ -27,8 +27,8 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
image, whether it's a PDF, or other graphical format (JPEG, TIFF, etc.)
|
||||
"""
|
||||
|
||||
def __init__(self, path, logging_group):
|
||||
super().__init__(path, logging_group)
|
||||
def __init__(self, path, logging_group, progress_callback):
|
||||
super().__init__(path, logging_group, progress_callback)
|
||||
self._text = None
|
||||
|
||||
def get_thumbnail(self):
|
||||
@@ -91,6 +91,7 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
self._text = get_text_from_pdf(self.document_path)
|
||||
return self._text
|
||||
|
||||
self.progress_callback(0,1,"Making greyscale images.")
|
||||
images = self._get_greyscale()
|
||||
|
||||
if not images:
|
||||
@@ -100,8 +101,10 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
|
||||
sample_page_index = int(len(images) / 2)
|
||||
self.log("info", "Attempting language detection on page {} of {}...".format(sample_page_index+1, len(images)))
|
||||
self.progress_callback(0.4, 1, "Language Detection.")
|
||||
sample_page_text = self._ocr([images[sample_page_index]], settings.OCR_LANGUAGE)[0]
|
||||
guessed_language = self._guess_language(sample_page_text)
|
||||
self.progress_callback(0.6, 1, "OCR all the pages.")
|
||||
|
||||
if not guessed_language or guessed_language not in ISO639:
|
||||
self.log("warning", "Language detection failed.")
|
||||
@@ -117,7 +120,7 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
|
||||
else:
|
||||
self.log("info", "Detected language: {}".format(guessed_language))
|
||||
ocr_pages = self._ocr(images, ISO639[guessed_language])
|
||||
ocr_pages = self._ocr(images, ISO639[guessed_language], report_progress=True)
|
||||
|
||||
self.log("info", "OCR completed.")
|
||||
self._text = strip_excess_whitespace(" ".join(ocr_pages))
|
||||
@@ -151,6 +154,8 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
|
||||
self.log("info", "Running unpaper on {} pages...".format(len(pnms)))
|
||||
|
||||
self.progress_callback(0.2,1, "Running unpaper on {} pages...".format(len(pnms)))
|
||||
|
||||
# Run unpaper in parallel on converted images
|
||||
with Pool(processes=settings.OCR_THREADS) as pool:
|
||||
pnms = pool.map(run_unpaper, pnms)
|
||||
@@ -165,11 +170,16 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
self.log('debug', "Language detection failed with: {}".format(e))
|
||||
return None
|
||||
|
||||
def _ocr(self, imgs, lang):
|
||||
def _ocr(self, imgs, lang, report_progress=False):
|
||||
self.log("info", "Performing OCR on {} page(s) with language {}".format(len(imgs), lang))
|
||||
r = []
|
||||
with Pool(processes=settings.OCR_THREADS) as pool:
|
||||
r = pool.map(image_to_string, itertools.product(imgs, [lang]))
|
||||
return r
|
||||
# r = pool.map(image_to_string, itertools.product(imgs, [lang]))
|
||||
for i, page in enumerate(pool.imap(image_to_string, itertools.product(imgs, [lang]))):
|
||||
if report_progress:
|
||||
self.progress_callback(0.6 + (i / len(imgs)) * 0.4, 1, "OCR'ed {} pages".format(i+1))
|
||||
r += [page]
|
||||
return r
|
||||
|
||||
def _complete_ocr_default_language(self, images, sample_page_index, sample_page):
|
||||
"""
|
||||
@@ -182,14 +192,13 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
del images_copy[sample_page_index]
|
||||
if images_copy:
|
||||
self.log('info', 'Continuing ocr with default language.')
|
||||
ocr_pages = self._ocr(images_copy, settings.OCR_LANGUAGE)
|
||||
ocr_pages = self._ocr(images_copy, settings.OCR_LANGUAGE, report_progress=True)
|
||||
ocr_pages.insert(sample_page_index, sample_page)
|
||||
return ocr_pages
|
||||
else:
|
||||
return [sample_page]
|
||||
|
||||
|
||||
|
||||
def strip_excess_whitespace(text):
|
||||
collapsed_spaces = re.sub(r"([^\S\r\n]+)", " ", text)
|
||||
no_leading_whitespace = re.sub(
|
||||
|
Reference in New Issue
Block a user