feat(compliance): wazuh client — get_sca_policies + get_sca_checks

Two new methods on WazuhClient covering the Security Configuration
Assessment endpoints. Compliance service (next commit) uses these to
populate the new compliance_results / compliance_checks tables.

- get_sca_policies(agent_id) → list of per-policy summaries
  (pass/fail/invalid totals, score, end_scan). Maps 1:1 to a
  compliance_results upsert.
- get_sca_checks(agent_id, policy_id, limit=500) → per-check rows
  for the optional deep-dive view. limit=500 covers the largest CIS
  benchmarks (~250-400 checks per policy).

Both follow the same _request → data.affected_items unwrap pattern
as the existing vuln + os + package methods. No behavioural change
elsewhere in WazuhClient.
This commit is contained in:
2026-05-19 08:13:02 +02:00
parent 89e74daae1
commit 4c0508629a
+44
View File
@@ -551,6 +551,50 @@ class WazuhClient:
items = response.get("data", {}).get("affected_items", [])
return items[0] if items else {}
# ============================================
# SCA — Security Configuration Assessment
# ============================================
def get_sca_policies(self, agent_id: str) -> List[Dict[str, Any]]:
"""
List all SCA policies an agent has been evaluated against.
Wazuh API: ``GET /sca/{agent_id}``
Returns per-policy summary including pass/fail/invalid counts +
score. Example item:
{
"policy_id": "cis_win11",
"name": "CIS Microsoft Windows 11 Enterprise Benchmark v1.0.0",
"description": "...",
"pass": 142, "fail": 38, "invalid": 0, "total_checks": 180,
"score": 78, "end_scan": "2026-05-19T01:00:00Z", ...
}
"""
response = self._request("GET", f"/sca/{agent_id}")
return response.get("data", {}).get("affected_items", []) or []
def get_sca_checks(
self,
agent_id: str,
policy_id: str,
limit: int = 500,
) -> List[Dict[str, Any]]:
"""
Per-check detail for one (agent, policy).
Wazuh API: ``GET /sca/{agent_id}/checks/{policy_id}``
Used by the optional deep-dive endpoint — not called by the
default compliance refresh which only persists per-policy
summaries. Returns up to ``limit`` checks; the default 500 is
enough for the largest CIS benchmarks (~250-400 checks).
"""
response = self._request(
"GET",
f"/sca/{agent_id}/checks/{policy_id}",
params={"limit": limit},
)
return response.get("data", {}).get("affected_items", []) or []
# ============================================
# Helper-Funktionen
# ============================================