Updates the exporter to use pathlib and add a few more tests for coverage

This commit is contained in:
Trenton H 2023-01-12 12:46:28 -08:00
parent 9e33344808
commit b25f083687
3 changed files with 106 additions and 48 deletions

View File

@ -19,7 +19,7 @@ from watchdog.observers.polling import PollingObserver
try: try:
from inotifyrecursive import INotify, flags from inotifyrecursive import INotify, flags
except ImportError: except ImportError: # pragma: nocover
INotify = flags = None INotify = flags = None
logger = logging.getLogger("paperless.management.consumer") logger = logging.getLogger("paperless.management.consumer")

View File

@ -4,6 +4,9 @@ import os
import shutil import shutil
import tempfile import tempfile
import time import time
from pathlib import Path
from typing import List
from typing import Set
import tqdm import tqdm
from django.conf import settings from django.conf import settings
@ -96,16 +99,16 @@ class Command(BaseCommand):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
BaseCommand.__init__(self, *args, **kwargs) BaseCommand.__init__(self, *args, **kwargs)
self.target = None self.target: Path = None
self.files_in_export_dir = [] self.files_in_export_dir: Set[Path] = set()
self.exported_files = [] self.exported_files: List[Path] = []
self.compare_checksums = False self.compare_checksums = False
self.use_filename_format = False self.use_filename_format = False
self.delete = False self.delete = False
def handle(self, *args, **options): def handle(self, *args, **options):
self.target = options["target"] self.target = Path(options["target"]).resolve()
self.compare_checksums = options["compare_checksums"] self.compare_checksums = options["compare_checksums"]
self.use_filename_format = options["use_filename_format"] self.use_filename_format = options["use_filename_format"]
self.delete = options["delete"] self.delete = options["delete"]
@ -121,11 +124,14 @@ class Command(BaseCommand):
dir=settings.SCRATCH_DIR, dir=settings.SCRATCH_DIR,
prefix="paperless-export", prefix="paperless-export",
) )
self.target = temp_dir.name self.target = Path(temp_dir.name).resolve()
if not os.path.exists(self.target): if not self.target.exists():
raise CommandError("That path doesn't exist") raise CommandError("That path doesn't exist")
if not self.target.is_dir():
raise CommandError("That path isn't a directory")
if not os.access(self.target, os.W_OK): if not os.access(self.target, os.W_OK):
raise CommandError("That path doesn't appear to be writable") raise CommandError("That path doesn't appear to be writable")
@ -152,10 +158,9 @@ class Command(BaseCommand):
def dump(self, progress_bar_disable=False): def dump(self, progress_bar_disable=False):
# 1. Take a snapshot of what files exist in the current export folder # 1. Take a snapshot of what files exist in the current export folder
for root, dirs, files in os.walk(self.target): for x in self.target.glob("**/*"):
self.files_in_export_dir.extend( if x.is_file():
map(lambda f: os.path.abspath(os.path.join(root, f)), files), self.files_in_export_dir.add(x.resolve())
)
# 2. Create manifest, containing all correspondents, types, tags, storage paths # 2. Create manifest, containing all correspondents, types, tags, storage paths
# comments, documents and ui_settings # comments, documents and ui_settings
@ -238,16 +243,16 @@ class Command(BaseCommand):
# 3.3. write filenames into manifest # 3.3. write filenames into manifest
original_name = base_name original_name = base_name
original_target = os.path.join(self.target, original_name) original_target = (self.target / Path(original_name)).resolve()
document_dict[EXPORTER_FILE_NAME] = original_name document_dict[EXPORTER_FILE_NAME] = original_name
thumbnail_name = base_name + "-thumbnail.webp" thumbnail_name = base_name + "-thumbnail.webp"
thumbnail_target = os.path.join(self.target, thumbnail_name) thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
if document.has_archive_version: if document.has_archive_version:
archive_name = base_name + "-archive.pdf" archive_name = base_name + "-archive.pdf"
archive_target = os.path.join(self.target, archive_name) archive_target = (self.target / Path(archive_name)).resolve()
document_dict[EXPORTER_ARCHIVE_NAME] = archive_name document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
else: else:
archive_target = None archive_target = None
@ -256,24 +261,21 @@ class Command(BaseCommand):
t = int(time.mktime(document.created.timetuple())) t = int(time.mktime(document.created.timetuple()))
if document.storage_type == Document.STORAGE_TYPE_GPG: if document.storage_type == Document.STORAGE_TYPE_GPG:
os.makedirs(os.path.dirname(original_target), exist_ok=True) original_target.parent.mkdir(parents=True, exist_ok=True)
with open(original_target, "wb") as f: with document.source_file as out_file:
with document.source_file as out_file: original_target.write_bytes(GnuPG.decrypted(out_file))
f.write(GnuPG.decrypted(out_file)) os.utime(original_target, times=(t, t))
os.utime(original_target, times=(t, t))
os.makedirs(os.path.dirname(thumbnail_target), exist_ok=True) thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
with open(thumbnail_target, "wb") as f: with document.thumbnail_file as out_file:
with document.thumbnail_file as out_file: thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
f.write(GnuPG.decrypted(out_file)) os.utime(thumbnail_target, times=(t, t))
os.utime(thumbnail_target, times=(t, t))
if archive_target: if archive_target:
os.makedirs(os.path.dirname(archive_target), exist_ok=True) archive_target.parent.mkdir(parents=True, exist_ok=True)
with open(archive_target, "wb") as f: with document.archive_path as out_file:
with document.archive_path as out_file: archive_target.write_bytes(GnuPG.decrypted(out_file))
f.write(GnuPG.decrypted(out_file)) os.utime(archive_target, times=(t, t))
os.utime(archive_target, times=(t, t))
else: else:
self.check_and_copy( self.check_and_copy(
document.source_path, document.source_path,
@ -291,16 +293,14 @@ class Command(BaseCommand):
) )
# 4.1 write manifest to target folder # 4.1 write manifest to target folder
manifest_path = os.path.abspath(os.path.join(self.target, "manifest.json")) manifest_path = (self.target / Path("manifest.json")).resolve()
manifest_path.write_text(json.dumps(manifest, indent=2))
with open(manifest_path, "w") as f:
json.dump(manifest, f, indent=2)
# 4.2 write version information to target folder # 4.2 write version information to target folder
version_path = os.path.abspath(os.path.join(self.target, "version.json")) version_path = (self.target / Path("version.json")).resolve()
version_path.write_text(
with open(version_path, "w") as f: json.dumps({"version": version.__full_version_str__}, indent=2),
json.dump({"version": version.__full_version_str__}, f, indent=2) )
if self.delete: if self.delete:
# 5. Remove files which we did not explicitly export in this run # 5. Remove files which we did not explicitly export in this run
@ -309,25 +309,24 @@ class Command(BaseCommand):
self.files_in_export_dir.remove(manifest_path) self.files_in_export_dir.remove(manifest_path)
for f in self.files_in_export_dir: for f in self.files_in_export_dir:
os.remove(f) f.unlink()
delete_empty_directories( delete_empty_directories(
os.path.abspath(os.path.dirname(f)), f.parent,
os.path.abspath(self.target), self.target,
) )
def check_and_copy(self, source, source_checksum, target): def check_and_copy(self, source, source_checksum, target: Path):
if os.path.abspath(target) in self.files_in_export_dir: if target in self.files_in_export_dir:
self.files_in_export_dir.remove(os.path.abspath(target)) self.files_in_export_dir.remove(target)
perform_copy = False perform_copy = False
if os.path.exists(target): if target.exists():
source_stat = os.stat(source) source_stat = os.stat(source)
target_stat = os.stat(target) target_stat = target.stat()
if self.compare_checksums and source_checksum: if self.compare_checksums and source_checksum:
with open(target, "rb") as f: target_checksum = hashlib.md5(target.read_bytes()).hexdigest()
target_checksum = hashlib.md5(f.read()).hexdigest()
perform_copy = target_checksum != source_checksum perform_copy = target_checksum != source_checksum
elif source_stat.st_mtime != target_stat.st_mtime: elif source_stat.st_mtime != target_stat.st_mtime:
perform_copy = True perform_copy = True
@ -338,5 +337,5 @@ class Command(BaseCommand):
perform_copy = True perform_copy = True
if perform_copy: if perform_copy:
os.makedirs(os.path.dirname(target), exist_ok=True) target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target) shutil.copy2(source, target)

View File

@ -8,6 +8,7 @@ from unittest import mock
from zipfile import ZipFile from zipfile import ZipFile
from django.core.management import call_command from django.core.management import call_command
from django.core.management.base import CommandError
from django.test import override_settings from django.test import override_settings
from django.test import TestCase from django.test import TestCase
from django.utils import timezone from django.utils import timezone
@ -438,3 +439,61 @@ class TestExportImport(DirectoriesMixin, TestCase):
self.assertEqual(len(zip.namelist()), 14) self.assertEqual(len(zip.namelist()), 14)
self.assertIn("manifest.json", zip.namelist()) self.assertIn("manifest.json", zip.namelist())
self.assertIn("version.json", zip.namelist()) self.assertIn("version.json", zip.namelist())
def test_export_target_not_exists(self):
"""
GIVEN:
- Request to export documents to directory that doesn't exist
WHEN:
- Export command is called
THEN:
- Error is raised
"""
args = ["document_exporter", "/tmp/foo/bar"]
with self.assertRaises(CommandError) as e:
call_command(*args)
self.assertEqual("That path isn't a directory", str(e))
def test_export_target_exists_but_is_file(self):
"""
GIVEN:
- Request to export documents to file instead of directory
WHEN:
- Export command is called
THEN:
- Error is raised
"""
with tempfile.NamedTemporaryFile() as tmp_file:
args = ["document_exporter", tmp_file.name]
with self.assertRaises(CommandError) as e:
call_command(*args)
self.assertEqual("That path isn't a directory", str(e))
def test_export_target_not_writable(self):
"""
GIVEN:
- Request to export documents to directory that's not writeable
WHEN:
- Export command is called
THEN:
- Error is raised
"""
with tempfile.TemporaryDirectory() as tmp_dir:
os.chmod(tmp_dir, 0o000)
args = ["document_exporter", tmp_dir]
with self.assertRaises(CommandError) as e:
call_command(*args)
self.assertEqual("That path doesn't appear to be writable", str(e))