Creates url_validator to allow only http or https schemes for the webhook url

This commit is contained in:
Trenton H 2025-02-16 10:58:05 -08:00
parent 360fb4dfa6
commit cedacc64e6
3 changed files with 68 additions and 31 deletions

View File

@ -58,6 +58,7 @@ from documents.permissions import set_permissions_for_object
from documents.templating.filepath import validate_filepath_template_and_render from documents.templating.filepath import validate_filepath_template_and_render
from documents.templating.utils import convert_format_str_to_template_format from documents.templating.utils import convert_format_str_to_template_format
from documents.validators import uri_validator from documents.validators import uri_validator
from documents.validators import url_validator
logger = logging.getLogger("paperless.serializers") logger = logging.getLogger("paperless.serializers")
@ -1950,7 +1951,7 @@ class WorkflowActionWebhookSerializer(serializers.ModelSerializer):
id = serializers.IntegerField(allow_null=True, required=False) id = serializers.IntegerField(allow_null=True, required=False)
def validate_url(self, url): def validate_url(self, url):
uri_validator(url) url_validator(url)
return url return url
class Meta: class Meta:

View File

@ -598,30 +598,35 @@ class TestApiWorkflows(DirectoriesMixin, APITestCase):
THEN: THEN:
- Correct HTTP response - Correct HTTP response
""" """
response = self.client.post(
self.ENDPOINT, for url, expected_resp_code in [
json.dumps( ("https://examplewithouttld:3000/path", status.HTTP_201_CREATED),
{ ("file:///etc/passwd/path", status.HTTP_400_BAD_REQUEST),
"name": "Workflow 2", ]:
"order": 1, response = self.client.post(
"triggers": [ self.ENDPOINT,
{ json.dumps(
"type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION, {
"sources": [DocumentSource.ApiUpload], "name": "Workflow 2",
"filter_filename": "*", "order": 1,
}, "triggers": [
], {
"actions": [ "type": WorkflowTrigger.WorkflowTriggerType.CONSUMPTION,
{ "sources": [DocumentSource.ApiUpload],
"type": WorkflowAction.WorkflowActionType.WEBHOOK, "filter_filename": "*",
"webhook": {
"url": "https://examplewithouttld:3000/path",
"include_document": False,
}, },
}, ],
], "actions": [
}, {
), "type": WorkflowAction.WorkflowActionType.WEBHOOK,
content_type="application/json", "webhook": {
) "url": url,
self.assertEqual(response.status_code, status.HTTP_201_CREATED) "include_document": False,
},
},
],
},
),
content_type="application/json",
)
self.assertEqual(response.status_code, expected_resp_code)

View File

@ -4,11 +4,18 @@ from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
def uri_validator(value) -> None: def uri_validator(value: str, allowed_schemes: set[str] | None = None) -> None:
""" """
Raises a ValidationError if the given value does not parse as an Validates that the given value parses as a URI with required components
URI looking thing, which we're defining as a scheme and either network and optionally restricts to specific schemes.
location or path value
Args:
value: The URI string to validate
allowed_schemes: Optional set/list of allowed schemes (e.g. {'http', 'https'}).
If None, all schemes are allowed.
Raises:
ValidationError: If the URI is invalid or uses a disallowed scheme
""" """
try: try:
parts = urlparse(value) parts = urlparse(value)
@ -22,8 +29,32 @@ def uri_validator(value) -> None:
_(f"Unable to parse URI {value}, missing net location or path"), _(f"Unable to parse URI {value}, missing net location or path"),
params={"value": value}, params={"value": value},
) )
if allowed_schemes and parts.scheme not in allowed_schemes:
raise ValidationError(
_(
f"URI scheme '{parts.scheme}' is not allowed. Allowed schemes: {', '.join(allowed_schemes)}",
),
params={"value": value, "scheme": parts.scheme},
)
except ValidationError:
raise
except Exception as e: except Exception as e:
raise ValidationError( raise ValidationError(
_(f"Unable to parse URI {value}"), _(f"Unable to parse URI {value}"),
params={"value": value}, params={"value": value},
) from e ) from e
def url_validator(value) -> None:
"""
Validates that the given value is a valid HTTP or HTTPS URL.
Args:
value: The URL string to validate
Raises:
ValidationError: If the URL is invalid or not using http/https scheme
"""
uri_validator(value, allowed_schemes={"http", "https"})