Chore: Switch from os.path to pathlib.Path (#8325)

---------

Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com>
This commit is contained in:
Sebastian Steinbeißer
2025-01-06 21:12:27 +01:00
committed by GitHub
parent d06aac947d
commit 935d077836
11 changed files with 178 additions and 142 deletions

View File

@@ -1,4 +1,4 @@
import os
from pathlib import Path
from django.conf import settings
from django.core.management.base import BaseCommand
@@ -14,7 +14,7 @@ class Command(BaseCommand):
"state to an unencrypted one (or vice-versa)"
)
def add_arguments(self, parser):
def add_arguments(self, parser) -> None:
parser.add_argument(
"--passphrase",
help=(
@@ -23,7 +23,7 @@ class Command(BaseCommand):
),
)
def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
try:
self.stdout.write(
self.style.WARNING(
@@ -52,7 +52,7 @@ class Command(BaseCommand):
self.__gpg_to_unencrypted(passphrase)
def __gpg_to_unencrypted(self, passphrase: str):
def __gpg_to_unencrypted(self, passphrase: str) -> None:
encrypted_files = Document.objects.filter(
storage_type=Document.STORAGE_TYPE_GPG,
)
@@ -69,7 +69,7 @@ class Command(BaseCommand):
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
ext = os.path.splitext(document.filename)[1]
ext: str = Path(document.filename).suffix
if not ext == ".gpg":
raise CommandError(
@@ -77,12 +77,12 @@ class Command(BaseCommand):
f"end with .gpg",
)
document.filename = os.path.splitext(document.filename)[0]
document.filename = Path(document.filename).stem
with open(document.source_path, "wb") as f:
with document.source_path.open("wb") as f:
f.write(raw_document)
with open(document.thumbnail_path, "wb") as f:
with document.thumbnail_path.open("wb") as f:
f.write(raw_thumb)
Document.objects.filter(id=document.id).update(
@@ -91,4 +91,4 @@ class Command(BaseCommand):
)
for path in old_paths:
os.unlink(path)
path.unlink()

View File

@@ -1,6 +1,7 @@
import json
import logging
import os
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
@@ -44,7 +45,7 @@ if settings.AUDIT_LOG_ENABLED:
@contextmanager
def disable_signal(sig, receiver, sender):
def disable_signal(sig, receiver, sender) -> Generator:
try:
sig.disconnect(receiver=receiver, sender=sender)
yield
@@ -58,7 +59,7 @@ class Command(CryptMixin, BaseCommand):
"documents it refers to."
)
def add_arguments(self, parser):
def add_arguments(self, parser) -> None:
parser.add_argument("source")
parser.add_argument(
@@ -90,7 +91,7 @@ class Command(CryptMixin, BaseCommand):
- Are there existing users or documents in the database?
"""
def pre_check_maybe_not_empty():
def pre_check_maybe_not_empty() -> None:
# Skip this check if operating only on the database
# We can expect data to exist in that case
if not self.data_only:
@@ -122,7 +123,7 @@ class Command(CryptMixin, BaseCommand):
),
)
def pre_check_manifest_exists():
def pre_check_manifest_exists() -> None:
if not (self.source / "manifest.json").exists():
raise CommandError(
"That directory doesn't appear to contain a manifest.json file.",
@@ -141,7 +142,7 @@ class Command(CryptMixin, BaseCommand):
"""
Loads manifest data from the various JSON files for parsing and loading the database
"""
main_manifest_path = self.source / "manifest.json"
main_manifest_path: Path = self.source / "manifest.json"
with main_manifest_path.open() as infile:
self.manifest = json.load(infile)
@@ -158,8 +159,8 @@ class Command(CryptMixin, BaseCommand):
Must account for the old style of export as well, with just version.json
"""
version_path = self.source / "version.json"
metadata_path = self.source / "metadata.json"
version_path: Path = self.source / "version.json"
metadata_path: Path = self.source / "metadata.json"
if not version_path.exists() and not metadata_path.exists():
self.stdout.write(
self.style.NOTICE("No version.json or metadata.json file located"),
@@ -221,7 +222,7 @@ class Command(CryptMixin, BaseCommand):
)
raise e
def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
logging.getLogger().handlers[0].level = logging.ERROR
self.source = Path(options["source"]).resolve()
@@ -290,13 +291,13 @@ class Command(CryptMixin, BaseCommand):
no_progress_bar=self.no_progress_bar,
)
def check_manifest_validity(self):
def check_manifest_validity(self) -> None:
"""
Attempts to verify the manifest is valid. Namely checking the files
referred to exist and the files can be read from
"""
def check_document_validity(document_record: dict):
def check_document_validity(document_record: dict) -> None:
if EXPORTER_FILE_NAME not in document_record:
raise CommandError(
"The manifest file contains a record which does not "
@@ -341,7 +342,7 @@ class Command(CryptMixin, BaseCommand):
if not self.data_only and record["model"] == "documents.document":
check_document_validity(record)
def _import_files_from_manifest(self):
def _import_files_from_manifest(self) -> None:
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
@@ -356,24 +357,24 @@ class Command(CryptMixin, BaseCommand):
document = Document.objects.get(pk=record["pk"])
doc_file = record[EXPORTER_FILE_NAME]
document_path = os.path.join(self.source, doc_file)
document_path = self.source / doc_file
if EXPORTER_THUMBNAIL_NAME in record:
thumb_file = record[EXPORTER_THUMBNAIL_NAME]
thumbnail_path = Path(os.path.join(self.source, thumb_file)).resolve()
thumbnail_path = (self.source / thumb_file).resolve()
else:
thumbnail_path = None
if EXPORTER_ARCHIVE_NAME in record:
archive_file = record[EXPORTER_ARCHIVE_NAME]
archive_path = os.path.join(self.source, archive_file)
archive_path = self.source / archive_file
else:
archive_path = None
document.storage_type = Document.STORAGE_TYPE_UNENCRYPTED
with FileLock(settings.MEDIA_LOCK):
if os.path.isfile(document.source_path):
if Path(document.source_path).is_file():
raise FileExistsError(document.source_path)
create_source_path_directory(document.source_path)
@@ -418,8 +419,8 @@ class Command(CryptMixin, BaseCommand):
had_at_least_one_record = False
for crypt_config in self.CRYPT_FIELDS:
importer_model = crypt_config["model_name"]
crypt_fields = crypt_config["fields"]
importer_model: str = crypt_config["model_name"]
crypt_fields: str = crypt_config["fields"]
for record in filter(
lambda x: x["model"] == importer_model,
self.manifest,