""" Background Scheduler for automated scan jobs Uses APScheduler for periodic scan execution. """ import logging import json import os from datetime import datetime, timedelta logger = logging.getLogger(__name__) try: from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger HAS_APSCHEDULER = True except ImportError: HAS_APSCHEDULER = False logger.warning("apscheduler not installed - automated scans disabled. Install with: pip install apscheduler") from app.database import SessionLocal from app.models.scan_schedule import ScanSchedule, ScheduleInterval from app.models.asset import Asset from app.models.scan import Scan, ScanType, ScanStatus from app.models.setting import Setting from app.models.vulnerability import Vulnerability, VulnerabilityStatus, VulnerabilitySeverity from app.models.policy import Policy from app.models.notification_log import NotificationLog, NotificationType, NotificationStatus from app.models.user import User from app.integrations.wazuh_client import WazuhClient scheduler = AsyncIOScheduler() if HAS_APSCHEDULER else None INTERVAL_MAP = { ScheduleInterval.EVERY_HOUR: {"hours": 1}, ScheduleInterval.EVERY_6_HOURS: {"hours": 6}, ScheduleInterval.EVERY_12_HOURS: {"hours": 12}, ScheduleInterval.DAILY: {"days": 1}, ScheduleInterval.WEEKLY: {"weeks": 1}, } async def execute_scheduled_scan(schedule_id: int): """Executes a scheduled scan. Routes by ScanSchedule.scanner_type to either the Wazuh agent loop or the Nessus sync service.""" db = SessionLocal() try: schedule = db.query(ScanSchedule).filter(ScanSchedule.id == schedule_id).first() if not schedule or not schedule.enabled: return scanner = (getattr(schedule, "scanner_type", None) or "wazuh").lower() logger.info( f"Scheduled scan '{schedule.name}' (ID: {schedule_id}, scanner={scanner}) started" ) # ---- Nessus branch ---- if scanner == "nessus": try: from app.services.nessus_sync import run_nessus_sync stats = run_nessus_sync(db) logger.info(f"Scheduled Nessus sync done: {stats}") except Exception as e: logger.error(f"Scheduled Nessus sync failed: {e}") schedule.last_run = datetime.now() # next_run derivation: ask the actual APScheduler trigger # when it will fire next. INTERVAL_MAP lookup with default # `{"days": 1}` was wrong for CRON schedules — it always # wrote a fake +24h next_run even when the cron fires every # minute. Trigger.get_next_fire_time gives the authoritative # answer for both interval and cron triggers. try: from apscheduler.schedulers.asyncio import AsyncIOScheduler # type: ignore # noqa: F401 job = scheduler.get_job(f"scan_schedule_{schedule.id}") if scheduler else None next_fire = job.next_run_time if job else None except Exception: next_fire = None if next_fire is not None: # strip tz so it matches the rest of the column (naive) schedule.next_run = next_fire.replace(tzinfo=None) else: interval_delta = INTERVAL_MAP.get(schedule.interval, {"days": 1}) schedule.next_run = datetime.now() + timedelta(**interval_delta) db.commit() return # ---- Wazuh branch (default, existing behaviour) ---- # Wazuh Config laden (transparently decrypted) from app.auth.setting_crypto import read_setting_value raw_wazuh = read_setting_value(db, "wazuh_config") if not raw_wazuh: logger.error("Scheduled scan aborted: Wazuh configuration missing") return config = json.loads(raw_wazuh) assets = db.query(Asset).filter(Asset.wazuh_agent_id.isnot(None)).all() if not assets: logger.info("No Wazuh assets found for scheduled scan") schedule.last_run = datetime.now() db.commit() return triggered = 0 errors = [] try: with WazuhClient( base_url=config.get("api_url"), username=config.get("username"), password=config.get("password"), indexer_url=config.get("indexer_url"), indexer_username=config.get("indexer_username"), indexer_password=config.get("indexer_password"), verify_ssl=config.get("verify_ssl", False) ) as client: # Sync vulnerabilities for each agent from app.routers.vulnerabilities import sync_agent_vulnerabilities for asset in assets: try: scan = Scan( asset_id=asset.id, scan_type=ScanType.WAZUH, # source-labelled (was FULL) status=ScanStatus.RUNNING, started_at=datetime.now() ) db.add(scan) sync_agent_vulnerabilities(db, client, asset.wazuh_agent_id, asset) scan.status = ScanStatus.COMPLETED scan.completed_at = datetime.now() triggered += 1 except Exception as e: scan.status = ScanStatus.FAILED scan.error_message = str(e) errors.append(f"{asset.hostname}: {e}") logger.error(f"Scheduled scan error for {asset.hostname}: {e}") except Exception as e: logger.error(f"Wazuh connection error during scheduled scan: {e}") schedule.last_run = datetime.now() interval_delta = INTERVAL_MAP.get(schedule.interval, {"days": 1}) schedule.next_run = datetime.now() + timedelta(**interval_delta) db.commit() logger.info(f"Scheduled scan '{schedule.name}' completed: {triggered} assets scanned, {len(errors)} errors") except Exception as e: logger.error(f"Scheduled scan error: {e}") finally: db.close() async def check_sla_breaches(): """ Hourly SLA-breach check. Honors notification_mode setting: - 'digest' (default): one summary mail per recipient - 'single': one mail per (vuln, recipient) as before Per-vuln 24h throttle still applies in both modes. Master toggle: setting `sla_breach_enabled` (default true). When set to "false" the entire job is a no-op — covers the case where the operator disabled all Security Policies but doesn't want any SLA mails firing for assets that fall through to default_sla. """ db = SessionLocal() try: from app.services.email_service import ( get_smtp_config, get_notification_mode, get_email_template, render_template, send_email, send_sla_breach_digest, ) from app.models.setting import Setting # Master toggle — string "false" disables the whole check. sla_enabled_row = db.query(Setting).filter( Setting.key == "sla_breach_enabled" ).first() if sla_enabled_row and (sla_enabled_row.value or "").strip().lower() in ("false", "0", "no", "off"): logger.info("SLA breach check skipped — setting sla_breach_enabled is off") return if not get_smtp_config(db): return vulns = db.query(Vulnerability).join(Asset).filter( Vulnerability.status == VulnerabilityStatus.open, Vulnerability.notification_suppressed == False ).all() if not vulns: return now = datetime.now() mode = get_notification_mode(db) dashboard_url = os.getenv("DASHBOARD_URL", "http://localhost:3000").rstrip("/") + "/vulnerabilities" default_sla = { VulnerabilitySeverity.critical: 2, VulnerabilitySeverity.high: 7, VulnerabilitySeverity.medium: 30, VulnerabilitySeverity.low: 90, VulnerabilitySeverity.none: 365, } # Bucket per recipient email so the digest stays one-mail-per-person. # buckets[email] = {user_id, username, items: [{vuln, hours_overdue, asset, assigned_name}, ...]} buckets: dict[str, dict] = {} def _resolve_recipients(vuln): recs = [] seen = set() def _push(u): if u and u.email and u.email not in seen: recs.append((u.id, u.email, u.username)) seen.add(u.email) if vuln.assigned_user_id: _push(db.query(User).filter(User.id == vuln.assigned_user_id).first()) elif vuln.assigned_group_id: from app.models.group import Group g = db.query(Group).filter(Group.id == vuln.assigned_group_id).first() if g: for u in g.users: _push(u) elif vuln.asset and vuln.asset.assigned_user_id: _push(db.query(User).filter(User.id == vuln.asset.assigned_user_id).first()) elif vuln.asset and vuln.asset.groups: # Asset uses M2M groups (no single assigned_group_id column) for g in vuln.asset.groups: for u in g.users: _push(u) return recs # Phase 1: classify breaches + de-duplicate / throttle for vuln in vulns: asset = vuln.asset if asset.policy_id: policy = db.query(Policy).filter(Policy.id == asset.policy_id).first() if policy: # Disabled policy = operator opted out of SLA tracking # for these assets. Skip breach evaluation entirely so # no notification is generated. Tester reported mails # kept arriving after disabling all policies. from app.models.policy import PolicyStatus if policy.status == PolicyStatus.DISABLED: continue sla_map = { VulnerabilitySeverity.critical: policy.critical_sla_days, VulnerabilitySeverity.high: policy.high_sla_days, VulnerabilitySeverity.medium: policy.medium_sla_days, VulnerabilitySeverity.low: policy.low_sla_days, VulnerabilitySeverity.none: 365, } sla_days = sla_map.get(vuln.severity, 30) else: sla_days = default_sla.get(vuln.severity, 30) else: sla_days = default_sla.get(vuln.severity, 30) deadline = vuln.detected_at + timedelta(days=sla_days) if now <= deadline: continue hours_overdue = int((now - deadline).total_seconds() / 3600) # Per-vuln 24h throttle still applies in both modes — prevents # repeat-pinging on the same finding if the hourly scheduler keeps firing. recent = db.query(NotificationLog).filter( NotificationLog.vulnerability_id == vuln.id, NotificationLog.notification_type == NotificationType.SLA_BREACH, NotificationLog.sent_at >= now - timedelta(hours=24) ).first() if recent: continue recipients = _resolve_recipients(vuln) if not recipients: continue assigned_name = "Unassigned" if vuln.assigned_user: assigned_name = vuln.assigned_user.username elif vuln.assigned_group_id: assigned_name = "Group" elif asset.assigned_user: assigned_name = asset.assigned_user.username elif asset.groups: assigned_name = "Group" for r_uid, r_email, r_username in recipients: bucket = buckets.setdefault(r_email, { "user_id": r_uid, "username": r_username, "items": [], }) bucket["items"].append({ "vuln": vuln, "asset": asset, "hours_overdue": hours_overdue, "assigned_name": assigned_name, }) if not buckets: return notifications_sent = 0 checked_at_str = now.strftime("%Y-%m-%d %H:%M UTC") # Phase 2: dispatch for email, bucket in buckets.items(): items = bucket["items"] user_id = bucket["user_id"] username = bucket["username"] if mode == "digest": digest_items = [ { "cve_id": it["vuln"].cve_id, "severity": it["vuln"].severity.value if it["vuln"].severity else "none", "asset_hostname": it["asset"].hostname, "detected_at": it["vuln"].detected_at.strftime("%Y-%m-%d %H:%M") if it["vuln"].detected_at else "—", "hours_overdue": it["hours_overdue"], } for it in items ] success, err = send_sla_breach_digest( db, to_email=email, recipient_name=username, items=digest_items, checked_at=checked_at_str, dashboard_url=dashboard_url, ) # One NotificationLog row per vuln so the per-vuln 24h throttle # remains effective on the next hourly tick. for it in items: db.add(NotificationLog( vulnerability_id=it["vuln"].id, asset_id=it["asset"].id, user_id=user_id, notification_type=NotificationType.SLA_BREACH, sent_at=now, subject=f"[VULNCHECK] {len(items)} SLA-breached vulnerabilities require action", recipient_email=email, status=NotificationStatus.SENT if success else NotificationStatus.FAILED, message_body=f"SLA digest: {it['vuln'].cve_id} on {it['asset'].hostname} ({it['hours_overdue']}h overdue)", error_message=None if success else err, )) if success: notifications_sent += 1 else: logger.warning("SLA digest mail failed for %s: %s", email, err) else: # legacy single-mail mode — one mail per vuln per recipient subject_template, body_template = get_email_template(db, "email_template_sla_breach") for it in items: vuln = it["vuln"] asset = it["asset"] variables = { "cve_id": vuln.cve_id, "severity": vuln.severity.value if vuln.severity else "unknown", "severity_upper": vuln.severity.value.upper() if vuln.severity else "UNKNOWN", "cvss_score": str(vuln.cvss_score or "N/A"), "asset_hostname": asset.hostname, "hours_overdue": str(it["hours_overdue"]), "assigned_user": it["assigned_name"], "recipient_name": username, "package_name": vuln.package_name or "N/A", "title": vuln.title or "N/A", "detected_at": vuln.detected_at.strftime("%Y-%m-%d %H:%M UTC") if vuln.detected_at else "N/A", "dashboard_url": dashboard_url, } subject = render_template(subject_template, variables) body = render_template(body_template, variables) success, error_msg = send_email(db, email, subject, body) db.add(NotificationLog( vulnerability_id=vuln.id, asset_id=asset.id, user_id=user_id, notification_type=NotificationType.SLA_BREACH, sent_at=now, subject=subject, recipient_email=email, status=NotificationStatus.SENT if success else NotificationStatus.FAILED, message_body=body, error_message=None if success else error_msg, )) if success: notifications_sent += 1 db.commit() if notifications_sent > 0: logger.info( f"SLA breach check ({mode} mode): {notifications_sent} mails sent, " f"{len(buckets)} recipients, " f"{sum(len(b['items']) for b in buckets.values())} breached findings" ) except Exception as e: logger.error(f"SLA breach check error: {e}") finally: db.close() async def refresh_threat_intel_enrichment(): """Daily refresh of EPSS scores + CISA KEV catalog for all open vulnerabilities.""" from app.services.enrichment_service import enrich_all_open_vulnerabilities db = SessionLocal() try: stats = enrich_all_open_vulnerabilities(db) logger.info( f"Threat intel refresh done: " f"{stats['total']} open vulns, epss_updated={stats['epss_updated']}, " f"kev_marked={stats['kev_marked']}, kev_cleared={stats['kev_cleared']}" ) except Exception as e: logger.error(f"Threat intel refresh failed: {e}") finally: db.close() async def exploit_intel_nightly(): """Refresh public-exploit catalogs (Exploit-DB / PoC-in-GitHub / Metasploit). Runs 03:45 UTC — after Vulnrichment (03:00) and the audit prune (03:30), before URS recompute (04:00) so the new priority_score values feed the URS run. Sources cached on disk 24h — the job is idempotent. """ from app.services.exploit_intel_service import refresh_all_exploit_intel db = SessionLocal() try: stats = refresh_all_exploit_intel(db, only_open=True) logger.info("exploit-intel nightly done: %s", stats) except Exception as e: logger.error("exploit-intel nightly failed: %s", e) db.rollback() finally: db.close() async def eol_check_nightly(): """Run endoflife.date EOL detection for every Wazuh-linked asset. Pseudo-CVE rows (cve_id starts with EOL-) get upserted so the next morning's vuln list highlights newly-EOL software. Cheap — only products in the mapping table get scanned; unmapped names skipped. Slotted at 03:15 UTC so it runs after SCA (02:00), before Vulnrichment (03:30 = after the audit prune) — leaves a gap so it doesn't compete for DB connections. """ from app.integrations.wazuh_client import WazuhClient from app.models.asset import Asset from app.services import eol_service from app.auth.setting_crypto import read_setting_value import json as _json db = SessionLocal() try: raw = read_setting_value(db, "wazuh_config") if not raw: logger.info("EOL check skipped — wazuh_config not set") return try: cfg = _json.loads(raw) except _json.JSONDecodeError: logger.warning("EOL check skipped — invalid wazuh_config JSON") return if not all([cfg.get("api_url"), cfg.get("username"), cfg.get("password")]): logger.info("EOL check skipped — wazuh_config incomplete") return wazuh = WazuhClient( base_url=cfg.get("api_url"), username=cfg.get("username"), password=cfg.get("password"), indexer_url=cfg.get("indexer_url"), indexer_username=cfg.get("indexer_username"), indexer_password=cfg.get("indexer_password"), verify_ssl=cfg.get("verify_ssl", False), ) assets = db.query(Asset).filter(Asset.wazuh_agent_id.isnot(None)).all() upserts = 0 for asset in assets: # OS-level EOL first (independent of package fetch). try: os_status = eol_service.check_os_eol( db, asset.operating_system or "", asset.os_version or "" ) if os_status and (os_status.is_eol or os_status.is_eol_soon or os_status.is_eoas): eol_service.upsert_eol_vulnerability( db, asset_id=asset.id, product_name=(asset.operating_system or "Operating System").strip(), installed_version=(asset.os_version or os_status.release_name or "unknown"), status=os_status, ) upserts += 1 except Exception as e: logger.warning("EOL: OS check failed for %s: %s", asset.hostname, e) try: pkgs = wazuh.get_packages(asset.wazuh_agent_id) or [] except Exception as e: logger.warning("EOL: package fetch failed for %s: %s", asset.hostname, e) continue seen: set[tuple[str, str]] = set() for pkg in pkgs: name = (pkg.get("name") or "").strip() version = (pkg.get("version") or "").strip() if not name or not version: continue key = (name.lower(), version) if key in seen: continue seen.add(key) if not eol_service.resolve_product_slug(name): continue try: status = eol_service.check_eol(db, name, version) except Exception: continue if not status or not (status.is_eol or status.is_eol_soon or status.is_eoas): continue try: eol_service.upsert_eol_vulnerability( db, asset_id=asset.id, product_name=name, installed_version=version, status=status, ) upserts += 1 except Exception as e: logger.warning("EOL upsert failed on asset %s: %s", asset.id, e) db.commit() logger.info("EOL nightly: %d assets scanned, %d EOL upserts", len(assets), upserts) except Exception as e: logger.error("EOL nightly failed: %s", e) db.rollback() finally: db.close() async def prune_audit_logs_nightly(): """Prune audit_logs older than `audit_log_retention_days` setting. Default 1825 days (≈ 5 years) so ISO 27001 / SOX / DSGVO Art.5 retention windows are covered. Setting key: `audit_log_retention_days` — integer string. 0 = keep forever. Runs 03:30 UTC, between Vulnrichment (03:00) and URS (04:00). """ from app.models.audit_log import AuditLog from app.models.setting import Setting from datetime import timedelta db = SessionLocal() try: row = db.query(Setting).filter(Setting.key == "audit_log_retention_days").first() try: days = int((row.value if row else "1825").strip()) except (ValueError, AttributeError): days = 1825 if days <= 0: logger.info("audit-log prune skipped (retention_days=0 → keep forever)") return cutoff = datetime.now() - timedelta(days=days) pruned = ( db.query(AuditLog) .filter(AuditLog.timestamp < cutoff) .delete(synchronize_session=False) ) db.commit() logger.info("audit-log prune: deleted %d entries older than %d days", pruned, days) except Exception as e: logger.error("audit-log prune failed: %s", e) db.rollback() finally: db.close() async def reconcile_assets_nightly(): """Soft-inactivate assets no source has reported within the window; revive recently-seen inactive ones. Runs 04:15 UTC, after URS.""" from app.services.asset_lifecycle import reconcile_asset_lifecycle db = SessionLocal() try: stats = reconcile_asset_lifecycle(db) logger.info("asset lifecycle nightly: %s", stats) except Exception as e: logger.error("asset lifecycle nightly failed: %s", e) db.rollback() finally: db.close() async def refresh_exposure_nightly(): """Refresh network-exposure scores from Wazuh syscollector ports. Runs 02:30 UTC, after SCA (02:00).""" from app.auth.setting_crypto import read_setting_value from app.services.exposure_service import refresh_all_exposure import json as _json db = SessionLocal() try: raw = read_setting_value(db, "wazuh_config") if not raw: logger.info("exposure nightly skipped — wazuh_config not set") return cfg = _json.loads(raw) if not all([cfg.get("api_url"), cfg.get("username"), cfg.get("password")]): logger.info("exposure nightly skipped — wazuh_config incomplete") return wazuh = WazuhClient( base_url=cfg.get("api_url"), username=cfg.get("username"), password=cfg.get("password"), indexer_url=cfg.get("indexer_url"), indexer_username=cfg.get("indexer_username"), indexer_password=cfg.get("indexer_password"), verify_ssl=cfg.get("verify_ssl", False), ) stats = refresh_all_exposure(db, wazuh) logger.info("exposure nightly: %s", stats) except Exception as e: logger.error("exposure nightly failed: %s", e) db.rollback() finally: db.close() async def recompute_urs_nightly(): """Recompute Unified Risk Score (URS) for every asset and snapshot. Runs 04:00 UTC — after SCA refresh (02:00) and Vulnrichment correction (03:00) so AVS/ASS inputs are fresh. Snapshots written by compute_urs_for_all() feed the 7-day trend arrow. Also prunes asset_risk_snapshots older than 90 days. """ from app.models.compliance import AssetRiskSnapshot from app.services.urs_service import compute_urs_for_all from datetime import timedelta db = SessionLocal() try: stats = compute_urs_for_all(db, avs_mode="hybrid") db.commit() cutoff = datetime.now() - timedelta(days=90) pruned = ( db.query(AssetRiskSnapshot) .filter(AssetRiskSnapshot.snapshot_date < cutoff) .delete(synchronize_session=False) ) db.commit() logger.info( "URS nightly recompute done: %d assets, %d with URS, %d errors, %d snapshots pruned", stats["assets"], stats["with_urs"], stats["errors"], pruned, ) except Exception as e: logger.error("URS nightly recompute failed: %s", e) finally: db.close() async def refresh_compliance_sca(): """Nightly Wazuh SCA refresh — pulls /sca/{agent_id} for every asset that has a wazuh_agent_id and upserts compliance_results. Cheap operation per asset (one summary GET, no per-check fetch) so a 100-asset fleet completes in seconds. """ from app.services.compliance_service import refresh_all_compliance db = SessionLocal() try: stats = refresh_all_compliance(db) logger.info( "Compliance SCA refresh done: %d assets, %d policy results, %d errors", stats["assets_synced"], stats["policies_synced"], len(stats["errors"]), ) except Exception as e: logger.error("Compliance SCA refresh failed: %s", e) finally: db.close() async def refresh_cisa_vulnrichment(): """Nightly job — pull CISA Vulnrichment ZIP snapshot + correct CVSS / SSVC. CISA commits to cisagov/vulnrichment several times per day. Running this nightly catches CVSS revisions, new SSVC decision points and newly-published CVEs that were 404 yesterday. Uses the same correct_vulnerability_scores helper as the manual 'Correct CVSS' button, so behaviour matches what the operator sees in the UI. Held score corrections (exploitation_source='vulnrichment') are preserved across Nessus syncs by the existing override-lock — see services/nessus_sync.py run_nessus_sync merge branch. """ from app.services.vuln_override_service import correct_vulnerability_scores db = SessionLocal() try: result = correct_vulnerability_scores(db, dry_run=False) logger.info( "CISA Vulnrichment nightly correction done: " "checked=%s, updated=%s, not_found=%s", result.get("checked", 0), result.get("updated", 0), result.get("not_found", 0), ) except Exception as e: logger.error("CISA Vulnrichment nightly refresh failed: %s", e) finally: db.close() def sync_schedules(): """Synchronizes DB schedules with APScheduler jobs""" if not HAS_APSCHEDULER or scheduler is None: return db = SessionLocal() try: schedules = db.query(ScanSchedule).filter(ScanSchedule.enabled == True).all() # Aktuelle Scheduler-Jobs entfernen (nur scan_schedule_ prefixed) existing_jobs = {j.id for j in scheduler.get_jobs() if j.id.startswith("scan_schedule_")} active_ids = {f"scan_schedule_{s.id}" for s in schedules} # Remove jobs that are no longer active for job_id in existing_jobs - active_ids: scheduler.remove_job(job_id) logger.info(f"Scheduler job removed: {job_id}") # Füge neue/aktualisierte Jobs hinzu for schedule in schedules: job_id = f"scan_schedule_{schedule.id}" interval_kwargs = INTERVAL_MAP.get(schedule.interval, {"days": 1}) # Prüfe ob der Job bereits existiert existing_job = next((j for j in scheduler.get_jobs() if j.id == job_id), None) if existing_job: # Nur Reschedule wenn sich das Intervall geändert hat? # Da wir hier schwer das Intervall des bestehenden Triggers prüfen können, # vergleichen wir einfach ob der Job-Name (Intervall-Name) noch passt oder # ob wir eine Änderung in der DB erzwingen wollen. # Wir verzichten auf das generelle Reschedule jede Minute! continue else: # Neuer Job oder nach App-Neustart if schedule.interval == ScheduleInterval.CRON and schedule.cron_expression: try: trigger = CronTrigger.from_crontab(schedule.cron_expression) except Exception as e: logger.error(f"Invalid cron expression for schedule {schedule.id}: {e}") continue else: # Wenn next_run in der Vergangenheit liegt, sofort ausführen (start_date=now) start_date = schedule.next_run if (schedule.next_run and schedule.next_run > datetime.now()) else datetime.now() trigger = IntervalTrigger(start_date=start_date, **interval_kwargs) scheduler.add_job( execute_scheduled_scan, trigger=trigger, id=job_id, args=[schedule.id], name=f"{schedule.name} ({schedule.interval})", replace_existing=True ) logger.info(f"Scheduler job added: {job_id} (Trigger: {trigger})") # Update next_run calculation if not schedule.next_run: if schedule.interval == ScheduleInterval.CRON and schedule.cron_expression: schedule.next_run = trigger.get_next_fire_time(None, datetime.now()) else: schedule.next_run = datetime.now() + timedelta(**interval_kwargs) db.commit() logger.info(f"Scheduler synchronized: {len(schedules)} active schedules") except Exception as e: logger.error(f"Scheduler sync error: {e}") finally: db.close() def start_scheduler(): """Starts the background scheduler""" if not HAS_APSCHEDULER or scheduler is None: logger.warning("Scheduler not available - install apscheduler: pip install apscheduler") return try: sync_schedules() except Exception as e: logger.warning(f"Initial scheduler synchronization failed (DB unchecked?): {e}") # Periodic re-sync every 60 seconds to pick up DB changes scheduler.add_job( sync_schedules, trigger=IntervalTrigger(seconds=60), id="scheduler_sync", name="Scheduler DB Sync", replace_existing=True ) # SLA breach checker every hour scheduler.add_job( check_sla_breaches, trigger=IntervalTrigger(hours=1), id="sla_breach_check", name="SLA Breach Checker", replace_existing=True ) # Daily threat intel refresh (EPSS scores change daily, KEV updates frequently) scheduler.add_job( refresh_threat_intel_enrichment, trigger=IntervalTrigger(hours=24), id="threat_intel_refresh", name="Daily Threat Intel Refresh (EPSS + KEV)", replace_existing=True, next_run_time=datetime.now() + timedelta(minutes=5), # First run 5 min after startup ) # Nightly CISA Vulnrichment correction at 03:00 — CISA pushes several # commits per day so a daily snapshot keeps CVSS + SSVC fields fresh # without spamming the GitHub raw API per-CVE. scheduler.add_job( refresh_cisa_vulnrichment, trigger=CronTrigger(hour=3, minute=0), id="vulnrichment_nightly", name="Nightly CISA Vulnrichment CVSS / SSVC Correction", replace_existing=True, ) # Nightly Wazuh SCA refresh at 02:00 — runs ahead of Vulnrichment so # compliance numbers on the dashboard are fresh when admins log in # the next morning. Cheap (one summary GET per agent). scheduler.add_job( refresh_compliance_sca, trigger=CronTrigger(hour=2, minute=0), id="compliance_sca_nightly", name="Nightly Wazuh SCA Compliance Refresh", replace_existing=True, ) # Nightly URS recompute at 04:00 — must run AFTER SCA (02:00) and # Vulnrichment (03:00) so AVS/ASS inputs reflect today's data. # Also prunes asset_risk_snapshots older than 90 days. scheduler.add_job( recompute_urs_nightly, trigger=CronTrigger(hour=4, minute=0), id="urs_nightly", name="Nightly URS Recompute + Snapshot Prune", replace_existing=True, ) # Nightly asset lifecycle reconcile at 04:15 — soft-inactivate # assets no source has reported within asset_inactive_after_days # (default 30), revive recently-seen ones. Runs after URS. scheduler.add_job( reconcile_assets_nightly, trigger=CronTrigger(hour=4, minute=15), id="asset_lifecycle_nightly", name="Nightly Asset Lifecycle Reconcile", replace_existing=True, ) # Nightly network-exposure refresh at 02:30 — pull syscollector # ports, classify risky listeners (VNC/RDP/Telnet/...), store score. scheduler.add_job( refresh_exposure_nightly, trigger=CronTrigger(hour=2, minute=30), id="exposure_nightly", name="Nightly Network-Exposure Refresh", replace_existing=True, ) # Nightly audit-log prune at 03:30 — between Vulnrichment (03:00) # and URS (04:00). Retention configurable via setting # `audit_log_retention_days` (default 1825 = ~5 years; 0 = keep # forever). Covers ISO 27001 / SOX / DSGVO Art.5 windows. scheduler.add_job( prune_audit_logs_nightly, trigger=CronTrigger(hour=3, minute=30), id="audit_log_prune_nightly", name="Nightly Audit-Log Retention Prune", replace_existing=True, ) # Nightly EOL detection via endoflife.date at 03:15 — closes the # gap Wazuh has vs Nessus plugin 64784 (unsupported-version # detection). Creates pseudo-CVEs (cve_id starts with "EOL-"). scheduler.add_job( eol_check_nightly, trigger=CronTrigger(hour=3, minute=15), id="eol_check_nightly", name="Nightly endoflife.date EOL Detection", replace_existing=True, ) # Nightly exploit-intel refresh (Plan M) at 03:45 — pulls # Exploit-DB CSV + PoC-in-GitHub + Metasploit module index, writes # per-vuln counts + ref lists. scheduler.add_job( exploit_intel_nightly, trigger=CronTrigger(hour=3, minute=45), id="exploit_intel_nightly", name="Nightly Public-Exploit Catalog Refresh", replace_existing=True, ) scheduler.start() logger.info( "Background scheduler started (SLA Breach Checker + Threat Intel Refresh + Vulnrichment Nightly + Compliance SCA Nightly + URS Nightly + Audit-Log Prune)" ) def stop_scheduler(): """Stops the background scheduler""" if not HAS_APSCHEDULER or scheduler is None: return if scheduler.running: scheduler.shutdown(wait=False) logger.info("Background scheduler stopped")