Re-org where some of the new classes are found

This commit is contained in:
Michael Shamoon 2022-11-28 20:39:03 -08:00
parent 93fab8bb95
commit 5e5f56dc67
7 changed files with 584 additions and 534 deletions

View File

@ -28,8 +28,6 @@ from .models import UiSettings
from .models import PaperlessTask
from .parsers import is_mime_type_supported
from paperless_mail.models import MailAccount, MailRule
# https://www.django-rest-framework.org/api-guide/serializers/#example
class DynamicFieldsModelSerializer(serializers.ModelSerializer):
@ -690,107 +688,3 @@ class AcknowledgeTasksViewSerializer(serializers.Serializer):
def validate_tasks(self, tasks):
self._validate_task_id_list(tasks)
return tasks
class ObfuscatedPasswordField(serializers.Field):
"""
Sends *** string instead of password in the clear
"""
def to_representation(self, value):
return re.sub(".", "*", value)
def to_internal_value(self, data):
return data
class MailAccountSerializer(serializers.ModelSerializer):
password = ObfuscatedPasswordField()
class Meta:
model = MailAccount
depth = 1
fields = [
"id",
"name",
"imap_server",
"imap_port",
"imap_security",
"username",
"password",
"character_set",
]
def update(self, instance, validated_data):
if "password" in validated_data:
if len(validated_data.get("password").replace("*", "")) == 0:
validated_data.pop("password")
super().update(instance, validated_data)
return instance
def create(self, validated_data):
mail_account = MailAccount.objects.create(**validated_data)
return mail_account
class AccountField(serializers.PrimaryKeyRelatedField):
def get_queryset(self):
return MailAccount.objects.all().order_by("-id")
class MailRuleSerializer(serializers.ModelSerializer):
account = AccountField(required=True)
action_parameter = serializers.CharField(
allow_null=True,
required=False,
default="",
)
assign_correspondent = CorrespondentField(allow_null=True, required=False)
assign_tags = TagsField(many=True, allow_null=True, required=False)
assign_document_type = DocumentTypeField(allow_null=True, required=False)
order = serializers.IntegerField(required=False)
class Meta:
model = MailRule
depth = 1
fields = [
"id",
"name",
"account",
"folder",
"filter_from",
"filter_subject",
"filter_body",
"filter_attachment_filename",
"maximum_age",
"action",
"action_parameter",
"assign_title_from",
"assign_tags",
"assign_correspondent_from",
"assign_correspondent",
"assign_document_type",
"order",
"attachment_type",
]
def update(self, instance, validated_data):
super().update(instance, validated_data)
return instance
def create(self, validated_data):
if "assign_tags" in validated_data:
assign_tags = validated_data.pop("assign_tags")
mail_rule = MailRule.objects.create(**validated_data)
if assign_tags:
mail_rule.assign_tags.set(assign_tags)
return mail_rule
def validate(self, attrs):
if (
attrs["action"] == MailRule.MailAction.TAG
or attrs["action"] == MailRule.MailAction.MOVE
) and attrs["action_parameter"] is None:
raise serializers.ValidationError("An action parameter is required.")
return attrs

View File

@ -2,7 +2,6 @@ import datetime
import io
import json
import os
import re
import shutil
import tempfile
import urllib.request
@ -37,7 +36,6 @@ from documents.models import Comment
from documents.models import StoragePath
from documents.tests.utils import DirectoriesMixin
from paperless import version
from paperless_mail.models import MailAccount, MailRule
from rest_framework.test import APITestCase
from whoosh.writing import AsyncWriter
@ -2931,425 +2929,3 @@ class TestTasks(APITestCase):
returned_data = response.data[0]
self.assertEqual(returned_data["task_file_name"], "anothertest.pdf")
class TestAPIMailAccounts(APITestCase):
ENDPOINT = "/api/mail_accounts/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_mail_accounts(self):
"""
GIVEN:
- Configured mail accounts
WHEN:
- API call is made to get mail accounts
THEN:
- Configured mail accounts are provided
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["count"], 1)
returned_account1 = response.data["results"][0]
self.assertEqual(returned_account1["name"], account1.name)
self.assertEqual(returned_account1["username"], account1.username)
self.assertEqual(
returned_account1["password"],
"*" * len(account1.password),
)
self.assertEqual(returned_account1["imap_server"], account1.imap_server)
self.assertEqual(returned_account1["imap_port"], account1.imap_port)
self.assertEqual(returned_account1["imap_security"], account1.imap_security)
self.assertEqual(returned_account1["character_set"], account1.character_set)
def test_create_mail_account(self):
"""
WHEN:
- API request is made to add a mail account
THEN:
- A new mail account is created
"""
account1 = {
"name": "Email1",
"username": "username1",
"password": "password1",
"imap_server": "server.example.com",
"imap_port": 443,
"imap_security": MailAccount.ImapSecurity.SSL,
"character_set": "UTF-8",
}
response = self.client.post(
self.ENDPOINT,
data=account1,
)
self.assertEqual(response.status_code, 201)
returned_account1 = MailAccount.objects.get(name="Email1")
self.assertEqual(returned_account1.name, account1["name"])
self.assertEqual(returned_account1.username, account1["username"])
self.assertEqual(returned_account1.password, account1["password"])
self.assertEqual(returned_account1.imap_server, account1["imap_server"])
self.assertEqual(returned_account1.imap_port, account1["imap_port"])
self.assertEqual(returned_account1.imap_security, account1["imap_security"])
self.assertEqual(returned_account1.character_set, account1["character_set"])
def test_delete_mail_account(self):
"""
GIVEN:
- Existing mail account
WHEN:
- API request is made to delete a mail account
THEN:
- Account is deleted
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
response = self.client.delete(
f"{self.ENDPOINT}{account1.pk}/",
)
self.assertEqual(response.status_code, 204)
self.assertEqual(len(MailAccount.objects.all()), 0)
def test_update_mail_account(self):
"""
GIVEN:
- Existing mail accounts
WHEN:
- API request is made to update mail account
THEN:
- The mail account is updated, password only updated if not '****'
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
response = self.client.patch(
f"{self.ENDPOINT}{account1.pk}/",
data={
"name": "Updated Name 1",
"password": "******",
},
)
self.assertEqual(response.status_code, 200)
returned_account1 = MailAccount.objects.get(pk=account1.pk)
self.assertEqual(returned_account1.name, "Updated Name 1")
self.assertEqual(returned_account1.password, account1.password)
response = self.client.patch(
f"{self.ENDPOINT}{account1.pk}/",
data={
"name": "Updated Name 2",
"password": "123xyz",
},
)
self.assertEqual(response.status_code, 200)
returned_account2 = MailAccount.objects.get(pk=account1.pk)
self.assertEqual(returned_account2.name, "Updated Name 2")
self.assertEqual(returned_account2.password, "123xyz")
class TestAPIMailRules(APITestCase):
ENDPOINT = "/api/mail_rules/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_mail_rules(self):
"""
GIVEN:
- Configured mail accounts and rules
WHEN:
- API call is made to get mail rules
THEN:
- Configured mail rules are provided
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["count"], 1)
returned_rule1 = response.data["results"][0]
self.assertEqual(returned_rule1["name"], rule1.name)
self.assertEqual(returned_rule1["account"], account1.pk)
self.assertEqual(returned_rule1["folder"], rule1.folder)
self.assertEqual(returned_rule1["filter_from"], rule1.filter_from)
self.assertEqual(returned_rule1["filter_subject"], rule1.filter_subject)
self.assertEqual(returned_rule1["filter_body"], rule1.filter_body)
self.assertEqual(
returned_rule1["filter_attachment_filename"],
rule1.filter_attachment_filename,
)
self.assertEqual(returned_rule1["maximum_age"], rule1.maximum_age)
self.assertEqual(returned_rule1["action"], rule1.action)
self.assertEqual(returned_rule1["assign_title_from"], rule1.assign_title_from)
self.assertEqual(
returned_rule1["assign_correspondent_from"],
rule1.assign_correspondent_from,
)
self.assertEqual(returned_rule1["order"], rule1.order)
self.assertEqual(returned_rule1["attachment_type"], rule1.attachment_type)
def test_create_mail_rule(self):
"""
GIVEN:
- Configured mail account exists
WHEN:
- API request is made to add a mail rule
THEN:
- A new mail rule is created
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
tag = Tag.objects.create(
name="t",
)
correspondent = Correspondent.objects.create(
name="c",
)
document_type = DocumentType.objects.create(
name="dt",
)
rule1 = {
"name": "Rule1",
"account": account1.pk,
"folder": "INBOX",
"filter_from": "from@example.com",
"filter_subject": "subject",
"filter_body": "body",
"filter_attachment_filename": "file.pdf",
"maximum_age": 30,
"action": MailRule.MailAction.MARK_READ,
"assign_title_from": MailRule.TitleSource.FROM_SUBJECT,
"assign_correspondent_from": MailRule.CorrespondentSource.FROM_NOTHING,
"order": 0,
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
"action_parameter": "parameter",
"assign_tags": [tag.pk],
"assign_correspondent": correspondent.pk,
"assign_document_type": document_type.pk,
}
response = self.client.post(
self.ENDPOINT,
data=rule1,
)
self.assertEqual(response.status_code, 201)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["count"], 1)
returned_rule1 = response.data["results"][0]
self.assertEqual(returned_rule1["name"], rule1["name"])
self.assertEqual(returned_rule1["account"], account1.pk)
self.assertEqual(returned_rule1["folder"], rule1["folder"])
self.assertEqual(returned_rule1["filter_from"], rule1["filter_from"])
self.assertEqual(returned_rule1["filter_subject"], rule1["filter_subject"])
self.assertEqual(returned_rule1["filter_body"], rule1["filter_body"])
self.assertEqual(
returned_rule1["filter_attachment_filename"],
rule1["filter_attachment_filename"],
)
self.assertEqual(returned_rule1["maximum_age"], rule1["maximum_age"])
self.assertEqual(returned_rule1["action"], rule1["action"])
self.assertEqual(
returned_rule1["assign_title_from"],
rule1["assign_title_from"],
)
self.assertEqual(
returned_rule1["assign_correspondent_from"],
rule1["assign_correspondent_from"],
)
self.assertEqual(returned_rule1["order"], rule1["order"])
self.assertEqual(returned_rule1["attachment_type"], rule1["attachment_type"])
self.assertEqual(returned_rule1["action_parameter"], rule1["action_parameter"])
self.assertEqual(
returned_rule1["assign_correspondent"],
rule1["assign_correspondent"],
)
self.assertEqual(
returned_rule1["assign_document_type"],
rule1["assign_document_type"],
)
self.assertEqual(returned_rule1["assign_tags"], rule1["assign_tags"])
def test_delete_mail_rule(self):
"""
GIVEN:
- Existing mail rule
WHEN:
- API request is made to delete a mail rule
THEN:
- Rule is deleted
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.delete(
f"{self.ENDPOINT}{rule1.pk}/",
)
self.assertEqual(response.status_code, 204)
self.assertEqual(len(MailRule.objects.all()), 0)
def test_update_mail_rule(self):
"""
GIVEN:
- Existing mail rule
WHEN:
- API request is made to update mail rule
THEN:
- The mail rule is updated
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.patch(
f"{self.ENDPOINT}{rule1.pk}/",
data={
"name": "Updated Name 1",
"action": MailRule.MailAction.DELETE,
},
)
self.assertEqual(response.status_code, 200)
returned_rule1 = MailRule.objects.get(pk=rule1.pk)
self.assertEqual(returned_rule1.name, "Updated Name 1")
self.assertEqual(returned_rule1.action, MailRule.MailAction.DELETE)

View File

@ -35,6 +35,8 @@ from paperless.db import GnuPG
from paperless.views import StandardPagination
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.serialisers import MailAccountSerializer
from paperless_mail.serialisers import MailRuleSerializer
from rest_framework import parsers
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound
@ -83,8 +85,6 @@ from .serialisers import CorrespondentSerializer
from .serialisers import DocumentListSerializer
from .serialisers import DocumentSerializer
from .serialisers import DocumentTypeSerializer
from .serialisers import MailAccountSerializer
from .serialisers import MailRuleSerializer
from .serialisers import PostDocumentSerializer
from .serialisers import SavedViewSerializer
from .serialisers import StoragePathSerializer

View File

@ -14,8 +14,6 @@ from documents.views import CorrespondentViewSet
from documents.views import DocumentTypeViewSet
from documents.views import IndexView
from documents.views import LogViewSet
from documents.views import MailAccountViewSet
from documents.views import MailRuleViewSet
from documents.views import PostDocumentView
from documents.views import RemoteVersionView
from documents.views import SavedViewViewSet
@ -29,6 +27,8 @@ from documents.views import UiSettingsView
from documents.views import UnifiedSearchViewSet
from paperless.consumers import StatusConsumer
from paperless.views import FaviconView
from paperless_mail.views import MailAccountViewSet
from paperless_mail.views import MailRuleViewSet
from rest_framework.authtoken import views
from rest_framework.routers import DefaultRouter

View File

@ -0,0 +1,110 @@
from documents.serialisers import CorrespondentField
from documents.serialisers import DocumentTypeField
from documents.serialisers import TagsField
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from rest_framework import serializers
class ObfuscatedPasswordField(serializers.Field):
"""
Sends *** string instead of password in the clear
"""
def to_representation(self, value):
return "*" * len(value)
def to_internal_value(self, data):
return data
class MailAccountSerializer(serializers.ModelSerializer):
password = ObfuscatedPasswordField()
class Meta:
model = MailAccount
depth = 1
fields = [
"id",
"name",
"imap_server",
"imap_port",
"imap_security",
"username",
"password",
"character_set",
]
def update(self, instance, validated_data):
if "password" in validated_data:
if len(validated_data.get("password").replace("*", "")) == 0:
validated_data.pop("password")
super().update(instance, validated_data)
return instance
def create(self, validated_data):
mail_account = MailAccount.objects.create(**validated_data)
return mail_account
class AccountField(serializers.PrimaryKeyRelatedField):
def get_queryset(self):
return MailAccount.objects.all().order_by("-id")
class MailRuleSerializer(serializers.ModelSerializer):
account = AccountField(required=True)
action_parameter = serializers.CharField(
allow_null=True,
required=False,
default="",
)
assign_correspondent = CorrespondentField(allow_null=True, required=False)
assign_tags = TagsField(many=True, allow_null=True, required=False)
assign_document_type = DocumentTypeField(allow_null=True, required=False)
order = serializers.IntegerField(required=False)
class Meta:
model = MailRule
depth = 1
fields = [
"id",
"name",
"account",
"folder",
"filter_from",
"filter_subject",
"filter_body",
"filter_attachment_filename",
"maximum_age",
"action",
"action_parameter",
"assign_title_from",
"assign_tags",
"assign_correspondent_from",
"assign_correspondent",
"assign_document_type",
"order",
"attachment_type",
]
def update(self, instance, validated_data):
super().update(instance, validated_data)
return instance
def create(self, validated_data):
if "assign_tags" in validated_data:
assign_tags = validated_data.pop("assign_tags")
mail_rule = MailRule.objects.create(**validated_data)
if assign_tags:
mail_rule.assign_tags.set(assign_tags)
return mail_rule
def validate(self, attrs):
if (
attrs["action"] == MailRule.MailAction.TAG
or attrs["action"] == MailRule.MailAction.MOVE
) and attrs["action_parameter"] is None:
raise serializers.ValidationError("An action parameter is required.")
return attrs

View File

@ -0,0 +1,429 @@
from django.contrib.auth.models import User
from documents.models import Correspondent
from documents.models import DocumentType
from documents.models import Tag
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from rest_framework.test import APITestCase
class TestAPIMailAccounts(APITestCase):
ENDPOINT = "/api/mail_accounts/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_mail_accounts(self):
"""
GIVEN:
- Configured mail accounts
WHEN:
- API call is made to get mail accounts
THEN:
- Configured mail accounts are provided
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["count"], 1)
returned_account1 = response.data["results"][0]
self.assertEqual(returned_account1["name"], account1.name)
self.assertEqual(returned_account1["username"], account1.username)
self.assertEqual(
returned_account1["password"],
"*" * len(account1.password),
)
self.assertEqual(returned_account1["imap_server"], account1.imap_server)
self.assertEqual(returned_account1["imap_port"], account1.imap_port)
self.assertEqual(returned_account1["imap_security"], account1.imap_security)
self.assertEqual(returned_account1["character_set"], account1.character_set)
def test_create_mail_account(self):
"""
WHEN:
- API request is made to add a mail account
THEN:
- A new mail account is created
"""
account1 = {
"name": "Email1",
"username": "username1",
"password": "password1",
"imap_server": "server.example.com",
"imap_port": 443,
"imap_security": MailAccount.ImapSecurity.SSL,
"character_set": "UTF-8",
}
response = self.client.post(
self.ENDPOINT,
data=account1,
)
self.assertEqual(response.status_code, 201)
returned_account1 = MailAccount.objects.get(name="Email1")
self.assertEqual(returned_account1.name, account1["name"])
self.assertEqual(returned_account1.username, account1["username"])
self.assertEqual(returned_account1.password, account1["password"])
self.assertEqual(returned_account1.imap_server, account1["imap_server"])
self.assertEqual(returned_account1.imap_port, account1["imap_port"])
self.assertEqual(returned_account1.imap_security, account1["imap_security"])
self.assertEqual(returned_account1.character_set, account1["character_set"])
def test_delete_mail_account(self):
"""
GIVEN:
- Existing mail account
WHEN:
- API request is made to delete a mail account
THEN:
- Account is deleted
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
response = self.client.delete(
f"{self.ENDPOINT}{account1.pk}/",
)
self.assertEqual(response.status_code, 204)
self.assertEqual(len(MailAccount.objects.all()), 0)
def test_update_mail_account(self):
"""
GIVEN:
- Existing mail accounts
WHEN:
- API request is made to update mail account
THEN:
- The mail account is updated, password only updated if not '****'
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
response = self.client.patch(
f"{self.ENDPOINT}{account1.pk}/",
data={
"name": "Updated Name 1",
"password": "******",
},
)
self.assertEqual(response.status_code, 200)
returned_account1 = MailAccount.objects.get(pk=account1.pk)
self.assertEqual(returned_account1.name, "Updated Name 1")
self.assertEqual(returned_account1.password, account1.password)
response = self.client.patch(
f"{self.ENDPOINT}{account1.pk}/",
data={
"name": "Updated Name 2",
"password": "123xyz",
},
)
self.assertEqual(response.status_code, 200)
returned_account2 = MailAccount.objects.get(pk=account1.pk)
self.assertEqual(returned_account2.name, "Updated Name 2")
self.assertEqual(returned_account2.password, "123xyz")
class TestAPIMailRules(APITestCase):
ENDPOINT = "/api/mail_rules/"
def setUp(self):
super().setUp()
self.user = User.objects.create_superuser(username="temp_admin")
self.client.force_authenticate(user=self.user)
def test_get_mail_rules(self):
"""
GIVEN:
- Configured mail accounts and rules
WHEN:
- API call is made to get mail rules
THEN:
- Configured mail rules are provided
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["count"], 1)
returned_rule1 = response.data["results"][0]
self.assertEqual(returned_rule1["name"], rule1.name)
self.assertEqual(returned_rule1["account"], account1.pk)
self.assertEqual(returned_rule1["folder"], rule1.folder)
self.assertEqual(returned_rule1["filter_from"], rule1.filter_from)
self.assertEqual(returned_rule1["filter_subject"], rule1.filter_subject)
self.assertEqual(returned_rule1["filter_body"], rule1.filter_body)
self.assertEqual(
returned_rule1["filter_attachment_filename"],
rule1.filter_attachment_filename,
)
self.assertEqual(returned_rule1["maximum_age"], rule1.maximum_age)
self.assertEqual(returned_rule1["action"], rule1.action)
self.assertEqual(returned_rule1["assign_title_from"], rule1.assign_title_from)
self.assertEqual(
returned_rule1["assign_correspondent_from"],
rule1.assign_correspondent_from,
)
self.assertEqual(returned_rule1["order"], rule1.order)
self.assertEqual(returned_rule1["attachment_type"], rule1.attachment_type)
def test_create_mail_rule(self):
"""
GIVEN:
- Configured mail account exists
WHEN:
- API request is made to add a mail rule
THEN:
- A new mail rule is created
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
tag = Tag.objects.create(
name="t",
)
correspondent = Correspondent.objects.create(
name="c",
)
document_type = DocumentType.objects.create(
name="dt",
)
rule1 = {
"name": "Rule1",
"account": account1.pk,
"folder": "INBOX",
"filter_from": "from@example.com",
"filter_subject": "subject",
"filter_body": "body",
"filter_attachment_filename": "file.pdf",
"maximum_age": 30,
"action": MailRule.MailAction.MARK_READ,
"assign_title_from": MailRule.TitleSource.FROM_SUBJECT,
"assign_correspondent_from": MailRule.CorrespondentSource.FROM_NOTHING,
"order": 0,
"attachment_type": MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
"action_parameter": "parameter",
"assign_tags": [tag.pk],
"assign_correspondent": correspondent.pk,
"assign_document_type": document_type.pk,
}
response = self.client.post(
self.ENDPOINT,
data=rule1,
)
self.assertEqual(response.status_code, 201)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data["count"], 1)
returned_rule1 = response.data["results"][0]
self.assertEqual(returned_rule1["name"], rule1["name"])
self.assertEqual(returned_rule1["account"], account1.pk)
self.assertEqual(returned_rule1["folder"], rule1["folder"])
self.assertEqual(returned_rule1["filter_from"], rule1["filter_from"])
self.assertEqual(returned_rule1["filter_subject"], rule1["filter_subject"])
self.assertEqual(returned_rule1["filter_body"], rule1["filter_body"])
self.assertEqual(
returned_rule1["filter_attachment_filename"],
rule1["filter_attachment_filename"],
)
self.assertEqual(returned_rule1["maximum_age"], rule1["maximum_age"])
self.assertEqual(returned_rule1["action"], rule1["action"])
self.assertEqual(
returned_rule1["assign_title_from"],
rule1["assign_title_from"],
)
self.assertEqual(
returned_rule1["assign_correspondent_from"],
rule1["assign_correspondent_from"],
)
self.assertEqual(returned_rule1["order"], rule1["order"])
self.assertEqual(returned_rule1["attachment_type"], rule1["attachment_type"])
self.assertEqual(returned_rule1["action_parameter"], rule1["action_parameter"])
self.assertEqual(
returned_rule1["assign_correspondent"],
rule1["assign_correspondent"],
)
self.assertEqual(
returned_rule1["assign_document_type"],
rule1["assign_document_type"],
)
self.assertEqual(returned_rule1["assign_tags"], rule1["assign_tags"])
def test_delete_mail_rule(self):
"""
GIVEN:
- Existing mail rule
WHEN:
- API request is made to delete a mail rule
THEN:
- Rule is deleted
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.delete(
f"{self.ENDPOINT}{rule1.pk}/",
)
self.assertEqual(response.status_code, 204)
self.assertEqual(len(MailRule.objects.all()), 0)
def test_update_mail_rule(self):
"""
GIVEN:
- Existing mail rule
WHEN:
- API request is made to update mail rule
THEN:
- The mail rule is updated
"""
account1 = MailAccount.objects.create(
name="Email1",
username="username1",
password="password1",
imap_server="server.example.com",
imap_port=443,
imap_security=MailAccount.ImapSecurity.SSL,
character_set="UTF-8",
)
rule1 = MailRule.objects.create(
name="Rule1",
account=account1,
folder="INBOX",
filter_from="from@example.com",
filter_subject="subject",
filter_body="body",
filter_attachment_filename="file.pdf",
maximum_age=30,
action=MailRule.MailAction.MARK_READ,
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
order=0,
attachment_type=MailRule.AttachmentProcessing.ATTACHMENTS_ONLY,
)
response = self.client.patch(
f"{self.ENDPOINT}{rule1.pk}/",
data={
"name": "Updated Name 1",
"action": MailRule.MailAction.DELETE,
},
)
self.assertEqual(response.status_code, 200)
returned_rule1 = MailRule.objects.get(pk=rule1.pk)
self.assertEqual(returned_rule1.name, "Updated Name 1")
self.assertEqual(returned_rule1.action, MailRule.MailAction.DELETE)

View File

@ -0,0 +1,41 @@
from paperless.views import StandardPagination
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.serialisers import MailAccountSerializer
from paperless_mail.serialisers import MailRuleSerializer
from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import ModelViewSet
class MailAccountViewSet(ModelViewSet):
model = MailAccount
queryset = MailAccount.objects.all()
serializer_class = MailAccountSerializer
pagination_class = StandardPagination
permission_classes = (IsAuthenticated,)
# TODO: user-scoped
# def get_queryset(self):
# user = self.request.user
# return MailAccount.objects.filter(user=user)
# def perform_create(self, serializer):
# serializer.save(user=self.request.user)
class MailRuleViewSet(ModelViewSet):
model = MailRule
queryset = MailRule.objects.all()
serializer_class = MailRuleSerializer
pagination_class = StandardPagination
permission_classes = (IsAuthenticated,)
# TODO: user-scoped
# def get_queryset(self):
# user = self.request.user
# return MailRule.objects.filter(user=user)
# def perform_create(self, serializer):
# serializer.save(user=self.request.user)