Compare commits

...

7 Commits

Author SHA1 Message Date
Sebastian Steinbeißer
eec38020a4
Merge 0c7765fe03ecea11adce00667b7478d566f8548a into a0c1a19263be3c0abd1922fdd3b09c285a36c5b6 2025-03-14 21:05:44 +01:00
github-actions[bot]
a0c1a19263
New Crowdin translations by GitHub Action (#9401) 2025-03-14 10:30:45 -07:00
github-actions[bot]
1f5d1b6f26
New Crowdin translations by GitHub Action (#8929)
Co-authored-by: Crowdin Bot <support+bot@crowdin.com>
2025-03-14 17:22:24 +00:00
shamoon
3b19a727b8
Change: better handle permissions in patch requests (#9393) 2025-03-14 08:53:00 -07:00
shamoon
7146a5f4fc
Fix: use correct filename with webhook (#9392) 2025-03-14 07:44:40 -07:00
Sebastian Steinbeißer
0c7765fe03
Chore: switch from os.path to pathlib.Path 2025-03-13 18:49:05 +01:00
shamoon
6babc61ba2
Change: sync OIDC groups on first login too (#9387) 2025-03-13 07:19:34 -07:00
94 changed files with 90328 additions and 59803 deletions

View File

@ -227,27 +227,9 @@ lint.per-file-ignores."src/documents/tests/test_consumer.py" = [
lint.per-file-ignores."src/documents/tests/test_file_handling.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_management.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_management_consumer.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_management_exporter.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_migration_archive_files.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_migration_document_pages_count.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_migration_mime_type.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_sanity_check.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/views.py" = [
"PTH",
] # TODO Enable & remove

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -70,57 +70,59 @@ def set_permissions_for_object(permissions: list[str], object, *, merge: bool =
for action in permissions:
permission = f"{action}_{object.__class__.__name__.lower()}"
# users
users_to_add = User.objects.filter(id__in=permissions[action]["users"])
users_to_remove = (
get_users_with_perms(
object,
only_with_perms_in=[permission],
with_group_users=False,
if "users" in permissions[action]:
# users
users_to_add = User.objects.filter(id__in=permissions[action]["users"])
users_to_remove = (
get_users_with_perms(
object,
only_with_perms_in=[permission],
with_group_users=False,
)
if not merge
else User.objects.none()
)
if not merge
else User.objects.none()
)
if len(users_to_add) > 0 and len(users_to_remove) > 0:
users_to_remove = users_to_remove.exclude(id__in=users_to_add)
if len(users_to_remove) > 0:
for user in users_to_remove:
remove_perm(permission, user, object)
if len(users_to_add) > 0:
for user in users_to_add:
assign_perm(permission, user, object)
if action == "change":
# change gives view too
assign_perm(
f"view_{object.__class__.__name__.lower()}",
user,
object,
)
# groups
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
groups_to_remove = (
get_groups_with_only_permission(
object,
permission,
if len(users_to_add) > 0 and len(users_to_remove) > 0:
users_to_remove = users_to_remove.exclude(id__in=users_to_add)
if len(users_to_remove) > 0:
for user in users_to_remove:
remove_perm(permission, user, object)
if len(users_to_add) > 0:
for user in users_to_add:
assign_perm(permission, user, object)
if action == "change":
# change gives view too
assign_perm(
f"view_{object.__class__.__name__.lower()}",
user,
object,
)
if "groups" in permissions[action]:
# groups
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
groups_to_remove = (
get_groups_with_only_permission(
object,
permission,
)
if not merge
else Group.objects.none()
)
if not merge
else Group.objects.none()
)
if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
if len(groups_to_remove) > 0:
for group in groups_to_remove:
remove_perm(permission, group, object)
if len(groups_to_add) > 0:
for group in groups_to_add:
assign_perm(permission, group, object)
if action == "change":
# change gives view too
assign_perm(
f"view_{object.__class__.__name__.lower()}",
group,
object,
)
if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
if len(groups_to_remove) > 0:
for group in groups_to_remove:
remove_perm(permission, group, object)
if len(groups_to_add) > 0:
for group in groups_to_add:
assign_perm(permission, group, object)
if action == "change":
# change gives view too
assign_perm(
f"view_{object.__class__.__name__.lower()}",
group,
object,
)
def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet:

View File

@ -160,24 +160,24 @@ class SetPermissionsMixin:
def validate_set_permissions(self, set_permissions=None):
permissions_dict = {
"view": {
"users": User.objects.none(),
"groups": Group.objects.none(),
},
"change": {
"users": User.objects.none(),
"groups": Group.objects.none(),
},
"view": {},
"change": {},
}
if set_permissions is not None:
for action, _ in permissions_dict.items():
for action in ["view", "change"]:
if action in set_permissions:
users = set_permissions[action]["users"]
permissions_dict[action]["users"] = self._validate_user_ids(users)
groups = set_permissions[action]["groups"]
permissions_dict[action]["groups"] = self._validate_group_ids(
groups,
)
if "users" in set_permissions[action]:
users = set_permissions[action]["users"]
permissions_dict[action]["users"] = self._validate_user_ids(
users,
)
if "groups" in set_permissions[action]:
groups = set_permissions[action]["groups"]
permissions_dict[action]["groups"] = self._validate_group_ids(
groups,
)
else:
del permissions_dict[action]
return permissions_dict
def _set_permissions(self, permissions, object):

View File

@ -1162,7 +1162,7 @@ def run_workflows(
) as f:
files = {
"file": (
document.original_filename,
filename,
f.read(),
document.mime_type,
),

View File

@ -395,6 +395,52 @@ class TestApiAuth(DirectoriesMixin, APITestCase):
self.assertTrue(checker.has_perm("view_document", doc))
self.assertIn("view_document", get_perms(group1, doc))
def test_patch_doesnt_remove_permissions(self):
"""
GIVEN:
- existing document with permissions set
WHEN:
- PATCH API request to update doc that is not json
THEN:
- Object permissions are not removed
"""
doc = Document.objects.create(
title="test",
mime_type="application/pdf",
content="this is a document",
)
user1 = User.objects.create_superuser(username="user1")
user2 = User.objects.create(username="user2")
group1 = Group.objects.create(name="group1")
doc.owner = user1
doc.save()
assign_perm("view_document", user2, doc)
assign_perm("change_document", user2, doc)
assign_perm("view_document", group1, doc)
assign_perm("change_document", group1, doc)
self.client.force_authenticate(user1)
response = self.client.patch(
f"/api/documents/{doc.id}/",
{
"archive_serial_number": "123",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
doc = Document.objects.get(pk=doc.id)
self.assertEqual(doc.owner, user1)
from guardian.core import ObjectPermissionChecker
checker = ObjectPermissionChecker(user2)
self.assertTrue(checker.has_perm("view_document", doc))
self.assertIn("view_document", get_perms(group1, doc))
self.assertTrue(checker.has_perm("change_document", doc))
self.assertIn("change_document", get_perms(group1, doc))
def test_dynamic_permissions_fields(self):
user1 = User.objects.create_user(username="user1")
user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))

View File

@ -1,6 +1,5 @@
import filecmp
import hashlib
import os
import shutil
import tempfile
from io import StringIO
@ -19,7 +18,7 @@ from documents.tasks import update_document_content_maybe_archive_file
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
sample_file: Path = Path(__file__).parent / "samples" / "simple.pdf"
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
@ -34,19 +33,13 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_archiver(self):
doc = self.make_models()
shutil.copy(
sample_file,
os.path.join(self.dirs.originals_dir, f"{doc.id:07}.pdf"),
)
shutil.copy(sample_file, Path(self.dirs.originals_dir) / f"{doc.id:07}.pdf")
call_command("document_archiver", "--processes", "1")
def test_handle_document(self):
doc = self.make_models()
shutil.copy(
sample_file,
os.path.join(self.dirs.originals_dir, f"{doc.id:07}.pdf"),
)
shutil.copy(sample_file, Path(self.dirs.originals_dir) / f"{doc.id:07}.pdf")
update_document_content_maybe_archive_file(doc.pk)
@ -90,11 +83,8 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
mime_type="application/pdf",
filename="document_01.pdf",
)
shutil.copy(sample_file, os.path.join(self.dirs.originals_dir, "document.pdf"))
shutil.copy(
sample_file,
os.path.join(self.dirs.originals_dir, "document_01.pdf"),
)
shutil.copy(sample_file, Path(self.dirs.originals_dir) / "document.pdf")
shutil.copy(sample_file, Path(self.dirs.originals_dir) / "document_01.pdf")
update_document_content_maybe_archive_file(doc2.pk)
update_document_content_maybe_archive_file(doc1.pk)
@ -136,22 +126,22 @@ class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
)
shutil.copy(
os.path.join(
os.path.dirname(__file__),
"samples",
"documents",
"originals",
"0000004.pdf.gpg",
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000004.pdf.gpg"
),
originals_dir / "0000004.pdf.gpg",
)
shutil.copy(
os.path.join(
os.path.dirname(__file__),
"samples",
"documents",
"thumbnails",
"0000004.webp.gpg",
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "thumbnails"
/ "0000004.webp.gpg"
),
thumb_dir / f"{doc.id:07}.webp.gpg",
)
@ -162,13 +152,13 @@ class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
self.assertEqual(doc.storage_type, Document.STORAGE_TYPE_UNENCRYPTED)
self.assertEqual(doc.filename, "0000004.pdf")
self.assertIsFile(os.path.join(originals_dir, "0000004.pdf"))
self.assertIsFile(Path(originals_dir) / "0000004.pdf")
self.assertIsFile(doc.source_path)
self.assertIsFile(os.path.join(thumb_dir, f"{doc.id:07}.webp"))
self.assertIsFile(Path(thumb_dir) / f"{doc.id:07}.webp")
self.assertIsFile(doc.thumbnail_path)
with doc.source_file as f:
checksum = hashlib.md5(f.read()).hexdigest()
checksum: str = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, doc.checksum)

View File

@ -1,5 +1,4 @@
import filecmp
import os
import shutil
from pathlib import Path
from threading import Thread
@ -94,13 +93,13 @@ class ConsumerThreadMixin(DocumentConsumeDelayMixin):
print("Consumed a perfectly valid file.") # noqa: T201
def slow_write_file(self, target, *, incomplete=False):
with open(self.sample_file, "rb") as f:
with Path(self.sample_file).open("rb") as f:
pdf_bytes = f.read()
if incomplete:
pdf_bytes = pdf_bytes[: len(pdf_bytes) - 100]
with open(target, "wb") as f:
with Path(target).open("wb") as f:
# this will take 2 seconds, since the file is about 20k.
print("Start writing file.") # noqa: T201
for b in chunked(1000, pdf_bytes):
@ -116,7 +115,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
def test_consume_file(self):
self.t_start()
f = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
f = Path(self.dirs.consumption_dir) / "my_file.pdf"
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
@ -130,7 +129,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
def test_consume_file_invalid_ext(self):
self.t_start()
f = os.path.join(self.dirs.consumption_dir, "my_file.wow")
f = Path(self.dirs.consumption_dir) / "my_file.wow"
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
@ -138,7 +137,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.consume_file_mock.assert_not_called()
def test_consume_existing_file(self):
f = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
f = Path(self.dirs.consumption_dir) / "my_file.pdf"
shutil.copy(self.sample_file, f)
self.t_start()
@ -154,7 +153,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start()
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
fname = Path(self.dirs.consumption_dir) / "my_file.pdf"
self.slow_write_file(fname)
@ -174,8 +173,8 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start()
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.~df"))
fname2 = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
fname = Path(self.dirs.consumption_dir) / "my_file.~df"
fname2 = Path(self.dirs.consumption_dir) / "my_file.pdf"
self.slow_write_file(fname)
shutil.move(fname, fname2)
@ -196,7 +195,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start()
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
fname = Path(self.dirs.consumption_dir) / "my_file.pdf"
self.slow_write_file(fname, incomplete=True)
self.wait_for_task_mock_call()
@ -225,23 +224,23 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
shutil.copy(
self.sample_file,
os.path.join(self.dirs.consumption_dir, ".DS_STORE"),
Path(self.dirs.consumption_dir) / ".DS_STORE",
)
shutil.copy(
self.sample_file,
os.path.join(self.dirs.consumption_dir, "my_file.pdf"),
Path(self.dirs.consumption_dir) / "my_file.pdf",
)
shutil.copy(
self.sample_file,
os.path.join(self.dirs.consumption_dir, "._my_file.pdf"),
Path(self.dirs.consumption_dir) / "._my_file.pdf",
)
shutil.copy(
self.sample_file,
os.path.join(self.dirs.consumption_dir, "my_second_file.pdf"),
Path(self.dirs.consumption_dir) / "my_second_file.pdf",
)
shutil.copy(
self.sample_file,
os.path.join(self.dirs.consumption_dir, "._my_second_file.pdf"),
Path(self.dirs.consumption_dir) / "._my_second_file.pdf",
)
sleep(5)
@ -259,60 +258,66 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
def test_is_ignored(self):
test_paths = [
{
"path": os.path.join(self.dirs.consumption_dir, "foo.pdf"),
"path": (Path(self.dirs.consumption_dir) / "foo.pdf").as_posix(),
"ignore": False,
},
{
"path": os.path.join(self.dirs.consumption_dir, "foo", "bar.pdf"),
"path": (
Path(self.dirs.consumption_dir) / "foo" / "bar.pdf"
).as_posix(),
"ignore": False,
},
{
"path": os.path.join(self.dirs.consumption_dir, ".DS_STORE"),
"path": (Path(self.dirs.consumption_dir) / ".DS_STORE").as_posix(),
"ignore": True,
},
{
"path": os.path.join(self.dirs.consumption_dir, ".DS_Store"),
"path": (Path(self.dirs.consumption_dir) / ".DS_Store").as_posix(),
"ignore": True,
},
{
"path": os.path.join(self.dirs.consumption_dir, ".stfolder", "foo.pdf"),
"path": (
Path(self.dirs.consumption_dir) / ".stfolder" / "foo.pdf"
).as_posix(),
"ignore": True,
},
{
"path": os.path.join(self.dirs.consumption_dir, ".stfolder.pdf"),
"path": (Path(self.dirs.consumption_dir) / ".stfolder.pdf").as_posix(),
"ignore": False,
},
{
"path": os.path.join(
self.dirs.consumption_dir,
".stversions",
"foo.pdf",
),
"path": (
Path(self.dirs.consumption_dir) / ".stversions" / "foo.pdf"
).as_posix(),
"ignore": True,
},
{
"path": os.path.join(self.dirs.consumption_dir, ".stversions.pdf"),
"path": (
Path(self.dirs.consumption_dir) / ".stversions.pdf"
).as_posix(),
"ignore": False,
},
{
"path": os.path.join(self.dirs.consumption_dir, "._foo.pdf"),
"path": (Path(self.dirs.consumption_dir) / "._foo.pdf").as_posix(),
"ignore": True,
},
{
"path": os.path.join(self.dirs.consumption_dir, "my_foo.pdf"),
"path": (Path(self.dirs.consumption_dir) / "my_foo.pdf").as_posix(),
"ignore": False,
},
{
"path": os.path.join(self.dirs.consumption_dir, "._foo", "bar.pdf"),
"path": (
Path(self.dirs.consumption_dir) / "._foo" / "bar.pdf"
).as_posix(),
"ignore": True,
},
{
"path": os.path.join(
self.dirs.consumption_dir,
"@eaDir",
"SYNO@.fileindexdb",
"_1jk.fnm",
),
"path": (
Path(self.dirs.consumption_dir)
/ "@eaDir"
/ "SYNO@.fileindexdb"
/ "_1jk.fnm"
).as_posix(),
"ignore": True,
},
]
@ -332,7 +337,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start()
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
f = Path(self.dirs.consumption_dir) / "my_file.pdf"
shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call()
@ -380,9 +385,9 @@ class TestConsumerTags(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCas
self.t_start()
path = os.path.join(self.dirs.consumption_dir, *tag_names)
os.makedirs(path, exist_ok=True)
f = Path(os.path.join(path, "my_file.pdf"))
path = Path(self.dirs.consumption_dir) / "/".join(tag_names)
path.mkdir(parents=True, exist_ok=True)
f = path / "my_file.pdf"
# Wait at least inotify read_delay for recursive watchers
# to be created for the new directories
sleep(1)

View File

@ -1,6 +1,5 @@
import hashlib
import json
import os
import shutil
import tempfile
from io import StringIO
@ -183,16 +182,16 @@ class TestExportImport(
call_command(*args)
with open(os.path.join(self.target, "manifest.json")) as f:
with (self.target / "manifest.json").open() as f:
manifest = json.load(f)
return manifest
def test_exporter(self, *, use_filename_format=False):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
num_permission_objects = Permission.objects.count()
@ -210,7 +209,7 @@ class TestExportImport(
4,
)
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile((self.target / "manifest.json").as_posix())
self.assertEqual(
self._get_document_from_manifest(manifest, self.d1.id)["fields"]["title"],
@ -231,19 +230,17 @@ class TestExportImport(
for element in manifest:
if element["model"] == "documents.document":
fname = os.path.join(
self.target,
element[document_exporter.EXPORTER_FILE_NAME],
)
fname = (
self.target / element[document_exporter.EXPORTER_FILE_NAME]
).as_posix()
self.assertIsFile(fname)
self.assertIsFile(
os.path.join(
self.target,
element[document_exporter.EXPORTER_THUMBNAIL_NAME],
),
(
self.target / element[document_exporter.EXPORTER_THUMBNAIL_NAME]
).as_posix(),
)
with open(fname, "rb") as f:
with Path(fname).open("rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, element["fields"]["checksum"])
@ -253,13 +250,12 @@ class TestExportImport(
)
if document_exporter.EXPORTER_ARCHIVE_NAME in element:
fname = os.path.join(
self.target,
element[document_exporter.EXPORTER_ARCHIVE_NAME],
)
fname = (
self.target / element[document_exporter.EXPORTER_ARCHIVE_NAME]
).as_posix()
self.assertIsFile(fname)
with open(fname, "rb") as f:
with Path(fname).open("rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, element["fields"]["archive_checksum"])
@ -297,10 +293,10 @@ class TestExportImport(
self.assertEqual(len(messages), 0)
def test_exporter_with_filename_format(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
with override_settings(
@ -309,16 +305,16 @@ class TestExportImport(
self.test_exporter(use_filename_format=True)
def test_update_export_changed_time(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
self._do_export()
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile((self.target / "manifest.json").as_posix())
st_mtime_1 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
st_mtime_1 = (self.target / "manifest.json").stat().st_mtime
with mock.patch(
"documents.management.commands.document_exporter.copy_file_with_basic_stats",
@ -326,8 +322,8 @@ class TestExportImport(
self._do_export()
m.assert_not_called()
self.assertIsFile(os.path.join(self.target, "manifest.json"))
st_mtime_2 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
self.assertIsFile((self.target / "manifest.json").as_posix())
st_mtime_2 = (self.target / "manifest.json").stat().st_mtime
Path(self.d1.source_path).touch()
@ -337,26 +333,26 @@ class TestExportImport(
self._do_export()
self.assertEqual(m.call_count, 1)
st_mtime_3 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
self.assertIsFile(os.path.join(self.target, "manifest.json"))
st_mtime_3 = (self.target / "manifest.json").stat().st_mtime
self.assertIsFile((self.target / "manifest.json").as_posix())
self.assertNotEqual(st_mtime_1, st_mtime_2)
self.assertNotEqual(st_mtime_2, st_mtime_3)
self._do_export(compare_json=True)
st_mtime_4 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
st_mtime_4 = (self.target / "manifest.json").stat().st_mtime
self.assertEqual(st_mtime_3, st_mtime_4)
def test_update_export_changed_checksum(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
self._do_export()
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile((self.target / "manifest.json").as_posix())
with mock.patch(
"documents.management.commands.document_exporter.copy_file_with_basic_stats",
@ -364,7 +360,7 @@ class TestExportImport(
self._do_export()
m.assert_not_called()
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile((self.target / "manifest.json").as_posix())
self.d2.checksum = "asdfasdgf3"
self.d2.save()
@ -375,13 +371,13 @@ class TestExportImport(
self._do_export(compare_checksums=True)
self.assertEqual(m.call_count, 1)
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile((self.target / "manifest.json").as_posix())
def test_update_export_deleted_document(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
manifest = self._do_export()
@ -389,7 +385,7 @@ class TestExportImport(
self.assertTrue(len(manifest), 7)
doc_from_manifest = self._get_document_from_manifest(manifest, self.d3.id)
self.assertIsFile(
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]),
(self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
)
self.d3.delete()
@ -401,39 +397,39 @@ class TestExportImport(
self.d3.id,
)
self.assertIsFile(
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]),
(self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
)
manifest = self._do_export(delete=True)
self.assertIsNotFile(
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]),
(self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
)
self.assertTrue(len(manifest), 6)
@override_settings(FILENAME_FORMAT="{title}/{correspondent}")
def test_update_export_changed_location(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
self._do_export(use_filename_format=True)
self.assertIsFile(os.path.join(self.target, "wow1", "c.pdf"))
self.assertIsFile((self.target / "wow1" / "c.pdf").as_posix())
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile((self.target / "manifest.json").as_posix())
self.d1.title = "new_title"
self.d1.save()
self._do_export(use_filename_format=True, delete=True)
self.assertIsNotFile(os.path.join(self.target, "wow1", "c.pdf"))
self.assertIsNotDir(os.path.join(self.target, "wow1"))
self.assertIsFile(os.path.join(self.target, "new_title", "c.pdf"))
self.assertIsFile(os.path.join(self.target, "manifest.json"))
self.assertIsFile(os.path.join(self.target, "wow2", "none.pdf"))
self.assertIsNotFile((self.target / "wow1" / "c.pdf").as_posix())
self.assertIsNotDir((self.target / "wow1").as_posix())
self.assertIsFile((self.target / "new_title" / "c.pdf").as_posix())
self.assertIsFile((self.target / "manifest.json").as_posix())
self.assertIsFile((self.target / "wow2" / "none.pdf").as_posix())
self.assertIsFile(
os.path.join(self.target, "wow2", "none_01.pdf"),
(self.target / "wow2" / "none_01.pdf").as_posix(),
)
def test_export_missing_files(self):
@ -458,20 +454,19 @@ class TestExportImport(
- Zipfile is created
- Zipfile contains exported files
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
args = ["document_exporter", self.target, "--zip"]
call_command(*args)
expected_file = os.path.join(
self.target,
f"export-{timezone.localdate().isoformat()}.zip",
)
expected_file = (
self.target / f"export-{timezone.localdate().isoformat()}.zip"
).as_posix()
self.assertIsFile(expected_file)
@ -492,10 +487,10 @@ class TestExportImport(
- Zipfile is created
- Zipfile contains exported files
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
args = ["document_exporter", self.target, "--zip", "--use-filename-format"]
@ -505,10 +500,9 @@ class TestExportImport(
):
call_command(*args)
expected_file = os.path.join(
self.target,
f"export-{timezone.localdate().isoformat()}.zip",
)
expected_file = (
self.target / f"export-{timezone.localdate().isoformat()}.zip"
).as_posix()
self.assertIsFile(expected_file)
@ -533,10 +527,10 @@ class TestExportImport(
- Zipfile contains exported files
- The existing file and directory in target are removed
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
# Create stuff in target directory
@ -552,10 +546,9 @@ class TestExportImport(
call_command(*args)
expected_file = os.path.join(
self.target,
f"export-{timezone.localdate().isoformat()}.zip",
)
expected_file = (
self.target / f"export-{timezone.localdate().isoformat()}.zip"
).as_posix()
self.assertIsFile(expected_file)
self.assertIsNotFile(existing_file)
@ -610,7 +603,7 @@ class TestExportImport(
- Error is raised
"""
with tempfile.TemporaryDirectory() as tmp_dir:
os.chmod(tmp_dir, 0o000)
Path(tmp_dir).chmod(0o000)
args = ["document_exporter", tmp_dir]
@ -629,10 +622,10 @@ class TestExportImport(
- Manifest.json doesn't contain information about archive files
- Documents can be imported again
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
manifest = self._do_export()
@ -670,10 +663,10 @@ class TestExportImport(
- Manifest.json doesn't contain information about thumbnails
- Documents can be imported again
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
manifest = self._do_export()
@ -713,10 +706,10 @@ class TestExportImport(
- Main manifest.json file doesn't contain information about documents
- Documents can be imported again
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
manifest = self._do_export(split_manifest=True)
@ -744,10 +737,10 @@ class TestExportImport(
THEN:
- Documents can be imported again
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
self._do_export(use_folder_prefix=True)
@ -769,10 +762,10 @@ class TestExportImport(
- ContentType & Permission objects are not deleted, db transaction rolled back
"""
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
num_content_type_objects = ContentType.objects.count()
@ -804,10 +797,10 @@ class TestExportImport(
self.assertEqual(Permission.objects.count(), num_permission_objects + 1)
def test_exporter_with_auditlog_disabled(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"),
os.path.join(self.dirs.media_dir, "documents"),
Path(__file__).parent / "samples" / "documents",
Path(self.dirs.media_dir) / "documents",
)
with override_settings(

View File

@ -1,4 +1,3 @@
import os
import shutil
from pathlib import Path
@ -8,11 +7,11 @@ from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations
def source_path_before(self):
def source_path_before(self) -> Path:
if self.filename:
fname = str(self.filename)
return os.path.join(settings.ORIGINALS_DIR, fname)
return Path(settings.ORIGINALS_DIR) / fname
class TestMigrateDocumentPageCount(DirectoriesMixin, TestMigrations):

View File

@ -1,5 +1,5 @@
import os
import shutil
from pathlib import Path
from django.conf import settings
from django.test import override_settings
@ -20,7 +20,7 @@ def source_path_before(self):
if self.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg"
return os.path.join(settings.ORIGINALS_DIR, fname)
return (Path(settings.ORIGINALS_DIR) / fname).as_posix()
def file_type_after(self):
@ -35,7 +35,7 @@ def source_path_after(doc):
if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover
return os.path.join(settings.ORIGINALS_DIR, fname)
return (Path(settings.ORIGINALS_DIR) / fname).as_posix()
@override_settings(PASSPHRASE="test")
@ -52,7 +52,7 @@ class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
)
self.doc_id = doc.id
shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
Path(__file__).parent / "samples" / "simple.pdf",
source_path_before(doc),
)
@ -63,12 +63,12 @@ class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
)
self.doc2_id = doc2.id
shutil.copy(
os.path.join(
os.path.dirname(__file__),
"samples",
"documents",
"originals",
"0000004.pdf.gpg",
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000004.pdf.gpg"
),
source_path_before(doc2),
)
@ -97,7 +97,7 @@ class TestMigrateMimeTypeBackwards(DirectoriesMixin, TestMigrations):
)
self.doc_id = doc.id
shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
Path(__file__).parent / "samples" / "simple.pdf",
source_path_after(doc),
)

View File

@ -1,5 +1,4 @@
import logging
import os
import shutil
from pathlib import Path
@ -17,34 +16,34 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
with filelock.FileLock(settings.MEDIA_LOCK):
# just make sure that the lockfile is present.
shutil.copy(
os.path.join(
os.path.dirname(__file__),
"samples",
"documents",
"originals",
"0000001.pdf",
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "originals"
/ "0000001.pdf"
),
os.path.join(self.dirs.originals_dir, "0000001.pdf"),
Path(self.dirs.originals_dir) / "0000001.pdf",
)
shutil.copy(
os.path.join(
os.path.dirname(__file__),
"samples",
"documents",
"archive",
"0000001.pdf",
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "archive"
/ "0000001.pdf"
),
os.path.join(self.dirs.archive_dir, "0000001.pdf"),
Path(self.dirs.archive_dir) / "0000001.pdf",
)
shutil.copy(
os.path.join(
os.path.dirname(__file__),
"samples",
"documents",
"thumbnails",
"0000001.webp",
(
Path(__file__).parent
/ "samples"
/ "documents"
/ "thumbnails"
/ "0000001.webp"
),
os.path.join(self.dirs.thumbnail_dir, "0000001.webp"),
Path(self.dirs.thumbnail_dir) / "0000001.webp",
)
return Document.objects.create(
@ -92,25 +91,25 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
def test_no_thumbnail(self):
doc = self.make_test_data()
os.remove(doc.thumbnail_path)
Path(doc.thumbnail_path).unlink()
self.assertSanityError(doc, "Thumbnail of document does not exist")
def test_thumbnail_no_access(self):
doc = self.make_test_data()
os.chmod(doc.thumbnail_path, 0o000)
Path(doc.thumbnail_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read thumbnail file of document")
os.chmod(doc.thumbnail_path, 0o777)
Path(doc.thumbnail_path).chmod(0o777)
def test_no_original(self):
doc = self.make_test_data()
os.remove(doc.source_path)
Path(doc.source_path).unlink()
self.assertSanityError(doc, "Original of document does not exist.")
def test_original_no_access(self):
doc = self.make_test_data()
os.chmod(doc.source_path, 0o000)
Path(doc.source_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read original file of document")
os.chmod(doc.source_path, 0o777)
Path(doc.source_path).chmod(0o777)
def test_original_checksum_mismatch(self):
doc = self.make_test_data()
@ -120,14 +119,14 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
def test_no_archive(self):
doc = self.make_test_data()
os.remove(doc.archive_path)
Path(doc.archive_path).unlink()
self.assertSanityError(doc, "Archived version of document does not exist.")
def test_archive_no_access(self):
doc = self.make_test_data()
os.chmod(doc.archive_path, 0o000)
Path(doc.archive_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read archive file of document")
os.chmod(doc.archive_path, 0o777)
Path(doc.archive_path).chmod(0o777)
def test_archive_checksum_mismatch(self):
doc = self.make_test_data()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,8 @@ from django.contrib.auth.models import User
from django.forms import ValidationError
from django.urls import reverse
from paperless.signals import handle_social_account_updated
logger = logging.getLogger("paperless.auth")
@ -114,4 +116,5 @@ class CustomSocialAccountAdapter(DefaultSocialAccountAdapter):
)
user.groups.add(*groups)
user.save()
handle_social_account_updated(None, request, sociallogin)
return user