security: harden auth, secrets, headers and email rendering

Closes 10 findings from the automated security scan (1 critical, 4 high,
5 medium). Operator action required before redeploy — see deploy notes
in chat or README.DEV.md.

Critical:
- TOTP/LDAP Fernet key (AUTH_PROVIDER_CRYPTO_KEY) is now env-only.
  Removed the DB fallback that co-located the key with the ciphertext
  it protects.

High:
- Rate limiter no longer trusts X-Forwarded-For from arbitrary peers.
  TRUSTED_PROXY_CIDRS gates which direct peers may rewrite the client
  IP, and ProxyHeadersMiddleware trusted_hosts is narrowed from "*"
  to FORWARDED_ALLOW_IPS.
- TOTP codes are single-use within their 90s validation window.
  In-memory replay cache keyed on (user_id, code).
- JWTs carry a jti claim; logout revokes both access and refresh JTIs,
  refresh rotates (revokes the presented token), and get_current_user
  rejects any revoked JTI. In-memory store with TTL = token exp.
- Sensitive setting values (wazuh_config, smtp_config, nessus_config)
  are encrypted at rest with an enc:v1: prefix. All read sites go
  through read_setting_value(); legacy plaintext rows still readable
  until next write. GET responses redact secret subfields so admins
  cannot accidentally exfiltrate stored credentials.

Medium:
- Email template rendering HTML-escapes all dynamic values. The "rows"
  variable is whitelisted as pre-escaped HTML. Severity CSS class is
  whitelisted to prevent attribute breakout via crafted package data.
- Request logging redacts sensitive query parameters (token, password,
  code, mfa_token, ...). Validation-error handler no longer logs or
  returns the offending request body.
- /health returns only {"status":"healthy"} — environment and version
  no longer leak to unauthenticated callers.
- SETUP_ADMIN_TOKEN comparison uses hmac.compare_digest.
- Settings PUT denylists auth_provider_crypto_key (env-only) and
  refuses to store the "***set***" redaction placeholder back into
  protected configs.
This commit is contained in:
2026-05-16 09:25:22 +02:00
parent b3d09b3b96
commit 03eef00f31
15 changed files with 485 additions and 124 deletions
+5
View File
@@ -11,6 +11,7 @@ from sqlalchemy.orm import Session
from app.database import get_db
from app.models.user import User, UserRole
from app.auth.jwt_handler import decode_token
from app.auth.token_revocation import is_revoked
# Security Scheme für Swagger UI
security = HTTPBearer(auto_error=False)
@@ -54,6 +55,10 @@ async def get_current_user(
if payload is None:
raise credentials_exception
jti = payload.get("jti")
if jti and is_revoked(jti):
raise credentials_exception
username: str = payload.get("sub")
if username is None:
raise credentials_exception
+5 -2
View File
@@ -4,6 +4,7 @@ JWT Token Handler
Sichere JWT-Token-Generierung und -Validierung nach OWASP-Best-Practices.
"""
import os
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
import jwt
@@ -82,7 +83,8 @@ def create_access_token(
to_encode.update({
"exp": expire,
"iat": now
"iat": now,
"jti": uuid.uuid4().hex,
})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
@@ -106,7 +108,8 @@ def create_refresh_token(data: Dict[str, Any]) -> str:
to_encode.update({
"exp": expire,
"iat": now,
"type": "refresh" # Token-Type für Unterscheidung
"type": "refresh", # Token-Type für Unterscheidung
"jti": uuid.uuid4().hex,
})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
+82
View File
@@ -0,0 +1,82 @@
"""
Transparent at-rest encryption for sensitive setting values.
Setting rows whose `key` is in PROTECTED_SETTING_KEYS get their `value`
column encrypted with the AUTH_PROVIDER_CRYPTO_KEY Fernet key (same key
that protects TOTP secrets). Encrypted values are prefixed with
`ENC_PREFIX` so reads can detect ciphertext vs. legacy plaintext and
auto-migrate on the next write.
"""
from __future__ import annotations
import logging
from typing import Optional
from sqlalchemy.orm import Session
from app.auth.totp import _fernet # reuses AUTH_PROVIDER_CRYPTO_KEY
from app.models.setting import Setting
logger = logging.getLogger(__name__)
ENC_PREFIX = "enc:v1:"
# Settings whose value field contains secrets (API tokens, passwords).
# Values written under these keys are encrypted at rest; reads transparently
# decrypt. Pre-existing plaintext rows still readable until next write.
PROTECTED_SETTING_KEYS: frozenset[str] = frozenset({
"wazuh_config",
"smtp_config",
"nessus_config",
})
def is_protected(key: str) -> bool:
return key in PROTECTED_SETTING_KEYS
def encrypt_value(plaintext: str) -> str:
"""Return ENC_PREFIX + base64-ciphertext."""
token = _fernet().encrypt(plaintext.encode()).decode()
return f"{ENC_PREFIX}{token}"
def decrypt_value(stored: str) -> str:
"""Decrypt an ENC_PREFIX-tagged value. Passes plaintext through unchanged."""
if not stored or not stored.startswith(ENC_PREFIX):
return stored
ciphertext = stored[len(ENC_PREFIX):]
return _fernet().decrypt(ciphertext.encode()).decode()
def read_setting_value(db: Session, key: str) -> Optional[str]:
"""Return decrypted setting value (or None if not set)."""
row = db.query(Setting).filter(Setting.key == key).first()
if not row or row.value is None:
return None
if is_protected(key):
try:
return decrypt_value(row.value)
except Exception as e:
logger.error("Failed to decrypt setting %s: %s", key, e)
return None
return row.value
def write_setting_value(
db: Session,
key: str,
value: str,
description: str = "",
) -> Setting:
"""Upsert a setting, encrypting if the key is protected."""
stored = encrypt_value(value) if is_protected(key) else value
row = db.query(Setting).filter(Setting.key == key).first()
if row:
row.value = stored
else:
row = Setting(key=key, value=stored, description=description)
db.add(row)
db.commit()
db.refresh(row)
return row
+40
View File
@@ -0,0 +1,40 @@
"""
JWT revocation store.
In-memory blacklist of revoked token JTI claims with auto-expiry tied to
each token's own `exp` claim. Single-process by design — for multi-worker
deployments swap the backing dict for Redis (the REDIS_URL env var is
already plumbed into the rest of the stack).
"""
from __future__ import annotations
import threading
import time
from typing import Optional
# jti -> expiry epoch (seconds since epoch, matches JWT `exp`)
_revoked: dict[str, float] = {}
_lock = threading.Lock()
def revoke(jti: str, exp_epoch: Optional[float]) -> None:
"""Mark a token JTI as revoked until its natural expiry."""
if not jti:
return
expiry = exp_epoch if exp_epoch is not None else time.time() + 7 * 24 * 3600
with _lock:
_revoked[jti] = expiry
def is_revoked(jti: str) -> bool:
"""Return True if the JTI was revoked and the revocation has not expired."""
if not jti:
return False
now = time.time()
with _lock:
# Opportunistic purge of expired entries.
expired = [k for k, exp in _revoked.items() if exp <= now]
for k in expired:
_revoked.pop(k, None)
return jti in _revoked
+61 -40
View File
@@ -18,6 +18,8 @@ from __future__ import annotations
import logging
import os
import threading
import time
from typing import Optional
import pyotp
@@ -35,56 +37,61 @@ SETTINGS_KEY_CRYPTO = "auth_provider_crypto_key"
# Cache the resolved key for the process lifetime. Re-resolved on each restart.
_cached_key: Optional[bytes] = None
# Replay-protection cache: (user_id, code) -> expiry epoch. Single-worker
# in-memory store; for multi-worker deployments, swap this for Redis with
# the same TTL semantics. Window of (2*VALID_WINDOW+1)*INTERVAL = 90s.
_REPLAY_TTL_SEC = (2 * TOTP_VALID_WINDOW + 1) * TOTP_INTERVAL
_replay_cache: dict[tuple, float] = {}
_replay_lock = threading.Lock()
def _replay_seen(user_id, code: str) -> bool:
"""Return True if (user_id, code) was already accepted recently."""
now = time.monotonic()
with _replay_lock:
# Lazy purge of expired entries (bounded — TOTP cardinality is small).
expired = [k for k, exp in _replay_cache.items() if exp <= now]
for k in expired:
_replay_cache.pop(k, None)
return (user_id, code) in _replay_cache
def _replay_record(user_id, code: str) -> None:
with _replay_lock:
_replay_cache[(user_id, code)] = time.monotonic() + _REPLAY_TTL_SEC
def _resolve_key() -> bytes:
"""
Resolve the Fernet key. Order:
1. AUTH_PROVIDER_CRYPTO_KEY env var
2. settings.auth_provider_crypto_key (auto-bootstrapped on first call)
Resolve the Fernet key from AUTH_PROVIDER_CRYPTO_KEY env var.
Refuses to fall back to DB storage — co-locating the key with the
encrypted ciphertext defeats the purpose of encrypting at rest.
"""
global _cached_key
if _cached_key is not None:
return _cached_key
env_key = os.getenv("AUTH_PROVIDER_CRYPTO_KEY")
if env_key:
_cached_key = env_key.encode() if isinstance(env_key, str) else env_key
return _cached_key
if not env_key:
raise RuntimeError(
"AUTH_PROVIDER_CRYPTO_KEY environment variable is not set. "
"Generate one with: python -c 'from cryptography.fernet import "
"Fernet; print(Fernet.generate_key().decode())' and set it in "
"the environment. Required for TOTP secret encryption and "
"LDAP bind password encryption."
)
# Fall back to DB. Lazy import to keep this module dependency-free at import time.
from app.database import SessionLocal
from app.models.setting import Setting
db = SessionLocal()
key_bytes = env_key.encode() if isinstance(env_key, str) else env_key
try:
row = db.query(Setting).filter(Setting.key == SETTINGS_KEY_CRYPTO).first()
if row and row.value:
_cached_key = row.value.encode()
return _cached_key
Fernet(key_bytes)
except Exception as e:
raise RuntimeError(
f"AUTH_PROVIDER_CRYPTO_KEY is not a valid Fernet key: {e}"
)
# Bootstrap: generate, persist, warn loudly so admins know to move it.
new_key = Fernet.generate_key()
row = Setting(
key=SETTINGS_KEY_CRYPTO,
value=new_key.decode(),
description=(
"Auto-generated Fernet key for encrypting TOTP secrets and LDAP "
"bind passwords. Move to AUTH_PROVIDER_CRYPTO_KEY env var for "
"stronger separation. Deleting this row invalidates all stored "
"TOTP enrolments and the encrypted LDAP bind password."
),
)
db.add(row)
db.commit()
logger.warning(
"AUTH_PROVIDER_CRYPTO_KEY not set in env — auto-generated and stored "
"in settings table. For production, copy the key into the env var so "
"it lives outside the encrypted data store."
)
_cached_key = new_key
return _cached_key
finally:
db.close()
_cached_key = key_bytes
return _cached_key
def _fernet() -> Fernet:
@@ -129,11 +136,25 @@ def verify(plain_secret: str, code: str) -> bool:
def verify_for_user(user, code: str) -> bool:
"""Verify code for a User instance with encrypted totp_secret in DB."""
"""Verify code for a User instance with encrypted totp_secret in DB.
Rejects replayed codes within the 90s validation window so a sniffed
OTP cannot be reused.
"""
if not user.totp_enabled or not user.totp_secret:
return False
try:
secret = decrypt_secret(user.totp_secret)
except InvalidToken:
return False
return verify(secret, code)
cleaned = (code or "").strip().replace(" ", "")
if _replay_seen(user.id, cleaned):
logger.warning("TOTP replay rejected for user_id=%s", user.id)
return False
if not verify(secret, cleaned):
return False
_replay_record(user.id, cleaned)
return True
+45 -12
View File
@@ -129,10 +129,16 @@ app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
import logging
logger = logging.getLogger("app.main")
logger.error(f"Validation Error at {request.url.path}: {exc.errors()}")
# Log only the error structure, not the offending body — Pydantic
# echoes back submitted values which may include passwords / tokens.
logger.error(
"Validation Error at %s: %d field error(s)",
request.url.path,
len(exc.errors()),
)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": exc.errors(), "body": str(exc)},
content={"detail": exc.errors()},
)
@@ -144,11 +150,41 @@ allowed_origins = os.getenv("CORS_ORIGINS", "http://localhost:3000,http://localh
# ============================================
if TRUST_PROXY_HEADERS:
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
# Narrow the set of peers allowed to set forwarded-for headers. Wildcard
# ("*") lets any upstream rewrite request.client.host, which defeats
# rate-limit IP keying. Operators must list reverse-proxy IPs via
# FORWARDED_ALLOW_IPS (uvicorn convention).
_trusted_hosts = os.getenv("FORWARDED_ALLOW_IPS", "127.0.0.1").split(",")
_trusted_hosts = [h.strip() for h in _trusted_hosts if h.strip()]
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=_trusted_hosts)
# Query parameters whose value must be redacted before being logged.
# Add new sensitive param names here as they emerge.
_SENSITIVE_QUERY_PARAMS = {
"token", "access_token", "refresh_token", "password", "api_key",
"key", "secret", "code", "mfa_token",
}
def _safe_query_string(query: str) -> str:
"""Redact values for sensitive query params; return the rest verbatim."""
if not query:
return ""
parts = []
for pair in query.split("&"):
if "=" in pair:
name, _, _ = pair.partition("=")
if name.lower() in _SENSITIVE_QUERY_PARAMS:
parts.append(f"{name}=***")
continue
parts.append(pair)
return "?" + "&".join(parts)
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"➡️ Incoming Request: {request.method} {request.url}")
safe_target = f"{request.url.path}{_safe_query_string(request.url.query)}"
logger.info(f"➡️ Incoming Request: {request.method} {safe_target}")
try:
response = await call_next(request)
logger.info(f"⬅️ Response: {response.status_code}")
@@ -249,16 +285,13 @@ async def global_exception_handler(request: Request, exc: Exception):
@app.get("/health", tags=["Health"])
async def health_check():
"""
Health-Check-Endpoint für Load-Balancer & Monitoring
Health-Check-Endpoint für Load-Balancer & Monitoring.
Returns:
Status und Version
Returns only liveness — version, environment, and other build/runtime
info are not exposed here to avoid handing reconnaissance data to
unauthenticated callers.
"""
return {
"status": "healthy",
"version": "1.0.0",
"environment": ENV
}
return {"status": "healthy"}
@app.get("/", tags=["Root"])
+41 -3
View File
@@ -1,22 +1,60 @@
import ipaddress
import logging
import os
from fastapi import Request
from slowapi import Limiter
logger = logging.getLogger(__name__)
TRUST_PROXY_HEADERS = os.getenv("TRUST_PROXY_HEADERS", "false").lower() == "true"
# Comma-separated list of CIDR ranges. Only requests whose direct peer is
# inside one of these ranges are allowed to override the client IP via
# X-Forwarded-For / X-Real-IP. Defaults to RFC1918 + loopback for safety —
# operators behind public-internet load balancers must set this explicitly.
_DEFAULT_TRUSTED_PROXIES = "127.0.0.1/32,::1/128,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16"
_trusted_proxy_cidrs: list = []
for raw in os.getenv("TRUSTED_PROXY_CIDRS", _DEFAULT_TRUSTED_PROXIES).split(","):
raw = raw.strip()
if not raw:
continue
try:
_trusted_proxy_cidrs.append(ipaddress.ip_network(raw, strict=False))
except ValueError:
logger.warning("Ignoring invalid TRUSTED_PROXY_CIDRS entry: %s", raw)
def _peer_is_trusted_proxy(peer_ip: str) -> bool:
try:
addr = ipaddress.ip_address(peer_ip)
except ValueError:
return False
return any(addr in net for net in _trusted_proxy_cidrs)
def get_client_ip(request: Request) -> str:
"""
Resolve client IP, optionally trusting reverse-proxy headers.
Resolve client IP. Honour reverse-proxy headers only when the direct
peer is itself a trusted proxy — otherwise the client could spoof
X-Forwarded-For to bypass rate limiting on /auth/login etc.
"""
if TRUST_PROXY_HEADERS:
peer = request.client.host if request.client else None
if TRUST_PROXY_HEADERS and peer and _peer_is_trusted_proxy(peer):
xff = request.headers.get("x-forwarded-for")
if xff:
# Rightmost untrusted hop = leftmost entry the trusted proxy
# received. Take the first non-trusted IP walking from the right.
for candidate in reversed([h.strip() for h in xff.split(",") if h.strip()]):
if not _peer_is_trusted_proxy(candidate):
return candidate
# All hops were trusted proxies — fall back to leftmost.
return xff.split(",")[0].strip()
xri = request.headers.get("x-real-ip")
if xri:
return xri.strip()
return request.client.host if request.client else "unknown"
return peer or "unknown"
limiter = Limiter(key_func=get_client_ip)
+12 -10
View File
@@ -449,13 +449,14 @@ async def sync_wazuh_assets(
Synchronizes assets from Wazuh Manager.
fetch active agents -> update/create local assets.
"""
# 1. Get Wazuh Config
config_setting = db.query(Setting).filter(Setting.key == "wazuh_config").first()
if not config_setting or not config_setting.value:
# 1. Get Wazuh Config (transparently decrypted)
from app.auth.setting_crypto import read_setting_value
raw_wazuh = read_setting_value(db, "wazuh_config")
if not raw_wazuh:
raise HTTPException(status_code=400, detail="Wazuh configuration not found.")
try:
config = json.loads(config_setting.value)
config = json.loads(raw_wazuh)
api_url = config.get("api_url")
username = config.get("username")
password = config.get("password")
@@ -569,12 +570,13 @@ async def rescan_asset(
if asset.source != AssetSource.WAZUH or not asset.wazuh_agent_id:
raise HTTPException(status_code=400, detail="Rescan only possible for Wazuh-managed assets")
# 1. Wazuh Config holen
config_setting = db.query(Setting).filter(Setting.key == "wazuh_config").first()
if not config_setting or not config_setting.value:
# 1. Wazuh Config holen (transparently decrypted)
from app.auth.setting_crypto import read_setting_value
raw_wazuh = read_setting_value(db, "wazuh_config")
if not raw_wazuh:
raise HTTPException(status_code=400, detail="Wazuh configuration missing")
config = json.loads(config_setting.value)
config = json.loads(raw_wazuh)
wazuh = WazuhClient(
base_url=config.get("api_url"),
username=config.get("username"),
+39 -4
View File
@@ -3,6 +3,7 @@
#
# Endpoints for Login, Logout, Token-Refresh and User-Management.
"""
import hmac
import os
from datetime import datetime, timedelta
from typing import Optional
@@ -29,6 +30,7 @@ from app.auth.dependencies import get_current_user, RequireAdmin
from app.auth.orchestrator import AuthOrchestrator
from app.auth.strategies.base import AuthError
from app.auth import totp as totp_helper
from app.auth.token_revocation import revoke as revoke_token
from app.models.user import AuthProvider
from app.rate_limiter import limiter
@@ -488,11 +490,32 @@ async def logout(
db: Session = Depends(get_db)
):
"""
User logout
User logout.
Note: JWT tokens are stateless and cannot be invalidated server-side.
In Production: Implement token blacklist in Redis.
Revokes the access token (cookie or Authorization header) and any
accompanying refresh token by JTI. Subsequent calls bearing those
tokens are rejected by the get_current_user / refresh dependencies.
"""
# Revoke access token JTI
bearer = request.headers.get("authorization")
access_token = None
if bearer and bearer.lower().startswith("bearer "):
access_token = bearer.split(" ", 1)[1].strip()
if not access_token:
access_token = request.cookies.get("access_token")
if access_token:
payload = decode_token(access_token)
if payload and payload.get("jti"):
revoke_token(payload["jti"], payload.get("exp"))
# Revoke refresh token JTI from cookie (the body-supplied refresh path
# passes the token through /auth/refresh, not /auth/logout).
refresh_cookie = request.cookies.get("refresh_token")
if refresh_cookie:
payload = decode_token(refresh_cookie)
if payload and payload.get("jti"):
revoke_token(payload["jti"], payload.get("exp"))
log_audit_event(
db,
AuditEventType.LOGOUT,
@@ -527,6 +550,18 @@ async def refresh_token(
detail="Invalid refresh token"
)
from app.auth.token_revocation import is_revoked as _is_revoked
jti = payload.get("jti")
if jti and _is_revoked(jti):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# Rotate: revoke the presented refresh token so it cannot be reused.
if jti:
revoke_token(jti, payload.get("exp"))
username = payload.get("sub")
user = db.query(User).filter(User.username == username).first()
@@ -571,7 +606,7 @@ async def setup_admin(
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not found")
token = request.headers.get("X-Setup-Token")
if not token or token != SETUP_ADMIN_TOKEN:
if not token or not hmac.compare_digest(token, SETUP_ADMIN_TOKEN):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid setup token")
any_admin_exists = db.query(User).filter(User.role == UserRole.ADMIN).count() > 0
+6 -5
View File
@@ -212,13 +212,14 @@ async def trigger_autoscan(
"""
Trigger automated scans for ALL assets connected to Wazuh.
"""
# 1. Get Wazuh Configuration from DB
config_setting = db.query(Setting).filter(Setting.key == "wazuh_config").first()
if not config_setting or not config_setting.value:
# 1. Get Wazuh Configuration from DB (transparently decrypted)
from app.auth.setting_crypto import read_setting_value
raw_wazuh = read_setting_value(db, "wazuh_config")
if not raw_wazuh:
raise HTTPException(status_code=400, detail="Wazuh configuration not found. Please configure settings first.")
try:
config = json.loads(config_setting.value)
config = json.loads(raw_wazuh)
api_url = config.get("api_url")
username = config.get("username")
password = config.get("password")
+90 -12
View File
@@ -1,6 +1,7 @@
"""
Settings Router
"""
import json
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
@@ -10,9 +11,63 @@ from app.database import get_db
from app.models.setting import Setting
from app.models.user import User
from app.auth.dependencies import RequireAdmin
from app.auth.setting_crypto import (
PROTECTED_SETTING_KEYS,
decrypt_value,
encrypt_value,
is_protected,
)
router = APIRouter(prefix="/api/v1/settings", tags=["Settings"])
# Keys that may never be written through this API — owned by env vars or
# managed by the auth subsystem. Overwriting them would lock users out of
# MFA or compromise key isolation.
_DENYLISTED_KEYS = {"auth_provider_crypto_key"}
# Field names treated as secret-bearing inside JSON-encoded protected
# configs. Values for these subfields are redacted in GET responses; the
# admin must re-supply them on update to overwrite.
_SECRET_SUBFIELDS = {
"password", "secret", "secret_key", "access_key",
"api_key", "indexer_password", "bind_password",
"token", "smtp_password",
}
def _redact_protected_value(key: str, stored: Optional[str]) -> Optional[str]:
"""Decrypt + redact secret subfields for safe GET responses."""
if stored is None:
return None
try:
plaintext = decrypt_value(stored)
except Exception:
return "***encrypted***"
try:
data = json.loads(plaintext)
except (json.JSONDecodeError, TypeError):
return "***redacted***"
if isinstance(data, dict):
for field in list(data.keys()):
if field.lower() in _SECRET_SUBFIELDS and data[field]:
data[field] = "***set***"
return json.dumps(data)
return "***redacted***"
def _serialize(setting: Setting) -> dict:
value = setting.value
if is_protected(setting.key):
value = _redact_protected_value(setting.key, value)
return {
"key": setting.key,
"value": value,
"description": setting.description,
}
# --- Schemas ---
class SettingResponse(BaseModel):
key: str
@@ -32,9 +87,8 @@ async def list_settings(
db: Session = Depends(get_db),
current_user: User = Depends(RequireAdmin)
):
"""List all system settings"""
settings = db.query(Setting).all()
return settings
"""List all system settings (secret subfields redacted)."""
return [_serialize(s) for s in db.query(Setting).all()]
@router.get("/{key}", response_model=SettingResponse)
async def get_setting(
@@ -42,11 +96,11 @@ async def get_setting(
db: Session = Depends(get_db),
current_user: User = Depends(RequireAdmin)
):
"""Get a specific setting"""
"""Get a specific setting (secret subfields redacted for protected keys)."""
setting = db.query(Setting).filter(Setting.key == key).first()
if not setting:
raise HTTPException(status_code=404, detail="Setting not found")
return setting
return _serialize(setting)
@router.put("/{key}", response_model=SettingResponse)
async def update_setting(
@@ -55,16 +109,40 @@ async def update_setting(
db: Session = Depends(get_db),
current_user: User = Depends(RequireAdmin)
):
"""Update or Create a setting"""
"""Update or Create a setting. Protected keys are encrypted at rest."""
if key in _DENYLISTED_KEYS:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This setting is managed outside the API and cannot be modified here",
)
value_to_store = update_data.value
if is_protected(key):
# If admin re-submitted a redacted value, refuse — they probably
# didn't intend to overwrite secrets with the literal "***set***".
try:
parsed = json.loads(update_data.value)
if isinstance(parsed, dict):
for f in parsed:
if f.lower() in _SECRET_SUBFIELDS and parsed[f] == "***set***":
raise HTTPException(
status_code=400,
detail=(
f"Refusing to store redaction placeholder in '{f}'. "
"Re-submit the actual secret value."
),
)
except json.JSONDecodeError:
pass
value_to_store = encrypt_value(update_data.value)
setting = db.query(Setting).filter(Setting.key == key).first()
if not setting:
# Create new if doesn't exist
setting = Setting(key=key, value=update_data.value)
setting = Setting(key=key, value=value_to_store)
db.add(setting)
else:
setting.value = update_data.value
setting.value = value_to_store
db.commit()
db.refresh(setting)
return setting
return _serialize(setting)
+16 -14
View File
@@ -793,13 +793,13 @@ def verify_patch_with_rescan(
with get_db_context() as db:
try:
# Wazuh Config laden
config_setting = db.query(Setting).filter(Setting.key == "wazuh_config").first()
if not config_setting or not config_setting.value:
from app.auth.setting_crypto import read_setting_value
raw_wazuh = read_setting_value(db, "wazuh_config")
if not raw_wazuh:
logger.error("Patch verification aborted: Wazuh configuration missing")
return
config = json.loads(config_setting.value)
config = json.loads(raw_wazuh)
wazuh = WazuhClient(
base_url=config.get("api_url"),
username=config.get("username"),
@@ -1187,13 +1187,14 @@ async def sync_vulnerabilities_from_wazuh(
def run_wazuh_vulnerability_sync(db: Session) -> dict:
"""Synchronous Wazuh vulnerability sync with statistics return"""
# 1. Get Wazuh Config from DB
config_setting = db.query(Setting).filter(Setting.key == "wazuh_config").first()
if not config_setting or not config_setting.value:
# 1. Get Wazuh Config from DB (transparently decrypted)
from app.auth.setting_crypto import read_setting_value
raw_wazuh = read_setting_value(db, "wazuh_config")
if not raw_wazuh:
raise HTTPException(status_code=400, detail="Wazuh configuration not found. Please configure Wazuh in Settings.")
try:
config = json.loads(config_setting.value)
config = json.loads(raw_wazuh)
api_url = config.get("api_url")
username = config.get("username")
password = config.get("password")
@@ -1732,16 +1733,17 @@ async def override_from_nessus(
- **scan_id**: Nessus Scan-ID (optional, wird aus Config ermittelt)
- **dry_run**: Wenn True, nur simulieren ohne Änderungen
"""
# Load Nessus config
config_setting = db.query(Setting).filter(Setting.key == SETTING_KEY).first()
if not config_setting or not config_setting.value:
# Load Nessus config (transparently decrypted)
from app.auth.setting_crypto import read_setting_value
raw_nessus = read_setting_value(db, SETTING_KEY)
if not raw_nessus:
raise HTTPException(
status_code=400,
detail="Nessus configuration not set. Please configure Nessus in Settings."
)
try:
config = json.loads(config_setting.value)
config = json.loads(raw_nessus)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid Nessus configuration format.")
+5 -4
View File
@@ -70,13 +70,14 @@ async def execute_scheduled_scan(schedule_id: int):
return
# ---- Wazuh branch (default, existing behaviour) ----
# Wazuh Config laden
config_setting = db.query(Setting).filter(Setting.key == "wazuh_config").first()
if not config_setting or not config_setting.value:
# Wazuh Config laden (transparently decrypted)
from app.auth.setting_crypto import read_setting_value
raw_wazuh = read_setting_value(db, "wazuh_config")
if not raw_wazuh:
logger.error("Scheduled scan aborted: Wazuh configuration missing")
return
config = json.loads(config_setting.value)
config = json.loads(raw_wazuh)
assets = db.query(Asset).filter(Asset.wazuh_agent_id.isnot(None)).all()
if not assets:
logger.info("No Wazuh assets found for scheduled scan")
+34 -15
View File
@@ -1,6 +1,7 @@
"""
Email Service für SMTP-Versand und Template-Rendering
"""
import html
import json
import logging
import os
@@ -223,17 +224,22 @@ td{padding:7px 6px;border-bottom:1px solid #f3f4f6}
def render_digest_rows(items: list) -> str:
"""Render the <tr> rows for the digest table. Items: list of dicts with
cve_id, severity, cvss_score, asset_hostname, package_name."""
cve_id, severity, cvss_score, asset_hostname, package_name. All dynamic
values HTML-escaped to prevent injection from compromised scanner data."""
rows = []
for it in items:
sev = (it.get("severity") or "none").lower()
raw_sev = (it.get("severity") or "none").lower()
# whitelist severity for CSS class — anything else falls back to 'none'
sev = raw_sev if raw_sev in {"critical", "high", "medium", "low", "none"} else "none"
cvss = it.get("cvss_score")
cvss_str = html.escape(str(cvss)) if cvss is not None else "-"
rows.append(
f"<tr>"
f"<td><strong>{it.get('cve_id', '')}</strong></td>"
f"<td><strong>{html.escape(str(it.get('cve_id', '')))}</strong></td>"
f"<td><span class='sev sev-{sev}'>{sev.upper()}</span></td>"
f"<td>{it.get('cvss_score') if it.get('cvss_score') is not None else '-'}</td>"
f"<td>{it.get('asset_hostname', '')}</td>"
f"<td>{(it.get('package_name') or '')[:60]}</td>"
f"<td>{cvss_str}</td>"
f"<td>{html.escape(str(it.get('asset_hostname', '')))}</td>"
f"<td>{html.escape(str(it.get('package_name') or '')[:60])}</td>"
f"</tr>"
)
return "".join(rows)
@@ -289,10 +295,12 @@ td{padding:7px 6px;border-bottom:1px solid #f3f4f6;vertical-align:top}
def render_sla_digest_rows(items: list) -> str:
"""Render <tr> rows for the SLA digest table. Items: list of dicts with
cve_id, severity, asset_hostname, detected_at, hours_overdue."""
cve_id, severity, asset_hostname, detected_at, hours_overdue. All
dynamic values HTML-escaped to prevent injection."""
rows = []
for it in items:
sev = (it.get("severity") or "none").lower()
raw_sev = (it.get("severity") or "none").lower()
sev = raw_sev if raw_sev in {"critical", "high", "medium", "low", "none"} else "none"
hours = int(it.get("hours_overdue") or 0)
if hours >= 48:
overdue_str = f"{hours // 24}d {hours % 24}h"
@@ -300,10 +308,10 @@ def render_sla_digest_rows(items: list) -> str:
overdue_str = f"{hours}h"
rows.append(
f"<tr>"
f"<td><strong>{it.get('cve_id', '')}</strong></td>"
f"<td><strong>{html.escape(str(it.get('cve_id', '')))}</strong></td>"
f"<td><span class='sev sev-{sev}'>{sev.upper()}</span></td>"
f"<td>{it.get('asset_hostname', '')}</td>"
f"<td>{it.get('detected_at', '')}</td>"
f"<td>{html.escape(str(it.get('asset_hostname', '')))}</td>"
f"<td>{html.escape(str(it.get('detected_at', '')))}</td>"
f"<td class='ovd'>{overdue_str}</td>"
f"</tr>"
)
@@ -389,11 +397,12 @@ def send_new_vulnerability_digest(
def get_smtp_config(db: Session) -> Optional[dict]:
setting = db.query(Setting).filter(Setting.key == "smtp_config").first()
if not setting or not setting.value:
from app.auth.setting_crypto import read_setting_value
raw = read_setting_value(db, "smtp_config")
if not raw:
return None
try:
return json.loads(setting.value)
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
return None
@@ -423,10 +432,20 @@ def get_email_template(db: Session, template_key: str = "email_template_sla_brea
return default_subject, default_body
# Keys whose values are already trusted HTML (pre-rendered with escaping).
# Everything else is HTML-escaped at substitution time.
_SAFE_HTML_KEYS = {"rows"}
def render_template(template: str, variables: dict) -> str:
def replace_var(match):
key = match.group(1)
return str(variables.get(key, f"{{{{{key}}}}}"))
if key not in variables:
return f"{{{{{key}}}}}"
value = str(variables[key])
if key in _SAFE_HTML_KEYS:
return value
return html.escape(value)
return re.sub(r'\{\{(\w+)\}\}', replace_var, template)
+4 -3
View File
@@ -47,11 +47,12 @@ SOURCE_NAME = "nessus"
# Config loader
# ---------------------------------------------------------------
def load_nessus_config(db: Session) -> Optional[dict]:
row = db.query(Setting).filter(Setting.key == SETTING_KEY).first()
if not row or not row.value:
from app.auth.setting_crypto import read_setting_value
raw = read_setting_value(db, SETTING_KEY)
if not raw:
return None
try:
return json.loads(row.value)
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
logger.warning("nessus_config is not valid JSON")
return None