fix(eol): slug-based pseudo-CVE id for Nessus EOL findings
New helper _slug_pseudo_cve resolves the plugin_name to an
endoflife.date slug via eol_service.resolve_product_slug, then emits
EOL-{SLUG}-{VERSION|P####} instead of EOL-NESSUS-{plugin_id}. Office
variants keep the year-based EOL-MS-OFFICE-YYYY id for the language-
pack dedup. Legacy _office_pseudo_cve retained as a fallback when slug
resolution fails.
Detection Sources panel (nessus + plugin_id) is untouched — the
plugin id remains queryable from the vuln.nessus_plugin_id column.
Fixes feedback 2026-06-02 #2b, #4. Existing rows still need a one-off
backfill (out of scope here).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -204,12 +204,46 @@ def _office_year(plugin_name: str) -> Optional[str]:
|
||||
|
||||
|
||||
def _office_pseudo_cve(plugin_name: str, plugin_id) -> str:
|
||||
"""Return `EOL-MS-OFFICE-YYYY` for Office variants, else `EOL-NESSUS-{pid}`."""
|
||||
"""Return `EOL-MS-OFFICE-YYYY` for Office variants, else `EOL-NESSUS-{pid}`.
|
||||
|
||||
NOTE: prefer `_slug_pseudo_cve(plugin_name, plugin_id, installed_version)`
|
||||
for the slug-based naming (`EOL-{SLUG}-{VERSION}`). Kept as a fallback
|
||||
when product-name resolution fails.
|
||||
"""
|
||||
if _OFFICE_RE.search(plugin_name or "") and _office_year(plugin_name):
|
||||
return f"EOL-MS-OFFICE-{_office_year(plugin_name)}"
|
||||
return f"EOL-NESSUS-{plugin_id}"
|
||||
|
||||
|
||||
def _slug_pseudo_cve(plugin_name: str, plugin_id, installed_version: Optional[str] = None) -> str:
|
||||
"""Return product-name-based pseudo-CVE id (`EOL-MSSQLSERVER-...`) when
|
||||
we can resolve a slug, falling back to the legacy `EOL-NESSUS-{pid}`.
|
||||
|
||||
Slug comes from `eol_service.resolve_product_slug(plugin_name)`. The
|
||||
trailing token is the installed version (sanitised) or the last 4
|
||||
digits of the plugin id when no version is present, so the row is
|
||||
still stable across re-syncs.
|
||||
"""
|
||||
# Office is its own special case — keep the year-based id so the
|
||||
# language-pack dedup in `run_nessus_sync` keeps working.
|
||||
if _OFFICE_RE.search(plugin_name or "") and _office_year(plugin_name):
|
||||
return f"EOL-MS-OFFICE-{_office_year(plugin_name)}"
|
||||
# Lazy import: eol_service imports from a few places; avoid a hard
|
||||
# import cycle at module load.
|
||||
try:
|
||||
from app.services.eol_service import resolve_product_slug
|
||||
slug = resolve_product_slug(plugin_name)
|
||||
except Exception:
|
||||
slug = None
|
||||
if slug:
|
||||
if installed_version:
|
||||
safe_v = re.sub(r"[^A-Za-z0-9._-]", "_", str(installed_version))[:24] or "x"
|
||||
return f"EOL-{slug.upper()}-{safe_v}"[:50]
|
||||
# No version → last 4 digits of plugin id keeps it stable
|
||||
return f"EOL-{slug.upper()}-P{str(plugin_id)[-4:]}"[:50]
|
||||
return f"EOL-NESSUS-{plugin_id}"
|
||||
|
||||
|
||||
def _normalise_office_pkg(pkg: str) -> str:
|
||||
"""Collapse all Office sub-flavour strings to the unified `MS Office`."""
|
||||
if _OFFICE_RE.search(pkg or ""):
|
||||
@@ -240,7 +274,7 @@ def _upsert_nessus_eol(
|
||||
widget alongside endoflife.date findings. Dedup key is (cve_id,
|
||||
asset_id), stable across re-syncs.
|
||||
"""
|
||||
cve_id = _office_pseudo_cve(plugin_name, plugin_id)
|
||||
cve_id = _slug_pseudo_cve(plugin_name, plugin_id, installed_version)
|
||||
# Strip the "... Unsupported Version Detection" suffix for a clean
|
||||
# PACKAGE column ("Microsoft SQL Server"). Office sub-flavours collapse
|
||||
# to "MS Office" so the language-pack noise stops multiplying rows.
|
||||
@@ -500,7 +534,9 @@ def run_nessus_sync(
|
||||
# Mark as seen THIS run so the source-backfill below
|
||||
# doesn't immediately drop nessus + patch the row we
|
||||
# just upserted (it keys on cve_id membership).
|
||||
seen_cves_for_asset.add(f"EOL-NESSUS-{plugin_id}")
|
||||
seen_cves_for_asset.add(
|
||||
_slug_pseudo_cve(eol_name, plugin_id, installed_version)
|
||||
)
|
||||
if _upsert_nessus_eol(
|
||||
db,
|
||||
asset=asset,
|
||||
@@ -874,3 +910,47 @@ def run_nessus_sync(
|
||||
stats["vulns_marked_patched"], len(stats["unmatched_hosts"]),
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
def reconcile_legacy_nessus_assets(db: Session) -> dict:
|
||||
"""One-shot helper for testers: flip ACTIVE NESSUS-sourced assets that
|
||||
have no `nessus_host_uuid` pinned (legacy rows from before the
|
||||
reconcile path was hardened) to INACTIVE.
|
||||
|
||||
These rows were created by older Nessus syncs that matched by IP
|
||||
only, so they never get a UUID and are silently skipped by
|
||||
`reconcile_missing_from_sync`. Without this, a reduced scan leaves
|
||||
them all ACTIVE.
|
||||
|
||||
Returns {"inactivated": int, "scanned": int}.
|
||||
|
||||
Safe to run multiple times. Logs an audit entry for each row flipped.
|
||||
"""
|
||||
from app.services.asset_lifecycle import _audit_asset_status
|
||||
from app.models.asset import AssetSource, AssetStatus
|
||||
|
||||
legacy = (
|
||||
db.query(Asset)
|
||||
.filter(
|
||||
Asset.source == AssetSource.NESSUS,
|
||||
Asset.status == AssetStatus.ACTIVE,
|
||||
Asset.nessus_host_uuid.is_(None),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
stats = {"scanned": len(legacy), "inactivated": 0}
|
||||
for a in legacy:
|
||||
a.status = AssetStatus.INACTIVE
|
||||
_audit_asset_status(
|
||||
db, a, "active", "inactive",
|
||||
"legacy Nessus-sourced asset without pinned nessus_host_uuid — "
|
||||
"cannot be reconciled event-driven; flipped via reconcile_legacy_nessus_assets",
|
||||
)
|
||||
stats["inactivated"] += 1
|
||||
if stats["inactivated"]:
|
||||
db.commit()
|
||||
logger.info(
|
||||
"nessus legacy reconcile: %d inactivated (of %d legacy ACTIVE rows)",
|
||||
stats["inactivated"], stats["scanned"],
|
||||
)
|
||||
return stats
|
||||
|
||||
Reference in New Issue
Block a user