feat(urs): api endpoints + asset criticality/frameworks update

Three new endpoints on the compliance router:

  GET  /api/v1/compliance/urs/{asset_id}    one asset (writes snapshot)
  GET  /api/v1/compliance/urs               list all assets sorted by URS
  POST /api/v1/compliance/urs/recompute-all force full recompute

All accept ?avs_mode=hybrid|avg|max — operator can A/B the score
policy without code changes.

Asset PUT endpoint extended:
- criticality (low|normal|high|critical) feeds URS multiplier
- compliance_frameworks (List[str]) stored as JSON TEXT for the
  per-framework URS breakdown in future commits

Validation: criticality is normalised to lowercase + bounded to the
4 valid values. frameworks list serialised to JSON before persisting.

Single-asset GET also returns trend_7d (URS delta vs 7 days ago, from
asset_risk_snapshots) so the frontend trend arrow has data without an
extra round-trip.
This commit is contained in:
2026-05-19 14:44:33 +02:00
parent 3b482b3560
commit d9059dabc1
2 changed files with 100 additions and 0 deletions
+11
View File
@@ -89,6 +89,9 @@ class AssetUpdateRequest(BaseModel):
location: Optional[str] = None
owner: Optional[str] = None
policy_id: Optional[int] = None
# Plan E — URS inputs (criticality multiplier + framework list)
criticality: Optional[str] = None # low | normal | high | critical
compliance_frameworks: Optional[List[str]] = None
class BulkAssetUpdateRequest(BaseModel):
@@ -302,7 +305,15 @@ async def update_asset(
)
# Update nur gesetzte Felder
import json as _json
for field, value in update_data.model_dump(exclude_unset=True).items():
if field == "compliance_frameworks":
# Stored as TEXT JSON list
value = _json.dumps(value or []) if value is not None else None
if field == "criticality" and value is not None:
value = value.lower()
if value not in ("low", "normal", "high", "critical"):
raise HTTPException(400, "criticality must be one of: low, normal, high, critical")
setattr(asset, field, value)
db.commit()
+89
View File
@@ -35,6 +35,9 @@ from app.services.compliance_service import (
refresh_asset_compliance,
)
from app.services.compliance_impact_import import import_impact_csv_files
from app.services.urs_service import (
compute_urs, compute_urs_for_all, get_urs_trend, urs_severity,
)
router = APIRouter(prefix="/api/v1/compliance", tags=["Compliance"])
@@ -323,6 +326,92 @@ def get_impact_stats(
]
# ---------- URS endpoints ----------
class URSResponse(BaseModel):
asset_id: int
hostname: Optional[str] = None
avs: Optional[float] = None
ass: Optional[float] = None
urs: Optional[float] = None
severity: str = "NONE"
criticality: Optional[str] = None
criticality_factor: Optional[float] = None
policy_count: Optional[int] = None
trend_7d: Optional[float] = None
@router.get("/urs/{asset_id}", response_model=URSResponse)
def get_asset_urs(
asset_id: int,
avs_mode: str = Query("hybrid", regex="^(hybrid|avg|max)$"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Compute and persist today's URS snapshot for one asset.
avs_mode picks how AVS is derived from CPR scores:
hybrid (default) — 0.7 × avg + 0.3 × max
avg — mean of CPR
max — single highest CPR
"""
result = compute_urs(db, asset_id, avs_mode=avs_mode, persist_snapshot=True)
asset = db.query(Asset).filter(Asset.id == asset_id).first()
trend = get_urs_trend(db, asset_id, days=7)
return URSResponse(
asset_id=asset_id,
hostname=asset.hostname if asset else None,
avs=result["avs"],
ass=result["ass"],
urs=result["urs"],
severity=result["severity"],
criticality=result.get("criticality"),
criticality_factor=result.get("criticality_factor"),
policy_count=result.get("policy_count"),
trend_7d=trend,
)
@router.get("/urs", response_model=List[URSResponse])
def list_urs(
limit: int = Query(200, le=1000),
avs_mode: str = Query("hybrid", regex="^(hybrid|avg|max)$"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""URS for every asset, sorted highest-risk first."""
assets = db.query(Asset).all()
rows: List[URSResponse] = []
for a in assets:
r = compute_urs(db, a.id, avs_mode=avs_mode, persist_snapshot=False)
rows.append(URSResponse(
asset_id=a.id,
hostname=a.hostname,
avs=r["avs"],
ass=r["ass"],
urs=r["urs"],
severity=r["severity"],
criticality=r.get("criticality"),
criticality_factor=r.get("criticality_factor"),
policy_count=r.get("policy_count"),
))
rows.sort(key=lambda x: (x.urs is None, -(x.urs or 0)))
return rows[:limit]
@router.post("/urs/recompute-all")
def recompute_all_urs(
avs_mode: str = Query("hybrid", regex="^(hybrid|avg|max)$"),
db: Session = Depends(get_db),
current_user: User = Depends(RequireEditor),
):
"""Force a URS recompute across every asset and snapshot today."""
stats = compute_urs_for_all(db, avs_mode=avs_mode)
db.commit()
return stats
@router.post("/refresh")
def refresh_everything(
db: Session = Depends(get_db),