Security: Pass random 32 character string as OAuth request state and validate response

This commit is contained in:
Hannes Ortmeier 2025-01-07 11:02:37 +01:00
parent 485237caf1
commit 9eee6227e8
2 changed files with 31 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import logging
import os import os
import platform import platform
import re import re
import secrets
import tempfile import tempfile
import urllib import urllib
import zipfile import zipfile
@ -11,6 +12,7 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from time import mktime from time import mktime
from unicodedata import normalize from unicodedata import normalize
from urllib.parse import parse_qs
from urllib.parse import quote from urllib.parse import quote
from urllib.parse import urlparse from urllib.parse import urlparse
@ -1714,13 +1716,34 @@ class UiSettingsView(GenericAPIView):
ui_settings["auditlog_enabled"] = settings.AUDIT_LOG_ENABLED ui_settings["auditlog_enabled"] = settings.AUDIT_LOG_ENABLED
if settings.GMAIL_OAUTH_ENABLED or settings.OUTLOOK_OAUTH_ENABLED: if settings.GMAIL_OAUTH_ENABLED or settings.OUTLOOK_OAUTH_ENABLED:
state = secrets.token_urlsafe(32)
manager = PaperlessMailOAuth2Manager() manager = PaperlessMailOAuth2Manager()
if settings.GMAIL_OAUTH_ENABLED: if settings.GMAIL_OAUTH_ENABLED:
ui_settings["gmail_oauth_url"] = manager.get_gmail_authorization_url() gmail_oauth_url = manager.get_gmail_authorization_url(state)
if settings.OUTLOOK_OAUTH_ENABLED: # Validate the URL
ui_settings["outlook_oauth_url"] = ( parsed_url_query = parse_qs(urlparse(gmail_oauth_url).query)
manager.get_outlook_authorization_url() if (
parsed_url_query.get("state")
and parsed_url_query.get("state")[0] != state
):
logger.error(
f"Invalid oauth callback {gmail_oauth_url} for state {state}",
) )
raise ValueError("State not found in authorization url")
ui_settings["gmail_oauth_url"] = gmail_oauth_url
if settings.OUTLOOK_OAUTH_ENABLED:
outlook_oauth_url = manager.get_outlook_authorization_url(state)
# Validate the URL
parsed_url_query = parse_qs(urlparse(outlook_oauth_url).query)
if (
parsed_url_query.get("state")
and parsed_url_query.get("state")[0] != state
):
logger.error(
f"Invalid oauth callback {outlook_oauth_url} for state {state}",
)
raise ValueError("State not found in authorization url")
ui_settings["outlook_oauth_url"] = outlook_oauth_url
ui_settings["email_enabled"] = settings.EMAIL_ENABLED ui_settings["email_enabled"] = settings.EMAIL_ENABLED

View File

@ -43,16 +43,17 @@ class PaperlessMailOAuth2Manager:
def oauth_redirect_url(self) -> str: def oauth_redirect_url(self) -> str:
return f"{'http://localhost:4200/' if settings.DEBUG else settings.BASE_URL}mail" # e.g. "http://localhost:4200/mail" or "/mail" return f"{'http://localhost:4200/' if settings.DEBUG else settings.BASE_URL}mail" # e.g. "http://localhost:4200/mail" or "/mail"
def get_gmail_authorization_url(self) -> str: def get_gmail_authorization_url(self, state: str) -> str:
return asyncio.run( return asyncio.run(
self.gmail_client.get_authorization_url( self.gmail_client.get_authorization_url(
redirect_uri=self.oauth_callback_url, redirect_uri=self.oauth_callback_url,
scope=["https://mail.google.com/"], scope=["https://mail.google.com/"],
extras_params={"prompt": "consent", "access_type": "offline"}, extras_params={"prompt": "consent", "access_type": "offline"},
state=state,
), ),
) )
def get_outlook_authorization_url(self) -> str: def get_outlook_authorization_url(self, state: str) -> str:
return asyncio.run( return asyncio.run(
self.outlook_client.get_authorization_url( self.outlook_client.get_authorization_url(
redirect_uri=self.oauth_callback_url, redirect_uri=self.oauth_callback_url,
@ -60,6 +61,7 @@ class PaperlessMailOAuth2Manager:
"offline_access", "offline_access",
"https://outlook.office.com/IMAP.AccessAsUser.All", "https://outlook.office.com/IMAP.AccessAsUser.All",
], ],
state=state,
), ),
) )