From af1b634d6d648a70b9956217c7558c5ef2ed9663 Mon Sep 17 00:00:00 2001 From: Trenton Holmes Date: Tue, 31 May 2022 08:42:11 -0700 Subject: [PATCH] Updates to use pathlib instead of os.path --- src/documents/sanity_checker.py | 46 ++++++++++++++++----------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/src/documents/sanity_checker.py b/src/documents/sanity_checker.py index 90aea9866..bac163bfd 100644 --- a/src/documents/sanity_checker.py +++ b/src/documents/sanity_checker.py @@ -1,7 +1,8 @@ import hashlib import logging -import os from collections import defaultdict +from pathlib import Path +from typing import Final from django.conf import settings from documents.models import Document @@ -59,38 +60,37 @@ class SanityCheckFailedException(Exception): def check_sanity(progress=False) -> SanityCheckMessages: messages = SanityCheckMessages() - present_files = [] - for root, subdirs, files in os.walk(settings.MEDIA_ROOT): - for f in files: - present_files.append(os.path.normpath(os.path.join(root, f))) + present_files = { + x.resolve() for x in Path(settings.MEDIA_ROOT).glob("**/*") if not x.is_dir() + } - lockfile = os.path.normpath(settings.MEDIA_LOCK) + lockfile = Path(settings.MEDIA_LOCK).resolve() if lockfile in present_files: present_files.remove(lockfile) for doc in tqdm(Document.objects.all(), disable=not progress): # Check sanity of the thumbnail - if not os.path.isfile(doc.thumbnail_path): + thumbnail_path: Final[Path] = Path(doc.thumbnail_path).resolve() + if not thumbnail_path.exists() or not thumbnail_path.is_file(): messages.error(doc.pk, "Thumbnail of document does not exist.") else: - if os.path.normpath(doc.thumbnail_path) in present_files: - present_files.remove(os.path.normpath(doc.thumbnail_path)) + if thumbnail_path in present_files: + present_files.remove(thumbnail_path) try: - with doc.thumbnail_file as f: - f.read() + _ = thumbnail_path.read_bytes() except OSError as e: messages.error(doc.pk, f"Cannot read thumbnail file of document: {e}") # Check sanity of the original file # TODO: extract method - if not os.path.isfile(doc.source_path): + source_path: Final[Path] = Path(doc.source_path).resolve() + if not source_path.exists() or not source_path.is_file(): messages.error(doc.pk, "Original of document does not exist.") else: - if os.path.normpath(doc.source_path) in present_files: - present_files.remove(os.path.normpath(doc.source_path)) + if source_path in present_files: + present_files.remove(source_path) try: - with doc.source_file as f: - checksum = hashlib.md5(f.read()).hexdigest() + checksum = hashlib.md5(source_path.read_bytes()).hexdigest() except OSError as e: messages.error(doc.pk, f"Cannot read original file of document: {e}") else: @@ -102,25 +102,25 @@ def check_sanity(progress=False) -> SanityCheckMessages: ) # Check sanity of the archive file. - if doc.archive_checksum and not doc.archive_filename: + if doc.archive_checksum is not None and doc.archive_filename is None: messages.error( doc.pk, "Document has an archive file checksum, but no archive filename.", ) - elif not doc.archive_checksum and doc.archive_filename: + elif doc.archive_checksum is None and doc.archive_filename is not None: messages.error( doc.pk, "Document has an archive file, but its checksum is missing.", ) elif doc.has_archive_version: - if not os.path.isfile(doc.archive_path): + archive_path: Final[Path] = Path(doc.archive_path).resolve() + if not archive_path.exists() or not archive_path.is_file(): messages.error(doc.pk, "Archived version of document does not exist.") else: - if os.path.normpath(doc.archive_path) in present_files: - present_files.remove(os.path.normpath(doc.archive_path)) + if archive_path in present_files: + present_files.remove(archive_path) try: - with doc.archive_file as f: - checksum = hashlib.md5(f.read()).hexdigest() + checksum = hashlib.md5(archive_path.read_bytes()).hexdigest() except OSError as e: messages.error( doc.pk,