356bf7b97f
EUVD (EU Vulnerability Database, ENISA) integration as second authoritative catalog alongside CISA KEV. EU-Compliance use cases benefit from a non-US source; CVEs confirmed by both catalogs get the highest priority via score stacking. Two ENISA endpoints are merged into one cached map (24h TTL): - /exploitedvulnerabilities (analogous to CISA KEV) - /criticalvulnerabilities (ENISA Critical flag) Priority-Score formula: - Exploit-Signal now triggered by KEV OR EUVD listing - Catalog-Bonus stacking: KEV +10, EUVD +10, KEV-ransomware +5, EU-Critical +3. A CVE in both catalogs adds +20 base. CPR Score (Cybersecurity Priority Risk = CVSS x EPSS x 10) added as separate metric next to Priority, per JacquesKruger/EPSS-Server convention. Calculated on-the-fly, no DB column needed. New API filters: euvd_only, eu_critical, in_any_catalog, in_both_catalogs. Setting toggle enrichment_euvd_enabled (default true). Frontend: new EUVD column (blue badge, EU-CRIT sub-badge), CPR column with mini-bar, four catalog filter checkboxes. Detail page splits threat intel into CISA KEV / ENISA EUVD / EPSS sections; breakdown shows EUVD bonus row and CPR score with both-catalogs hint.
99 lines
3.6 KiB
Python
99 lines
3.6 KiB
Python
import os
|
||
import logging
|
||
from sqlalchemy.orm import Session
|
||
from app.models.user import User, UserRole
|
||
from app.models.setting import Setting
|
||
from app.auth.jwt_handler import hash_password, get_password_strength
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _ensure_default_setting(db: Session, key: str, default_value: str, description: str) -> None:
|
||
"""Create a setting row only if it does not already exist."""
|
||
existing = db.query(Setting).filter(Setting.key == key).first()
|
||
if existing:
|
||
return
|
||
db.add(Setting(key=key, value=default_value, description=description))
|
||
|
||
|
||
def create_initial_data(db: Session):
|
||
"""
|
||
Creates initial data like the default admin user if it doesn't exist.
|
||
Also seeds default enrichment toggles.
|
||
"""
|
||
try:
|
||
# Seed enrichment defaults (no-op if already present)
|
||
try:
|
||
_ensure_default_setting(
|
||
db,
|
||
"enrichment_epss_enabled",
|
||
"true",
|
||
"Enable EPSS (FIRST.org) score enrichment"
|
||
)
|
||
_ensure_default_setting(
|
||
db,
|
||
"enrichment_kev_enabled",
|
||
"true",
|
||
"Enable CISA KEV (Known Exploited Vulnerabilities) enrichment"
|
||
)
|
||
_ensure_default_setting(
|
||
db,
|
||
"enrichment_euvd_enabled",
|
||
"true",
|
||
"Enable ENISA EUVD enrichment (EU exploited/critical vulnerabilities)"
|
||
)
|
||
db.commit()
|
||
except Exception as e:
|
||
logger.warning(f"Could not seed enrichment settings: {e}")
|
||
db.rollback()
|
||
# Check if any admin exists
|
||
admin_exists = db.query(User).filter(User.role == UserRole.ADMIN).count() > 0
|
||
if admin_exists:
|
||
logger.info("ℹ️ Admin user already exists. Skipping default admin creation.")
|
||
return
|
||
|
||
# Default admin (simple onboarding)
|
||
if os.getenv("DISABLE_DEFAULT_ADMIN", "false").lower() == "true":
|
||
logger.warning(
|
||
"⚠️ No admin user exists and default admin is disabled. "
|
||
"Set DEFAULT_ADMIN_* to initialize."
|
||
)
|
||
return
|
||
|
||
default_username = os.getenv("DEFAULT_ADMIN_USERNAME", "admin")
|
||
default_email = os.getenv("DEFAULT_ADMIN_EMAIL", "admin@vulnmanager.local")
|
||
default_password = os.getenv("DEFAULT_ADMIN_PASSWORD")
|
||
|
||
if not default_password:
|
||
logger.error(
|
||
"❌ No admin user exists and DEFAULT_ADMIN_PASSWORD is not set. "
|
||
"Refusing to create admin with insecure default. "
|
||
"Set DEFAULT_ADMIN_PASSWORD env var or use POST /auth/setup-admin "
|
||
"with SETUP_ADMIN_TOKEN to bootstrap."
|
||
)
|
||
return
|
||
|
||
password_check = get_password_strength(default_password)
|
||
if not password_check["is_valid"]:
|
||
logger.error(
|
||
"❌ DEFAULT_ADMIN_PASSWORD does not meet strength requirements: %s. "
|
||
"Refusing to create admin user.",
|
||
", ".join(password_check["feedback"])
|
||
)
|
||
return
|
||
|
||
admin_user = User(
|
||
username=default_username,
|
||
email=default_email,
|
||
password_hash=hash_password(default_password),
|
||
role=UserRole.ADMIN,
|
||
is_active=True,
|
||
is_verified=True,
|
||
)
|
||
db.add(admin_user)
|
||
db.commit()
|
||
logger.info("✅ Default admin user created: %s", default_username)
|
||
except Exception as e:
|
||
logger.error(f"❌ Error creating initial data: {e}")
|
||
db.rollback()
|