mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Switch src/documents/bulk*.py from os.path to pathlib.Path (#7862)
Also: * Ensure that the ruff PTH check remains enabled for these files and all files added in the future. * Add some type annotations.
This commit is contained in:

committed by
GitHub

parent
d1f255a22e
commit
74d0c9fda5
@@ -1,14 +1,21 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import NoReturn
|
||||
from zipfile import ZipFile
|
||||
|
||||
from documents.models import Document
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
|
||||
|
||||
class BulkArchiveStrategy:
|
||||
def __init__(self, zipf: ZipFile, follow_formatting: bool = False):
|
||||
self.zipf = zipf
|
||||
def __init__(self, zipf: ZipFile, follow_formatting: bool = False) -> None:
|
||||
self.zipf: ZipFile = zipf
|
||||
if follow_formatting:
|
||||
self.make_unique_filename = self._formatted_filepath
|
||||
self.make_unique_filename: Callable[..., Path | str] = (
|
||||
self._formatted_filepath
|
||||
)
|
||||
else:
|
||||
self.make_unique_filename = self._filename_only
|
||||
|
||||
@@ -17,7 +24,7 @@ class BulkArchiveStrategy:
|
||||
doc: Document,
|
||||
archive: bool = False,
|
||||
folder: str = "",
|
||||
):
|
||||
) -> str:
|
||||
"""
|
||||
Constructs a unique name for the given document to be used inside the
|
||||
zip file.
|
||||
@@ -26,7 +33,7 @@ class BulkArchiveStrategy:
|
||||
"""
|
||||
counter = 0
|
||||
while True:
|
||||
filename = folder + doc.get_public_filename(archive, counter)
|
||||
filename: str = folder + doc.get_public_filename(archive, counter)
|
||||
if filename in self.zipf.namelist():
|
||||
counter += 1
|
||||
else:
|
||||
@@ -37,7 +44,7 @@ class BulkArchiveStrategy:
|
||||
doc: Document,
|
||||
archive: bool = False,
|
||||
folder: str = "",
|
||||
):
|
||||
) -> Path:
|
||||
"""
|
||||
Constructs a full file path for the given document to be used inside
|
||||
the zipfile.
|
||||
@@ -45,24 +52,30 @@ class BulkArchiveStrategy:
|
||||
The path is already unique, as handled when a document is consumed or updated
|
||||
"""
|
||||
if archive and doc.has_archive_version:
|
||||
in_archive_path = os.path.join(folder, doc.archive_filename)
|
||||
if TYPE_CHECKING:
|
||||
assert doc.archive_filename is not None
|
||||
in_archive_path: Path = Path(folder) / doc.archive_filename
|
||||
else:
|
||||
in_archive_path = os.path.join(folder, doc.filename)
|
||||
if TYPE_CHECKING:
|
||||
assert doc.filename is not None
|
||||
in_archive_path = Path(folder) / doc.filename
|
||||
|
||||
return in_archive_path
|
||||
|
||||
def add_document(self, doc: Document):
|
||||
def add_document(self, doc: Document) -> NoReturn:
|
||||
raise NotImplementedError # pragma: no cover
|
||||
|
||||
|
||||
class OriginalsOnlyStrategy(BulkArchiveStrategy):
|
||||
def add_document(self, doc: Document):
|
||||
def add_document(self, doc: Document) -> None:
|
||||
self.zipf.write(doc.source_path, self.make_unique_filename(doc))
|
||||
|
||||
|
||||
class ArchiveOnlyStrategy(BulkArchiveStrategy):
|
||||
def add_document(self, doc: Document):
|
||||
def add_document(self, doc: Document) -> None:
|
||||
if doc.has_archive_version:
|
||||
if TYPE_CHECKING:
|
||||
assert doc.archive_path is not None
|
||||
self.zipf.write(
|
||||
doc.archive_path,
|
||||
self.make_unique_filename(doc, archive=True),
|
||||
@@ -72,8 +85,10 @@ class ArchiveOnlyStrategy(BulkArchiveStrategy):
|
||||
|
||||
|
||||
class OriginalAndArchiveStrategy(BulkArchiveStrategy):
|
||||
def add_document(self, doc: Document):
|
||||
def add_document(self, doc: Document) -> None:
|
||||
if doc.has_archive_version:
|
||||
if TYPE_CHECKING:
|
||||
assert doc.archive_path is not None
|
||||
self.zipf.write(
|
||||
doc.archive_path,
|
||||
self.make_unique_filename(doc, archive=True, folder="archive/"),
|
||||
|
@@ -1,8 +1,9 @@
|
||||
import hashlib
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Literal
|
||||
|
||||
from celery import chain
|
||||
from celery import chord
|
||||
@@ -25,10 +26,13 @@ from documents.tasks import bulk_update_documents
|
||||
from documents.tasks import consume_file
|
||||
from documents.tasks import update_document_archive_file
|
||||
|
||||
logger = logging.getLogger("paperless.bulk_edit")
|
||||
logger: logging.Logger = logging.getLogger("paperless.bulk_edit")
|
||||
|
||||
|
||||
def set_correspondent(doc_ids: list[int], correspondent):
|
||||
def set_correspondent(
|
||||
doc_ids: list[int],
|
||||
correspondent: Correspondent,
|
||||
) -> Literal["OK"]:
|
||||
if correspondent:
|
||||
correspondent = Correspondent.objects.only("pk").get(id=correspondent)
|
||||
|
||||
@@ -45,7 +49,7 @@ def set_correspondent(doc_ids: list[int], correspondent):
|
||||
return "OK"
|
||||
|
||||
|
||||
def set_storage_path(doc_ids: list[int], storage_path):
|
||||
def set_storage_path(doc_ids: list[int], storage_path: StoragePath) -> Literal["OK"]:
|
||||
if storage_path:
|
||||
storage_path = StoragePath.objects.only("pk").get(id=storage_path)
|
||||
|
||||
@@ -66,7 +70,7 @@ def set_storage_path(doc_ids: list[int], storage_path):
|
||||
return "OK"
|
||||
|
||||
|
||||
def set_document_type(doc_ids: list[int], document_type):
|
||||
def set_document_type(doc_ids: list[int], document_type: DocumentType) -> Literal["OK"]:
|
||||
if document_type:
|
||||
document_type = DocumentType.objects.only("pk").get(id=document_type)
|
||||
|
||||
@@ -83,7 +87,7 @@ def set_document_type(doc_ids: list[int], document_type):
|
||||
return "OK"
|
||||
|
||||
|
||||
def add_tag(doc_ids: list[int], tag: int):
|
||||
def add_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
|
||||
qs = Document.objects.filter(Q(id__in=doc_ids) & ~Q(tags__id=tag)).only("pk")
|
||||
affected_docs = list(qs.values_list("pk", flat=True))
|
||||
|
||||
@@ -98,7 +102,7 @@ def add_tag(doc_ids: list[int], tag: int):
|
||||
return "OK"
|
||||
|
||||
|
||||
def remove_tag(doc_ids: list[int], tag: int):
|
||||
def remove_tag(doc_ids: list[int], tag: int) -> Literal["OK"]:
|
||||
qs = Document.objects.filter(Q(id__in=doc_ids) & Q(tags__id=tag)).only("pk")
|
||||
affected_docs = list(qs.values_list("pk", flat=True))
|
||||
|
||||
@@ -113,7 +117,11 @@ def remove_tag(doc_ids: list[int], tag: int):
|
||||
return "OK"
|
||||
|
||||
|
||||
def modify_tags(doc_ids: list[int], add_tags: list[int], remove_tags: list[int]):
|
||||
def modify_tags(
|
||||
doc_ids: list[int],
|
||||
add_tags: list[int],
|
||||
remove_tags: list[int],
|
||||
) -> Literal["OK"]:
|
||||
qs = Document.objects.filter(id__in=doc_ids).only("pk")
|
||||
affected_docs = list(qs.values_list("pk", flat=True))
|
||||
|
||||
@@ -137,7 +145,11 @@ def modify_tags(doc_ids: list[int], add_tags: list[int], remove_tags: list[int])
|
||||
return "OK"
|
||||
|
||||
|
||||
def modify_custom_fields(doc_ids: list[int], add_custom_fields, remove_custom_fields):
|
||||
def modify_custom_fields(
|
||||
doc_ids: list[int],
|
||||
add_custom_fields,
|
||||
remove_custom_fields,
|
||||
) -> Literal["OK"]:
|
||||
qs = Document.objects.filter(id__in=doc_ids).only("pk")
|
||||
affected_docs = list(qs.values_list("pk", flat=True))
|
||||
|
||||
@@ -158,7 +170,7 @@ def modify_custom_fields(doc_ids: list[int], add_custom_fields, remove_custom_fi
|
||||
|
||||
|
||||
@shared_task
|
||||
def delete(doc_ids: list[int]):
|
||||
def delete(doc_ids: list[int]) -> Literal["OK"]:
|
||||
try:
|
||||
Document.objects.filter(id__in=doc_ids).delete()
|
||||
|
||||
@@ -177,7 +189,7 @@ def delete(doc_ids: list[int]):
|
||||
return "OK"
|
||||
|
||||
|
||||
def reprocess(doc_ids: list[int]):
|
||||
def reprocess(doc_ids: list[int]) -> Literal["OK"]:
|
||||
for document_id in doc_ids:
|
||||
update_document_archive_file.delay(
|
||||
document_id=document_id,
|
||||
@@ -186,7 +198,12 @@ def reprocess(doc_ids: list[int]):
|
||||
return "OK"
|
||||
|
||||
|
||||
def set_permissions(doc_ids: list[int], set_permissions, owner=None, merge=False):
|
||||
def set_permissions(
|
||||
doc_ids: list[int],
|
||||
set_permissions,
|
||||
owner=None,
|
||||
merge=False,
|
||||
) -> Literal["OK"]:
|
||||
qs = Document.objects.filter(id__in=doc_ids).select_related("owner")
|
||||
|
||||
if merge:
|
||||
@@ -205,12 +222,12 @@ def set_permissions(doc_ids: list[int], set_permissions, owner=None, merge=False
|
||||
return "OK"
|
||||
|
||||
|
||||
def rotate(doc_ids: list[int], degrees: int):
|
||||
def rotate(doc_ids: list[int], degrees: int) -> Literal["OK"]:
|
||||
logger.info(
|
||||
f"Attempting to rotate {len(doc_ids)} documents by {degrees} degrees.",
|
||||
)
|
||||
qs = Document.objects.filter(id__in=doc_ids)
|
||||
affected_docs = []
|
||||
affected_docs: list[int] = []
|
||||
import pikepdf
|
||||
|
||||
rotate_tasks = []
|
||||
@@ -250,17 +267,17 @@ def merge(
|
||||
doc_ids: list[int],
|
||||
metadata_document_id: int | None = None,
|
||||
delete_originals: bool = False,
|
||||
user: User = None,
|
||||
):
|
||||
user: User | None = None,
|
||||
) -> Literal["OK"]:
|
||||
logger.info(
|
||||
f"Attempting to merge {len(doc_ids)} documents into a single document.",
|
||||
)
|
||||
qs = Document.objects.filter(id__in=doc_ids)
|
||||
affected_docs = []
|
||||
affected_docs: list[int] = []
|
||||
import pikepdf
|
||||
|
||||
merged_pdf = pikepdf.new()
|
||||
version = merged_pdf.pdf_version
|
||||
version: str = merged_pdf.pdf_version
|
||||
# use doc_ids to preserve order
|
||||
for doc_id in doc_ids:
|
||||
doc = qs.get(id=doc_id)
|
||||
@@ -277,9 +294,11 @@ def merge(
|
||||
logger.warning("No documents were merged")
|
||||
return "OK"
|
||||
|
||||
filepath = os.path.join(
|
||||
tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
|
||||
f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf",
|
||||
filepath = (
|
||||
Path(
|
||||
tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
|
||||
)
|
||||
/ f"{'_'.join([str(doc_id) for doc_id in doc_ids])[:100]}_merged.pdf"
|
||||
)
|
||||
merged_pdf.remove_unreferenced_resources()
|
||||
merged_pdf.save(filepath, min_version=version)
|
||||
@@ -288,8 +307,12 @@ def merge(
|
||||
if metadata_document_id:
|
||||
metadata_document = qs.get(id=metadata_document_id)
|
||||
if metadata_document is not None:
|
||||
overrides = DocumentMetadataOverrides.from_document(metadata_document)
|
||||
overrides: DocumentMetadataOverrides = (
|
||||
DocumentMetadataOverrides.from_document(metadata_document)
|
||||
)
|
||||
overrides.title = metadata_document.title + " (merged)"
|
||||
else:
|
||||
overrides = DocumentMetadataOverrides()
|
||||
else:
|
||||
overrides = DocumentMetadataOverrides()
|
||||
|
||||
@@ -321,8 +344,8 @@ def split(
|
||||
doc_ids: list[int],
|
||||
pages: list[list[int]],
|
||||
delete_originals: bool = False,
|
||||
user: User = None,
|
||||
):
|
||||
user: User | None = None,
|
||||
) -> Literal["OK"]:
|
||||
logger.info(
|
||||
f"Attempting to split document {doc_ids[0]} into {len(pages)} documents",
|
||||
)
|
||||
@@ -334,18 +357,22 @@ def split(
|
||||
try:
|
||||
with pikepdf.open(doc.source_path) as pdf:
|
||||
for idx, split_doc in enumerate(pages):
|
||||
dst = pikepdf.new()
|
||||
dst: pikepdf.Pdf = pikepdf.new()
|
||||
for page in split_doc:
|
||||
dst.pages.append(pdf.pages[page - 1])
|
||||
filepath = os.path.join(
|
||||
tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
|
||||
f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf",
|
||||
filepath: Path = (
|
||||
Path(
|
||||
tempfile.mkdtemp(dir=settings.SCRATCH_DIR),
|
||||
)
|
||||
/ f"{doc.id}_{split_doc[0]}-{split_doc[-1]}.pdf"
|
||||
)
|
||||
dst.remove_unreferenced_resources()
|
||||
dst.save(filepath)
|
||||
dst.close()
|
||||
|
||||
overrides = DocumentMetadataOverrides().from_document(doc)
|
||||
overrides: DocumentMetadataOverrides = (
|
||||
DocumentMetadataOverrides().from_document(doc)
|
||||
)
|
||||
overrides.title = f"{doc.title} (split {idx + 1})"
|
||||
if user is not None:
|
||||
overrides.owner_id = user.id
|
||||
@@ -376,7 +403,7 @@ def split(
|
||||
return "OK"
|
||||
|
||||
|
||||
def delete_pages(doc_ids: list[int], pages: list[int]):
|
||||
def delete_pages(doc_ids: list[int], pages: list[int]) -> Literal["OK"]:
|
||||
logger.info(
|
||||
f"Attempting to delete pages {pages} from {len(doc_ids)} documents",
|
||||
)
|
||||
|
Reference in New Issue
Block a user