diff --git a/src/documents/management/commands/document_archiver.py b/src/documents/management/commands/document_archiver.py index 469fc5991..aba2ea693 100644 --- a/src/documents/management/commands/document_archiver.py +++ b/src/documents/management/commands/document_archiver.py @@ -5,39 +5,56 @@ import logging import os import shutil import uuid +from time import sleep +import tqdm from django.conf import settings from django.core.management.base import BaseCommand +from django.db import transaction from whoosh.writing import AsyncWriter from documents.models import Document from ... import index +from ...file_handling import create_source_path_directory from ...mixins import Renderable from ...parsers import get_parser_class_for_mime_type +logger = logging.getLogger(__name__) + + def handle_document(document): mime_type = document.mime_type parser_class = get_parser_class_for_mime_type(mime_type) parser = parser_class(logging_group=uuid.uuid4()) - parser.parse(document.source_path, mime_type) - if parser.get_archive_path(): - shutil.copy(parser.get_archive_path(), document.archive_path) - with document.archive_file as f: - document.archive_checksum = hashlib.md5(f.read()).hexdigest() - else: - logging.getLogger(__name__).warning( - f"Parser {parser} did not produce an archived document " - f"for {document.file_name}" - ) - if parser.get_text(): - document.content = parser.get_text() - document.save() + try: + parser.parse(document.source_path, mime_type) - parser.cleanup() + if parser.get_archive_path(): + with transaction.atomic(): + with open(parser.get_archive_path(), 'rb') as f: + checksum = hashlib.md5(f.read()).hexdigest() + # i'm going to save first so that in case the file move + # fails, the database is rolled back. + # we also don't use save() since that triggers the filehandling + # logic, and we don't want that yet (file not yet in place) + Document.objects.filter(pk=document.pk).update( + archive_checksum=checksum, + content=parser.get_text() + ) + create_source_path_directory(document.archive_path) + shutil.move(parser.get_archive_path(), document.archive_path) + + with AsyncWriter(index.open_index()) as writer: + index.update_document(writer, document) + + except Exception as e: + logger.error(f"Error while parsing document {document}: {str(e)}") + finally: + parser.cleanup() class Command(Renderable, BaseCommand): @@ -61,6 +78,14 @@ class Command(Renderable, BaseCommand): help="Recreates the archived document for documents that already " "have an archived version." ) + parser.add_argument( + "-d", "--document", + default=None, + type=int, + required=False, + help="Specify the ID of a document, and this command will only " + "run on this specific document." + ) def handle(self, *args, **options): @@ -68,22 +93,22 @@ class Command(Renderable, BaseCommand): overwrite = options["overwrite"] - documents = Document.objects.all() + if options['document']: + documents = Document.objects.filter(pk=options['document']) + else: + documents = Document.objects.all() - documents_to_process = filter( - lambda d: overwrite or not os.path.exists(d.archive_path), + documents_to_process = list(filter( + lambda d: overwrite or not d.archive_checksum, documents - ) + )) + logging.getLogger().handlers[0].level = logging.ERROR with multiprocessing.Pool(processes=settings.TASK_WORKERS) as pool: - list( - pool.imap( + list(tqdm.tqdm( + pool.imap_unordered( handle_document, - list(documents_to_process) - ) - ) - - ix = index.open_index() - with AsyncWriter(ix) as writer: - for d in documents_to_process: - index.update_document(writer, d) + documents_to_process + ), + total=len(documents_to_process) + ))