Chore: switch from os.path to pathlib.Path (#10397)

This commit is contained in:
Sebastian Steinbeißer
2025-08-06 19:50:42 +02:00
committed by GitHub
parent 54e2b916e6
commit 6dca4daea5
9 changed files with 201 additions and 181 deletions

View File

@@ -1,8 +1,8 @@
from __future__ import annotations
import logging
import os
import shutil
from pathlib import Path
from typing import TYPE_CHECKING
import httpx
@@ -51,8 +51,6 @@ from documents.permissions import set_permissions_for_object
from documents.templating.workflows import parse_w_workflow_placeholders
if TYPE_CHECKING:
from pathlib import Path
from documents.classifier import DocumentClassifier
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
@@ -329,15 +327,16 @@ def cleanup_document_deletion(sender, instance, **kwargs):
# Find a non-conflicting filename in case a document with the same
# name was moved to trash earlier
counter = 0
old_filename = os.path.split(instance.source_path)[1]
(old_filebase, old_fileext) = os.path.splitext(old_filename)
old_filename = Path(instance.source_path).name
old_filebase = Path(old_filename).stem
old_fileext = Path(old_filename).suffix
while True:
new_file_path = settings.EMPTY_TRASH_DIR / (
old_filebase + (f"_{counter:02}" if counter else "") + old_fileext
)
if os.path.exists(new_file_path):
if new_file_path.exists():
counter += 1
else:
break
@@ -361,26 +360,26 @@ def cleanup_document_deletion(sender, instance, **kwargs):
files += (instance.source_path,)
for filename in files:
if filename and os.path.isfile(filename):
if filename and filename.is_file():
try:
os.unlink(filename)
filename.unlink()
logger.debug(f"Deleted file {filename}.")
except OSError as e:
logger.warning(
f"While deleting document {instance!s}, the file "
f"{filename} could not be deleted: {e}",
)
elif filename and not os.path.isfile(filename):
elif filename and not filename.is_file():
logger.warning(f"Expected {filename} to exist, but it did not")
delete_empty_directories(
os.path.dirname(instance.source_path),
Path(instance.source_path).parent,
root=settings.ORIGINALS_DIR,
)
if instance.has_archive_version:
delete_empty_directories(
os.path.dirname(instance.archive_path),
Path(instance.archive_path).parent,
root=settings.ARCHIVE_DIR,
)
@@ -401,14 +400,14 @@ def update_filename_and_move_files(
if isinstance(instance, CustomFieldInstance):
instance = instance.document
def validate_move(instance, old_path, new_path):
if not os.path.isfile(old_path):
def validate_move(instance, old_path: Path, new_path: Path):
if not old_path.is_file():
# Can't do anything if the old file does not exist anymore.
msg = f"Document {instance!s}: File {old_path} doesn't exist."
logger.fatal(msg)
raise CannotMoveFilesException(msg)
if os.path.isfile(new_path):
if new_path.is_file():
# Can't do anything if the new file already exists. Skip updating file.
msg = f"Document {instance!s}: Cannot rename file since target path {new_path} already exists."
logger.warning(msg)
@@ -436,16 +435,20 @@ def update_filename_and_move_files(
old_filename = instance.filename
old_source_path = instance.source_path
instance.filename = generate_unique_filename(instance)
# Need to convert to string to be able to save it to the db
instance.filename = str(generate_unique_filename(instance))
move_original = old_filename != instance.filename
old_archive_filename = instance.archive_filename
old_archive_path = instance.archive_path
if instance.has_archive_version:
instance.archive_filename = generate_unique_filename(
instance,
archive_filename=True,
# Need to convert to string to be able to save it to the db
instance.archive_filename = str(
generate_unique_filename(
instance,
archive_filename=True,
),
)
move_archive = old_archive_filename != instance.archive_filename
@@ -487,11 +490,11 @@ def update_filename_and_move_files(
# Try to move files to their original location.
try:
if move_original and os.path.isfile(instance.source_path):
if move_original and instance.source_path.is_file():
logger.info("Restoring previous original path")
shutil.move(instance.source_path, old_source_path)
if move_archive and os.path.isfile(instance.archive_path):
if move_archive and instance.archive_path.is_file():
logger.info("Restoring previous archive path")
shutil.move(instance.archive_path, old_archive_path)
@@ -512,17 +515,15 @@ def update_filename_and_move_files(
# finally, remove any empty sub folders. This will do nothing if
# something has failed above.
if not os.path.isfile(old_source_path):
if not old_source_path.is_file():
delete_empty_directories(
os.path.dirname(old_source_path),
Path(old_source_path).parent,
root=settings.ORIGINALS_DIR,
)
if instance.has_archive_version and not os.path.isfile(
old_archive_path,
):
if instance.has_archive_version and not old_archive_path.is_file():
delete_empty_directories(
os.path.dirname(old_archive_path),
Path(old_archive_path).parent,
root=settings.ARCHIVE_DIR,
)
@@ -1219,10 +1220,7 @@ def run_workflows(
)
files = None
if action.webhook.include_document:
with open(
original_file,
"rb",
) as f:
with original_file.open("rb") as f:
files = {
"file": (
filename,