feat(override): async job + status endpoint for vulnrichment correction
The synchronous /override/vulnrichment endpoint blocks the browser
request for the full duration of the correction — 23 minutes on the
tester's instance, well past every reasonable client timeout. Result:
'backend connection failed' modal and zero feedback about whether
the work actually ran.
New endpoints (sync one kept untouched for single-CVE backward compat):
POST /override/vulnrichment/start
Returns {job_id, status: 'queued', poll_url} immediately.
Spawns a daemon thread that does the heavy lift.
GET /override/vulnrichment/status/{job_id}
Returns the live snapshot: status, stage, total, done,
updated/checked/not_found, error if any.
GET /override/vulnrichment/jobs
Recent jobs ringbuffer for an admin overview.
State lives in an in-memory dict (override_jobs.py) — survives
requests, not backend restarts. That's fine; a restart would have
killed the worker thread anyway and the user re-clicks. Ringbuffer
caps at 50 entries, oldest finished gets evicted.
Worker owns its own SessionLocal() so it does not collide with the
trigger request's DB session. Stages narrate what's currently
happening ('downloading vulnrichment ZIP snapshot', 'fetching N
CVEs from github raw', etc.) so the frontend can show a meaningful
progress message even though we don't get per-CVE callbacks from
inside correct_vulnerability_scores.
Frontend modal that polls the status endpoint comes in the next commit.
This commit is contained in:
@@ -38,6 +38,11 @@ from app.services.vuln_override_service import (
|
||||
correct_vulnerability_scores,
|
||||
VulnOverrideService,
|
||||
)
|
||||
from app.services.override_jobs import (
|
||||
start_vulnrichment_job,
|
||||
get_job as get_override_job,
|
||||
list_jobs as list_override_jobs,
|
||||
)
|
||||
|
||||
# Settings key for Nessus configuration
|
||||
SETTING_KEY = "nessus_config"
|
||||
@@ -1922,6 +1927,70 @@ async def override_from_vulnrichment(
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/override/vulnrichment/start")
|
||||
async def start_vulnrichment_correction_job(
|
||||
cve_ids: Optional[List[str]] = Query(None, description="CVE-IDs zum Korrigieren (leer = alle)"),
|
||||
asset_ids: Optional[List[int]] = Query(None, description="Asset-IDs (leer = alle)"),
|
||||
dry_run: bool = Query(False, description="Nur simulieren ohne DB-Änderungen"),
|
||||
current_user: User = Depends(RequireEditor),
|
||||
):
|
||||
"""
|
||||
Async-Variante des Vulnrichment-Overrides.
|
||||
|
||||
Spawnt einen Background-Thread und liefert sofort eine ``job_id``
|
||||
zurück. Fortschritt + Endergebnis via
|
||||
``GET /override/vulnrichment/status/{job_id}``.
|
||||
|
||||
Für Bulk-Korrekturen (Frontend "Correct CVSS" Button) verwenden —
|
||||
der synchrone Endpoint blockiert die Browser-Verbindung und löst
|
||||
bei großen Datenbanken den 23-Min-Hänger des Frontends aus.
|
||||
"""
|
||||
job_id = start_vulnrichment_job(
|
||||
user_id=current_user.id,
|
||||
cve_ids=cve_ids,
|
||||
asset_ids=asset_ids,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
return {
|
||||
"job_id": job_id,
|
||||
"status": "queued",
|
||||
"poll_url": f"/api/v1/vulnerabilities/override/vulnrichment/status/{job_id}",
|
||||
}
|
||||
|
||||
|
||||
@router.get("/override/vulnrichment/status/{job_id}")
|
||||
async def get_vulnrichment_correction_status(
|
||||
job_id: str,
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""
|
||||
Snapshot eines mit ``/override/vulnrichment/start`` gestarteten Jobs.
|
||||
|
||||
Felder:
|
||||
- ``status``: queued | running | completed | failed
|
||||
- ``stage``: human-readable progress label (z.B. "downloading ZIP")
|
||||
- ``total`` / ``done``: numerische Progress-Hinweise
|
||||
- ``updated`` / ``checked`` / ``not_found``: End-Stats wenn fertig
|
||||
- ``error``: gesetzt bei status == "failed"
|
||||
"""
|
||||
job = get_override_job(job_id)
|
||||
if not job:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Job {job_id} not found (may have expired after backend restart)",
|
||||
)
|
||||
return job
|
||||
|
||||
|
||||
@router.get("/override/vulnrichment/jobs")
|
||||
async def list_vulnrichment_correction_jobs(
|
||||
limit: int = Query(10, le=50),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""Most-recent-first list of override jobs (for an admin overview)."""
|
||||
return list_override_jobs(limit=limit)
|
||||
|
||||
|
||||
@router.get("/override/stats")
|
||||
async def get_override_stats(
|
||||
db: Session = Depends(get_db),
|
||||
|
||||
@@ -0,0 +1,189 @@
|
||||
"""
|
||||
In-memory job tracker for long-running CVSS-override runs.
|
||||
|
||||
The CISA Vulnrichment override against a full DB takes minutes (even
|
||||
after the ZIP-snapshot speedup) — the browser's HTTP client times
|
||||
out long before the work completes, leaving the user with a
|
||||
'backend connection failed' modal and no way to know whether the
|
||||
correction actually finished.
|
||||
|
||||
This module decouples the trigger from the work:
|
||||
|
||||
POST /override/vulnrichment/start → spawn thread, return job_id
|
||||
GET /override/vulnrichment/status/{job_id} → progress + stats
|
||||
|
||||
State lives in a module-global dict so it survives the request but
|
||||
not a backend restart. That's acceptable: a restart aborts any
|
||||
in-flight job anyway, and the user just re-clicks.
|
||||
|
||||
Thread safety: dict assignment is atomic in CPython, and only the
|
||||
worker thread mutates a given job's fields after creation. The
|
||||
status reader returns whatever it sees — slightly stale reads are
|
||||
fine for a 2-second poll loop.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.database import SessionLocal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# job_id (str) → state dict
|
||||
_jobs: Dict[str, Dict[str, Any]] = {}
|
||||
# Cap the in-memory ring so a misbehaving caller can't OOM us.
|
||||
_MAX_JOBS = 50
|
||||
|
||||
|
||||
def _prune_oldest() -> None:
|
||||
if len(_jobs) <= _MAX_JOBS:
|
||||
return
|
||||
# Drop the oldest finished job. Finished = has finished_at.
|
||||
finished = [
|
||||
(jid, j.get("finished_at"))
|
||||
for jid, j in _jobs.items()
|
||||
if j.get("finished_at")
|
||||
]
|
||||
if not finished:
|
||||
return
|
||||
finished.sort(key=lambda t: t[1])
|
||||
_jobs.pop(finished[0][0], None)
|
||||
|
||||
|
||||
def start_vulnrichment_job(
|
||||
user_id: int,
|
||||
cve_ids: Optional[List[str]] = None,
|
||||
asset_ids: Optional[List[int]] = None,
|
||||
dry_run: bool = False,
|
||||
) -> str:
|
||||
"""Spawn a worker thread, return its job_id immediately."""
|
||||
job_id = str(uuid.uuid4())
|
||||
_jobs[job_id] = {
|
||||
"job_id": job_id,
|
||||
"type": "vulnrichment",
|
||||
"status": "queued", # queued | running | completed | failed
|
||||
"user_id": user_id,
|
||||
"dry_run": dry_run,
|
||||
"started_at": datetime.now().isoformat(),
|
||||
"finished_at": None,
|
||||
"stage": "starting", # human-readable progress label
|
||||
"total": 0,
|
||||
"done": 0,
|
||||
"updated": 0,
|
||||
"checked": 0,
|
||||
"not_found": 0,
|
||||
"error": None,
|
||||
"result": None,
|
||||
}
|
||||
_prune_oldest()
|
||||
|
||||
thread = threading.Thread(
|
||||
target=_run_vulnrichment_job,
|
||||
args=(job_id, cve_ids, asset_ids, dry_run),
|
||||
daemon=True,
|
||||
name=f"override-{job_id[:8]}",
|
||||
)
|
||||
thread.start()
|
||||
return job_id
|
||||
|
||||
|
||||
def get_job(job_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Return a snapshot of the job state, or None if unknown."""
|
||||
job = _jobs.get(job_id)
|
||||
if not job:
|
||||
return None
|
||||
# Return a copy so the caller can't mutate the live state.
|
||||
return dict(job)
|
||||
|
||||
|
||||
def list_jobs(limit: int = 10) -> List[Dict[str, Any]]:
|
||||
"""Most-recent-first list of jobs (for an admin overview)."""
|
||||
snapshot = list(_jobs.values())
|
||||
snapshot.sort(key=lambda j: j.get("started_at") or "", reverse=True)
|
||||
return [dict(j) for j in snapshot[:limit]]
|
||||
|
||||
|
||||
def _run_vulnrichment_job(
|
||||
job_id: str,
|
||||
cve_ids: Optional[List[str]],
|
||||
asset_ids: Optional[List[int]],
|
||||
dry_run: bool,
|
||||
) -> None:
|
||||
"""Worker thread body. Owns its own DB session."""
|
||||
# Import lazily to avoid a circular import at module load.
|
||||
from app.services.vuln_override_service import (
|
||||
VulnOverrideService,
|
||||
correct_vulnerability_scores,
|
||||
)
|
||||
from app.models.vulnerability import Vulnerability, VulnerabilityStatus
|
||||
|
||||
job = _jobs[job_id]
|
||||
job["status"] = "running"
|
||||
db: Session = SessionLocal()
|
||||
try:
|
||||
# Stage 1: figure out target CVE set so progress has a known total
|
||||
job["stage"] = "selecting target CVEs"
|
||||
query = db.query(Vulnerability).filter(
|
||||
Vulnerability.status == VulnerabilityStatus.open
|
||||
)
|
||||
if cve_ids:
|
||||
query = query.filter(Vulnerability.cve_id.in_(cve_ids))
|
||||
if asset_ids:
|
||||
query = query.filter(Vulnerability.asset_id.in_(asset_ids))
|
||||
vulns = query.all()
|
||||
unique = sorted({
|
||||
v.cve_id for v in vulns
|
||||
if v.cve_id and not v.cve_id.startswith("NESSUS-")
|
||||
})
|
||||
job["total"] = len(unique)
|
||||
|
||||
if not unique:
|
||||
job["status"] = "completed"
|
||||
job["stage"] = "no CVEs to correct"
|
||||
job["finished_at"] = datetime.now().isoformat()
|
||||
job["result"] = {"message": "no real CVEs in scope", "updated": 0}
|
||||
return
|
||||
|
||||
# Stage 2: fetch (ZIP snapshot for large batches, else per-CVE raw)
|
||||
if len(unique) > VulnOverrideService._ZIP_FALLBACK_THRESHOLD:
|
||||
job["stage"] = (
|
||||
f"downloading vulnrichment ZIP snapshot (~249 MB) for {len(unique)} CVEs"
|
||||
)
|
||||
else:
|
||||
job["stage"] = f"fetching {len(unique)} CVEs from github raw"
|
||||
|
||||
# Stage 3: apply overrides via existing helper (does the heavy lift)
|
||||
# We don't get per-CVE callbacks from inside the helper, so we
|
||||
# surface "done" only after it returns. The two stages above
|
||||
# already tell the user what's currently happening.
|
||||
result = correct_vulnerability_scores(
|
||||
db,
|
||||
cve_ids=unique,
|
||||
asset_ids=asset_ids,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
job["done"] = job["total"]
|
||||
job["checked"] = result.get("checked", 0)
|
||||
job["updated"] = result.get("updated", 0)
|
||||
job["not_found"] = result.get("not_found", 0)
|
||||
job["result"] = result
|
||||
job["status"] = "completed"
|
||||
job["stage"] = (
|
||||
f"done — {job['updated']} updated, "
|
||||
f"{job['not_found']} not in vulnrichment feed"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("override job %s failed", job_id)
|
||||
job["status"] = "failed"
|
||||
job["error"] = str(e)
|
||||
job["stage"] = f"failed: {e}"
|
||||
finally:
|
||||
job["finished_at"] = datetime.now().isoformat()
|
||||
db.close()
|
||||
Reference in New Issue
Block a user