Enhancement: support webhook restrictions (#10555)

This commit is contained in:
shamoon
2025-08-11 10:15:30 -07:00
committed by GitHub
parent 1bee1495cf
commit 6730896894
5 changed files with 269 additions and 11 deletions

View File

@@ -1,8 +1,10 @@
import shutil
import socket
from datetime import timedelta
from typing import TYPE_CHECKING
from unittest import mock
import pytest
from django.contrib.auth.models import Group
from django.contrib.auth.models import User
from django.test import override_settings
@@ -10,6 +12,7 @@ from django.utils import timezone
from guardian.shortcuts import assign_perm
from guardian.shortcuts import get_groups_with_perms
from guardian.shortcuts import get_users_with_perms
from httpx import HTTPError
from httpx import HTTPStatusError
from pytest_httpx import HTTPXMock
from rest_framework.test import APITestCase
@@ -2825,6 +2828,8 @@ class TestWorkflows(
content="Test message",
headers={},
files=None,
follow_redirects=False,
timeout=5,
)
expected_str = "Webhook sent to http://paperless-ngx.com"
@@ -2842,6 +2847,8 @@ class TestWorkflows(
data={"message": "Test message"},
headers={},
files=None,
follow_redirects=False,
timeout=5,
)
@mock.patch("httpx.post")
@@ -2962,3 +2969,164 @@ class TestWebhookSend:
as_json=True,
)
assert httpx_mock.get_request().headers["Content-Type"] == "application/json"
@pytest.fixture
def resolve_to(monkeypatch):
"""
Force DNS resolution to a specific IP for any hostname.
"""
def _set(ip: str):
def fake_getaddrinfo(host, *_args, **_kwargs):
return [(socket.AF_INET, None, None, "", (ip, 0))]
monkeypatch.setattr(socket, "getaddrinfo", fake_getaddrinfo)
return _set
class TestWebhookSecurity:
def test_blocks_invalid_scheme_or_hostname(self, httpx_mock: HTTPXMock):
"""
GIVEN:
- Invalid URL schemes or hostnames
WHEN:
- send_webhook is called with such URLs
THEN:
- ValueError is raised
"""
with pytest.raises(ValueError):
send_webhook(
"ftp://example.com",
data="",
headers={},
files=None,
as_json=False,
)
with pytest.raises(ValueError):
send_webhook(
"http:///nohost",
data="",
headers={},
files=None,
as_json=False,
)
@override_settings(WEBHOOKS_ALLOWED_PORTS=[80, 443])
def test_blocks_disallowed_port(self, httpx_mock: HTTPXMock):
"""
GIVEN:
- URL with a disallowed port
WHEN:
- send_webhook is called with such URL
THEN:
- ValueError is raised
"""
with pytest.raises(ValueError):
send_webhook(
"http://paperless-ngx.com:8080",
data="",
headers={},
files=None,
as_json=False,
)
assert httpx_mock.get_request() is None
@override_settings(WEBHOOKS_ALLOW_INTERNAL_REQUESTS=False)
def test_blocks_private_loopback_linklocal(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- URL with a private, loopback, or link-local IP address
- WEBHOOKS_ALLOW_INTERNAL_REQUESTS is False
WHEN:
- send_webhook is called with such URL
THEN:
- ValueError is raised
"""
resolve_to("127.0.0.1")
with pytest.raises(ValueError):
send_webhook(
"http://paperless-ngx.com",
data="",
headers={},
files=None,
as_json=False,
)
def test_allows_public_ip_and_sends(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- URL with a public IP address
WHEN:
- send_webhook is called with such URL
THEN:
- Request is sent successfully
"""
resolve_to("52.207.186.75")
httpx_mock.add_response(content=b"ok")
send_webhook(
url="http://paperless-ngx.com",
data="hi",
headers={},
files=None,
as_json=False,
)
req = httpx_mock.get_request()
assert req.url.host == "paperless-ngx.com"
def test_follow_redirects_disabled(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- A URL that redirects
WHEN:
- send_webhook is called with follow_redirects=False
THEN:
- Request is made to the original URL and does not follow the redirect
"""
resolve_to("52.207.186.75")
# Return a redirect and ensure we don't follow it (only one request recorded)
httpx_mock.add_response(
status_code=302,
headers={"location": "http://internal-service.local"},
content=b"",
)
with pytest.raises(HTTPError):
send_webhook(
"http://paperless-ngx.com",
data="",
headers={},
files=None,
as_json=False,
)
assert len(httpx_mock.get_requests()) == 1
def test_strips_user_supplied_host_header(self, httpx_mock: HTTPXMock, resolve_to):
"""
GIVEN:
- A URL with a user-supplied Host header
WHEN:
- send_webhook is called with a malicious Host header
THEN:
- The Host header is stripped and replaced with the resolved hostname
"""
resolve_to("52.207.186.75")
httpx_mock.add_response(content=b"ok")
send_webhook(
url="http://paperless-ngx.com",
data="ok",
headers={"Host": "evil.test"},
files=None,
as_json=False,
)
req = httpx_mock.get_request()
assert req.headers["Host"] == "paperless-ngx.com"
assert "evil.test" not in req.headers.get("Host", "")