Files
vulncheck/app/routers/auth_oidc.py
T
vulncheck b8e8870b29 feat(auth): LDAPS + OIDC + SAML 2.0 strategies and SSO routers
Phase 2-4 of multi-provider authentication. All three providers slot
into the AuthOrchestrator from phase 1; no further changes to
/auth/login are needed.

LDAPS (app/auth/strategies/ldap_strategy.py):
- ldap3 with strict TLS cert validation (CERT_REQUIRED + CA bundle)
- Service-account search-then-bind flow (DN never exposed to caller)
- Filter chars escaped via ldap3.utils.conv.escape_filter_chars
- AD-flavored defaults: sAMAccountName / memberOf / objectGUID
- Refuses if filter returns >1 entry (anti-impersonation safety)
- Bind password encrypted at rest (Fernet), bootstrapped from env on
  first start then stored in settings table

OIDC (app/auth/strategies/oidc_strategy.py + app/routers/auth_oidc.py):
- Authorization Code + PKCE (S256)
- Strict ID-Token validation via Authlib: signature (JWKS w/ auto-refresh
  on rotation), iss (essential), aud (essential, must == client_id),
  exp (essential), nonce (replay protection)
- State/PKCE-verifier/nonce stored in signed itsdangerous cookie (no
  server-side session store needed)
- Discovery + JWKS cached in-process; JWKS auto-refetched on key miss
- Groups merged from both id_token claims and userinfo endpoint
- Hardened: prompt=select_account to defeat silent IdP reuse

SAML 2.0 (app/auth/strategies/saml_strategy.py + app/routers/auth_saml.py):
- python3-saml (OneLogin) with strict=true; xmlsec1 handles signature
  validation. XSW attacks mitigated via strict assertion/response
  signature position checks plus wantAssertionsSigned=true
- SP-initiated (/auth/saml/login) + IdP-initiated (POST /auth/saml/acs)
- /auth/saml/metadata serves signed SP descriptor
- RelayState same-origin check to prevent open redirect
- IdP metadata loaded from URL or file at startup

Wiring:
- app/main.py imports auth_oidc/auth_saml routers behind try/except so
  the app still starts when authlib or python3-saml aren't installed
- Frontend login page fetches /auth/providers and renders matching
  redirect buttons (Sign in with Entra ID / SAML SSO) plus the local
  credentials form. Adds MFA second-step screen with 6-digit OTP input
  when /auth/login returns mfa_required=true.

.env.example: full provider config blocks with worked examples for
Entra ID, Okta, Keycloak, Google. Each block is commented with the
exact format the corresponding admin needs.
2026-05-12 19:11:32 +02:00

323 lines
12 KiB
Python

"""
OIDC (OAuth 2.0 / OpenID Connect) routes.
Flow (Authorization Code + PKCE):
1. GET /auth/oidc/login → redirect to IdP authorize endpoint with PKCE challenge
2. GET /auth/oidc/callback → IdP redirects here; we exchange code for tokens,
Authlib validates id_token (signature/iss/aud/exp/nonce),
we fetch userinfo, then hand off to AuthOrchestrator
for JIT / role-mapping / cookie issuance.
State + PKCE verifier + nonce are persisted in a signed itsdangerous cookie
(httpOnly, short-lived). No server-side session store needed.
"""
from __future__ import annotations
import json
import logging
import os
import secrets
from datetime import datetime
from typing import Optional
from urllib.parse import urlencode
import httpx
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import JSONResponse, RedirectResponse
from itsdangerous import BadSignature, URLSafeTimedSerializer
from sqlalchemy.orm import Session
from app.auth.jwt_handler import (
JWT_ACCESS_TOKEN_EXPIRE_MINUTES,
JWT_REFRESH_TOKEN_EXPIRE_DAYS,
create_access_token,
create_refresh_token,
)
from app.auth.orchestrator import AuthOrchestrator
from app.auth.strategies.base import AuthError
from app.auth.strategies.oidc_strategy import OidcStrategy
from app.database import get_db
from app.models.audit_log import AuditEventType, AuditLog
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth/oidc", tags=["Authentication"])
OIDC_STATE_COOKIE = "oidc_state"
OIDC_STATE_TTL = 600 # 10 min — must be > user's redirect time at IdP
COOKIE_SAMESITE = os.getenv("AUTH_COOKIE_SAMESITE", "lax")
COOKIE_SECURE = os.getenv("AUTH_COOKIE_SECURE", "true").lower() == "true"
def _serializer() -> URLSafeTimedSerializer:
key = os.getenv("JWT_SECRET_KEY")
if not key:
raise RuntimeError("JWT_SECRET_KEY required for OIDC state signing")
return URLSafeTimedSerializer(secret_key=key, salt="oidc-state")
def _generate_pkce() -> tuple[str, str]:
"""Returns (code_verifier, code_challenge) for PKCE S256."""
import base64
import hashlib
verifier = secrets.token_urlsafe(64)[:128]
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b"=").decode()
return verifier, challenge
_CACHED_DISCOVERY: dict = {}
def _fetch_discovery(strategy: OidcStrategy) -> dict:
"""Cache OIDC discovery doc in-process. Refreshed on uvicorn restart."""
if "doc" in _CACHED_DISCOVERY:
return _CACHED_DISCOVERY["doc"]
with httpx.Client(timeout=15) as client:
r = client.get(strategy.discovery_url)
r.raise_for_status()
doc = r.json()
required = {"authorization_endpoint", "token_endpoint", "issuer", "jwks_uri"}
if not required.issubset(doc):
raise HTTPException(500, "OIDC discovery document is incomplete")
_CACHED_DISCOVERY["doc"] = doc
return doc
_CACHED_JWKS: dict = {}
def _fetch_jwks(jwks_uri: str) -> dict:
"""Cache JWKS for the lifetime of the process. Auto-refetched if key not found."""
if jwks_uri in _CACHED_JWKS:
return _CACHED_JWKS[jwks_uri]
with httpx.Client(timeout=15) as client:
r = client.get(jwks_uri)
r.raise_for_status()
jwks = r.json()
_CACHED_JWKS[jwks_uri] = jwks
return jwks
def _validate_id_token(id_token: str, strategy: OidcStrategy, expected_nonce: str) -> dict:
"""
Strict ID-Token validation per OIDC core §3.1.3.7.
Validates: signature (via JWKS), issuer, audience, expiration, nbf, nonce.
"""
from authlib.jose import JsonWebToken, JsonWebKey
from authlib.jose.errors import JoseError
discovery = _fetch_discovery(strategy)
jwks_data = _fetch_jwks(discovery["jwks_uri"])
key = JsonWebKey.import_key_set(jwks_data)
try:
claims = JsonWebToken(["RS256", "RS384", "RS512", "ES256", "ES384"]).decode(
id_token,
key=key,
claims_options={
"iss": {"essential": True, "value": discovery["issuer"]},
"aud": {"essential": True, "value": strategy.client_id},
"exp": {"essential": True},
},
)
claims.validate()
except JoseError as e:
# Refresh JWKS once and retry (key rotation)
_CACHED_JWKS.pop(discovery["jwks_uri"], None)
jwks_data = _fetch_jwks(discovery["jwks_uri"])
key = JsonWebKey.import_key_set(jwks_data)
try:
claims = JsonWebToken(["RS256", "RS384", "RS512", "ES256", "ES384"]).decode(
id_token,
key=key,
claims_options={
"iss": {"essential": True, "value": discovery["issuer"]},
"aud": {"essential": True, "value": strategy.client_id},
"exp": {"essential": True},
},
)
claims.validate()
except JoseError as e2:
raise HTTPException(401, f"ID token validation failed: {e2}") from e2
# Nonce check: prevents replay
if claims.get("nonce") != expected_nonce:
raise HTTPException(401, "ID token nonce mismatch (possible replay)")
return dict(claims)
# ============================================
# Endpoints
# ============================================
@router.get("/login")
async def oidc_login(request: Request):
"""
SP-initiated login. Stash state+nonce+PKCE verifier in a signed cookie,
then 302 to the IdP's authorize endpoint.
"""
strategy = OidcStrategy()
if not strategy.is_enabled:
raise HTTPException(404, "OIDC is not configured")
discovery = _fetch_discovery(strategy)
state = secrets.token_urlsafe(32)
nonce = secrets.token_urlsafe(32)
code_verifier, code_challenge = _generate_pkce()
params = {
"response_type": "code",
"client_id": strategy.client_id,
"redirect_uri": strategy.redirect_uri,
"scope": strategy.scopes,
"state": state,
"nonce": nonce,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
# Force account picker — avoids silently using last-used IdP session.
"prompt": "select_account",
}
authorize_url = discovery["authorization_endpoint"] + "?" + urlencode(params)
response = RedirectResponse(authorize_url, status_code=302)
signed = _serializer().dumps({
"state": state,
"nonce": nonce,
"code_verifier": code_verifier,
})
response.set_cookie(
OIDC_STATE_COOKIE,
signed,
max_age=OIDC_STATE_TTL,
httponly=True,
secure=COOKIE_SECURE,
samesite=COOKIE_SAMESITE,
path="/",
)
return response
@router.get("/callback")
async def oidc_callback(
request: Request,
code: Optional[str] = None,
state: Optional[str] = None,
error: Optional[str] = None,
error_description: Optional[str] = None,
db: Session = Depends(get_db),
):
"""IdP redirects here with ?code & ?state. We validate, exchange, validate id_token, then issue our own JWT session cookies."""
strategy = OidcStrategy()
if not strategy.is_enabled:
raise HTTPException(404, "OIDC is not configured")
if error:
logger.warning("OIDC error from IdP: %s%s", error, error_description)
raise HTTPException(401, f"IdP returned error: {error}")
if not code or not state:
raise HTTPException(400, "Missing code/state")
# 1) Recover state from signed cookie + verify
cookie = request.cookies.get(OIDC_STATE_COOKIE)
if not cookie:
raise HTTPException(400, "OIDC state cookie missing or expired")
try:
stash = _serializer().loads(cookie, max_age=OIDC_STATE_TTL)
except BadSignature:
raise HTTPException(400, "OIDC state signature invalid")
if state != stash.get("state"):
raise HTTPException(400, "OIDC state mismatch (possible CSRF)")
code_verifier = stash["code_verifier"]
expected_nonce = stash["nonce"]
# 2) Token exchange
discovery = _fetch_discovery(strategy)
token_resp = None
async with httpx.AsyncClient(timeout=20) as client:
token_resp = await client.post(
discovery["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": strategy.redirect_uri,
"client_id": strategy.client_id,
"client_secret": strategy.client_secret,
"code_verifier": code_verifier,
},
headers={"Accept": "application/json"},
)
if token_resp.status_code != 200:
logger.warning("OIDC token endpoint returned %s: %s", token_resp.status_code, token_resp.text[:300])
raise HTTPException(401, "Token exchange failed")
token_data = token_resp.json()
id_token = token_data.get("id_token")
access_token_oidc = token_data.get("access_token")
if not id_token:
raise HTTPException(401, "Token response missing id_token")
# 3) Strict id_token validation (sig + iss + aud + exp + nonce)
id_claims = _validate_id_token(id_token, strategy, expected_nonce)
# 4) Userinfo (optional but recommended for groups)
userinfo: dict = {}
if discovery.get("userinfo_endpoint") and access_token_oidc:
try:
async with httpx.AsyncClient(timeout=15) as client:
u = await client.get(
discovery["userinfo_endpoint"],
headers={"Authorization": f"Bearer {access_token_oidc}"},
)
if u.status_code == 200:
userinfo = u.json()
except httpx.HTTPError as e:
logger.warning("OIDC userinfo fetch failed: %s", e)
# 5) Parse to ExternalIdentity → orchestrator → JIT
identity = strategy.parse_userinfo(userinfo, id_claims)
orchestrator = AuthOrchestrator(db)
try:
result = orchestrator.complete_sso_login(
identity,
actor_ip=request.client.host if request.client else None,
actor_user_agent=request.headers.get("user-agent"),
)
except AuthError as e:
db.add(
AuditLog(
event_type=AuditEventType.AUTH_PROVIDER_FAILED,
event_description=f"OIDC orchestrator rejected user: {e.detail}",
ip_address=request.client.host if request.client else None,
timestamp=datetime.now(),
)
)
db.commit()
raise HTTPException(status_code=401, detail=e.safe_message)
# 6) Issue our own session cookies + redirect to app root
user = result.user
if not user.is_active:
raise HTTPException(status_code=403, detail="Account is deactivated")
token_payload = {"sub": user.username, "role": user.role.value}
access_jwt = create_access_token(token_payload)
refresh_jwt = create_refresh_token(token_payload)
redirect_to = os.getenv("DASHBOARD_URL", "/").rstrip("/") + "/"
response = RedirectResponse(redirect_to, status_code=302)
response.set_cookie(
"access_token", access_jwt, httponly=True, secure=COOKIE_SECURE,
samesite=COOKIE_SAMESITE, max_age=JWT_ACCESS_TOKEN_EXPIRE_MINUTES * 60, path="/",
)
response.set_cookie(
"refresh_token", refresh_jwt, httponly=True, secure=COOKIE_SECURE,
samesite=COOKIE_SAMESITE, max_age=JWT_REFRESH_TOKEN_EXPIRE_DAYS * 86400, path="/",
)
# Drop the OIDC state cookie now that we're done
response.delete_cookie(OIDC_STATE_COOKIE, path="/")
return response