Compare commits

...

7 Commits

Author SHA1 Message Date
Sebastian Steinbeißer
eec38020a4
Merge 0c7765fe03ecea11adce00667b7478d566f8548a into a0c1a19263be3c0abd1922fdd3b09c285a36c5b6 2025-03-14 21:05:44 +01:00
github-actions[bot]
a0c1a19263
New Crowdin translations by GitHub Action (#9401) 2025-03-14 10:30:45 -07:00
github-actions[bot]
1f5d1b6f26
New Crowdin translations by GitHub Action (#8929)
Co-authored-by: Crowdin Bot <support+bot@crowdin.com>
2025-03-14 17:22:24 +00:00
shamoon
3b19a727b8
Change: better handle permissions in patch requests (#9393) 2025-03-14 08:53:00 -07:00
shamoon
7146a5f4fc
Fix: use correct filename with webhook (#9392) 2025-03-14 07:44:40 -07:00
Sebastian Steinbeißer
0c7765fe03
Chore: switch from os.path to pathlib.Path 2025-03-13 18:49:05 +01:00
shamoon
6babc61ba2
Change: sync OIDC groups on first login too (#9387) 2025-03-13 07:19:34 -07:00
94 changed files with 90328 additions and 59803 deletions

View File

@ -227,27 +227,9 @@ lint.per-file-ignores."src/documents/tests/test_consumer.py" = [
lint.per-file-ignores."src/documents/tests/test_file_handling.py" = [ lint.per-file-ignores."src/documents/tests/test_file_handling.py" = [
"PTH", "PTH",
] # TODO Enable & remove ] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_management.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_management_consumer.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_management_exporter.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_migration_archive_files.py" = [ lint.per-file-ignores."src/documents/tests/test_migration_archive_files.py" = [
"PTH", "PTH",
] # TODO Enable & remove ] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_migration_document_pages_count.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_migration_mime_type.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/tests/test_sanity_check.py" = [
"PTH",
] # TODO Enable & remove
lint.per-file-ignores."src/documents/views.py" = [ lint.per-file-ignores."src/documents/views.py" = [
"PTH", "PTH",
] # TODO Enable & remove ] # TODO Enable & remove

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -70,57 +70,59 @@ def set_permissions_for_object(permissions: list[str], object, *, merge: bool =
for action in permissions: for action in permissions:
permission = f"{action}_{object.__class__.__name__.lower()}" permission = f"{action}_{object.__class__.__name__.lower()}"
# users if "users" in permissions[action]:
users_to_add = User.objects.filter(id__in=permissions[action]["users"]) # users
users_to_remove = ( users_to_add = User.objects.filter(id__in=permissions[action]["users"])
get_users_with_perms( users_to_remove = (
object, get_users_with_perms(
only_with_perms_in=[permission], object,
with_group_users=False, only_with_perms_in=[permission],
with_group_users=False,
)
if not merge
else User.objects.none()
) )
if not merge if len(users_to_add) > 0 and len(users_to_remove) > 0:
else User.objects.none() users_to_remove = users_to_remove.exclude(id__in=users_to_add)
) if len(users_to_remove) > 0:
if len(users_to_add) > 0 and len(users_to_remove) > 0: for user in users_to_remove:
users_to_remove = users_to_remove.exclude(id__in=users_to_add) remove_perm(permission, user, object)
if len(users_to_remove) > 0: if len(users_to_add) > 0:
for user in users_to_remove: for user in users_to_add:
remove_perm(permission, user, object) assign_perm(permission, user, object)
if len(users_to_add) > 0: if action == "change":
for user in users_to_add: # change gives view too
assign_perm(permission, user, object) assign_perm(
if action == "change": f"view_{object.__class__.__name__.lower()}",
# change gives view too user,
assign_perm( object,
f"view_{object.__class__.__name__.lower()}", )
user, if "groups" in permissions[action]:
object, # groups
) groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
# groups groups_to_remove = (
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"]) get_groups_with_only_permission(
groups_to_remove = ( object,
get_groups_with_only_permission( permission,
object, )
permission, if not merge
else Group.objects.none()
) )
if not merge if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
else Group.objects.none() groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
) if len(groups_to_remove) > 0:
if len(groups_to_add) > 0 and len(groups_to_remove) > 0: for group in groups_to_remove:
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add) remove_perm(permission, group, object)
if len(groups_to_remove) > 0: if len(groups_to_add) > 0:
for group in groups_to_remove: for group in groups_to_add:
remove_perm(permission, group, object) assign_perm(permission, group, object)
if len(groups_to_add) > 0: if action == "change":
for group in groups_to_add: # change gives view too
assign_perm(permission, group, object) assign_perm(
if action == "change": f"view_{object.__class__.__name__.lower()}",
# change gives view too group,
assign_perm( object,
f"view_{object.__class__.__name__.lower()}", )
group,
object,
)
def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet: def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet:

View File

@ -160,24 +160,24 @@ class SetPermissionsMixin:
def validate_set_permissions(self, set_permissions=None): def validate_set_permissions(self, set_permissions=None):
permissions_dict = { permissions_dict = {
"view": { "view": {},
"users": User.objects.none(), "change": {},
"groups": Group.objects.none(),
},
"change": {
"users": User.objects.none(),
"groups": Group.objects.none(),
},
} }
if set_permissions is not None: if set_permissions is not None:
for action, _ in permissions_dict.items(): for action in ["view", "change"]:
if action in set_permissions: if action in set_permissions:
users = set_permissions[action]["users"] if "users" in set_permissions[action]:
permissions_dict[action]["users"] = self._validate_user_ids(users) users = set_permissions[action]["users"]
groups = set_permissions[action]["groups"] permissions_dict[action]["users"] = self._validate_user_ids(
permissions_dict[action]["groups"] = self._validate_group_ids( users,
groups, )
) if "groups" in set_permissions[action]:
groups = set_permissions[action]["groups"]
permissions_dict[action]["groups"] = self._validate_group_ids(
groups,
)
else:
del permissions_dict[action]
return permissions_dict return permissions_dict
def _set_permissions(self, permissions, object): def _set_permissions(self, permissions, object):

View File

@ -1162,7 +1162,7 @@ def run_workflows(
) as f: ) as f:
files = { files = {
"file": ( "file": (
document.original_filename, filename,
f.read(), f.read(),
document.mime_type, document.mime_type,
), ),

View File

@ -395,6 +395,52 @@ class TestApiAuth(DirectoriesMixin, APITestCase):
self.assertTrue(checker.has_perm("view_document", doc)) self.assertTrue(checker.has_perm("view_document", doc))
self.assertIn("view_document", get_perms(group1, doc)) self.assertIn("view_document", get_perms(group1, doc))
def test_patch_doesnt_remove_permissions(self):
"""
GIVEN:
- existing document with permissions set
WHEN:
- PATCH API request to update doc that is not json
THEN:
- Object permissions are not removed
"""
doc = Document.objects.create(
title="test",
mime_type="application/pdf",
content="this is a document",
)
user1 = User.objects.create_superuser(username="user1")
user2 = User.objects.create(username="user2")
group1 = Group.objects.create(name="group1")
doc.owner = user1
doc.save()
assign_perm("view_document", user2, doc)
assign_perm("change_document", user2, doc)
assign_perm("view_document", group1, doc)
assign_perm("change_document", group1, doc)
self.client.force_authenticate(user1)
response = self.client.patch(
f"/api/documents/{doc.id}/",
{
"archive_serial_number": "123",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
doc = Document.objects.get(pk=doc.id)
self.assertEqual(doc.owner, user1)
from guardian.core import ObjectPermissionChecker
checker = ObjectPermissionChecker(user2)
self.assertTrue(checker.has_perm("view_document", doc))
self.assertIn("view_document", get_perms(group1, doc))
self.assertTrue(checker.has_perm("change_document", doc))
self.assertIn("change_document", get_perms(group1, doc))
def test_dynamic_permissions_fields(self): def test_dynamic_permissions_fields(self):
user1 = User.objects.create_user(username="user1") user1 = User.objects.create_user(username="user1")
user1.user_permissions.add(*Permission.objects.filter(codename="view_document")) user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))

View File

@ -1,6 +1,5 @@
import filecmp import filecmp
import hashlib import hashlib
import os
import shutil import shutil
import tempfile import tempfile
from io import StringIO from io import StringIO
@ -19,7 +18,7 @@ from documents.tasks import update_document_content_maybe_archive_file
from documents.tests.utils import DirectoriesMixin from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin from documents.tests.utils import FileSystemAssertsMixin
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf") sample_file: Path = Path(__file__).parent / "samples" / "simple.pdf"
@override_settings(FILENAME_FORMAT="{correspondent}/{title}") @override_settings(FILENAME_FORMAT="{correspondent}/{title}")
@ -34,19 +33,13 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def test_archiver(self): def test_archiver(self):
doc = self.make_models() doc = self.make_models()
shutil.copy( shutil.copy(sample_file, Path(self.dirs.originals_dir) / f"{doc.id:07}.pdf")
sample_file,
os.path.join(self.dirs.originals_dir, f"{doc.id:07}.pdf"),
)
call_command("document_archiver", "--processes", "1") call_command("document_archiver", "--processes", "1")
def test_handle_document(self): def test_handle_document(self):
doc = self.make_models() doc = self.make_models()
shutil.copy( shutil.copy(sample_file, Path(self.dirs.originals_dir) / f"{doc.id:07}.pdf")
sample_file,
os.path.join(self.dirs.originals_dir, f"{doc.id:07}.pdf"),
)
update_document_content_maybe_archive_file(doc.pk) update_document_content_maybe_archive_file(doc.pk)
@ -90,11 +83,8 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
mime_type="application/pdf", mime_type="application/pdf",
filename="document_01.pdf", filename="document_01.pdf",
) )
shutil.copy(sample_file, os.path.join(self.dirs.originals_dir, "document.pdf")) shutil.copy(sample_file, Path(self.dirs.originals_dir) / "document.pdf")
shutil.copy( shutil.copy(sample_file, Path(self.dirs.originals_dir) / "document_01.pdf")
sample_file,
os.path.join(self.dirs.originals_dir, "document_01.pdf"),
)
update_document_content_maybe_archive_file(doc2.pk) update_document_content_maybe_archive_file(doc2.pk)
update_document_content_maybe_archive_file(doc1.pk) update_document_content_maybe_archive_file(doc1.pk)
@ -136,22 +126,22 @@ class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
) )
shutil.copy( shutil.copy(
os.path.join( (
os.path.dirname(__file__), Path(__file__).parent
"samples", / "samples"
"documents", / "documents"
"originals", / "originals"
"0000004.pdf.gpg", / "0000004.pdf.gpg"
), ),
originals_dir / "0000004.pdf.gpg", originals_dir / "0000004.pdf.gpg",
) )
shutil.copy( shutil.copy(
os.path.join( (
os.path.dirname(__file__), Path(__file__).parent
"samples", / "samples"
"documents", / "documents"
"thumbnails", / "thumbnails"
"0000004.webp.gpg", / "0000004.webp.gpg"
), ),
thumb_dir / f"{doc.id:07}.webp.gpg", thumb_dir / f"{doc.id:07}.webp.gpg",
) )
@ -162,13 +152,13 @@ class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
self.assertEqual(doc.storage_type, Document.STORAGE_TYPE_UNENCRYPTED) self.assertEqual(doc.storage_type, Document.STORAGE_TYPE_UNENCRYPTED)
self.assertEqual(doc.filename, "0000004.pdf") self.assertEqual(doc.filename, "0000004.pdf")
self.assertIsFile(os.path.join(originals_dir, "0000004.pdf")) self.assertIsFile(Path(originals_dir) / "0000004.pdf")
self.assertIsFile(doc.source_path) self.assertIsFile(doc.source_path)
self.assertIsFile(os.path.join(thumb_dir, f"{doc.id:07}.webp")) self.assertIsFile(Path(thumb_dir) / f"{doc.id:07}.webp")
self.assertIsFile(doc.thumbnail_path) self.assertIsFile(doc.thumbnail_path)
with doc.source_file as f: with doc.source_file as f:
checksum = hashlib.md5(f.read()).hexdigest() checksum: str = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, doc.checksum) self.assertEqual(checksum, doc.checksum)

View File

@ -1,5 +1,4 @@
import filecmp import filecmp
import os
import shutil import shutil
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
@ -94,13 +93,13 @@ class ConsumerThreadMixin(DocumentConsumeDelayMixin):
print("Consumed a perfectly valid file.") # noqa: T201 print("Consumed a perfectly valid file.") # noqa: T201
def slow_write_file(self, target, *, incomplete=False): def slow_write_file(self, target, *, incomplete=False):
with open(self.sample_file, "rb") as f: with Path(self.sample_file).open("rb") as f:
pdf_bytes = f.read() pdf_bytes = f.read()
if incomplete: if incomplete:
pdf_bytes = pdf_bytes[: len(pdf_bytes) - 100] pdf_bytes = pdf_bytes[: len(pdf_bytes) - 100]
with open(target, "wb") as f: with Path(target).open("wb") as f:
# this will take 2 seconds, since the file is about 20k. # this will take 2 seconds, since the file is about 20k.
print("Start writing file.") # noqa: T201 print("Start writing file.") # noqa: T201
for b in chunked(1000, pdf_bytes): for b in chunked(1000, pdf_bytes):
@ -116,7 +115,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
def test_consume_file(self): def test_consume_file(self):
self.t_start() self.t_start()
f = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf")) f = Path(self.dirs.consumption_dir) / "my_file.pdf"
shutil.copy(self.sample_file, f) shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call() self.wait_for_task_mock_call()
@ -130,7 +129,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
def test_consume_file_invalid_ext(self): def test_consume_file_invalid_ext(self):
self.t_start() self.t_start()
f = os.path.join(self.dirs.consumption_dir, "my_file.wow") f = Path(self.dirs.consumption_dir) / "my_file.wow"
shutil.copy(self.sample_file, f) shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call() self.wait_for_task_mock_call()
@ -138,7 +137,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.consume_file_mock.assert_not_called() self.consume_file_mock.assert_not_called()
def test_consume_existing_file(self): def test_consume_existing_file(self):
f = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf")) f = Path(self.dirs.consumption_dir) / "my_file.pdf"
shutil.copy(self.sample_file, f) shutil.copy(self.sample_file, f)
self.t_start() self.t_start()
@ -154,7 +153,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start() self.t_start()
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf")) fname = Path(self.dirs.consumption_dir) / "my_file.pdf"
self.slow_write_file(fname) self.slow_write_file(fname)
@ -174,8 +173,8 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start() self.t_start()
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.~df")) fname = Path(self.dirs.consumption_dir) / "my_file.~df"
fname2 = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf")) fname2 = Path(self.dirs.consumption_dir) / "my_file.pdf"
self.slow_write_file(fname) self.slow_write_file(fname)
shutil.move(fname, fname2) shutil.move(fname, fname2)
@ -196,7 +195,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start() self.t_start()
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf")) fname = Path(self.dirs.consumption_dir) / "my_file.pdf"
self.slow_write_file(fname, incomplete=True) self.slow_write_file(fname, incomplete=True)
self.wait_for_task_mock_call() self.wait_for_task_mock_call()
@ -225,23 +224,23 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
shutil.copy( shutil.copy(
self.sample_file, self.sample_file,
os.path.join(self.dirs.consumption_dir, ".DS_STORE"), Path(self.dirs.consumption_dir) / ".DS_STORE",
) )
shutil.copy( shutil.copy(
self.sample_file, self.sample_file,
os.path.join(self.dirs.consumption_dir, "my_file.pdf"), Path(self.dirs.consumption_dir) / "my_file.pdf",
) )
shutil.copy( shutil.copy(
self.sample_file, self.sample_file,
os.path.join(self.dirs.consumption_dir, "._my_file.pdf"), Path(self.dirs.consumption_dir) / "._my_file.pdf",
) )
shutil.copy( shutil.copy(
self.sample_file, self.sample_file,
os.path.join(self.dirs.consumption_dir, "my_second_file.pdf"), Path(self.dirs.consumption_dir) / "my_second_file.pdf",
) )
shutil.copy( shutil.copy(
self.sample_file, self.sample_file,
os.path.join(self.dirs.consumption_dir, "._my_second_file.pdf"), Path(self.dirs.consumption_dir) / "._my_second_file.pdf",
) )
sleep(5) sleep(5)
@ -259,60 +258,66 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
def test_is_ignored(self): def test_is_ignored(self):
test_paths = [ test_paths = [
{ {
"path": os.path.join(self.dirs.consumption_dir, "foo.pdf"), "path": (Path(self.dirs.consumption_dir) / "foo.pdf").as_posix(),
"ignore": False, "ignore": False,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, "foo", "bar.pdf"), "path": (
Path(self.dirs.consumption_dir) / "foo" / "bar.pdf"
).as_posix(),
"ignore": False, "ignore": False,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, ".DS_STORE"), "path": (Path(self.dirs.consumption_dir) / ".DS_STORE").as_posix(),
"ignore": True, "ignore": True,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, ".DS_Store"), "path": (Path(self.dirs.consumption_dir) / ".DS_Store").as_posix(),
"ignore": True, "ignore": True,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, ".stfolder", "foo.pdf"), "path": (
Path(self.dirs.consumption_dir) / ".stfolder" / "foo.pdf"
).as_posix(),
"ignore": True, "ignore": True,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, ".stfolder.pdf"), "path": (Path(self.dirs.consumption_dir) / ".stfolder.pdf").as_posix(),
"ignore": False, "ignore": False,
}, },
{ {
"path": os.path.join( "path": (
self.dirs.consumption_dir, Path(self.dirs.consumption_dir) / ".stversions" / "foo.pdf"
".stversions", ).as_posix(),
"foo.pdf",
),
"ignore": True, "ignore": True,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, ".stversions.pdf"), "path": (
Path(self.dirs.consumption_dir) / ".stversions.pdf"
).as_posix(),
"ignore": False, "ignore": False,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, "._foo.pdf"), "path": (Path(self.dirs.consumption_dir) / "._foo.pdf").as_posix(),
"ignore": True, "ignore": True,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, "my_foo.pdf"), "path": (Path(self.dirs.consumption_dir) / "my_foo.pdf").as_posix(),
"ignore": False, "ignore": False,
}, },
{ {
"path": os.path.join(self.dirs.consumption_dir, "._foo", "bar.pdf"), "path": (
Path(self.dirs.consumption_dir) / "._foo" / "bar.pdf"
).as_posix(),
"ignore": True, "ignore": True,
}, },
{ {
"path": os.path.join( "path": (
self.dirs.consumption_dir, Path(self.dirs.consumption_dir)
"@eaDir", / "@eaDir"
"SYNO@.fileindexdb", / "SYNO@.fileindexdb"
"_1jk.fnm", / "_1jk.fnm"
), ).as_posix(),
"ignore": True, "ignore": True,
}, },
] ]
@ -332,7 +337,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
self.t_start() self.t_start()
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf") f = Path(self.dirs.consumption_dir) / "my_file.pdf"
shutil.copy(self.sample_file, f) shutil.copy(self.sample_file, f)
self.wait_for_task_mock_call() self.wait_for_task_mock_call()
@ -380,9 +385,9 @@ class TestConsumerTags(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCas
self.t_start() self.t_start()
path = os.path.join(self.dirs.consumption_dir, *tag_names) path = Path(self.dirs.consumption_dir) / "/".join(tag_names)
os.makedirs(path, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
f = Path(os.path.join(path, "my_file.pdf")) f = path / "my_file.pdf"
# Wait at least inotify read_delay for recursive watchers # Wait at least inotify read_delay for recursive watchers
# to be created for the new directories # to be created for the new directories
sleep(1) sleep(1)

View File

@ -1,6 +1,5 @@
import hashlib import hashlib
import json import json
import os
import shutil import shutil
import tempfile import tempfile
from io import StringIO from io import StringIO
@ -183,16 +182,16 @@ class TestExportImport(
call_command(*args) call_command(*args)
with open(os.path.join(self.target, "manifest.json")) as f: with (self.target / "manifest.json").open() as f:
manifest = json.load(f) manifest = json.load(f)
return manifest return manifest
def test_exporter(self, *, use_filename_format=False): def test_exporter(self, *, use_filename_format=False):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
num_permission_objects = Permission.objects.count() num_permission_objects = Permission.objects.count()
@ -210,7 +209,7 @@ class TestExportImport(
4, 4,
) )
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
self.assertEqual( self.assertEqual(
self._get_document_from_manifest(manifest, self.d1.id)["fields"]["title"], self._get_document_from_manifest(manifest, self.d1.id)["fields"]["title"],
@ -231,19 +230,17 @@ class TestExportImport(
for element in manifest: for element in manifest:
if element["model"] == "documents.document": if element["model"] == "documents.document":
fname = os.path.join( fname = (
self.target, self.target / element[document_exporter.EXPORTER_FILE_NAME]
element[document_exporter.EXPORTER_FILE_NAME], ).as_posix()
)
self.assertIsFile(fname) self.assertIsFile(fname)
self.assertIsFile( self.assertIsFile(
os.path.join( (
self.target, self.target / element[document_exporter.EXPORTER_THUMBNAIL_NAME]
element[document_exporter.EXPORTER_THUMBNAIL_NAME], ).as_posix(),
),
) )
with open(fname, "rb") as f: with Path(fname).open("rb") as f:
checksum = hashlib.md5(f.read()).hexdigest() checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, element["fields"]["checksum"]) self.assertEqual(checksum, element["fields"]["checksum"])
@ -253,13 +250,12 @@ class TestExportImport(
) )
if document_exporter.EXPORTER_ARCHIVE_NAME in element: if document_exporter.EXPORTER_ARCHIVE_NAME in element:
fname = os.path.join( fname = (
self.target, self.target / element[document_exporter.EXPORTER_ARCHIVE_NAME]
element[document_exporter.EXPORTER_ARCHIVE_NAME], ).as_posix()
)
self.assertIsFile(fname) self.assertIsFile(fname)
with open(fname, "rb") as f: with Path(fname).open("rb") as f:
checksum = hashlib.md5(f.read()).hexdigest() checksum = hashlib.md5(f.read()).hexdigest()
self.assertEqual(checksum, element["fields"]["archive_checksum"]) self.assertEqual(checksum, element["fields"]["archive_checksum"])
@ -297,10 +293,10 @@ class TestExportImport(
self.assertEqual(len(messages), 0) self.assertEqual(len(messages), 0)
def test_exporter_with_filename_format(self): def test_exporter_with_filename_format(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
with override_settings( with override_settings(
@ -309,16 +305,16 @@ class TestExportImport(
self.test_exporter(use_filename_format=True) self.test_exporter(use_filename_format=True)
def test_update_export_changed_time(self): def test_update_export_changed_time(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
self._do_export() self._do_export()
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
st_mtime_1 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime st_mtime_1 = (self.target / "manifest.json").stat().st_mtime
with mock.patch( with mock.patch(
"documents.management.commands.document_exporter.copy_file_with_basic_stats", "documents.management.commands.document_exporter.copy_file_with_basic_stats",
@ -326,8 +322,8 @@ class TestExportImport(
self._do_export() self._do_export()
m.assert_not_called() m.assert_not_called()
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
st_mtime_2 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime st_mtime_2 = (self.target / "manifest.json").stat().st_mtime
Path(self.d1.source_path).touch() Path(self.d1.source_path).touch()
@ -337,26 +333,26 @@ class TestExportImport(
self._do_export() self._do_export()
self.assertEqual(m.call_count, 1) self.assertEqual(m.call_count, 1)
st_mtime_3 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime st_mtime_3 = (self.target / "manifest.json").stat().st_mtime
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
self.assertNotEqual(st_mtime_1, st_mtime_2) self.assertNotEqual(st_mtime_1, st_mtime_2)
self.assertNotEqual(st_mtime_2, st_mtime_3) self.assertNotEqual(st_mtime_2, st_mtime_3)
self._do_export(compare_json=True) self._do_export(compare_json=True)
st_mtime_4 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime st_mtime_4 = (self.target / "manifest.json").stat().st_mtime
self.assertEqual(st_mtime_3, st_mtime_4) self.assertEqual(st_mtime_3, st_mtime_4)
def test_update_export_changed_checksum(self): def test_update_export_changed_checksum(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
self._do_export() self._do_export()
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
with mock.patch( with mock.patch(
"documents.management.commands.document_exporter.copy_file_with_basic_stats", "documents.management.commands.document_exporter.copy_file_with_basic_stats",
@ -364,7 +360,7 @@ class TestExportImport(
self._do_export() self._do_export()
m.assert_not_called() m.assert_not_called()
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
self.d2.checksum = "asdfasdgf3" self.d2.checksum = "asdfasdgf3"
self.d2.save() self.d2.save()
@ -375,13 +371,13 @@ class TestExportImport(
self._do_export(compare_checksums=True) self._do_export(compare_checksums=True)
self.assertEqual(m.call_count, 1) self.assertEqual(m.call_count, 1)
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
def test_update_export_deleted_document(self): def test_update_export_deleted_document(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
manifest = self._do_export() manifest = self._do_export()
@ -389,7 +385,7 @@ class TestExportImport(
self.assertTrue(len(manifest), 7) self.assertTrue(len(manifest), 7)
doc_from_manifest = self._get_document_from_manifest(manifest, self.d3.id) doc_from_manifest = self._get_document_from_manifest(manifest, self.d3.id)
self.assertIsFile( self.assertIsFile(
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]), (self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
) )
self.d3.delete() self.d3.delete()
@ -401,39 +397,39 @@ class TestExportImport(
self.d3.id, self.d3.id,
) )
self.assertIsFile( self.assertIsFile(
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]), (self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
) )
manifest = self._do_export(delete=True) manifest = self._do_export(delete=True)
self.assertIsNotFile( self.assertIsNotFile(
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]), (self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
) )
self.assertTrue(len(manifest), 6) self.assertTrue(len(manifest), 6)
@override_settings(FILENAME_FORMAT="{title}/{correspondent}") @override_settings(FILENAME_FORMAT="{title}/{correspondent}")
def test_update_export_changed_location(self): def test_update_export_changed_location(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
self._do_export(use_filename_format=True) self._do_export(use_filename_format=True)
self.assertIsFile(os.path.join(self.target, "wow1", "c.pdf")) self.assertIsFile((self.target / "wow1" / "c.pdf").as_posix())
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
self.d1.title = "new_title" self.d1.title = "new_title"
self.d1.save() self.d1.save()
self._do_export(use_filename_format=True, delete=True) self._do_export(use_filename_format=True, delete=True)
self.assertIsNotFile(os.path.join(self.target, "wow1", "c.pdf")) self.assertIsNotFile((self.target / "wow1" / "c.pdf").as_posix())
self.assertIsNotDir(os.path.join(self.target, "wow1")) self.assertIsNotDir((self.target / "wow1").as_posix())
self.assertIsFile(os.path.join(self.target, "new_title", "c.pdf")) self.assertIsFile((self.target / "new_title" / "c.pdf").as_posix())
self.assertIsFile(os.path.join(self.target, "manifest.json")) self.assertIsFile((self.target / "manifest.json").as_posix())
self.assertIsFile(os.path.join(self.target, "wow2", "none.pdf")) self.assertIsFile((self.target / "wow2" / "none.pdf").as_posix())
self.assertIsFile( self.assertIsFile(
os.path.join(self.target, "wow2", "none_01.pdf"), (self.target / "wow2" / "none_01.pdf").as_posix(),
) )
def test_export_missing_files(self): def test_export_missing_files(self):
@ -458,20 +454,19 @@ class TestExportImport(
- Zipfile is created - Zipfile is created
- Zipfile contains exported files - Zipfile contains exported files
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
args = ["document_exporter", self.target, "--zip"] args = ["document_exporter", self.target, "--zip"]
call_command(*args) call_command(*args)
expected_file = os.path.join( expected_file = (
self.target, self.target / f"export-{timezone.localdate().isoformat()}.zip"
f"export-{timezone.localdate().isoformat()}.zip", ).as_posix()
)
self.assertIsFile(expected_file) self.assertIsFile(expected_file)
@ -492,10 +487,10 @@ class TestExportImport(
- Zipfile is created - Zipfile is created
- Zipfile contains exported files - Zipfile contains exported files
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
args = ["document_exporter", self.target, "--zip", "--use-filename-format"] args = ["document_exporter", self.target, "--zip", "--use-filename-format"]
@ -505,10 +500,9 @@ class TestExportImport(
): ):
call_command(*args) call_command(*args)
expected_file = os.path.join( expected_file = (
self.target, self.target / f"export-{timezone.localdate().isoformat()}.zip"
f"export-{timezone.localdate().isoformat()}.zip", ).as_posix()
)
self.assertIsFile(expected_file) self.assertIsFile(expected_file)
@ -533,10 +527,10 @@ class TestExportImport(
- Zipfile contains exported files - Zipfile contains exported files
- The existing file and directory in target are removed - The existing file and directory in target are removed
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
# Create stuff in target directory # Create stuff in target directory
@ -552,10 +546,9 @@ class TestExportImport(
call_command(*args) call_command(*args)
expected_file = os.path.join( expected_file = (
self.target, self.target / f"export-{timezone.localdate().isoformat()}.zip"
f"export-{timezone.localdate().isoformat()}.zip", ).as_posix()
)
self.assertIsFile(expected_file) self.assertIsFile(expected_file)
self.assertIsNotFile(existing_file) self.assertIsNotFile(existing_file)
@ -610,7 +603,7 @@ class TestExportImport(
- Error is raised - Error is raised
""" """
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
os.chmod(tmp_dir, 0o000) Path(tmp_dir).chmod(0o000)
args = ["document_exporter", tmp_dir] args = ["document_exporter", tmp_dir]
@ -629,10 +622,10 @@ class TestExportImport(
- Manifest.json doesn't contain information about archive files - Manifest.json doesn't contain information about archive files
- Documents can be imported again - Documents can be imported again
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
manifest = self._do_export() manifest = self._do_export()
@ -670,10 +663,10 @@ class TestExportImport(
- Manifest.json doesn't contain information about thumbnails - Manifest.json doesn't contain information about thumbnails
- Documents can be imported again - Documents can be imported again
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
manifest = self._do_export() manifest = self._do_export()
@ -713,10 +706,10 @@ class TestExportImport(
- Main manifest.json file doesn't contain information about documents - Main manifest.json file doesn't contain information about documents
- Documents can be imported again - Documents can be imported again
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
manifest = self._do_export(split_manifest=True) manifest = self._do_export(split_manifest=True)
@ -744,10 +737,10 @@ class TestExportImport(
THEN: THEN:
- Documents can be imported again - Documents can be imported again
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
self._do_export(use_folder_prefix=True) self._do_export(use_folder_prefix=True)
@ -769,10 +762,10 @@ class TestExportImport(
- ContentType & Permission objects are not deleted, db transaction rolled back - ContentType & Permission objects are not deleted, db transaction rolled back
""" """
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
num_content_type_objects = ContentType.objects.count() num_content_type_objects = ContentType.objects.count()
@ -804,10 +797,10 @@ class TestExportImport(
self.assertEqual(Permission.objects.count(), num_permission_objects + 1) self.assertEqual(Permission.objects.count(), num_permission_objects + 1)
def test_exporter_with_auditlog_disabled(self): def test_exporter_with_auditlog_disabled(self):
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) shutil.rmtree(Path(self.dirs.media_dir) / "documents")
shutil.copytree( shutil.copytree(
os.path.join(os.path.dirname(__file__), "samples", "documents"), Path(__file__).parent / "samples" / "documents",
os.path.join(self.dirs.media_dir, "documents"), Path(self.dirs.media_dir) / "documents",
) )
with override_settings( with override_settings(

View File

@ -1,4 +1,3 @@
import os
import shutil import shutil
from pathlib import Path from pathlib import Path
@ -8,11 +7,11 @@ from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import TestMigrations from documents.tests.utils import TestMigrations
def source_path_before(self): def source_path_before(self) -> Path:
if self.filename: if self.filename:
fname = str(self.filename) fname = str(self.filename)
return os.path.join(settings.ORIGINALS_DIR, fname) return Path(settings.ORIGINALS_DIR) / fname
class TestMigrateDocumentPageCount(DirectoriesMixin, TestMigrations): class TestMigrateDocumentPageCount(DirectoriesMixin, TestMigrations):

View File

@ -1,5 +1,5 @@
import os
import shutil import shutil
from pathlib import Path
from django.conf import settings from django.conf import settings
from django.test import override_settings from django.test import override_settings
@ -20,7 +20,7 @@ def source_path_before(self):
if self.storage_type == STORAGE_TYPE_GPG: if self.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" fname += ".gpg"
return os.path.join(settings.ORIGINALS_DIR, fname) return (Path(settings.ORIGINALS_DIR) / fname).as_posix()
def file_type_after(self): def file_type_after(self):
@ -35,7 +35,7 @@ def source_path_after(doc):
if doc.storage_type == STORAGE_TYPE_GPG: if doc.storage_type == STORAGE_TYPE_GPG:
fname += ".gpg" # pragma: no cover fname += ".gpg" # pragma: no cover
return os.path.join(settings.ORIGINALS_DIR, fname) return (Path(settings.ORIGINALS_DIR) / fname).as_posix()
@override_settings(PASSPHRASE="test") @override_settings(PASSPHRASE="test")
@ -52,7 +52,7 @@ class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
) )
self.doc_id = doc.id self.doc_id = doc.id
shutil.copy( shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"), Path(__file__).parent / "samples" / "simple.pdf",
source_path_before(doc), source_path_before(doc),
) )
@ -63,12 +63,12 @@ class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
) )
self.doc2_id = doc2.id self.doc2_id = doc2.id
shutil.copy( shutil.copy(
os.path.join( (
os.path.dirname(__file__), Path(__file__).parent
"samples", / "samples"
"documents", / "documents"
"originals", / "originals"
"0000004.pdf.gpg", / "0000004.pdf.gpg"
), ),
source_path_before(doc2), source_path_before(doc2),
) )
@ -97,7 +97,7 @@ class TestMigrateMimeTypeBackwards(DirectoriesMixin, TestMigrations):
) )
self.doc_id = doc.id self.doc_id = doc.id
shutil.copy( shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"), Path(__file__).parent / "samples" / "simple.pdf",
source_path_after(doc), source_path_after(doc),
) )

View File

@ -1,5 +1,4 @@
import logging import logging
import os
import shutil import shutil
from pathlib import Path from pathlib import Path
@ -17,34 +16,34 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
with filelock.FileLock(settings.MEDIA_LOCK): with filelock.FileLock(settings.MEDIA_LOCK):
# just make sure that the lockfile is present. # just make sure that the lockfile is present.
shutil.copy( shutil.copy(
os.path.join( (
os.path.dirname(__file__), Path(__file__).parent
"samples", / "samples"
"documents", / "documents"
"originals", / "originals"
"0000001.pdf", / "0000001.pdf"
), ),
os.path.join(self.dirs.originals_dir, "0000001.pdf"), Path(self.dirs.originals_dir) / "0000001.pdf",
) )
shutil.copy( shutil.copy(
os.path.join( (
os.path.dirname(__file__), Path(__file__).parent
"samples", / "samples"
"documents", / "documents"
"archive", / "archive"
"0000001.pdf", / "0000001.pdf"
), ),
os.path.join(self.dirs.archive_dir, "0000001.pdf"), Path(self.dirs.archive_dir) / "0000001.pdf",
) )
shutil.copy( shutil.copy(
os.path.join( (
os.path.dirname(__file__), Path(__file__).parent
"samples", / "samples"
"documents", / "documents"
"thumbnails", / "thumbnails"
"0000001.webp", / "0000001.webp"
), ),
os.path.join(self.dirs.thumbnail_dir, "0000001.webp"), Path(self.dirs.thumbnail_dir) / "0000001.webp",
) )
return Document.objects.create( return Document.objects.create(
@ -92,25 +91,25 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
def test_no_thumbnail(self): def test_no_thumbnail(self):
doc = self.make_test_data() doc = self.make_test_data()
os.remove(doc.thumbnail_path) Path(doc.thumbnail_path).unlink()
self.assertSanityError(doc, "Thumbnail of document does not exist") self.assertSanityError(doc, "Thumbnail of document does not exist")
def test_thumbnail_no_access(self): def test_thumbnail_no_access(self):
doc = self.make_test_data() doc = self.make_test_data()
os.chmod(doc.thumbnail_path, 0o000) Path(doc.thumbnail_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read thumbnail file of document") self.assertSanityError(doc, "Cannot read thumbnail file of document")
os.chmod(doc.thumbnail_path, 0o777) Path(doc.thumbnail_path).chmod(0o777)
def test_no_original(self): def test_no_original(self):
doc = self.make_test_data() doc = self.make_test_data()
os.remove(doc.source_path) Path(doc.source_path).unlink()
self.assertSanityError(doc, "Original of document does not exist.") self.assertSanityError(doc, "Original of document does not exist.")
def test_original_no_access(self): def test_original_no_access(self):
doc = self.make_test_data() doc = self.make_test_data()
os.chmod(doc.source_path, 0o000) Path(doc.source_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read original file of document") self.assertSanityError(doc, "Cannot read original file of document")
os.chmod(doc.source_path, 0o777) Path(doc.source_path).chmod(0o777)
def test_original_checksum_mismatch(self): def test_original_checksum_mismatch(self):
doc = self.make_test_data() doc = self.make_test_data()
@ -120,14 +119,14 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
def test_no_archive(self): def test_no_archive(self):
doc = self.make_test_data() doc = self.make_test_data()
os.remove(doc.archive_path) Path(doc.archive_path).unlink()
self.assertSanityError(doc, "Archived version of document does not exist.") self.assertSanityError(doc, "Archived version of document does not exist.")
def test_archive_no_access(self): def test_archive_no_access(self):
doc = self.make_test_data() doc = self.make_test_data()
os.chmod(doc.archive_path, 0o000) Path(doc.archive_path).chmod(0o000)
self.assertSanityError(doc, "Cannot read archive file of document") self.assertSanityError(doc, "Cannot read archive file of document")
os.chmod(doc.archive_path, 0o777) Path(doc.archive_path).chmod(0o777)
def test_archive_checksum_mismatch(self): def test_archive_checksum_mismatch(self):
doc = self.make_test_data() doc = self.make_test_data()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,8 @@ from django.contrib.auth.models import User
from django.forms import ValidationError from django.forms import ValidationError
from django.urls import reverse from django.urls import reverse
from paperless.signals import handle_social_account_updated
logger = logging.getLogger("paperless.auth") logger = logging.getLogger("paperless.auth")
@ -114,4 +116,5 @@ class CustomSocialAccountAdapter(DefaultSocialAccountAdapter):
) )
user.groups.add(*groups) user.groups.add(*groups)
user.save() user.save()
handle_social_account_updated(None, request, sociallogin)
return user return user