mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-04-02 13:45:10 -05:00
websocket testing
This commit is contained in:
parent
8711a206a0
commit
866c8fc848
@ -1,23 +1,31 @@
|
||||
from unittest import mock
|
||||
|
||||
from channels.layers import get_channel_layer
|
||||
from channels.testing import WebsocketCommunicator
|
||||
from django.test import TestCase
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from paperless.asgi import application
|
||||
|
||||
|
||||
TEST_CHANNEL_LAYERS = {
|
||||
'default': {
|
||||
'BACKEND': 'channels.layers.InMemoryChannelLayer',
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestWebSockets(TestCase):
|
||||
|
||||
@mock.patch("paperless.consumers.async_to_sync")
|
||||
async def test_no_auth(self, async_to_sync):
|
||||
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
|
||||
async def test_no_auth(self):
|
||||
communicator = WebsocketCommunicator(application, "/ws/status/")
|
||||
connected, subprotocol = await communicator.connect()
|
||||
self.assertFalse(connected)
|
||||
await communicator.disconnect()
|
||||
|
||||
@mock.patch("paperless.consumers.async_to_sync")
|
||||
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
|
||||
@mock.patch("paperless.consumers.StatusConsumer._authenticated")
|
||||
async def test_auth(self, _authenticated, async_to_sync):
|
||||
async def test_auth(self, _authenticated):
|
||||
_authenticated.return_value = True
|
||||
|
||||
communicator = WebsocketCommunicator(application, "/ws/status/")
|
||||
@ -25,3 +33,28 @@ class TestWebSockets(TestCase):
|
||||
self.assertTrue(connected)
|
||||
|
||||
await communicator.disconnect()
|
||||
|
||||
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
|
||||
@mock.patch("paperless.consumers.StatusConsumer._authenticated")
|
||||
async def test_receive(self, _authenticated):
|
||||
_authenticated.return_value = True
|
||||
|
||||
communicator = WebsocketCommunicator(application, "/ws/status/")
|
||||
connected, subprotocol = await communicator.connect()
|
||||
self.assertTrue(connected)
|
||||
|
||||
message = {
|
||||
"task_id": "test"
|
||||
}
|
||||
|
||||
channel_layer = get_channel_layer()
|
||||
await channel_layer.group_send("status_updates", {
|
||||
"type": "status_update",
|
||||
"data": message
|
||||
})
|
||||
|
||||
response = await communicator.receive_json_from()
|
||||
|
||||
self.assertEqual(response, message)
|
||||
|
||||
await communicator.disconnect()
|
||||
|
Loading…
x
Reference in New Issue
Block a user