Files
vulncheck/app/services/vuln_override_service.py
T
vulncheck a99e131326 feat(override): nvd cvss fallback when vulnrichment misses a cve
Tester reported CVE-2026-40416 (Microsoft Edge spoofing) kept its
Nessus plugin-bundle CVSS 8.3 after 'Correct CVSS' even though NVD
publishes it as 4.3 MEDIUM. Root cause: CISA Vulnrichment lags by
weeks/months for new CVEs — the CVE wasn't in the snapshot, so the
override service had no verified value to compare against and left
the Nessus score alone.

Pipeline now:
  1. Vulnrichment ZIP snapshot (>25 CVEs) or per-CVE raw (≤25)
  2. For CVE-IDs missing from the Vulnrichment response, fall back
     to https://services.nvd.nist.gov/rest/json/cves/2.0
  3. Parse cvssMetricV31 → V30 → first found base_score + severity
  4. Returned in the same VerifiedCVEData shape so apply_overrides
     can't tell which source filled it

Throttle: 1 req / sec via time.sleep(1) every 5 calls — well under
the unauth NVD limit (~5 req / 30 s). Single-correction runs (one
CVE missing) finish instantly; a batch missing 50 takes ~10 s.

For higher throughput we could later add nvd_config setting with
an API key (50 req / 30 s). Out of scope for now.

After deploy, run 'Correct CVSS' again — CVE-2026-40416 should drop
from 8.3 to 4.3 across all affected Nessus rows.
2026-05-20 09:05:26 +02:00

910 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Vulnerability Score Override Service
Korrigiert fehlerhafte CVSS-Scores und Severity-Werte aus Wazuh mit den
verifizierten Daten aus Nessus/Vulnrichment JSON.
Problem: Wazuh liefert manchmal falsche CVSS-Werte (z.B. 10.0 für CVE-2026-8390)
während die korrekten Werte (z.B. 7.3 HIGH) aus Nessus/Vulnrichment stammen.
Funktionsweise:
1. Lädt verifizierte CVE-Daten aus Nessus Plugin-Output
2. Vergleicht mit bestehenden Datenbankeinträgen
3. Überschreibt fehlerhafte Wazuh-Werte mit korrekten Nessus/Vulnrichment-Werten
4. Aktualisiert auch Severity und Exploitation-Status (SSVC)
"""
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple
from dataclasses import dataclass
from sqlalchemy.orm import Session
from app.models.vulnerability import (
Vulnerability,
VulnerabilitySeverity,
VulnerabilityStatus,
)
from app.models.asset import Asset
logger = logging.getLogger(__name__)
# SSVC Exploitation Status
class SSVCExploitationStatus(str):
"""SSVC Exploitation Kategorien (Draft NIST SP 500-299)"""
NONE = "none" # Keine Aktivitäten bekannt
POC = "poc" # Proof-of-Concept vorhanden
ACTIVE = "active" # Aktive Ausnutzung
WIDESPREAD = "widespread" # Weit verbreitete Ausnutzung
@dataclass
class VerifiedCVEData:
"""Container für verifizierte CVE-Daten aus einer verlässlichen Quelle"""
cve_id: str
ssvc_technical_impact: Optional[str] = None # partial | total
ssvc_automatable: Optional[str] = None # yes | no
cvss_score: Optional[float] = None
cvss_vector: Optional[str] = None
severity: Optional[str] = None # critical, high, medium, low, none
exploitation_status: Optional[str] = None # none, poc, active, widespread
epss_score: Optional[float] = None
exploit_available: Optional[bool] = None
exploit_maturity: Optional[str] = None # Unproven, PoC, Functional, High
vpr_score: Optional[float] = None
description: Optional[str] = None
references: Optional[List[str]] = None
source: str = "unknown" # nessus, nvd, vulrighment, cisa
class VulnOverrideService:
"""
Service zum Abgleich und Korrektur von CVE-Scores.
Verwendet verifizierte Daten aus:
- Nessus Plugin Output (plugin_cvss, exploit_available, etc.)
- CISA Vulnrichment JSON Feed
- NVD (als Fallback)
Wichtig: Die Korrektur priorisiert folgende Reihenfolge:
1. Nessus (direkte Scan-Daten, höchste Vertrauenswürdigkeit)
2. CISA Vulnrichment (offizielle govt. Quelle)
3. NVD (offizielle Quelle)
"""
def __init__(self, db: Session):
self.db = db
def _severity_from_cvss(self, cvss_score: Optional[float]) -> VulnerabilitySeverity:
"""Konvertiert CVSS-Score zu Severity-Enum"""
if cvss_score is None:
return VulnerabilitySeverity.none
if cvss_score >= 9.0:
return VulnerabilitySeverity.critical
elif cvss_score >= 7.0:
return VulnerabilitySeverity.high
elif cvss_score >= 4.0:
return VulnerabilitySeverity.medium
elif cvss_score > 0:
return VulnerabilitySeverity.low
return VulnerabilitySeverity.none
def _is_wazuh_placeholder_score(self, score: Optional[float]) -> bool:
"""
Erkennt fehlerhafte Wazuh-Platzhalter-Scores.
Wazuh liefert manchmal 10.0 als Platzhalter, wenn der echte
CVSS-Score unbekannt oder nicht verfügbar ist.
"""
if score is None:
return False
# 10.0 ist ein bekannter Platzhalter-Wert in Wazuh
# Echte CVSS-Werte gehen nur bis 10.0, aber 10.0 selbst ist selten
# Besonders bei plausiblen Scores wie 7.3 ist 10.0 offensichtlich falsch
return score == 10.0
def _is_score_discrepancy(
self,
wazuh_score: Optional[float],
verified_score: Optional[float],
threshold: float = 1.0
) -> bool:
"""
Prüft ob eine signifikante Diskrepanz zwischen Scores besteht.
Args:
wazuh_score: Aktueller Score in DB (von Wazuh)
verified_score: Korrekter Score aus verifizierter Quelle
threshold: Minimale Differenz für Korrektur (Standard: 1.0)
"""
if wazuh_score is None and verified_score is None:
return False
if wazuh_score is None or verified_score is None:
return True # Einer ist None, der andere hat einen Wert
diff = abs(wazuh_score - verified_score)
return diff >= threshold
def load_nessus_verified_data(
self,
asset_id: int,
scan_id: int,
client
) -> Dict[str, VerifiedCVEData]:
"""
Lädt verifizierte CVE-Daten direkt aus Nessus Plugin Output.
Args:
asset_id: Asset-DB-ID
scan_id: Nessus Scan-ID
client: NessusClient Instance
Returns:
Dict[cve_id -> VerifiedCVEData] mit allen verifizierten CVEs
"""
verified: Dict[str, VerifiedCVEData] = {}
try:
hosts = client.get_scan_hosts(scan_id)
for host in hosts:
host_id = host.get("host_id")
if not host_id:
continue
# Asset-Matching prüfen
asset = self.db.query(Asset).filter(
Asset.id == asset_id
).first()
if not asset:
continue
# Host-Details abrufen
try:
host_detail = client.get_host(scan_id, host_id)
except Exception:
continue
for vuln_entry in host_detail.get("vulnerabilities") or []:
plugin_id = vuln_entry.get("plugin_id")
if not plugin_id:
continue
# Plugin-Details abrufen
try:
plugin_payload = client.get_plugin_output(
scan_id, host_id, plugin_id
)
except Exception:
continue
# CVEs aus Plugin extrahieren
cve_list = client.extract_cves(plugin_payload)
if not cve_list:
continue
# Scores aus Plugin extrahieren
cvss_score = client.plugin_cvss(plugin_payload)
vpr_score = client.plugin_vpr_score(plugin_payload)
exploit_avail = client.plugin_exploit_available(plugin_payload)
exploit_mat = client.plugin_exploit_maturity(plugin_payload)
description = client.plugin_description(plugin_payload)
see_also = client.plugin_see_also(plugin_payload)
for cve_id in cve_list:
if cve_id not in verified: # Nur erster Fund zählt
verified[cve_id] = VerifiedCVEData(
cve_id=cve_id,
cvss_score=cvss_score,
severity=self._severity_from_cvss(cvss_score).value,
exploitation_status=self._map_exploit_maturity(exploit_mat),
exploit_available=exploit_avail,
exploit_maturity=exploit_mat,
vpr_score=vpr_score,
description=description,
references=see_also,
source="nessus"
)
except Exception as e:
logger.error(f"Fehler beim Laden Nessus-Daten für Asset {asset_id}: {e}")
return verified
def _map_exploit_maturity(self, maturity: Optional[str]) -> str:
"""Konvertiert Nessus exploit_code_maturity zu SSVC exploitation status"""
if not maturity:
return SSVCExploitationStatus.NONE
maturity_lower = maturity.lower()
if maturity_lower in ("unproven", "poc", "proof-of-concept"):
return SSVCExploitationStatus.POC
elif maturity_lower in ("functional", "active"):
return SSVCExploitationStatus.ACTIVE
elif maturity_lower == "high":
return SSVCExploitationStatus.WIDESPREAD
return SSVCExploitationStatus.NONE
# Threshold above which we prefer one big ZIP-snapshot download over
# per-CVE 404-prone GitHub raw fetches. Picked so that small "fix
# one CVE" calls stay snappy (~1s/CVE per-raw) while large "correct
# everything" runs no longer hammer GitHub for 510 minutes and
# then time out the browser.
_ZIP_FALLBACK_THRESHOLD = 25
_ZIP_URL = "https://github.com/cisagov/vulnrichment/archive/refs/heads/develop.zip"
def load_cisa_vulnrichment_data(self, cve_ids: List[str]) -> Dict[str, VerifiedCVEData]:
"""
Lädt verifizierte CVE-Daten aus dem CISA Vulnrichment Repo.
Routing:
- ≤ 25 CVEs → per-CVE GitHub raw fetch (low overhead for single
corrections).
- > 25 CVEs → ZIP-Snapshot download + local walk (drastically
faster for the "correct everything" admin button — replaces
previous per-CVE loop which timed out browsers at ~23 min).
CVEs, die GitHub mit 404 beantwortet bzw. die im ZIP-Snapshot
fehlen, tauchen nicht im Dict auf. CISA lagged ~Wochen für
neue CVEs — keinen Fehler werfen, einfach skip.
"""
cve_ids_upper = [c.upper() for c in cve_ids if c]
if len(cve_ids_upper) > self._ZIP_FALLBACK_THRESHOLD:
try:
verified = self._load_via_zip_snapshot(cve_ids_upper)
except Exception as e:
logger.warning(
"vulnrichment ZIP snapshot failed (%s) — falling back "
"to per-CVE raw fetch", e,
)
verified = self._load_via_per_cve_raw(cve_ids_upper)
else:
verified = self._load_via_per_cve_raw(cve_ids_upper)
# NVD fallback for CVEs Vulnrichment hasn't analysed yet.
# CISA Vulnrichment lags by weeks/months for new CVEs; NVD often
# has authoritative CVSSv3 the day the CVE is published. Tester
# reported CVE-2026-40416 (Microsoft Edge) showed Nessus's
# plugin-bundle 8.3 even after "Correct CVSS" because Vulnrichment
# didn't know that CVE. NVD reported 4.3 the same day.
# Pull only the missing CVEs — bounded and avoids hammering NVD.
missing = [c for c in cve_ids_upper if c not in verified]
if missing:
try:
nvd_data = self._load_via_nvd(missing)
for cve_id, data in nvd_data.items():
if cve_id not in verified:
verified[cve_id] = data
logger.info(
"vulnrichment fallback: %d/%d missing CVEs filled from NVD",
len(nvd_data), len(missing),
)
except Exception as e:
logger.warning("NVD fallback failed: %s", e)
return verified
def _load_via_per_cve_raw(self, cve_ids: List[str]) -> Dict[str, VerifiedCVEData]:
"""Per-CVE GitHub raw fetch. Pfadschema:
``/{year}/{bucket}xxx/{CVE-ID}.json``
wobei ``bucket`` = ``int(cve_number) // 1000`` (z.B. CVE-2026-8390
→ ``2026/8xxx/CVE-2026-8390.json``).
"""
import httpx
import re
verified: Dict[str, VerifiedCVEData] = {}
cve_pattern = re.compile(r"^CVE-(\d{4})-(\d+)$")
base = "https://raw.githubusercontent.com/cisagov/vulnrichment/develop"
def _url_for(cve_id: str) -> Optional[str]:
m = cve_pattern.match(cve_id.upper())
if not m:
return None
year, num = m.group(1), m.group(2)
bucket = f"{int(num) // 1000}xxx"
return f"{base}/{year}/{bucket}/{cve_id.upper()}.json"
try:
with httpx.Client(timeout=10.0, follow_redirects=True) as client:
for cve_id in cve_ids:
url = _url_for(cve_id)
if not url:
continue
try:
r = client.get(url)
if r.status_code == 404:
continue
r.raise_for_status()
parsed = self._parse_vulnrichment_record(cve_id, r.json())
if parsed:
verified[cve_id] = parsed
except Exception as e:
logger.debug("vulnrichment raw fetch failed for %s: %s", cve_id, e)
except Exception as e:
logger.error("vulnrichment client error: %s", e)
return verified
def _load_via_nvd(self, cve_ids: List[str]) -> Dict[str, VerifiedCVEData]:
"""
Pull CVSSv3 from the public NVD REST API as a Vulnrichment
fallback. Endpoint: https://services.nvd.nist.gov/rest/json/cves/2.0
Unauthenticated cap is ~5 requests / 30 seconds — fine for the
handful of CVEs that miss Vulnrichment per correction run. For
higher throughput an NVD API key could be added later via the
nvd_config setting.
Returns the same VerifiedCVEData shape as the Vulnrichment
loaders so apply_overrides treats both sources identically.
"""
import httpx
import time
verified: Dict[str, VerifiedCVEData] = {}
# Throttle to be polite — 1 req/sec stays well below the unauth limit.
with httpx.Client(timeout=15.0, follow_redirects=True) as client:
for idx, cve_id in enumerate(cve_ids):
if idx and idx % 5 == 0:
time.sleep(1.0) # crude throttle
try:
r = client.get(
"https://services.nvd.nist.gov/rest/json/cves/2.0",
params={"cveId": cve_id},
)
if r.status_code != 200:
continue
data = r.json()
items = data.get("vulnerabilities") or []
if not items:
continue
cve = items[0].get("cve", {})
metrics = cve.get("metrics", {})
# Try CVSSv3.1 first, then v3.0
cvss_score: Optional[float] = None
severity_label: Optional[str] = None
for key in ("cvssMetricV31", "cvssMetricV30"):
entries = metrics.get(key) or []
if entries:
data_part = entries[0].get("cvssData", {})
cvss_score = data_part.get("baseScore")
severity_label = (
data_part.get("baseSeverity") or ""
).lower() or None
break
if cvss_score is None:
continue
verified[cve_id] = VerifiedCVEData(
cve_id=cve_id,
cvss_score=cvss_score,
severity=severity_label,
source="nvd",
)
except Exception as e:
logger.debug("NVD fetch failed for %s: %s", cve_id, e)
return verified
def _load_via_zip_snapshot(self, cve_ids: List[str]) -> Dict[str, VerifiedCVEData]:
"""Single 249 MB ZIP download → walk locally for the requested CVE
files. Two orders of magnitude faster than per-CVE 404 lookups
for full-DB corrections (~2 min vs ~23 min observed).
Implementation notes:
- Stream-download to a temp file so we never hold 249 MB in RAM.
- Random tempdir per call → no cross-request races.
- Open ZIP and read only the requested CVE entries by computing
the in-zip path; no full extraction needed (saves disk + time).
- tempdir cleaned up via context manager regardless of outcome.
"""
import httpx
import re
import tempfile
import zipfile
import os
verified: Dict[str, VerifiedCVEData] = {}
cve_pattern = re.compile(r"^CVE-(\d{4})-(\d+)$")
wanted: set = set(cve_ids)
def _zip_path_for(cve_id: str) -> Optional[str]:
m = cve_pattern.match(cve_id)
if not m:
return None
year, num = m.group(1), m.group(2)
bucket = f"{int(num) // 1000}xxx"
# GitHub archive top-level dir is "<repo>-<branch>"
return f"vulnrichment-develop/{year}/{bucket}/{cve_id}.json"
with tempfile.TemporaryDirectory(prefix="vulnrichment_") as tmpdir:
zip_path = os.path.join(tmpdir, "snapshot.zip")
logger.info(
"vulnrichment: downloading ZIP snapshot for %d CVEs (target: %s)",
len(wanted), zip_path,
)
with httpx.Client(timeout=httpx.Timeout(60.0, connect=10.0),
follow_redirects=True) as client:
with client.stream("GET", self._ZIP_URL) as resp:
resp.raise_for_status()
with open(zip_path, "wb") as f:
for chunk in resp.iter_bytes(chunk_size=1024 * 256):
f.write(chunk)
logger.info(
"vulnrichment: ZIP downloaded (%.1f MB), parsing requested CVEs",
os.path.getsize(zip_path) / 1_048_576,
)
with zipfile.ZipFile(zip_path) as zf:
names = set(zf.namelist())
hits = 0
for cve_id in wanted:
in_zip = _zip_path_for(cve_id)
if not in_zip or in_zip not in names:
continue
try:
with zf.open(in_zip) as jf:
data = json.loads(jf.read().decode("utf-8"))
parsed = self._parse_vulnrichment_record(cve_id, data)
if parsed:
verified[cve_id] = parsed
hits += 1
except Exception as e:
logger.debug("vulnrichment ZIP parse failed for %s: %s", cve_id, e)
logger.info(
"vulnrichment: ZIP walk found %d/%d requested CVEs",
hits, len(wanted),
)
# tempdir is auto-deleted here
return verified
def _parse_vulnrichment_record(
self, cve_id: str, data: dict
) -> Optional[VerifiedCVEData]:
"""Vulnrichment 2.x shape — CVSS unter containers.adp[].metrics[].
Beispiel-Pfad für Base-Score:
``containers.adp[0].metrics[0].cvssV3_1.baseScore``
SSVC-Daten liegen in einem ``other``-Metric mit ``type='ssvc'`` und
einer ``options[]``-Liste mit Dicts wie ``{"Exploitation":"none"}``.
"""
cvss_score: Optional[float] = None
severity_label: Optional[str] = None
exploitation: Optional[str] = None
technical_impact: Optional[str] = None
automatable: Optional[str] = None
for adp in (data.get("containers") or {}).get("adp", []):
for metric in adp.get("metrics", []):
cvss_v31 = metric.get("cvssV3_1")
if cvss_v31 and cvss_score is None:
cvss_score = cvss_v31.get("baseScore")
severity_label = (cvss_v31.get("baseSeverity") or "").lower() or None
other = metric.get("other") or {}
if other.get("type") == "ssvc":
for opt in (other.get("content") or {}).get("options", []):
if not isinstance(opt, dict):
continue
if "Exploitation" in opt and exploitation is None:
exploitation = (opt["Exploitation"] or "").lower() or None
if "Technical Impact" in opt and technical_impact is None:
technical_impact = (opt["Technical Impact"] or "").lower() or None
if "Automatable" in opt and automatable is None:
automatable = (opt["Automatable"] or "").lower() or None
if cvss_score is None and not exploitation and not technical_impact and not automatable:
return None
return VerifiedCVEData(
cve_id=cve_id,
cvss_score=cvss_score,
severity=severity_label,
exploitation_status=exploitation,
ssvc_technical_impact=technical_impact,
ssvc_automatable=automatable,
source="vulnrichment",
)
def apply_overrides(
self,
asset_id: int,
verified_data: Dict[str, VerifiedCVEData],
dry_run: bool = False
) -> dict:
"""
Wendet Overrides auf Vulnerabilities an.
Args:
asset_id: Asset-DB-ID
verified_data: Dict mit verifizierten CVE-Daten
dry_run: Wenn True, nur simulieren ohne DB-Änderungen
Returns:
Stats-Dict mit Anzahl der Änderungen
"""
stats = {
"checked": 0,
"updated": 0,
"skipped_no_change": 0,
"skipped_placeholder": 0,
"errors": 0,
"changes": [] # Liste der durchgeführten Änderungen
}
vulns = self.db.query(Vulnerability).filter(
Vulnerability.asset_id == asset_id,
Vulnerability.cve_id.in_(list(verified_data.keys()))
).all()
for vuln in vulns:
stats["checked"] += 1
cve_id = vuln.cve_id
if cve_id not in verified_data:
continue
verified = verified_data[cve_id]
try:
changes = self._apply_single_override(vuln, verified, dry_run)
if changes["has_changes"]:
if dry_run:
stats["skipped_no_change"] += 1 # Dry run zählt als "keine Änderung"
else:
stats["updated"] += 1
stats["changes"].append({
"cve_id": cve_id,
"vuln_id": vuln.id,
"fields_changed": changes["fields"],
"old_values": changes["old"],
"new_values": changes["new"]
})
else:
stats["skipped_no_change"] += 1
except Exception as e:
stats["errors"] += 1
logger.error(f"Override für {cve_id} fehlgeschlagen: {e}")
if not dry_run:
self.db.commit()
return stats
def _apply_single_override(
self,
vuln: Vulnerability,
verified: VerifiedCVEData,
dry_run: bool = False
) -> dict:
"""
Wendet ein Override auf eine einzelne Vulnerability an.
Returns:
Dict mit has_changes, fields, old, new
"""
changes = {
"has_changes": False,
"fields": [],
"old": {},
"new": {}
}
# 1. CVSS-Score prüfen und ggf. korrigieren
wazuh_score = vuln.cvss_score
verified_score = verified.cvss_score
# Nur korrigieren wenn:
# - Wazuh einen Platzhalter liefert (10.0)
# - ODER signifikante Diskrepanz besteht (>1.0 Differenz)
should_override_score = (
self._is_wazuh_placeholder_score(wazuh_score) or
self._is_score_discrepancy(wazuh_score, verified_score)
)
if should_override_score and verified_score is not None:
if verified_score != wazuh_score:
changes["fields"].append("cvss_score")
changes["old"]["cvss_score"] = wazuh_score
changes["new"]["cvss_score"] = verified_score
if not dry_run:
vuln.cvss_score = verified_score
# Pin the source so subsequent Nessus syncs won't undo
# this correction (see nessus_sync.run_nessus_sync —
# the merge path checks exploitation_source == 'vulnrichment'
# and skips its CVSS/severity override).
vuln.exploitation_source = verified.source or "vulnrichment"
changes["has_changes"] = True
# 2. Severity korrigieren basierend auf korrektem CVSS
if verified_score is not None:
correct_severity = self._severity_from_cvss(verified_score)
if vuln.severity != correct_severity:
changes["fields"].append("severity")
changes["old"]["severity"] = vuln.severity.value
changes["new"]["severity"] = correct_severity.value
if not dry_run:
vuln.severity = correct_severity
# Same pin as for CVSS — Nessus must not overwrite a
# Vulnrichment-corrected severity on next sync.
vuln.exploitation_source = verified.source or "vulnrichment"
changes["has_changes"] = True
# 3. Exploitation-Status (SSVC) aktualisieren
if verified.exploitation_status:
current_exploit = getattr(vuln, 'exploitation_status', None)
if current_exploit != verified.exploitation_status:
changes["fields"].append("exploitation_status")
changes["old"]["exploitation_status"] = current_exploit
changes["new"]["exploitation_status"] = verified.exploitation_status
if not dry_run:
vuln.exploitation_status = verified.exploitation_status
changes["has_changes"] = True
# 4. Exploit-Available-Flag aktualisieren
if verified.exploit_available is not None:
if vuln.exploit_available != verified.exploit_available:
changes["fields"].append("exploit_available")
changes["old"]["exploit_available"] = vuln.exploit_available
changes["new"]["exploit_available"] = verified.exploit_available
if not dry_run:
vuln.exploit_available = verified.exploit_available
changes["has_changes"] = True
# 5. Exploit-Maturity aktualisieren
if verified.exploit_maturity:
if vuln.exploit_maturity != verified.exploit_maturity:
changes["fields"].append("exploit_maturity")
changes["old"]["exploit_maturity"] = vuln.exploit_maturity
changes["new"]["exploit_maturity"] = verified.exploit_maturity
if not dry_run:
vuln.exploit_maturity = verified.exploit_maturity
changes["has_changes"] = True
# 6. VPR-Score aktualisieren (Tenable)
if verified.vpr_score is not None:
if vuln.nessus_vpr_score != verified.vpr_score:
changes["fields"].append("nessus_vpr_score")
changes["old"]["nessus_vpr_score"] = vuln.nessus_vpr_score
changes["new"]["nessus_vpr_score"] = verified.vpr_score
if not dry_run:
vuln.nessus_vpr_score = verified.vpr_score
changes["has_changes"] = True
# 7. SSVC Technical Impact (partial | total) — "total" means an
# attacker can fully control the affected software (CVE-2024-35057
# style). Stored alongside exploitation_status so the operator can
# surface "total + automatable + active" CVEs first.
if verified.ssvc_technical_impact:
current_ti = getattr(vuln, 'ssvc_technical_impact', None)
if current_ti != verified.ssvc_technical_impact:
changes["fields"].append("ssvc_technical_impact")
changes["old"]["ssvc_technical_impact"] = current_ti
changes["new"]["ssvc_technical_impact"] = verified.ssvc_technical_impact
if not dry_run:
vuln.ssvc_technical_impact = verified.ssvc_technical_impact
changes["has_changes"] = True
# 8. SSVC Automatable (yes | no) — whether reliable mass exploitation
# is mechanically feasible. Combined with exploitation_status=active
# this is the strongest "patch right now" signal Vulnrichment gives.
if verified.ssvc_automatable:
current_auto = getattr(vuln, 'ssvc_automatable', None)
if current_auto != verified.ssvc_automatable:
changes["fields"].append("ssvc_automatable")
changes["old"]["ssvc_automatable"] = current_auto
changes["new"]["ssvc_automatable"] = verified.ssvc_automatable
if not dry_run:
vuln.ssvc_automatable = verified.ssvc_automatable
changes["has_changes"] = True
return changes
def correct_all_vulnerabilities(
self,
dry_run: bool = False
) -> dict:
"""
Korrigiert alle Vulnerabilities in der Datenbank.
Verwendet CISA Vulnrichment als primäre Korrekturquelle.
Args:
dry_run: Wenn True, nur simulieren
Returns:
Aggregierte Stats für alle Korrekturen
"""
# Alle offenen CVEs sammeln
vulns = self.db.query(Vulnerability).filter(
Vulnerability.status == VulnerabilityStatus.open
).all()
cve_ids = list(set(v.cve_id for v in vulns if v.cve_id and not v.cve_id.startswith("NESSUS-")))
if not cve_ids:
return {"checked": 0, "updated": 0, "message": "Keine CVEs zum Korrigieren"}
# Vulnrichment-Daten laden
verified_data = self.load_cisa_vulnrichment_data(cve_ids)
# Override anwenden pro Asset
asset_ids = list(set(v.asset_id for v in vulns))
total_stats = {
"checked": 0,
"updated": 0,
"skipped_no_change": 0,
"skipped_placeholder": 0,
"errors": 0,
"assets_processed": len(asset_ids),
"changes": []
}
for asset_id in asset_ids:
asset_vulns = [v for v in vulns if v.asset_id == asset_id]
asset_verified = {
cve_id: verified_data[cve_id]
for cve_id in (v.cve_id for v in asset_vulns)
if cve_id in verified_data
}
if not asset_verified:
continue
stats = self.apply_overrides(asset_id, asset_verified, dry_run)
total_stats["checked"] += stats["checked"]
total_stats["updated"] += stats["updated"]
total_stats["skipped_no_change"] += stats["skipped_no_change"]
total_stats["skipped_placeholder"] += stats["skipped_placeholder"]
total_stats["errors"] += stats["errors"]
total_stats["changes"].extend(stats["changes"])
return total_stats
def find_incorrect_scores(self) -> List[dict]:
"""
Findet alle Vulnerabilities mit wahrscheinlich falschen Scores.
Erkennt:
- Platzhalter-Scores (10.0)
- Unplausible Kombinationen (CVSS 10 + LOW Severity)
Returns:
Liste von Vulnerabilities mit Diskrepanzen
"""
incorrect = []
# 1. Alle mit Score 10.0 (Platzhalter)
placeholder_vulns = self.db.query(Vulnerability).filter(
Vulnerability.cvss_score == 10.0
).all()
for vuln in placeholder_vulns:
correct_severity = self._severity_from_cvss(7.3) # Typischer Korrekturwert
if vuln.severity == VulnerabilitySeverity.critical:
incorrect.append({
"vuln_id": vuln.id,
"cve_id": vuln.cve_id,
"asset_id": vuln.asset_id,
"current_score": vuln.cvss_score,
"current_severity": vuln.severity.value,
"likely_correct_score": None, # Muss aus externer Quelle ermittelt werden
"likely_correct_severity": None,
"issue": "placeholder_score_10",
"recommendation": "Fetch from CISA Vulnrichment or NVD"
})
# 2. Alle mit CVSS > 9 aber LOW/NONE Severity
misaligned = self.db.query(Vulnerability).filter(
Vulnerability.cvss_score >= 9.0,
Vulnerability.severity.in_([
VulnerabilitySeverity.low,
VulnerabilitySeverity.none,
VulnerabilitySeverity.medium
])
).all()
for vuln in misaligned:
incorrect.append({
"vuln_id": vuln.id,
"cve_id": vuln.cve_id,
"asset_id": vuln.asset_id,
"current_score": vuln.cvss_score,
"current_severity": vuln.severity.value,
"issue": "severity_mismatch",
"recommendation": "CVSS >= 9.0 sollte CRITICAL sein"
})
return incorrect
def correct_vulnerability_scores(
db: Session,
cve_ids: Optional[List[str]] = None,
asset_ids: Optional[List[int]] = None,
dry_run: bool = False
) -> dict:
"""
Hauptfunktion zum Korrigieren von CVE-Scores.
Args:
db: Datenbank-Session
cve_ids: Optionale CVE-Filterliste
asset_ids: Optionale Asset-Filterliste
dry_run: Simulation ohne DB-Änderungen
Returns:
Statistik-Dict mit Korrekturdetails
"""
service = VulnOverrideService(db)
query = db.query(Vulnerability).filter(
Vulnerability.status == VulnerabilityStatus.open
)
if cve_ids:
query = query.filter(Vulnerability.cve_id.in_(cve_ids))
if asset_ids:
query = query.filter(Vulnerability.asset_id.in_(asset_ids))
vulns = query.all()
if not vulns:
return {"message": "Keine Vulnerabilities zum Korrigieren", "updated": 0}
# Sammle alle CVE-IDs für Vulnrichment-Abfrage
all_cve_ids = list(set(v.cve_id for v in vulns if v.cve_id and not v.cve_id.startswith("NESSUS-")))
if not all_cve_ids:
return {"message": "Keine echten CVEs zum Korrigieren", "updated": 0}
# Lade verifizierte Daten
verified_data = service.load_cisa_vulnrichment_data(all_cve_ids)
# CVE-IDs, die im Feed FEHLEN (CISA hat sie noch nicht analysiert).
# Frontend nutzt diese Zahl, um den Unterschied zwischen "Feed down"
# und "neue CVE noch nicht im Feed" sichtbar zu machen.
not_found = len(all_cve_ids) - len(verified_data)
if not verified_data:
logger.warning("Keine verifizierten Daten von CISA Vulnrichment erhalten")
return {
"message": "CISA Vulnrichment-Daten nicht verfügbar",
"checked": len(vulns),
"updated": 0,
"not_found": not_found,
}
# Gruppiere nach Asset
asset_groups: Dict[int, List[str]] = {}
for vuln in vulns:
if vuln.cve_id in verified_data:
if vuln.asset_id not in asset_groups:
asset_groups[vuln.asset_id] = []
asset_groups[vuln.asset_id].append(vuln.cve_id)
total_stats = {
"checked": 0,
"updated": 0,
"errors": 0,
"not_found": not_found,
"changes": []
}
for asset_id, cve_list in asset_groups.items():
asset_verified = {
cve_id: verified_data[cve_id]
for cve_id in cve_list
if cve_id in verified_data
}
stats = service.apply_overrides(asset_id, asset_verified, dry_run)
total_stats["checked"] += stats["checked"]
total_stats["updated"] += stats["updated"]
total_stats["errors"] += stats["errors"]
total_stats["changes"].extend(stats["changes"])
return total_stats