mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 03:16:10 -06:00 
			
		
		
		
	reading and displaying PDF metadata
This commit is contained in:
		@@ -1,4 +1,5 @@
 | 
			
		||||
import os
 | 
			
		||||
import shutil
 | 
			
		||||
import tempfile
 | 
			
		||||
from unittest import mock
 | 
			
		||||
 | 
			
		||||
@@ -493,3 +494,34 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
 | 
			
		||||
        self.assertEqual(response.status_code, 400)
 | 
			
		||||
 | 
			
		||||
        async_task.assert_not_called()
 | 
			
		||||
 | 
			
		||||
    def test_get_metadata(self):
 | 
			
		||||
        doc = Document.objects.create(title="test", filename="file.pdf", mime_type="image/png")
 | 
			
		||||
 | 
			
		||||
        shutil.copy(os.path.join(os.path.dirname(__file__), "samples", "documents", "thumbnails", "0000001.png"), doc.source_path)
 | 
			
		||||
        shutil.copy(os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"), doc.archive_path)
 | 
			
		||||
 | 
			
		||||
        response = self.client.get(f"/api/documents/{doc.pk}/metadata/")
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        meta = response.data
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(meta['original_mime_type'], "image/png")
 | 
			
		||||
        self.assertTrue(meta['has_archive_version'])
 | 
			
		||||
        self.assertEqual(len(meta['original_metadata']), 0)
 | 
			
		||||
        self.assertGreater(len(meta['archive_metadata']), 0)
 | 
			
		||||
 | 
			
		||||
    def test_get_metadata_no_archive(self):
 | 
			
		||||
        doc = Document.objects.create(title="test", filename="file.pdf", mime_type="application/pdf")
 | 
			
		||||
 | 
			
		||||
        shutil.copy(os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"), doc.source_path)
 | 
			
		||||
 | 
			
		||||
        response = self.client.get(f"/api/documents/{doc.pk}/metadata/")
 | 
			
		||||
        self.assertEqual(response.status_code, 200)
 | 
			
		||||
 | 
			
		||||
        meta = response.data
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(meta['original_mime_type'], "application/pdf")
 | 
			
		||||
        self.assertFalse(meta['has_archive_version'])
 | 
			
		||||
        self.assertGreater(len(meta['original_metadata']), 0)
 | 
			
		||||
        self.assertIsNone(meta['archive_metadata'])
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,11 @@
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import re
 | 
			
		||||
import tempfile
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from time import mktime
 | 
			
		||||
 | 
			
		||||
import pikepdf
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.db.models import Count, Max
 | 
			
		||||
from django.http import HttpResponse, HttpResponseBadRequest, Http404
 | 
			
		||||
@@ -160,16 +163,49 @@ class DocumentViewSet(RetrieveModelMixin,
 | 
			
		||||
            disposition, filename)
 | 
			
		||||
        return response
 | 
			
		||||
 | 
			
		||||
    def get_metadata(self, file, type):
 | 
			
		||||
        if not os.path.isfile(file):
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        namespace_pattern = re.compile(r"\{(.*)\}(.*)")
 | 
			
		||||
 | 
			
		||||
        result = []
 | 
			
		||||
        if type == 'application/pdf':
 | 
			
		||||
            pdf = pikepdf.open(file)
 | 
			
		||||
            meta = pdf.open_metadata()
 | 
			
		||||
            for key, value in meta.items():
 | 
			
		||||
                if isinstance(value, list):
 | 
			
		||||
                    value = " ".join([str(e) for e in value])
 | 
			
		||||
                value = str(value)
 | 
			
		||||
                try:
 | 
			
		||||
                    m = namespace_pattern.match(key)
 | 
			
		||||
                    result.append({
 | 
			
		||||
                        "namespace": m.group(1),
 | 
			
		||||
                        "prefix": meta.REVERSE_NS[m.group(1)],
 | 
			
		||||
                        "key": m.group(2),
 | 
			
		||||
                        "value": value
 | 
			
		||||
                    })
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    logging.getLogger(__name__).warning(
 | 
			
		||||
                        f"Error while reading metadata {key}: {value}. Error: "
 | 
			
		||||
                        f"{e}"
 | 
			
		||||
                    )
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    @action(methods=['get'], detail=True)
 | 
			
		||||
    def metadata(self, request, pk=None):
 | 
			
		||||
        try:
 | 
			
		||||
            doc = Document.objects.get(pk=pk)
 | 
			
		||||
            return Response({
 | 
			
		||||
                "paperless__checksum": doc.checksum,
 | 
			
		||||
                "paperless__mime_type": doc.mime_type,
 | 
			
		||||
                "paperless__filename": doc.filename,
 | 
			
		||||
                "paperless__has_archive_version":
 | 
			
		||||
                    os.path.isfile(doc.archive_path)
 | 
			
		||||
                "original_checksum": doc.checksum,
 | 
			
		||||
                "archived_checksum": doc.archive_checksum,
 | 
			
		||||
                "original_mime_type": doc.mime_type,
 | 
			
		||||
                "media_filename": doc.filename,
 | 
			
		||||
                "has_archive_version": os.path.isfile(doc.archive_path),
 | 
			
		||||
                "original_metadata": self.get_metadata(
 | 
			
		||||
                    doc.source_path, doc.mime_type),
 | 
			
		||||
                "archive_metadata": self.get_metadata(
 | 
			
		||||
                    doc.archive_path, "application/pdf")
 | 
			
		||||
            })
 | 
			
		||||
        except Document.DoesNotExist:
 | 
			
		||||
            raise Http404()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user