proper document archiver with progress bar.

This commit is contained in:
jonaswinkler 2020-12-03 01:04:52 +01:00
parent e22769ca63
commit 72a4ff0fca

View File

@ -5,39 +5,56 @@ import logging
import os
import shutil
import uuid
from time import sleep
import tqdm
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import transaction
from whoosh.writing import AsyncWriter
from documents.models import Document
from ... import index
from ...file_handling import create_source_path_directory
from ...mixins import Renderable
from ...parsers import get_parser_class_for_mime_type
logger = logging.getLogger(__name__)
def handle_document(document):
mime_type = document.mime_type
parser_class = get_parser_class_for_mime_type(mime_type)
parser = parser_class(logging_group=uuid.uuid4())
parser.parse(document.source_path, mime_type)
if parser.get_archive_path():
shutil.copy(parser.get_archive_path(), document.archive_path)
with document.archive_file as f:
document.archive_checksum = hashlib.md5(f.read()).hexdigest()
else:
logging.getLogger(__name__).warning(
f"Parser {parser} did not produce an archived document "
f"for {document.file_name}"
)
if parser.get_text():
document.content = parser.get_text()
document.save()
try:
parser.parse(document.source_path, mime_type)
parser.cleanup()
if parser.get_archive_path():
with transaction.atomic():
with open(parser.get_archive_path(), 'rb') as f:
checksum = hashlib.md5(f.read()).hexdigest()
# i'm going to save first so that in case the file move
# fails, the database is rolled back.
# we also don't use save() since that triggers the filehandling
# logic, and we don't want that yet (file not yet in place)
Document.objects.filter(pk=document.pk).update(
archive_checksum=checksum,
content=parser.get_text()
)
create_source_path_directory(document.archive_path)
shutil.move(parser.get_archive_path(), document.archive_path)
with AsyncWriter(index.open_index()) as writer:
index.update_document(writer, document)
except Exception as e:
logger.error(f"Error while parsing document {document}: {str(e)}")
finally:
parser.cleanup()
class Command(Renderable, BaseCommand):
@ -61,6 +78,14 @@ class Command(Renderable, BaseCommand):
help="Recreates the archived document for documents that already "
"have an archived version."
)
parser.add_argument(
"-d", "--document",
default=None,
type=int,
required=False,
help="Specify the ID of a document, and this command will only "
"run on this specific document."
)
def handle(self, *args, **options):
@ -68,22 +93,22 @@ class Command(Renderable, BaseCommand):
overwrite = options["overwrite"]
documents = Document.objects.all()
if options['document']:
documents = Document.objects.filter(pk=options['document'])
else:
documents = Document.objects.all()
documents_to_process = filter(
lambda d: overwrite or not os.path.exists(d.archive_path),
documents_to_process = list(filter(
lambda d: overwrite or not d.archive_checksum,
documents
)
))
logging.getLogger().handlers[0].level = logging.ERROR
with multiprocessing.Pool(processes=settings.TASK_WORKERS) as pool:
list(
pool.imap(
list(tqdm.tqdm(
pool.imap_unordered(
handle_document,
list(documents_to_process)
)
)
ix = index.open_index()
with AsyncWriter(ix) as writer:
for d in documents_to_process:
index.update_document(writer, d)
documents_to_process
),
total=len(documents_to_process)
))