mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
Compare commits
7 Commits
0244b6b20b
...
eec38020a4
Author | SHA1 | Date | |
---|---|---|---|
![]() |
eec38020a4 | ||
![]() |
a0c1a19263 | ||
![]() |
1f5d1b6f26 | ||
![]() |
3b19a727b8 | ||
![]() |
7146a5f4fc | ||
![]() |
0c7765fe03 | ||
![]() |
6babc61ba2 |
@ -227,27 +227,9 @@ lint.per-file-ignores."src/documents/tests/test_consumer.py" = [
|
||||
lint.per-file-ignores."src/documents/tests/test_file_handling.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_management.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_management_consumer.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_management_exporter.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_migration_archive_files.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_migration_document_pages_count.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_migration_mime_type.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/tests/test_sanity_check.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
lint.per-file-ignores."src/documents/views.py" = [
|
||||
"PTH",
|
||||
] # TODO Enable & remove
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
11044
src-ui/src/locale/messages.et_EE.xlf
Normal file
11044
src-ui/src/locale/messages.et_EE.xlf
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -70,57 +70,59 @@ def set_permissions_for_object(permissions: list[str], object, *, merge: bool =
|
||||
|
||||
for action in permissions:
|
||||
permission = f"{action}_{object.__class__.__name__.lower()}"
|
||||
# users
|
||||
users_to_add = User.objects.filter(id__in=permissions[action]["users"])
|
||||
users_to_remove = (
|
||||
get_users_with_perms(
|
||||
object,
|
||||
only_with_perms_in=[permission],
|
||||
with_group_users=False,
|
||||
if "users" in permissions[action]:
|
||||
# users
|
||||
users_to_add = User.objects.filter(id__in=permissions[action]["users"])
|
||||
users_to_remove = (
|
||||
get_users_with_perms(
|
||||
object,
|
||||
only_with_perms_in=[permission],
|
||||
with_group_users=False,
|
||||
)
|
||||
if not merge
|
||||
else User.objects.none()
|
||||
)
|
||||
if not merge
|
||||
else User.objects.none()
|
||||
)
|
||||
if len(users_to_add) > 0 and len(users_to_remove) > 0:
|
||||
users_to_remove = users_to_remove.exclude(id__in=users_to_add)
|
||||
if len(users_to_remove) > 0:
|
||||
for user in users_to_remove:
|
||||
remove_perm(permission, user, object)
|
||||
if len(users_to_add) > 0:
|
||||
for user in users_to_add:
|
||||
assign_perm(permission, user, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
user,
|
||||
object,
|
||||
)
|
||||
# groups
|
||||
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
|
||||
groups_to_remove = (
|
||||
get_groups_with_only_permission(
|
||||
object,
|
||||
permission,
|
||||
if len(users_to_add) > 0 and len(users_to_remove) > 0:
|
||||
users_to_remove = users_to_remove.exclude(id__in=users_to_add)
|
||||
if len(users_to_remove) > 0:
|
||||
for user in users_to_remove:
|
||||
remove_perm(permission, user, object)
|
||||
if len(users_to_add) > 0:
|
||||
for user in users_to_add:
|
||||
assign_perm(permission, user, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
user,
|
||||
object,
|
||||
)
|
||||
if "groups" in permissions[action]:
|
||||
# groups
|
||||
groups_to_add = Group.objects.filter(id__in=permissions[action]["groups"])
|
||||
groups_to_remove = (
|
||||
get_groups_with_only_permission(
|
||||
object,
|
||||
permission,
|
||||
)
|
||||
if not merge
|
||||
else Group.objects.none()
|
||||
)
|
||||
if not merge
|
||||
else Group.objects.none()
|
||||
)
|
||||
if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
|
||||
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
|
||||
if len(groups_to_remove) > 0:
|
||||
for group in groups_to_remove:
|
||||
remove_perm(permission, group, object)
|
||||
if len(groups_to_add) > 0:
|
||||
for group in groups_to_add:
|
||||
assign_perm(permission, group, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
group,
|
||||
object,
|
||||
)
|
||||
if len(groups_to_add) > 0 and len(groups_to_remove) > 0:
|
||||
groups_to_remove = groups_to_remove.exclude(id__in=groups_to_add)
|
||||
if len(groups_to_remove) > 0:
|
||||
for group in groups_to_remove:
|
||||
remove_perm(permission, group, object)
|
||||
if len(groups_to_add) > 0:
|
||||
for group in groups_to_add:
|
||||
assign_perm(permission, group, object)
|
||||
if action == "change":
|
||||
# change gives view too
|
||||
assign_perm(
|
||||
f"view_{object.__class__.__name__.lower()}",
|
||||
group,
|
||||
object,
|
||||
)
|
||||
|
||||
|
||||
def get_objects_for_user_owner_aware(user, perms, Model) -> QuerySet:
|
||||
|
@ -160,24 +160,24 @@ class SetPermissionsMixin:
|
||||
|
||||
def validate_set_permissions(self, set_permissions=None):
|
||||
permissions_dict = {
|
||||
"view": {
|
||||
"users": User.objects.none(),
|
||||
"groups": Group.objects.none(),
|
||||
},
|
||||
"change": {
|
||||
"users": User.objects.none(),
|
||||
"groups": Group.objects.none(),
|
||||
},
|
||||
"view": {},
|
||||
"change": {},
|
||||
}
|
||||
if set_permissions is not None:
|
||||
for action, _ in permissions_dict.items():
|
||||
for action in ["view", "change"]:
|
||||
if action in set_permissions:
|
||||
users = set_permissions[action]["users"]
|
||||
permissions_dict[action]["users"] = self._validate_user_ids(users)
|
||||
groups = set_permissions[action]["groups"]
|
||||
permissions_dict[action]["groups"] = self._validate_group_ids(
|
||||
groups,
|
||||
)
|
||||
if "users" in set_permissions[action]:
|
||||
users = set_permissions[action]["users"]
|
||||
permissions_dict[action]["users"] = self._validate_user_ids(
|
||||
users,
|
||||
)
|
||||
if "groups" in set_permissions[action]:
|
||||
groups = set_permissions[action]["groups"]
|
||||
permissions_dict[action]["groups"] = self._validate_group_ids(
|
||||
groups,
|
||||
)
|
||||
else:
|
||||
del permissions_dict[action]
|
||||
return permissions_dict
|
||||
|
||||
def _set_permissions(self, permissions, object):
|
||||
|
@ -1162,7 +1162,7 @@ def run_workflows(
|
||||
) as f:
|
||||
files = {
|
||||
"file": (
|
||||
document.original_filename,
|
||||
filename,
|
||||
f.read(),
|
||||
document.mime_type,
|
||||
),
|
||||
|
@ -395,6 +395,52 @@ class TestApiAuth(DirectoriesMixin, APITestCase):
|
||||
self.assertTrue(checker.has_perm("view_document", doc))
|
||||
self.assertIn("view_document", get_perms(group1, doc))
|
||||
|
||||
def test_patch_doesnt_remove_permissions(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- existing document with permissions set
|
||||
WHEN:
|
||||
- PATCH API request to update doc that is not json
|
||||
THEN:
|
||||
- Object permissions are not removed
|
||||
"""
|
||||
doc = Document.objects.create(
|
||||
title="test",
|
||||
mime_type="application/pdf",
|
||||
content="this is a document",
|
||||
)
|
||||
user1 = User.objects.create_superuser(username="user1")
|
||||
user2 = User.objects.create(username="user2")
|
||||
group1 = Group.objects.create(name="group1")
|
||||
doc.owner = user1
|
||||
doc.save()
|
||||
|
||||
assign_perm("view_document", user2, doc)
|
||||
assign_perm("change_document", user2, doc)
|
||||
assign_perm("view_document", group1, doc)
|
||||
assign_perm("change_document", group1, doc)
|
||||
|
||||
self.client.force_authenticate(user1)
|
||||
|
||||
response = self.client.patch(
|
||||
f"/api/documents/{doc.id}/",
|
||||
{
|
||||
"archive_serial_number": "123",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
doc = Document.objects.get(pk=doc.id)
|
||||
|
||||
self.assertEqual(doc.owner, user1)
|
||||
from guardian.core import ObjectPermissionChecker
|
||||
|
||||
checker = ObjectPermissionChecker(user2)
|
||||
self.assertTrue(checker.has_perm("view_document", doc))
|
||||
self.assertIn("view_document", get_perms(group1, doc))
|
||||
self.assertTrue(checker.has_perm("change_document", doc))
|
||||
self.assertIn("change_document", get_perms(group1, doc))
|
||||
|
||||
def test_dynamic_permissions_fields(self):
|
||||
user1 = User.objects.create_user(username="user1")
|
||||
user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
|
||||
|
@ -1,6 +1,5 @@
|
||||
import filecmp
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from io import StringIO
|
||||
@ -19,7 +18,7 @@ from documents.tasks import update_document_content_maybe_archive_file
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from documents.tests.utils import FileSystemAssertsMixin
|
||||
|
||||
sample_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
|
||||
sample_file: Path = Path(__file__).parent / "samples" / "simple.pdf"
|
||||
|
||||
|
||||
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
|
||||
@ -34,19 +33,13 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
|
||||
def test_archiver(self):
|
||||
doc = self.make_models()
|
||||
shutil.copy(
|
||||
sample_file,
|
||||
os.path.join(self.dirs.originals_dir, f"{doc.id:07}.pdf"),
|
||||
)
|
||||
shutil.copy(sample_file, Path(self.dirs.originals_dir) / f"{doc.id:07}.pdf")
|
||||
|
||||
call_command("document_archiver", "--processes", "1")
|
||||
|
||||
def test_handle_document(self):
|
||||
doc = self.make_models()
|
||||
shutil.copy(
|
||||
sample_file,
|
||||
os.path.join(self.dirs.originals_dir, f"{doc.id:07}.pdf"),
|
||||
)
|
||||
shutil.copy(sample_file, Path(self.dirs.originals_dir) / f"{doc.id:07}.pdf")
|
||||
|
||||
update_document_content_maybe_archive_file(doc.pk)
|
||||
|
||||
@ -90,11 +83,8 @@ class TestArchiver(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
|
||||
mime_type="application/pdf",
|
||||
filename="document_01.pdf",
|
||||
)
|
||||
shutil.copy(sample_file, os.path.join(self.dirs.originals_dir, "document.pdf"))
|
||||
shutil.copy(
|
||||
sample_file,
|
||||
os.path.join(self.dirs.originals_dir, "document_01.pdf"),
|
||||
)
|
||||
shutil.copy(sample_file, Path(self.dirs.originals_dir) / "document.pdf")
|
||||
shutil.copy(sample_file, Path(self.dirs.originals_dir) / "document_01.pdf")
|
||||
|
||||
update_document_content_maybe_archive_file(doc2.pk)
|
||||
update_document_content_maybe_archive_file(doc1.pk)
|
||||
@ -136,22 +126,22 @@ class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
|
||||
)
|
||||
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000004.pdf.gpg",
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000004.pdf.gpg"
|
||||
),
|
||||
originals_dir / "0000004.pdf.gpg",
|
||||
)
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"thumbnails",
|
||||
"0000004.webp.gpg",
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "thumbnails"
|
||||
/ "0000004.webp.gpg"
|
||||
),
|
||||
thumb_dir / f"{doc.id:07}.webp.gpg",
|
||||
)
|
||||
@ -162,13 +152,13 @@ class TestDecryptDocuments(FileSystemAssertsMixin, TestCase):
|
||||
|
||||
self.assertEqual(doc.storage_type, Document.STORAGE_TYPE_UNENCRYPTED)
|
||||
self.assertEqual(doc.filename, "0000004.pdf")
|
||||
self.assertIsFile(os.path.join(originals_dir, "0000004.pdf"))
|
||||
self.assertIsFile(Path(originals_dir) / "0000004.pdf")
|
||||
self.assertIsFile(doc.source_path)
|
||||
self.assertIsFile(os.path.join(thumb_dir, f"{doc.id:07}.webp"))
|
||||
self.assertIsFile(Path(thumb_dir) / f"{doc.id:07}.webp")
|
||||
self.assertIsFile(doc.thumbnail_path)
|
||||
|
||||
with doc.source_file as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
checksum: str = hashlib.md5(f.read()).hexdigest()
|
||||
self.assertEqual(checksum, doc.checksum)
|
||||
|
||||
|
||||
|
@ -1,5 +1,4 @@
|
||||
import filecmp
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from threading import Thread
|
||||
@ -94,13 +93,13 @@ class ConsumerThreadMixin(DocumentConsumeDelayMixin):
|
||||
print("Consumed a perfectly valid file.") # noqa: T201
|
||||
|
||||
def slow_write_file(self, target, *, incomplete=False):
|
||||
with open(self.sample_file, "rb") as f:
|
||||
with Path(self.sample_file).open("rb") as f:
|
||||
pdf_bytes = f.read()
|
||||
|
||||
if incomplete:
|
||||
pdf_bytes = pdf_bytes[: len(pdf_bytes) - 100]
|
||||
|
||||
with open(target, "wb") as f:
|
||||
with Path(target).open("wb") as f:
|
||||
# this will take 2 seconds, since the file is about 20k.
|
||||
print("Start writing file.") # noqa: T201
|
||||
for b in chunked(1000, pdf_bytes):
|
||||
@ -116,7 +115,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
def test_consume_file(self):
|
||||
self.t_start()
|
||||
|
||||
f = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
|
||||
f = Path(self.dirs.consumption_dir) / "my_file.pdf"
|
||||
shutil.copy(self.sample_file, f)
|
||||
|
||||
self.wait_for_task_mock_call()
|
||||
@ -130,7 +129,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
def test_consume_file_invalid_ext(self):
|
||||
self.t_start()
|
||||
|
||||
f = os.path.join(self.dirs.consumption_dir, "my_file.wow")
|
||||
f = Path(self.dirs.consumption_dir) / "my_file.wow"
|
||||
shutil.copy(self.sample_file, f)
|
||||
|
||||
self.wait_for_task_mock_call()
|
||||
@ -138,7 +137,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
self.consume_file_mock.assert_not_called()
|
||||
|
||||
def test_consume_existing_file(self):
|
||||
f = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
|
||||
f = Path(self.dirs.consumption_dir) / "my_file.pdf"
|
||||
shutil.copy(self.sample_file, f)
|
||||
|
||||
self.t_start()
|
||||
@ -154,7 +153,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
|
||||
fname = Path(self.dirs.consumption_dir) / "my_file.pdf"
|
||||
|
||||
self.slow_write_file(fname)
|
||||
|
||||
@ -174,8 +173,8 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.~df"))
|
||||
fname2 = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
|
||||
fname = Path(self.dirs.consumption_dir) / "my_file.~df"
|
||||
fname2 = Path(self.dirs.consumption_dir) / "my_file.pdf"
|
||||
|
||||
self.slow_write_file(fname)
|
||||
shutil.move(fname, fname2)
|
||||
@ -196,7 +195,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
fname = Path(os.path.join(self.dirs.consumption_dir, "my_file.pdf"))
|
||||
fname = Path(self.dirs.consumption_dir) / "my_file.pdf"
|
||||
self.slow_write_file(fname, incomplete=True)
|
||||
|
||||
self.wait_for_task_mock_call()
|
||||
@ -225,23 +224,23 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
|
||||
shutil.copy(
|
||||
self.sample_file,
|
||||
os.path.join(self.dirs.consumption_dir, ".DS_STORE"),
|
||||
Path(self.dirs.consumption_dir) / ".DS_STORE",
|
||||
)
|
||||
shutil.copy(
|
||||
self.sample_file,
|
||||
os.path.join(self.dirs.consumption_dir, "my_file.pdf"),
|
||||
Path(self.dirs.consumption_dir) / "my_file.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
self.sample_file,
|
||||
os.path.join(self.dirs.consumption_dir, "._my_file.pdf"),
|
||||
Path(self.dirs.consumption_dir) / "._my_file.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
self.sample_file,
|
||||
os.path.join(self.dirs.consumption_dir, "my_second_file.pdf"),
|
||||
Path(self.dirs.consumption_dir) / "my_second_file.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
self.sample_file,
|
||||
os.path.join(self.dirs.consumption_dir, "._my_second_file.pdf"),
|
||||
Path(self.dirs.consumption_dir) / "._my_second_file.pdf",
|
||||
)
|
||||
|
||||
sleep(5)
|
||||
@ -259,60 +258,66 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
def test_is_ignored(self):
|
||||
test_paths = [
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, "foo.pdf"),
|
||||
"path": (Path(self.dirs.consumption_dir) / "foo.pdf").as_posix(),
|
||||
"ignore": False,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, "foo", "bar.pdf"),
|
||||
"path": (
|
||||
Path(self.dirs.consumption_dir) / "foo" / "bar.pdf"
|
||||
).as_posix(),
|
||||
"ignore": False,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, ".DS_STORE"),
|
||||
"path": (Path(self.dirs.consumption_dir) / ".DS_STORE").as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, ".DS_Store"),
|
||||
"path": (Path(self.dirs.consumption_dir) / ".DS_Store").as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, ".stfolder", "foo.pdf"),
|
||||
"path": (
|
||||
Path(self.dirs.consumption_dir) / ".stfolder" / "foo.pdf"
|
||||
).as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, ".stfolder.pdf"),
|
||||
"path": (Path(self.dirs.consumption_dir) / ".stfolder.pdf").as_posix(),
|
||||
"ignore": False,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(
|
||||
self.dirs.consumption_dir,
|
||||
".stversions",
|
||||
"foo.pdf",
|
||||
),
|
||||
"path": (
|
||||
Path(self.dirs.consumption_dir) / ".stversions" / "foo.pdf"
|
||||
).as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, ".stversions.pdf"),
|
||||
"path": (
|
||||
Path(self.dirs.consumption_dir) / ".stversions.pdf"
|
||||
).as_posix(),
|
||||
"ignore": False,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, "._foo.pdf"),
|
||||
"path": (Path(self.dirs.consumption_dir) / "._foo.pdf").as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, "my_foo.pdf"),
|
||||
"path": (Path(self.dirs.consumption_dir) / "my_foo.pdf").as_posix(),
|
||||
"ignore": False,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(self.dirs.consumption_dir, "._foo", "bar.pdf"),
|
||||
"path": (
|
||||
Path(self.dirs.consumption_dir) / "._foo" / "bar.pdf"
|
||||
).as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
{
|
||||
"path": os.path.join(
|
||||
self.dirs.consumption_dir,
|
||||
"@eaDir",
|
||||
"SYNO@.fileindexdb",
|
||||
"_1jk.fnm",
|
||||
),
|
||||
"path": (
|
||||
Path(self.dirs.consumption_dir)
|
||||
/ "@eaDir"
|
||||
/ "SYNO@.fileindexdb"
|
||||
/ "_1jk.fnm"
|
||||
).as_posix(),
|
||||
"ignore": True,
|
||||
},
|
||||
]
|
||||
@ -332,7 +337,7 @@ class TestConsumer(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCase):
|
||||
|
||||
self.t_start()
|
||||
|
||||
f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
|
||||
f = Path(self.dirs.consumption_dir) / "my_file.pdf"
|
||||
shutil.copy(self.sample_file, f)
|
||||
|
||||
self.wait_for_task_mock_call()
|
||||
@ -380,9 +385,9 @@ class TestConsumerTags(DirectoriesMixin, ConsumerThreadMixin, TransactionTestCas
|
||||
|
||||
self.t_start()
|
||||
|
||||
path = os.path.join(self.dirs.consumption_dir, *tag_names)
|
||||
os.makedirs(path, exist_ok=True)
|
||||
f = Path(os.path.join(path, "my_file.pdf"))
|
||||
path = Path(self.dirs.consumption_dir) / "/".join(tag_names)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
f = path / "my_file.pdf"
|
||||
# Wait at least inotify read_delay for recursive watchers
|
||||
# to be created for the new directories
|
||||
sleep(1)
|
||||
|
@ -1,6 +1,5 @@
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
from io import StringIO
|
||||
@ -183,16 +182,16 @@ class TestExportImport(
|
||||
|
||||
call_command(*args)
|
||||
|
||||
with open(os.path.join(self.target, "manifest.json")) as f:
|
||||
with (self.target / "manifest.json").open() as f:
|
||||
manifest = json.load(f)
|
||||
|
||||
return manifest
|
||||
|
||||
def test_exporter(self, *, use_filename_format=False):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
num_permission_objects = Permission.objects.count()
|
||||
@ -210,7 +209,7 @@ class TestExportImport(
|
||||
4,
|
||||
)
|
||||
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
self.assertEqual(
|
||||
self._get_document_from_manifest(manifest, self.d1.id)["fields"]["title"],
|
||||
@ -231,19 +230,17 @@ class TestExportImport(
|
||||
|
||||
for element in manifest:
|
||||
if element["model"] == "documents.document":
|
||||
fname = os.path.join(
|
||||
self.target,
|
||||
element[document_exporter.EXPORTER_FILE_NAME],
|
||||
)
|
||||
fname = (
|
||||
self.target / element[document_exporter.EXPORTER_FILE_NAME]
|
||||
).as_posix()
|
||||
self.assertIsFile(fname)
|
||||
self.assertIsFile(
|
||||
os.path.join(
|
||||
self.target,
|
||||
element[document_exporter.EXPORTER_THUMBNAIL_NAME],
|
||||
),
|
||||
(
|
||||
self.target / element[document_exporter.EXPORTER_THUMBNAIL_NAME]
|
||||
).as_posix(),
|
||||
)
|
||||
|
||||
with open(fname, "rb") as f:
|
||||
with Path(fname).open("rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
self.assertEqual(checksum, element["fields"]["checksum"])
|
||||
|
||||
@ -253,13 +250,12 @@ class TestExportImport(
|
||||
)
|
||||
|
||||
if document_exporter.EXPORTER_ARCHIVE_NAME in element:
|
||||
fname = os.path.join(
|
||||
self.target,
|
||||
element[document_exporter.EXPORTER_ARCHIVE_NAME],
|
||||
)
|
||||
fname = (
|
||||
self.target / element[document_exporter.EXPORTER_ARCHIVE_NAME]
|
||||
).as_posix()
|
||||
self.assertIsFile(fname)
|
||||
|
||||
with open(fname, "rb") as f:
|
||||
with Path(fname).open("rb") as f:
|
||||
checksum = hashlib.md5(f.read()).hexdigest()
|
||||
self.assertEqual(checksum, element["fields"]["archive_checksum"])
|
||||
|
||||
@ -297,10 +293,10 @@ class TestExportImport(
|
||||
self.assertEqual(len(messages), 0)
|
||||
|
||||
def test_exporter_with_filename_format(self):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
with override_settings(
|
||||
@ -309,16 +305,16 @@ class TestExportImport(
|
||||
self.test_exporter(use_filename_format=True)
|
||||
|
||||
def test_update_export_changed_time(self):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
self._do_export()
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
st_mtime_1 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
|
||||
st_mtime_1 = (self.target / "manifest.json").stat().st_mtime
|
||||
|
||||
with mock.patch(
|
||||
"documents.management.commands.document_exporter.copy_file_with_basic_stats",
|
||||
@ -326,8 +322,8 @@ class TestExportImport(
|
||||
self._do_export()
|
||||
m.assert_not_called()
|
||||
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
st_mtime_2 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
st_mtime_2 = (self.target / "manifest.json").stat().st_mtime
|
||||
|
||||
Path(self.d1.source_path).touch()
|
||||
|
||||
@ -337,26 +333,26 @@ class TestExportImport(
|
||||
self._do_export()
|
||||
self.assertEqual(m.call_count, 1)
|
||||
|
||||
st_mtime_3 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
st_mtime_3 = (self.target / "manifest.json").stat().st_mtime
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
self.assertNotEqual(st_mtime_1, st_mtime_2)
|
||||
self.assertNotEqual(st_mtime_2, st_mtime_3)
|
||||
|
||||
self._do_export(compare_json=True)
|
||||
st_mtime_4 = os.stat(os.path.join(self.target, "manifest.json")).st_mtime
|
||||
st_mtime_4 = (self.target / "manifest.json").stat().st_mtime
|
||||
self.assertEqual(st_mtime_3, st_mtime_4)
|
||||
|
||||
def test_update_export_changed_checksum(self):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
self._do_export()
|
||||
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
with mock.patch(
|
||||
"documents.management.commands.document_exporter.copy_file_with_basic_stats",
|
||||
@ -364,7 +360,7 @@ class TestExportImport(
|
||||
self._do_export()
|
||||
m.assert_not_called()
|
||||
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
self.d2.checksum = "asdfasdgf3"
|
||||
self.d2.save()
|
||||
@ -375,13 +371,13 @@ class TestExportImport(
|
||||
self._do_export(compare_checksums=True)
|
||||
self.assertEqual(m.call_count, 1)
|
||||
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
def test_update_export_deleted_document(self):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
manifest = self._do_export()
|
||||
@ -389,7 +385,7 @@ class TestExportImport(
|
||||
self.assertTrue(len(manifest), 7)
|
||||
doc_from_manifest = self._get_document_from_manifest(manifest, self.d3.id)
|
||||
self.assertIsFile(
|
||||
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]),
|
||||
(self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
|
||||
)
|
||||
self.d3.delete()
|
||||
|
||||
@ -401,39 +397,39 @@ class TestExportImport(
|
||||
self.d3.id,
|
||||
)
|
||||
self.assertIsFile(
|
||||
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]),
|
||||
(self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
|
||||
)
|
||||
|
||||
manifest = self._do_export(delete=True)
|
||||
self.assertIsNotFile(
|
||||
os.path.join(self.target, doc_from_manifest[EXPORTER_FILE_NAME]),
|
||||
(self.target / doc_from_manifest[EXPORTER_FILE_NAME]).as_posix(),
|
||||
)
|
||||
|
||||
self.assertTrue(len(manifest), 6)
|
||||
|
||||
@override_settings(FILENAME_FORMAT="{title}/{correspondent}")
|
||||
def test_update_export_changed_location(self):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
self._do_export(use_filename_format=True)
|
||||
self.assertIsFile(os.path.join(self.target, "wow1", "c.pdf"))
|
||||
self.assertIsFile((self.target / "wow1" / "c.pdf").as_posix())
|
||||
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
|
||||
self.d1.title = "new_title"
|
||||
self.d1.save()
|
||||
self._do_export(use_filename_format=True, delete=True)
|
||||
self.assertIsNotFile(os.path.join(self.target, "wow1", "c.pdf"))
|
||||
self.assertIsNotDir(os.path.join(self.target, "wow1"))
|
||||
self.assertIsFile(os.path.join(self.target, "new_title", "c.pdf"))
|
||||
self.assertIsFile(os.path.join(self.target, "manifest.json"))
|
||||
self.assertIsFile(os.path.join(self.target, "wow2", "none.pdf"))
|
||||
self.assertIsNotFile((self.target / "wow1" / "c.pdf").as_posix())
|
||||
self.assertIsNotDir((self.target / "wow1").as_posix())
|
||||
self.assertIsFile((self.target / "new_title" / "c.pdf").as_posix())
|
||||
self.assertIsFile((self.target / "manifest.json").as_posix())
|
||||
self.assertIsFile((self.target / "wow2" / "none.pdf").as_posix())
|
||||
self.assertIsFile(
|
||||
os.path.join(self.target, "wow2", "none_01.pdf"),
|
||||
(self.target / "wow2" / "none_01.pdf").as_posix(),
|
||||
)
|
||||
|
||||
def test_export_missing_files(self):
|
||||
@ -458,20 +454,19 @@ class TestExportImport(
|
||||
- Zipfile is created
|
||||
- Zipfile contains exported files
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
args = ["document_exporter", self.target, "--zip"]
|
||||
|
||||
call_command(*args)
|
||||
|
||||
expected_file = os.path.join(
|
||||
self.target,
|
||||
f"export-{timezone.localdate().isoformat()}.zip",
|
||||
)
|
||||
expected_file = (
|
||||
self.target / f"export-{timezone.localdate().isoformat()}.zip"
|
||||
).as_posix()
|
||||
|
||||
self.assertIsFile(expected_file)
|
||||
|
||||
@ -492,10 +487,10 @@ class TestExportImport(
|
||||
- Zipfile is created
|
||||
- Zipfile contains exported files
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
args = ["document_exporter", self.target, "--zip", "--use-filename-format"]
|
||||
@ -505,10 +500,9 @@ class TestExportImport(
|
||||
):
|
||||
call_command(*args)
|
||||
|
||||
expected_file = os.path.join(
|
||||
self.target,
|
||||
f"export-{timezone.localdate().isoformat()}.zip",
|
||||
)
|
||||
expected_file = (
|
||||
self.target / f"export-{timezone.localdate().isoformat()}.zip"
|
||||
).as_posix()
|
||||
|
||||
self.assertIsFile(expected_file)
|
||||
|
||||
@ -533,10 +527,10 @@ class TestExportImport(
|
||||
- Zipfile contains exported files
|
||||
- The existing file and directory in target are removed
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
# Create stuff in target directory
|
||||
@ -552,10 +546,9 @@ class TestExportImport(
|
||||
|
||||
call_command(*args)
|
||||
|
||||
expected_file = os.path.join(
|
||||
self.target,
|
||||
f"export-{timezone.localdate().isoformat()}.zip",
|
||||
)
|
||||
expected_file = (
|
||||
self.target / f"export-{timezone.localdate().isoformat()}.zip"
|
||||
).as_posix()
|
||||
|
||||
self.assertIsFile(expected_file)
|
||||
self.assertIsNotFile(existing_file)
|
||||
@ -610,7 +603,7 @@ class TestExportImport(
|
||||
- Error is raised
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
os.chmod(tmp_dir, 0o000)
|
||||
Path(tmp_dir).chmod(0o000)
|
||||
|
||||
args = ["document_exporter", tmp_dir]
|
||||
|
||||
@ -629,10 +622,10 @@ class TestExportImport(
|
||||
- Manifest.json doesn't contain information about archive files
|
||||
- Documents can be imported again
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
manifest = self._do_export()
|
||||
@ -670,10 +663,10 @@ class TestExportImport(
|
||||
- Manifest.json doesn't contain information about thumbnails
|
||||
- Documents can be imported again
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
manifest = self._do_export()
|
||||
@ -713,10 +706,10 @@ class TestExportImport(
|
||||
- Main manifest.json file doesn't contain information about documents
|
||||
- Documents can be imported again
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
manifest = self._do_export(split_manifest=True)
|
||||
@ -744,10 +737,10 @@ class TestExportImport(
|
||||
THEN:
|
||||
- Documents can be imported again
|
||||
"""
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
self._do_export(use_folder_prefix=True)
|
||||
@ -769,10 +762,10 @@ class TestExportImport(
|
||||
- ContentType & Permission objects are not deleted, db transaction rolled back
|
||||
"""
|
||||
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
num_content_type_objects = ContentType.objects.count()
|
||||
@ -804,10 +797,10 @@ class TestExportImport(
|
||||
self.assertEqual(Permission.objects.count(), num_permission_objects + 1)
|
||||
|
||||
def test_exporter_with_auditlog_disabled(self):
|
||||
shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
|
||||
shutil.rmtree(Path(self.dirs.media_dir) / "documents")
|
||||
shutil.copytree(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "documents"),
|
||||
os.path.join(self.dirs.media_dir, "documents"),
|
||||
Path(__file__).parent / "samples" / "documents",
|
||||
Path(self.dirs.media_dir) / "documents",
|
||||
)
|
||||
|
||||
with override_settings(
|
||||
|
@ -1,4 +1,3 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
@ -8,11 +7,11 @@ from documents.tests.utils import DirectoriesMixin
|
||||
from documents.tests.utils import TestMigrations
|
||||
|
||||
|
||||
def source_path_before(self):
|
||||
def source_path_before(self) -> Path:
|
||||
if self.filename:
|
||||
fname = str(self.filename)
|
||||
|
||||
return os.path.join(settings.ORIGINALS_DIR, fname)
|
||||
return Path(settings.ORIGINALS_DIR) / fname
|
||||
|
||||
|
||||
class TestMigrateDocumentPageCount(DirectoriesMixin, TestMigrations):
|
||||
|
@ -1,5 +1,5 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
@ -20,7 +20,7 @@ def source_path_before(self):
|
||||
if self.storage_type == STORAGE_TYPE_GPG:
|
||||
fname += ".gpg"
|
||||
|
||||
return os.path.join(settings.ORIGINALS_DIR, fname)
|
||||
return (Path(settings.ORIGINALS_DIR) / fname).as_posix()
|
||||
|
||||
|
||||
def file_type_after(self):
|
||||
@ -35,7 +35,7 @@ def source_path_after(doc):
|
||||
if doc.storage_type == STORAGE_TYPE_GPG:
|
||||
fname += ".gpg" # pragma: no cover
|
||||
|
||||
return os.path.join(settings.ORIGINALS_DIR, fname)
|
||||
return (Path(settings.ORIGINALS_DIR) / fname).as_posix()
|
||||
|
||||
|
||||
@override_settings(PASSPHRASE="test")
|
||||
@ -52,7 +52,7 @@ class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
|
||||
)
|
||||
self.doc_id = doc.id
|
||||
shutil.copy(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
|
||||
Path(__file__).parent / "samples" / "simple.pdf",
|
||||
source_path_before(doc),
|
||||
)
|
||||
|
||||
@ -63,12 +63,12 @@ class TestMigrateMimeType(DirectoriesMixin, TestMigrations):
|
||||
)
|
||||
self.doc2_id = doc2.id
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000004.pdf.gpg",
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000004.pdf.gpg"
|
||||
),
|
||||
source_path_before(doc2),
|
||||
)
|
||||
@ -97,7 +97,7 @@ class TestMigrateMimeTypeBackwards(DirectoriesMixin, TestMigrations):
|
||||
)
|
||||
self.doc_id = doc.id
|
||||
shutil.copy(
|
||||
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
|
||||
Path(__file__).parent / "samples" / "simple.pdf",
|
||||
source_path_after(doc),
|
||||
)
|
||||
|
||||
|
@ -1,5 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
@ -17,34 +16,34 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
with filelock.FileLock(settings.MEDIA_LOCK):
|
||||
# just make sure that the lockfile is present.
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"originals",
|
||||
"0000001.pdf",
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "originals"
|
||||
/ "0000001.pdf"
|
||||
),
|
||||
os.path.join(self.dirs.originals_dir, "0000001.pdf"),
|
||||
Path(self.dirs.originals_dir) / "0000001.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"archive",
|
||||
"0000001.pdf",
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "archive"
|
||||
/ "0000001.pdf"
|
||||
),
|
||||
os.path.join(self.dirs.archive_dir, "0000001.pdf"),
|
||||
Path(self.dirs.archive_dir) / "0000001.pdf",
|
||||
)
|
||||
shutil.copy(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"samples",
|
||||
"documents",
|
||||
"thumbnails",
|
||||
"0000001.webp",
|
||||
(
|
||||
Path(__file__).parent
|
||||
/ "samples"
|
||||
/ "documents"
|
||||
/ "thumbnails"
|
||||
/ "0000001.webp"
|
||||
),
|
||||
os.path.join(self.dirs.thumbnail_dir, "0000001.webp"),
|
||||
Path(self.dirs.thumbnail_dir) / "0000001.webp",
|
||||
)
|
||||
|
||||
return Document.objects.create(
|
||||
@ -92,25 +91,25 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
|
||||
def test_no_thumbnail(self):
|
||||
doc = self.make_test_data()
|
||||
os.remove(doc.thumbnail_path)
|
||||
Path(doc.thumbnail_path).unlink()
|
||||
self.assertSanityError(doc, "Thumbnail of document does not exist")
|
||||
|
||||
def test_thumbnail_no_access(self):
|
||||
doc = self.make_test_data()
|
||||
os.chmod(doc.thumbnail_path, 0o000)
|
||||
Path(doc.thumbnail_path).chmod(0o000)
|
||||
self.assertSanityError(doc, "Cannot read thumbnail file of document")
|
||||
os.chmod(doc.thumbnail_path, 0o777)
|
||||
Path(doc.thumbnail_path).chmod(0o777)
|
||||
|
||||
def test_no_original(self):
|
||||
doc = self.make_test_data()
|
||||
os.remove(doc.source_path)
|
||||
Path(doc.source_path).unlink()
|
||||
self.assertSanityError(doc, "Original of document does not exist.")
|
||||
|
||||
def test_original_no_access(self):
|
||||
doc = self.make_test_data()
|
||||
os.chmod(doc.source_path, 0o000)
|
||||
Path(doc.source_path).chmod(0o000)
|
||||
self.assertSanityError(doc, "Cannot read original file of document")
|
||||
os.chmod(doc.source_path, 0o777)
|
||||
Path(doc.source_path).chmod(0o777)
|
||||
|
||||
def test_original_checksum_mismatch(self):
|
||||
doc = self.make_test_data()
|
||||
@ -120,14 +119,14 @@ class TestSanityCheck(DirectoriesMixin, TestCase):
|
||||
|
||||
def test_no_archive(self):
|
||||
doc = self.make_test_data()
|
||||
os.remove(doc.archive_path)
|
||||
Path(doc.archive_path).unlink()
|
||||
self.assertSanityError(doc, "Archived version of document does not exist.")
|
||||
|
||||
def test_archive_no_access(self):
|
||||
doc = self.make_test_data()
|
||||
os.chmod(doc.archive_path, 0o000)
|
||||
Path(doc.archive_path).chmod(0o000)
|
||||
self.assertSanityError(doc, "Cannot read archive file of document")
|
||||
os.chmod(doc.archive_path, 0o777)
|
||||
Path(doc.archive_path).chmod(0o777)
|
||||
|
||||
def test_archive_checksum_mismatch(self):
|
||||
doc = self.make_test_data()
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
2021
src/locale/et_EE/LC_MESSAGES/django.po
Normal file
2021
src/locale/et_EE/LC_MESSAGES/django.po
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -10,6 +10,8 @@ from django.contrib.auth.models import User
|
||||
from django.forms import ValidationError
|
||||
from django.urls import reverse
|
||||
|
||||
from paperless.signals import handle_social_account_updated
|
||||
|
||||
logger = logging.getLogger("paperless.auth")
|
||||
|
||||
|
||||
@ -114,4 +116,5 @@ class CustomSocialAccountAdapter(DefaultSocialAccountAdapter):
|
||||
)
|
||||
user.groups.add(*groups)
|
||||
user.save()
|
||||
handle_social_account_updated(None, request, sociallogin)
|
||||
return user
|
||||
|
Loading…
x
Reference in New Issue
Block a user