Chore: reorganize api tests (#4935)

* Move permissions-related API tests

* Move bulk-edit-related API tests

* Move bulk-download-related API tests

* Move uisettings-related API tests

* Move remoteversion-related API tests

* Move tasks API tests

* Move object-related API tests

* Move consumption-template-related API tests

* Rename pared-down documents API test file

Co-Authored-By: Trenton H <797416+stumpylog@users.noreply.github.com>
This commit is contained in:
shamoon 2023-12-11 20:08:51 -08:00 committed by GitHub
parent 85f824f032
commit e2d25a7a09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 4997 additions and 4907 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,337 @@
import datetime
import io
import json
import os
import shutil
import zipfile
from django.contrib.auth.models import User
from django.test import override_settings
from django.utils import timezone
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.tests.utils import DirectoriesMixin
class TestBulkDownload(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/documents/bulk_download/"
def setUp(self):
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.doc1 = Document.objects.create(title="unrelated", checksum="A")
self.doc2 = Document.objects.create(
title="document A",
filename="docA.pdf",
mime_type="application/pdf",
checksum="B",
created=timezone.make_aware(datetime.datetime(2021, 1, 1)),
)
self.doc2b = Document.objects.create(
title="document A",
filename="docA2.pdf",
mime_type="application/pdf",
checksum="D",
created=timezone.make_aware(datetime.datetime(2021, 1, 1)),
)
self.doc3 = Document.objects.create(
title="document B",
filename="docB.jpg",
mime_type="image/jpeg",
checksum="C",
created=timezone.make_aware(datetime.datetime(2020, 3, 21)),
archive_filename="docB.pdf",
archive_checksum="D",
)
shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.pdf"),
self.doc2.source_path,
)
shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.png"),
self.doc2b.source_path,
)
shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "simple.jpg"),
self.doc3.source_path,
)
shutil.copy(
os.path.join(os.path.dirname(__file__), "samples", "test_with_bom.pdf"),
self.doc3.archive_path,
)
def test_download_originals(self):
response = self.client.post(
self.ENDPOINT,
json.dumps(
{"documents": [self.doc2.id, self.doc3.id], "content": "originals"},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 2)
self.assertIn("2021-01-01 document A.pdf", zipf.namelist())
self.assertIn("2020-03-21 document B.jpg", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(f.read(), zipf.read("2021-01-01 document A.pdf"))
with self.doc3.source_file as f:
self.assertEqual(f.read(), zipf.read("2020-03-21 document B.jpg"))
def test_download_default(self):
response = self.client.post(
self.ENDPOINT,
json.dumps({"documents": [self.doc2.id, self.doc3.id]}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 2)
self.assertIn("2021-01-01 document A.pdf", zipf.namelist())
self.assertIn("2020-03-21 document B.pdf", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(f.read(), zipf.read("2021-01-01 document A.pdf"))
with self.doc3.archive_file as f:
self.assertEqual(f.read(), zipf.read("2020-03-21 document B.pdf"))
def test_download_both(self):
response = self.client.post(
self.ENDPOINT,
json.dumps({"documents": [self.doc2.id, self.doc3.id], "content": "both"}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 3)
self.assertIn("originals/2021-01-01 document A.pdf", zipf.namelist())
self.assertIn("archive/2020-03-21 document B.pdf", zipf.namelist())
self.assertIn("originals/2020-03-21 document B.jpg", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(
f.read(),
zipf.read("originals/2021-01-01 document A.pdf"),
)
with self.doc3.archive_file as f:
self.assertEqual(
f.read(),
zipf.read("archive/2020-03-21 document B.pdf"),
)
with self.doc3.source_file as f:
self.assertEqual(
f.read(),
zipf.read("originals/2020-03-21 document B.jpg"),
)
def test_filename_clashes(self):
response = self.client.post(
self.ENDPOINT,
json.dumps({"documents": [self.doc2.id, self.doc2b.id]}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 2)
self.assertIn("2021-01-01 document A.pdf", zipf.namelist())
self.assertIn("2021-01-01 document A_01.pdf", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(f.read(), zipf.read("2021-01-01 document A.pdf"))
with self.doc2b.source_file as f:
self.assertEqual(f.read(), zipf.read("2021-01-01 document A_01.pdf"))
def test_compression(self):
self.client.post(
self.ENDPOINT,
json.dumps(
{"documents": [self.doc2.id, self.doc2b.id], "compression": "lzma"},
),
content_type="application/json",
)
@override_settings(FILENAME_FORMAT="{correspondent}/{title}")
def test_formatted_download_originals(self):
"""
GIVEN:
- Defined file naming format
WHEN:
- Bulk download request for original documents
- Bulk download request requests to follow format
THEN:
- Files in resulting zipfile are formatted
"""
c = Correspondent.objects.create(name="test")
c2 = Correspondent.objects.create(name="a space name")
self.doc2.correspondent = c
self.doc2.title = "This is Doc 2"
self.doc2.save()
self.doc3.correspondent = c2
self.doc3.title = "Title 2 - Doc 3"
self.doc3.save()
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"content": "originals",
"follow_formatting": True,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 2)
self.assertIn("a space name/Title 2 - Doc 3.jpg", zipf.namelist())
self.assertIn("test/This is Doc 2.pdf", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(f.read(), zipf.read("test/This is Doc 2.pdf"))
with self.doc3.source_file as f:
self.assertEqual(
f.read(),
zipf.read("a space name/Title 2 - Doc 3.jpg"),
)
@override_settings(FILENAME_FORMAT="somewhere/{title}")
def test_formatted_download_archive(self):
"""
GIVEN:
- Defined file naming format
WHEN:
- Bulk download request for archive documents
- Bulk download request requests to follow format
THEN:
- Files in resulting zipfile are formatted
"""
self.doc2.title = "This is Doc 2"
self.doc2.save()
self.doc3.title = "Title 2 - Doc 3"
self.doc3.save()
print(self.doc3.archive_path)
print(self.doc3.archive_filename)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"follow_formatting": True,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 2)
self.assertIn("somewhere/This is Doc 2.pdf", zipf.namelist())
self.assertIn("somewhere/Title 2 - Doc 3.pdf", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(f.read(), zipf.read("somewhere/This is Doc 2.pdf"))
with self.doc3.archive_file as f:
self.assertEqual(f.read(), zipf.read("somewhere/Title 2 - Doc 3.pdf"))
@override_settings(FILENAME_FORMAT="{document_type}/{title}")
def test_formatted_download_both(self):
"""
GIVEN:
- Defined file naming format
WHEN:
- Bulk download request for original documents and archive documents
- Bulk download request requests to follow format
THEN:
- Files defined in resulting zipfile are formatted
"""
dc1 = DocumentType.objects.create(name="bill")
dc2 = DocumentType.objects.create(name="statement")
self.doc2.document_type = dc1
self.doc2.title = "This is Doc 2"
self.doc2.save()
self.doc3.document_type = dc2
self.doc3.title = "Title 2 - Doc 3"
self.doc3.save()
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"content": "both",
"follow_formatting": True,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response["Content-Type"], "application/zip")
with zipfile.ZipFile(io.BytesIO(response.content)) as zipf:
self.assertEqual(len(zipf.filelist), 3)
self.assertIn("originals/bill/This is Doc 2.pdf", zipf.namelist())
self.assertIn("archive/statement/Title 2 - Doc 3.pdf", zipf.namelist())
self.assertIn("originals/statement/Title 2 - Doc 3.jpg", zipf.namelist())
with self.doc2.source_file as f:
self.assertEqual(
f.read(),
zipf.read("originals/bill/This is Doc 2.pdf"),
)
with self.doc3.archive_file as f:
self.assertEqual(
f.read(),
zipf.read("archive/statement/Title 2 - Doc 3.pdf"),
)
with self.doc3.source_file as f:
self.assertEqual(
f.read(),
zipf.read("originals/statement/Title 2 - Doc 3.jpg"),
)

View File

@ -0,0 +1,870 @@
import json
from unittest import mock
from django.contrib.auth.models import User
from guardian.shortcuts import assign_perm
from rest_framework import status
from rest_framework.test import APITestCase
from documents import bulk_edit
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
class TestBulkEdit(DirectoriesMixin, APITestCase):
def setUp(self):
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
patcher = mock.patch("documents.bulk_edit.bulk_update_documents.delay")
self.async_task = patcher.start()
self.addCleanup(patcher.stop)
self.c1 = Correspondent.objects.create(name="c1")
self.c2 = Correspondent.objects.create(name="c2")
self.dt1 = DocumentType.objects.create(name="dt1")
self.dt2 = DocumentType.objects.create(name="dt2")
self.t1 = Tag.objects.create(name="t1")
self.t2 = Tag.objects.create(name="t2")
self.doc1 = Document.objects.create(checksum="A", title="A")
self.doc2 = Document.objects.create(
checksum="B",
title="B",
correspondent=self.c1,
document_type=self.dt1,
)
self.doc3 = Document.objects.create(
checksum="C",
title="C",
correspondent=self.c2,
document_type=self.dt2,
)
self.doc4 = Document.objects.create(checksum="D", title="D")
self.doc5 = Document.objects.create(checksum="E", title="E")
self.doc2.tags.add(self.t1)
self.doc3.tags.add(self.t2)
self.doc4.tags.add(self.t1, self.t2)
self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}")
def test_set_correspondent(self):
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
bulk_edit.set_correspondent(
[self.doc1.id, self.doc2.id, self.doc3.id],
self.c2.id,
)
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 3)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
def test_unset_correspondent(self):
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 1)
bulk_edit.set_correspondent([self.doc1.id, self.doc2.id, self.doc3.id], None)
self.assertEqual(Document.objects.filter(correspondent=self.c2).count(), 0)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
def test_set_document_type(self):
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
bulk_edit.set_document_type(
[self.doc1.id, self.doc2.id, self.doc3.id],
self.dt2.id,
)
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 3)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc2.id])
def test_unset_document_type(self):
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 1)
bulk_edit.set_document_type([self.doc1.id, self.doc2.id, self.doc3.id], None)
self.assertEqual(Document.objects.filter(document_type=self.dt2).count(), 0)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
def test_set_document_storage_path(self):
"""
GIVEN:
- 5 documents without defined storage path
WHEN:
- Bulk edit called to add storage path to 1 document
THEN:
- Single document storage path update
"""
self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
bulk_edit.set_storage_path(
[self.doc1.id],
self.sp1.id,
)
self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
def test_unset_document_storage_path(self):
"""
GIVEN:
- 4 documents without defined storage path
- 1 document with a defined storage
WHEN:
- Bulk edit called to remove storage path from 1 document
THEN:
- Single document storage path removed
"""
self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
bulk_edit.set_storage_path(
[self.doc1.id],
self.sp1.id,
)
self.assertEqual(Document.objects.filter(storage_path=None).count(), 4)
bulk_edit.set_storage_path(
[self.doc1.id],
None,
)
self.assertEqual(Document.objects.filter(storage_path=None).count(), 5)
self.async_task.assert_called()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id])
def test_add_tag(self):
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
bulk_edit.add_tag(
[self.doc1.id, self.doc2.id, self.doc3.id, self.doc4.id],
self.t1.id,
)
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 4)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc1.id, self.doc3.id])
def test_remove_tag(self):
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 2)
bulk_edit.remove_tag([self.doc1.id, self.doc3.id, self.doc4.id], self.t1.id)
self.assertEqual(Document.objects.filter(tags__id=self.t1.id).count(), 1)
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
self.assertCountEqual(kwargs["document_ids"], [self.doc4.id])
def test_modify_tags(self):
tag_unrelated = Tag.objects.create(name="unrelated")
self.doc2.tags.add(tag_unrelated)
self.doc3.tags.add(tag_unrelated)
bulk_edit.modify_tags(
[self.doc2.id, self.doc3.id],
add_tags=[self.t2.id],
remove_tags=[self.t1.id],
)
self.assertCountEqual(list(self.doc2.tags.all()), [self.t2, tag_unrelated])
self.assertCountEqual(list(self.doc3.tags.all()), [self.t2, tag_unrelated])
self.async_task.assert_called_once()
args, kwargs = self.async_task.call_args
# TODO: doc3 should not be affected, but the query for that is rather complicated
self.assertCountEqual(kwargs["document_ids"], [self.doc2.id, self.doc3.id])
def test_delete(self):
self.assertEqual(Document.objects.count(), 5)
bulk_edit.delete([self.doc1.id, self.doc2.id])
self.assertEqual(Document.objects.count(), 3)
self.assertCountEqual(
[doc.id for doc in Document.objects.all()],
[self.doc3.id, self.doc4.id, self.doc5.id],
)
@mock.patch("documents.serialisers.bulk_edit.set_correspondent")
def test_api_set_correspondent(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_correspondent",
"parameters": {"correspondent": self.c1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertEqual(kwargs["correspondent"], self.c1.id)
@mock.patch("documents.serialisers.bulk_edit.set_correspondent")
def test_api_unset_correspondent(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_correspondent",
"parameters": {"correspondent": None},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertIsNone(kwargs["correspondent"])
@mock.patch("documents.serialisers.bulk_edit.set_document_type")
def test_api_set_type(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_document_type",
"parameters": {"document_type": self.dt1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertEqual(kwargs["document_type"], self.dt1.id)
@mock.patch("documents.serialisers.bulk_edit.set_document_type")
def test_api_unset_type(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_document_type",
"parameters": {"document_type": None},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertIsNone(kwargs["document_type"])
@mock.patch("documents.serialisers.bulk_edit.add_tag")
def test_api_add_tag(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "add_tag",
"parameters": {"tag": self.t1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertEqual(kwargs["tag"], self.t1.id)
@mock.patch("documents.serialisers.bulk_edit.remove_tag")
def test_api_remove_tag(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "remove_tag",
"parameters": {"tag": self.t1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertEqual(kwargs["tag"], self.t1.id)
@mock.patch("documents.serialisers.bulk_edit.modify_tags")
def test_api_modify_tags(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id, self.doc3.id],
"method": "modify_tags",
"parameters": {
"add_tags": [self.t1.id],
"remove_tags": [self.t2.id],
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertListEqual(args[0], [self.doc1.id, self.doc3.id])
self.assertEqual(kwargs["add_tags"], [self.t1.id])
self.assertEqual(kwargs["remove_tags"], [self.t2.id])
@mock.patch("documents.serialisers.bulk_edit.modify_tags")
def test_api_modify_tags_not_provided(self, m):
"""
GIVEN:
- API data to modify tags is missing modify_tags field
WHEN:
- API to edit tags is called
THEN:
- API returns HTTP 400
- modify_tags is not called
"""
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id, self.doc3.id],
"method": "modify_tags",
"parameters": {
"add_tags": [self.t1.id],
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
m.assert_not_called()
@mock.patch("documents.serialisers.bulk_edit.delete")
def test_api_delete(self, m):
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{"documents": [self.doc1.id], "method": "delete", "parameters": {}},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertEqual(args[0], [self.doc1.id])
self.assertEqual(len(kwargs), 0)
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
def test_api_set_storage_path(self, m):
"""
GIVEN:
- API data to set the storage path of a document
WHEN:
- API is called
THEN:
- set_storage_path is called with correct document IDs and storage_path ID
"""
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_storage_path",
"parameters": {"storage_path": self.sp1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertListEqual(args[0], [self.doc1.id])
self.assertEqual(kwargs["storage_path"], self.sp1.id)
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
def test_api_unset_storage_path(self, m):
"""
GIVEN:
- API data to clear/unset the storage path of a document
WHEN:
- API is called
THEN:
- set_storage_path is called with correct document IDs and None storage_path
"""
m.return_value = "OK"
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_storage_path",
"parameters": {"storage_path": None},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertListEqual(args[0], [self.doc1.id])
self.assertEqual(kwargs["storage_path"], None)
def test_api_invalid_storage_path(self):
"""
GIVEN:
- API data to set the storage path of a document
- Given storage_path ID isn't valid
WHEN:
- API is called
THEN:
- set_storage_path is called with correct document IDs and storage_path ID
"""
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_storage_path",
"parameters": {"storage_path": self.sp1.id + 10},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.async_task.assert_not_called()
def test_api_set_storage_path_not_provided(self):
"""
GIVEN:
- API data to set the storage path of a document
- API data is missing storage path ID
WHEN:
- API is called
THEN:
- set_storage_path is called with correct document IDs and storage_path ID
"""
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id],
"method": "set_storage_path",
"parameters": {},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.async_task.assert_not_called()
def test_api_invalid_doc(self):
self.assertEqual(Document.objects.count(), 5)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps({"documents": [-235], "method": "delete", "parameters": {}}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(Document.objects.count(), 5)
def test_api_invalid_method(self):
self.assertEqual(Document.objects.count(), 5)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "exterminate",
"parameters": {},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(Document.objects.count(), 5)
def test_api_invalid_correspondent(self):
self.assertEqual(self.doc2.correspondent, self.c1)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "set_correspondent",
"parameters": {"correspondent": 345657},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
doc2 = Document.objects.get(id=self.doc2.id)
self.assertEqual(doc2.correspondent, self.c1)
def test_api_no_correspondent(self):
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "set_correspondent",
"parameters": {},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_api_invalid_document_type(self):
self.assertEqual(self.doc2.document_type, self.dt1)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "set_document_type",
"parameters": {"document_type": 345657},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
doc2 = Document.objects.get(id=self.doc2.id)
self.assertEqual(doc2.document_type, self.dt1)
def test_api_no_document_type(self):
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "set_document_type",
"parameters": {},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_api_add_invalid_tag(self):
self.assertEqual(list(self.doc2.tags.all()), [self.t1])
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "add_tag",
"parameters": {"tag": 345657},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(list(self.doc2.tags.all()), [self.t1])
def test_api_add_tag_no_tag(self):
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{"documents": [self.doc2.id], "method": "add_tag", "parameters": {}},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_api_delete_invalid_tag(self):
self.assertEqual(list(self.doc2.tags.all()), [self.t1])
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "remove_tag",
"parameters": {"tag": 345657},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(list(self.doc2.tags.all()), [self.t1])
def test_api_delete_tag_no_tag(self):
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{"documents": [self.doc2.id], "method": "remove_tag", "parameters": {}},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_api_modify_invalid_tags(self):
self.assertEqual(list(self.doc2.tags.all()), [self.t1])
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "modify_tags",
"parameters": {
"add_tags": [self.t2.id, 1657],
"remove_tags": [1123123],
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_api_modify_tags_no_tags(self):
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "modify_tags",
"parameters": {"remove_tags": [1123123]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id],
"method": "modify_tags",
"parameters": {"add_tags": [self.t2.id, 1657]},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_api_selection_data_empty(self):
response = self.client.post(
"/api/documents/selection_data/",
json.dumps({"documents": []}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
for field, Entity in [
("selected_correspondents", Correspondent),
("selected_tags", Tag),
("selected_document_types", DocumentType),
]:
self.assertEqual(len(response.data[field]), Entity.objects.count())
for correspondent in response.data[field]:
self.assertEqual(correspondent["document_count"], 0)
self.assertCountEqual(
map(lambda c: c["id"], response.data[field]),
map(lambda c: c["id"], Entity.objects.values("id")),
)
def test_api_selection_data(self):
response = self.client.post(
"/api/documents/selection_data/",
json.dumps(
{"documents": [self.doc1.id, self.doc2.id, self.doc4.id, self.doc5.id]},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertCountEqual(
response.data["selected_correspondents"],
[
{"id": self.c1.id, "document_count": 1},
{"id": self.c2.id, "document_count": 0},
],
)
self.assertCountEqual(
response.data["selected_tags"],
[
{"id": self.t1.id, "document_count": 2},
{"id": self.t2.id, "document_count": 1},
],
)
self.assertCountEqual(
response.data["selected_document_types"],
[
{"id": self.c1.id, "document_count": 1},
{"id": self.c2.id, "document_count": 0},
],
)
@mock.patch("documents.serialisers.bulk_edit.set_permissions")
def test_set_permissions(self, m):
m.return_value = "OK"
user1 = User.objects.create(username="user1")
user2 = User.objects.create(username="user2")
permissions = {
"view": {
"users": [user1.id, user2.id],
"groups": None,
},
"change": {
"users": [user1.id],
"groups": None,
},
}
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"method": "set_permissions",
"parameters": {"set_permissions": permissions},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
args, kwargs = m.call_args
self.assertCountEqual(args[0], [self.doc2.id, self.doc3.id])
self.assertEqual(len(kwargs["set_permissions"]["view"]["users"]), 2)
@mock.patch("documents.serialisers.bulk_edit.set_permissions")
def test_insufficient_permissions_ownership(self, m):
"""
GIVEN:
- Documents owned by user other than logged in user
WHEN:
- set_permissions bulk edit API endpoint is called
THEN:
- User is not able to change permissions
"""
m.return_value = "OK"
self.doc1.owner = User.objects.get(username="temp_admin")
self.doc1.save()
user1 = User.objects.create(username="user1")
self.client.force_authenticate(user=user1)
permissions = {
"owner": user1.id,
}
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id, self.doc2.id, self.doc3.id],
"method": "set_permissions",
"parameters": {"set_permissions": permissions},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
m.assert_not_called()
self.assertEqual(response.content, b"Insufficient permissions")
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc2.id, self.doc3.id],
"method": "set_permissions",
"parameters": {"set_permissions": permissions},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()
@mock.patch("documents.serialisers.bulk_edit.set_storage_path")
def test_insufficient_permissions_edit(self, m):
"""
GIVEN:
- Documents for which current user only has view permissions
WHEN:
- API is called
THEN:
- set_storage_path only called if user can edit all docs
"""
m.return_value = "OK"
self.doc1.owner = User.objects.get(username="temp_admin")
self.doc1.save()
user1 = User.objects.create(username="user1")
assign_perm("view_document", user1, self.doc1)
self.client.force_authenticate(user=user1)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id, self.doc2.id, self.doc3.id],
"method": "set_storage_path",
"parameters": {"storage_path": self.sp1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
m.assert_not_called()
self.assertEqual(response.content, b"Insufficient permissions")
assign_perm("change_document", user1, self.doc1)
response = self.client.post(
"/api/documents/bulk_edit/",
json.dumps(
{
"documents": [self.doc1.id, self.doc2.id, self.doc3.id],
"method": "set_storage_path",
"parameters": {"storage_path": self.sp1.id},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
m.assert_called_once()

View File

@ -0,0 +1,236 @@
import json
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from rest_framework import status
from rest_framework.test import APITestCase
from documents.data_models import DocumentSource
from documents.models import ConsumptionTemplate
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
class TestApiConsumptionTemplates(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/consumption_templates/"
def setUp(self) -> None:
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.user2 = User.objects.create(username="user2")
self.user3 = User.objects.create(username="user3")
self.group1 = Group.objects.create(name="group1")
self.c = Correspondent.objects.create(name="Correspondent Name")
self.c2 = Correspondent.objects.create(name="Correspondent Name 2")
self.dt = DocumentType.objects.create(name="DocType Name")
self.t1 = Tag.objects.create(name="t1")
self.t2 = Tag.objects.create(name="t2")
self.t3 = Tag.objects.create(name="t3")
self.sp = StoragePath.objects.create(path="/test/")
self.cf1 = CustomField.objects.create(name="Custom Field 1", data_type="string")
self.cf2 = CustomField.objects.create(
name="Custom Field 2",
data_type="integer",
)
self.ct = ConsumptionTemplate.objects.create(
name="Template 1",
order=0,
sources=f"{int(DocumentSource.ApiUpload)},{int(DocumentSource.ConsumeFolder)},{int(DocumentSource.MailFetch)}",
filter_filename="*simple*",
filter_path="*/samples/*",
assign_title="Doc from {correspondent}",
assign_correspondent=self.c,
assign_document_type=self.dt,
assign_storage_path=self.sp,
assign_owner=self.user2,
)
self.ct.assign_tags.add(self.t1)
self.ct.assign_tags.add(self.t2)
self.ct.assign_tags.add(self.t3)
self.ct.assign_view_users.add(self.user3.pk)
self.ct.assign_view_groups.add(self.group1.pk)
self.ct.assign_change_users.add(self.user3.pk)
self.ct.assign_change_groups.add(self.group1.pk)
self.ct.assign_custom_fields.add(self.cf1.pk)
self.ct.assign_custom_fields.add(self.cf2.pk)
self.ct.save()
def test_api_get_consumption_template(self):
"""
GIVEN:
- API request to get all consumption template
WHEN:
- API is called
THEN:
- Existing consumption templates are returned
"""
response = self.client.get(self.ENDPOINT, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
resp_consumption_template = response.data["results"][0]
self.assertEqual(resp_consumption_template["id"], self.ct.id)
self.assertEqual(
resp_consumption_template["assign_correspondent"],
self.ct.assign_correspondent.pk,
)
def test_api_create_consumption_template(self):
"""
GIVEN:
- API request to create a consumption template
WHEN:
- API is called
THEN:
- Correct HTTP response
- New template is created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Template 2",
"order": 1,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*test*",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(ConsumptionTemplate.objects.count(), 2)
def test_api_create_invalid_consumption_template(self):
"""
GIVEN:
- API request to create a consumption template
- Neither file name nor path filter are specified
WHEN:
- API is called
THEN:
- Correct HTTP 400 response
- No template is created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Template 2",
"order": 1,
"sources": [DocumentSource.ApiUpload],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(ConsumptionTemplate.objects.count(), 1)
def test_api_create_consumption_template_empty_fields(self):
"""
GIVEN:
- API request to create a consumption template
- Path or filename filter or assign title are empty string
WHEN:
- API is called
THEN:
- Template is created but filter or title assignment is not set if ""
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Template 2",
"order": 1,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "*test*",
"filter_path": "",
"assign_title": "",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
ct = ConsumptionTemplate.objects.get(name="Template 2")
self.assertEqual(ct.filter_filename, "*test*")
self.assertIsNone(ct.filter_path)
self.assertIsNone(ct.assign_title)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Template 3",
"order": 1,
"sources": [DocumentSource.ApiUpload],
"filter_filename": "",
"filter_path": "*/test/*",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
ct2 = ConsumptionTemplate.objects.get(name="Template 3")
self.assertEqual(ct2.filter_path, "*/test/*")
self.assertIsNone(ct2.filter_filename)
def test_api_create_consumption_template_with_mailrule(self):
"""
GIVEN:
- API request to create a consumption template with a mail rule but no MailFetch source
WHEN:
- API is called
THEN:
- New template is created with MailFetch as source
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_to="someone@somewhere.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename_include="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Template 2",
"order": 1,
"sources": [DocumentSource.ApiUpload],
"filter_mailrule": rule1.pk,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(ConsumptionTemplate.objects.count(), 2)
ct = ConsumptionTemplate.objects.get(name="Template 2")
self.assertEqual(ct.sources, [int(DocumentSource.MailFetch).__str__()])

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,224 @@
import json
from unittest import mock
from django.contrib.auth.models import User
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import StoragePath
from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
class TestApiObjects(DirectoriesMixin, APITestCase):
def setUp(self) -> None:
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.tag1 = Tag.objects.create(name="t1", is_inbox_tag=True)
self.tag2 = Tag.objects.create(name="t2")
self.tag3 = Tag.objects.create(name="t3")
self.c1 = Correspondent.objects.create(name="c1")
self.c2 = Correspondent.objects.create(name="c2")
self.c3 = Correspondent.objects.create(name="c3")
self.dt1 = DocumentType.objects.create(name="dt1")
self.dt2 = DocumentType.objects.create(name="dt2")
self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{title}")
self.sp2 = StoragePath.objects.create(name="sp2", path="Something2/{title}")
def test_object_filters(self):
response = self.client.get(
f"/api/tags/?id={self.tag2.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 1)
response = self.client.get(
f"/api/tags/?id__in={self.tag1.id},{self.tag3.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 2)
response = self.client.get(
f"/api/correspondents/?id={self.c2.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 1)
response = self.client.get(
f"/api/correspondents/?id__in={self.c1.id},{self.c3.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 2)
response = self.client.get(
f"/api/document_types/?id={self.dt1.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 1)
response = self.client.get(
f"/api/document_types/?id__in={self.dt1.id},{self.dt2.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 2)
response = self.client.get(
f"/api/storage_paths/?id={self.sp1.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 1)
response = self.client.get(
f"/api/storage_paths/?id__in={self.sp1.id},{self.sp2.id}",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
results = response.data["results"]
self.assertEqual(len(results), 2)
class TestApiStoragePaths(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/storage_paths/"
def setUp(self) -> None:
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.sp1 = StoragePath.objects.create(name="sp1", path="Something/{checksum}")
def test_api_get_storage_path(self):
"""
GIVEN:
- API request to get all storage paths
WHEN:
- API is called
THEN:
- Existing storage paths are returned
"""
response = self.client.get(self.ENDPOINT, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
resp_storage_path = response.data["results"][0]
self.assertEqual(resp_storage_path["id"], self.sp1.id)
self.assertEqual(resp_storage_path["path"], self.sp1.path)
def test_api_create_storage_path(self):
"""
GIVEN:
- API request to create a storage paths
WHEN:
- API is called
THEN:
- Correct HTTP response
- New storage path is created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "A storage path",
"path": "Somewhere/{asn}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(StoragePath.objects.count(), 2)
def test_api_create_invalid_storage_path(self):
"""
GIVEN:
- API request to create a storage paths
- Storage path format is incorrect
WHEN:
- API is called
THEN:
- Correct HTTP 400 response
- No storage path is created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Another storage path",
"path": "Somewhere/{correspdent}",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(StoragePath.objects.count(), 1)
def test_api_storage_path_placeholders(self):
"""
GIVEN:
- API request to create a storage path with placeholders
- Storage path is valid
WHEN:
- API is called
THEN:
- Correct HTTP response
- New storage path is created
"""
response = self.client.post(
self.ENDPOINT,
json.dumps(
{
"name": "Storage path with placeholders",
"path": "{title}/{correspondent}/{document_type}/{created}/{created_year}"
"/{created_year_short}/{created_month}/{created_month_name}"
"/{created_month_name_short}/{created_day}/{added}/{added_year}"
"/{added_year_short}/{added_month}/{added_month_name}"
"/{added_month_name_short}/{added_day}/{asn}/{tags}"
"/{tag_list}/{owner_username}/{original_name}/{doc_pk}/",
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(StoragePath.objects.count(), 2)
@mock.patch("documents.bulk_edit.bulk_update_documents.delay")
def test_api_update_storage_path(self, bulk_update_mock):
"""
GIVEN:
- API request to get all storage paths
WHEN:
- API is called
THEN:
- Existing storage paths are returned
"""
document = Document.objects.create(
mime_type="application/pdf",
storage_path=self.sp1,
)
response = self.client.patch(
f"{self.ENDPOINT}{self.sp1.pk}/",
data={
"path": "somewhere/{created} - {title}",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
bulk_update_mock.assert_called_once()
args, _ = bulk_update_mock.call_args
self.assertCountEqual([document.pk], args[0])

View File

@ -0,0 +1,910 @@
import json
from django.contrib.auth.models import Group
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_perms
from guardian.shortcuts import get_users_with_perms
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import Correspondent
from documents.models import Document
from documents.models import DocumentType
from documents.models import MatchingModel
from documents.models import StoragePath
from documents.models import Tag
from documents.tests.utils import DirectoriesMixin
class TestApiAuth(DirectoriesMixin, APITestCase):
def test_auth_required(self):
d = Document.objects.create(title="Test")
self.assertEqual(
self.client.get("/api/documents/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get(f"/api/documents/{d.id}/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get(f"/api/documents/{d.id}/download/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get(f"/api/documents/{d.id}/preview/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get(f"/api/documents/{d.id}/thumb/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/tags/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/correspondents/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/document_types/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/logs/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/saved_views/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/search/autocomplete/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/documents/bulk_edit/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/documents/bulk_download/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
self.assertEqual(
self.client.get("/api/documents/selection_data/").status_code,
status.HTTP_401_UNAUTHORIZED,
)
def test_api_version_no_auth(self):
response = self.client.get("/api/")
self.assertNotIn("X-Api-Version", response)
self.assertNotIn("X-Version", response)
def test_api_version_with_auth(self):
user = User.objects.create_superuser(username="test")
self.client.force_authenticate(user)
response = self.client.get("/api/")
self.assertIn("X-Api-Version", response)
self.assertIn("X-Version", response)
def test_api_insufficient_permissions(self):
user = User.objects.create_user(username="test")
self.client.force_authenticate(user)
Document.objects.create(title="Test")
self.assertEqual(
self.client.get("/api/documents/").status_code,
status.HTTP_403_FORBIDDEN,
)
self.assertEqual(
self.client.get("/api/tags/").status_code,
status.HTTP_403_FORBIDDEN,
)
self.assertEqual(
self.client.get("/api/correspondents/").status_code,
status.HTTP_403_FORBIDDEN,
)
self.assertEqual(
self.client.get("/api/document_types/").status_code,
status.HTTP_403_FORBIDDEN,
)
self.assertEqual(
self.client.get("/api/logs/").status_code,
status.HTTP_403_FORBIDDEN,
)
self.assertEqual(
self.client.get("/api/saved_views/").status_code,
status.HTTP_403_FORBIDDEN,
)
def test_api_sufficient_permissions(self):
user = User.objects.create_user(username="test")
user.user_permissions.add(*Permission.objects.all())
self.client.force_authenticate(user)
Document.objects.create(title="Test")
self.assertEqual(
self.client.get("/api/documents/").status_code,
status.HTTP_200_OK,
)
self.assertEqual(self.client.get("/api/tags/").status_code, status.HTTP_200_OK)
self.assertEqual(
self.client.get("/api/correspondents/").status_code,
status.HTTP_200_OK,
)
self.assertEqual(
self.client.get("/api/document_types/").status_code,
status.HTTP_200_OK,
)
self.assertEqual(self.client.get("/api/logs/").status_code, status.HTTP_200_OK)
self.assertEqual(
self.client.get("/api/saved_views/").status_code,
status.HTTP_200_OK,
)
def test_api_get_object_permissions(self):
user1 = User.objects.create_user(username="test1")
user2 = User.objects.create_user(username="test2")
user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
self.client.force_authenticate(user1)
self.assertEqual(
self.client.get("/api/documents/").status_code,
status.HTTP_200_OK,
)
d = Document.objects.create(title="Test", content="the content 1", checksum="1")
# no owner
self.assertEqual(
self.client.get(f"/api/documents/{d.id}/").status_code,
status.HTTP_200_OK,
)
d2 = Document.objects.create(
title="Test 2",
content="the content 2",
checksum="2",
owner=user2,
)
self.assertEqual(
self.client.get(f"/api/documents/{d2.id}/").status_code,
status.HTTP_404_NOT_FOUND,
)
def test_api_default_owner(self):
"""
GIVEN:
- API request to create an object (Tag)
WHEN:
- owner is not set at all
THEN:
- Object created with current user as owner
"""
user1 = User.objects.create_superuser(username="user1")
self.client.force_authenticate(user1)
response = self.client.post(
"/api/tags/",
json.dumps(
{
"name": "test1",
"matching_algorithm": MatchingModel.MATCH_AUTO,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
tag1 = Tag.objects.filter(name="test1").first()
self.assertEqual(tag1.owner, user1)
def test_api_set_no_owner(self):
"""
GIVEN:
- API request to create an object (Tag)
WHEN:
- owner is passed as None
THEN:
- Object created with no owner
"""
user1 = User.objects.create_superuser(username="user1")
self.client.force_authenticate(user1)
response = self.client.post(
"/api/tags/",
json.dumps(
{
"name": "test1",
"matching_algorithm": MatchingModel.MATCH_AUTO,
"owner": None,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
tag1 = Tag.objects.filter(name="test1").first()
self.assertEqual(tag1.owner, None)
def test_api_set_owner_w_permissions(self):
"""
GIVEN:
- API request to create an object (Tag) that supplies set_permissions object
WHEN:
- owner is passed as user id
- view > users is set & view > groups is set
THEN:
- Object permissions are set appropriately
"""
user1 = User.objects.create_superuser(username="user1")
user2 = User.objects.create(username="user2")
group1 = Group.objects.create(name="group1")
self.client.force_authenticate(user1)
response = self.client.post(
"/api/tags/",
json.dumps(
{
"name": "test1",
"matching_algorithm": MatchingModel.MATCH_AUTO,
"owner": user1.id,
"set_permissions": {
"view": {
"users": [user2.id],
"groups": [group1.id],
},
"change": {
"users": None,
"groups": None,
},
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
tag1 = Tag.objects.filter(name="test1").first()
from guardian.core import ObjectPermissionChecker
checker = ObjectPermissionChecker(user2)
self.assertEqual(checker.has_perm("view_tag", tag1), True)
self.assertIn("view_tag", get_perms(group1, tag1))
def test_api_set_other_owner_w_permissions(self):
"""
GIVEN:
- API request to create an object (Tag)
WHEN:
- a different owner than is logged in is set
- view > groups is set
THEN:
- Object permissions are set appropriately
"""
user1 = User.objects.create_superuser(username="user1")
user2 = User.objects.create(username="user2")
group1 = Group.objects.create(name="group1")
self.client.force_authenticate(user1)
response = self.client.post(
"/api/tags/",
json.dumps(
{
"name": "test1",
"matching_algorithm": MatchingModel.MATCH_AUTO,
"owner": user2.id,
"set_permissions": {
"view": {
"users": None,
"groups": [group1.id],
},
"change": {
"users": None,
"groups": None,
},
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
tag1 = Tag.objects.filter(name="test1").first()
self.assertEqual(tag1.owner, user2)
self.assertIn("view_tag", get_perms(group1, tag1))
def test_api_set_doc_permissions(self):
"""
GIVEN:
- API request to update doc permissions and owner
WHEN:
- owner is set
- view > users is set & view > groups is set
THEN:
- Object permissions are set appropriately
"""
doc = Document.objects.create(
title="test",
mime_type="application/pdf",
content="this is a document",
)
user1 = User.objects.create_superuser(username="user1")
user2 = User.objects.create(username="user2")
group1 = Group.objects.create(name="group1")
self.client.force_authenticate(user1)
response = self.client.patch(
f"/api/documents/{doc.id}/",
json.dumps(
{
"owner": user1.id,
"set_permissions": {
"view": {
"users": [user2.id],
"groups": [group1.id],
},
"change": {
"users": None,
"groups": None,
},
},
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
doc = Document.objects.get(pk=doc.id)
self.assertEqual(doc.owner, user1)
from guardian.core import ObjectPermissionChecker
checker = ObjectPermissionChecker(user2)
self.assertTrue(checker.has_perm("view_document", doc))
self.assertIn("view_document", get_perms(group1, doc))
def test_dynamic_permissions_fields(self):
user1 = User.objects.create_user(username="user1")
user1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
user2 = User.objects.create_user(username="user2")
Document.objects.create(title="Test", content="content 1", checksum="1")
doc2 = Document.objects.create(
title="Test2",
content="content 2",
checksum="2",
owner=user2,
)
doc3 = Document.objects.create(
title="Test3",
content="content 3",
checksum="3",
owner=user2,
)
assign_perm("view_document", user1, doc2)
assign_perm("view_document", user1, doc3)
assign_perm("change_document", user1, doc3)
self.client.force_authenticate(user1)
response = self.client.get(
"/api/documents/",
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
resp_data = response.json()
self.assertNotIn("permissions", resp_data["results"][0])
self.assertIn("user_can_change", resp_data["results"][0])
self.assertEqual(resp_data["results"][0]["user_can_change"], True) # doc1
self.assertEqual(resp_data["results"][1]["user_can_change"], False) # doc2
self.assertEqual(resp_data["results"][2]["user_can_change"], True) # doc3
response = self.client.get(
"/api/documents/?full_perms=true",
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
resp_data = response.json()
self.assertIn("permissions", resp_data["results"][0])
self.assertNotIn("user_can_change", resp_data["results"][0])
class TestApiUser(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/users/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_users(self):
"""
GIVEN:
- Configured users
WHEN:
- API call is made to get users
THEN:
- Configured users are provided
"""
user1 = User.objects.create(
username="testuser",
password="test",
first_name="Test",
last_name="User",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 2)
returned_user2 = response.data["results"][1]
self.assertEqual(returned_user2["username"], user1.username)
self.assertEqual(returned_user2["password"], "**********")
self.assertEqual(returned_user2["first_name"], user1.first_name)
self.assertEqual(returned_user2["last_name"], user1.last_name)
def test_create_user(self):
"""
WHEN:
- API request is made to add a user account
THEN:
- A new user account is created
"""
user1 = {
"username": "testuser",
"password": "test",
"first_name": "Test",
"last_name": "User",
}
response = self.client.post(
self.ENDPOINT,
data=user1,
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
returned_user1 = User.objects.get(username="testuser")
self.assertEqual(returned_user1.username, user1["username"])
self.assertEqual(returned_user1.first_name, user1["first_name"])
self.assertEqual(returned_user1.last_name, user1["last_name"])
def test_delete_user(self):
"""
GIVEN:
- Existing user account
WHEN:
- API request is made to delete a user account
THEN:
- Account is deleted
"""
user1 = User.objects.create(
username="testuser",
password="test",
first_name="Test",
last_name="User",
)
nUsers = User.objects.count()
response = self.client.delete(
f"{self.ENDPOINT}{user1.pk}/",
)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertEqual(User.objects.count(), nUsers - 1)
def test_update_user(self):
"""
GIVEN:
- Existing user accounts
WHEN:
- API request is made to update user account
THEN:
- The user account is updated, password only updated if not '****'
"""
user1 = User.objects.create(
username="testuser",
password="test",
first_name="Test",
last_name="User",
)
initial_password = user1.password
response = self.client.patch(
f"{self.ENDPOINT}{user1.pk}/",
data={
"first_name": "Updated Name 1",
"password": "******",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
returned_user1 = User.objects.get(pk=user1.pk)
self.assertEqual(returned_user1.first_name, "Updated Name 1")
self.assertEqual(returned_user1.password, initial_password)
response = self.client.patch(
f"{self.ENDPOINT}{user1.pk}/",
data={
"first_name": "Updated Name 2",
"password": "123xyz",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
returned_user2 = User.objects.get(pk=user1.pk)
self.assertEqual(returned_user2.first_name, "Updated Name 2")
self.assertNotEqual(returned_user2.password, initial_password)
class TestApiGroup(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/groups/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_groups(self):
"""
GIVEN:
- Configured groups
WHEN:
- API call is made to get groups
THEN:
- Configured groups are provided
"""
group1 = Group.objects.create(
name="Test Group",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data["count"], 1)
returned_group1 = response.data["results"][0]
self.assertEqual(returned_group1["name"], group1.name)
def test_create_group(self):
"""
WHEN:
- API request is made to add a group
THEN:
- A new group is created
"""
group1 = {
"name": "Test Group",
}
response = self.client.post(
self.ENDPOINT,
data=group1,
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
returned_group1 = Group.objects.get(name="Test Group")
self.assertEqual(returned_group1.name, group1["name"])
def test_delete_group(self):
"""
GIVEN:
- Existing group
WHEN:
- API request is made to delete a group
THEN:
- Group is deleted
"""
group1 = Group.objects.create(
name="Test Group",
)
response = self.client.delete(
f"{self.ENDPOINT}{group1.pk}/",
)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertEqual(len(Group.objects.all()), 0)
def test_update_group(self):
"""
GIVEN:
- Existing groups
WHEN:
- API request is made to update group
THEN:
- The group is updated
"""
group1 = Group.objects.create(
name="Test Group",
)
response = self.client.patch(
f"{self.ENDPOINT}{group1.pk}/",
data={
"name": "Updated Name 1",
},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
returned_group1 = Group.objects.get(pk=group1.pk)
self.assertEqual(returned_group1.name, "Updated Name 1")
class TestBulkEditObjectPermissions(APITestCase):
def setUp(self):
super().setUp()
user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=user)
self.t1 = Tag.objects.create(name="t1")
self.t2 = Tag.objects.create(name="t2")
self.c1 = Correspondent.objects.create(name="c1")
self.dt1 = DocumentType.objects.create(name="dt1")
self.sp1 = StoragePath.objects.create(name="sp1")
self.user1 = User.objects.create(username="user1")
self.user2 = User.objects.create(username="user2")
self.user3 = User.objects.create(username="user3")
def test_bulk_object_set_permissions(self):
"""
GIVEN:
- Existing objects
WHEN:
- bulk_edit_object_perms API endpoint is called
THEN:
- Permissions and / or owner are changed
"""
permissions = {
"view": {
"users": [self.user1.id, self.user2.id],
"groups": [],
},
"change": {
"users": [self.user1.id],
"groups": [],
},
}
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.t1.id, self.t2.id],
"object_type": "tags",
"permissions": permissions,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user1, get_users_with_perms(self.t1))
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.c1.id],
"object_type": "correspondents",
"permissions": permissions,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user1, get_users_with_perms(self.c1))
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.dt1.id],
"object_type": "document_types",
"permissions": permissions,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user1, get_users_with_perms(self.dt1))
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.sp1.id],
"object_type": "storage_paths",
"permissions": permissions,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn(self.user1, get_users_with_perms(self.sp1))
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.t1.id, self.t2.id],
"object_type": "tags",
"owner": self.user3.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(Tag.objects.get(pk=self.t2.id).owner, self.user3)
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.sp1.id],
"object_type": "storage_paths",
"owner": self.user3.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(StoragePath.objects.get(pk=self.sp1.id).owner, self.user3)
def test_bulk_edit_object_permissions_insufficient_perms(self):
"""
GIVEN:
- Objects owned by user other than logged in user
WHEN:
- bulk_edit_object_perms API endpoint is called
THEN:
- User is not able to change permissions
"""
self.t1.owner = User.objects.get(username="temp_admin")
self.t1.save()
self.client.force_authenticate(user=self.user1)
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.t1.id, self.t2.id],
"object_type": "tags",
"owner": self.user1.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(response.content, b"Insufficient permissions")
def test_bulk_edit_object_permissions_validation(self):
"""
GIVEN:
- Existing objects
WHEN:
- bulk_edit_object_perms API endpoint is called with invalid params
THEN:
- Validation fails
"""
# not a list
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": self.t1.id,
"object_type": "tags",
"owner": self.user1.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# not a list of ints
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": ["one"],
"object_type": "tags",
"owner": self.user1.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# duplicates
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [self.t1.id, self.t2.id, self.t1.id],
"object_type": "tags",
"owner": self.user1.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
# not a valid object type
response = self.client.post(
"/api/bulk_edit_object_perms/",
json.dumps(
{
"objects": [1],
"object_type": "madeup",
"owner": self.user1.id,
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

View File

@ -0,0 +1,123 @@
import json
import urllib.request
from unittest import mock
from unittest.mock import MagicMock
from rest_framework import status
from rest_framework.test import APITestCase
from documents.tests.utils import DirectoriesMixin
from paperless import version
class TestApiRemoteVersion(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/remote_version/"
def setUp(self):
super().setUp()
@mock.patch("urllib.request.urlopen")
def test_remote_version_enabled_no_update_prefix(self, urlopen_mock):
cm = MagicMock()
cm.getcode.return_value = status.HTTP_200_OK
cm.read.return_value = json.dumps({"tag_name": "ngx-1.6.0"}).encode()
cm.__enter__.return_value = cm
urlopen_mock.return_value = cm
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"version": "1.6.0",
"update_available": False,
},
)
@mock.patch("urllib.request.urlopen")
def test_remote_version_enabled_no_update_no_prefix(self, urlopen_mock):
cm = MagicMock()
cm.getcode.return_value = status.HTTP_200_OK
cm.read.return_value = json.dumps(
{"tag_name": version.__full_version_str__},
).encode()
cm.__enter__.return_value = cm
urlopen_mock.return_value = cm
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"version": version.__full_version_str__,
"update_available": False,
},
)
@mock.patch("urllib.request.urlopen")
def test_remote_version_enabled_update(self, urlopen_mock):
new_version = (
version.__version__[0],
version.__version__[1],
version.__version__[2] + 1,
)
new_version_str = ".".join(map(str, new_version))
cm = MagicMock()
cm.getcode.return_value = status.HTTP_200_OK
cm.read.return_value = json.dumps(
{"tag_name": new_version_str},
).encode()
cm.__enter__.return_value = cm
urlopen_mock.return_value = cm
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"version": new_version_str,
"update_available": True,
},
)
@mock.patch("urllib.request.urlopen")
def test_remote_version_bad_json(self, urlopen_mock):
cm = MagicMock()
cm.getcode.return_value = status.HTTP_200_OK
cm.read.return_value = b'{ "blah":'
cm.__enter__.return_value = cm
urlopen_mock.return_value = cm
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"version": "0.0.0",
"update_available": False,
},
)
@mock.patch("urllib.request.urlopen")
def test_remote_version_exception(self, urlopen_mock):
cm = MagicMock()
cm.getcode.return_value = status.HTTP_200_OK
cm.read.side_effect = urllib.error.URLError("an error")
cm.__enter__.return_value = cm
urlopen_mock.return_value = cm
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data,
{
"version": "0.0.0",
"update_available": False,
},
)

View File

@ -0,0 +1,240 @@
import uuid
import celery
from django.contrib.auth.models import User
from rest_framework import status
from rest_framework.test import APITestCase
from documents.models import PaperlessTask
from documents.tests.utils import DirectoriesMixin
class TestTasks(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/tasks/"
ENDPOINT_ACKNOWLEDGE = "/api/acknowledge_tasks/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_tasks(self):
"""
GIVEN:
- Attempted celery tasks
WHEN:
- API call is made to get tasks
THEN:
- Attempting and pending tasks are serialized and provided
"""
task1 = PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
)
task2 = PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_two.pdf",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 2)
returned_task1 = response.data[1]
returned_task2 = response.data[0]
self.assertEqual(returned_task1["task_id"], task1.task_id)
self.assertEqual(returned_task1["status"], celery.states.PENDING)
self.assertEqual(returned_task1["task_file_name"], task1.task_file_name)
self.assertEqual(returned_task2["task_id"], task2.task_id)
self.assertEqual(returned_task2["status"], celery.states.PENDING)
self.assertEqual(returned_task2["task_file_name"], task2.task_file_name)
def test_get_single_task_status(self):
"""
GIVEN
- Query parameter for a valid task ID
WHEN:
- API call is made to get task status
THEN:
- Single task data is returned
"""
id1 = str(uuid.uuid4())
task1 = PaperlessTask.objects.create(
task_id=id1,
task_file_name="task_one.pdf",
)
_ = PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_two.pdf",
)
response = self.client.get(self.ENDPOINT + f"?task_id={id1}")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
returned_task1 = response.data[0]
self.assertEqual(returned_task1["task_id"], task1.task_id)
def test_get_single_task_status_not_valid(self):
"""
GIVEN
- Query parameter for a non-existent task ID
WHEN:
- API call is made to get task status
THEN:
- No task data is returned
"""
PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
)
_ = PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_two.pdf",
)
response = self.client.get(self.ENDPOINT + "?task_id=bad-task-id")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 0)
def test_acknowledge_tasks(self):
"""
GIVEN:
- Attempted celery tasks
WHEN:
- API call is made to get mark task as acknowledged
THEN:
- Task is marked as acknowledged
"""
task = PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(len(response.data), 1)
response = self.client.post(
self.ENDPOINT_ACKNOWLEDGE,
{"tasks": [task.id]},
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
response = self.client.get(self.ENDPOINT)
self.assertEqual(len(response.data), 0)
def test_task_result_no_error(self):
"""
GIVEN:
- A celery task completed without error
WHEN:
- API call is made to get tasks
THEN:
- The returned data includes the task result
"""
PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
status=celery.states.SUCCESS,
result="Success. New document id 1 created",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(returned_data["result"], "Success. New document id 1 created")
self.assertEqual(returned_data["related_document"], "1")
def test_task_result_with_error(self):
"""
GIVEN:
- A celery task completed with an exception
WHEN:
- API call is made to get tasks
THEN:
- The returned result is the exception info
"""
PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="task_one.pdf",
status=celery.states.FAILURE,
result="test.pdf: Not consuming test.pdf: It is a duplicate.",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(
returned_data["result"],
"test.pdf: Not consuming test.pdf: It is a duplicate.",
)
def test_task_name_webui(self):
"""
GIVEN:
- Attempted celery task
- Task was created through the webui
WHEN:
- API call is made to get tasks
THEN:
- Returned data include the filename
"""
PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="test.pdf",
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(returned_data["task_file_name"], "test.pdf")
def test_task_name_consume_folder(self):
"""
GIVEN:
- Attempted celery task
- Task was created through the consume folder
WHEN:
- API call is made to get tasks
THEN:
- Returned data include the filename
"""
PaperlessTask.objects.create(
task_id=str(uuid.uuid4()),
task_file_name="anothertest.pdf",
task_name="documents.tasks.some_task",
status=celery.states.SUCCESS,
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data), 1)
returned_data = response.data[0]
self.assertEqual(returned_data["task_file_name"], "anothertest.pdf")

View File

@ -0,0 +1,65 @@
import json
from django.contrib.auth.models import User
from rest_framework import status
from rest_framework.test import APITestCase
from documents.tests.utils import DirectoriesMixin
class TestApiUiSettings(DirectoriesMixin, APITestCase):
ENDPOINT = "/api/ui_settings/"
def setUp(self):
super().setUp()
self.test_user = User.objects.create_superuser(username="test")
self.test_user.first_name = "Test"
self.test_user.last_name = "User"
self.test_user.save()
self.client.force_authenticate(user=self.test_user)
def test_api_get_ui_settings(self):
response = self.client.get(self.ENDPOINT, format="json")
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertDictEqual(
response.data["user"],
{
"id": self.test_user.id,
"username": self.test_user.username,
"is_superuser": True,
"groups": [],
"first_name": self.test_user.first_name,
"last_name": self.test_user.last_name,
},
)
self.assertDictEqual(
response.data["settings"],
{
"update_checking": {
"backend_setting": "default",
},
},
)
def test_api_set_ui_settings(self):
settings = {
"settings": {
"dark_mode": {
"enabled": True,
},
},
}
response = self.client.post(
self.ENDPOINT,
json.dumps(settings),
content_type="application/json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
ui_settings = self.test_user.ui_settings
self.assertDictEqual(
ui_settings.settings,
settings["settings"],
)