fix(wazuh): paginate indexer query past 5000-hit cap

Tester saw the log:
  Agent 083: Indexer returned 5000 hits (total: 10000)
  Agent 083: Has 10000 vulnerabilities but limit is 5000.
              Some vulnerabilities may be missing!

5000 of 10000 vulns silently dropped on agents with very large
finding sets. Affected every Plan B+ sync of a long-history agent.

query_vulnerabilities_from_indexer now paginates in PAGE_SIZE=5000
chunks via OpenSearch from+size until total_hits is drained or
MAX_TOTAL=50000 safety cap is reached. track_total_hits=true added
so total_hits is accurate beyond the default 10k cutoff. Per-page
log line shows accumulated count, so the operator can verify all
hits land.

50k cap is generous — Wazuh agents rarely exceed 10-15k findings;
the few that do (long-uptime servers with many packages) still cap
well below memory pressure. Beyond 50k would need scroll/search_after
API which adds complexity for a vanishingly small population.
This commit is contained in:
2026-05-20 09:04:06 +02:00
parent 346204ee0e
commit f6ee3ff2fa
+44 -27
View File
@@ -389,35 +389,52 @@ class WazuhClient:
except Exception as e:
logger.warning(f"Could not query solved CVEs from alerts: {e}")
# Step 2: Query active vulnerabilities from states index
query = {
"size": limit,
"from": offset,
"query": {
"bool": {
"must": [
{"term": {"agent.id": agent_id}}
]
}
}
}
# Step 2: Query active vulnerabilities from states index.
# Pagination: OpenSearch caps from+size at index.max_result_window
# (default 10000), so we loop in pages of `limit` (default 5000)
# until total_hits is drained or we hit MAX_TOTAL safety cap.
# Prior behaviour was a single 5000-hit fetch — silently dropped
# everything above 5000 on agents with very large finding sets
# (tester saw 10000 hits returning only 5000). 50k cap covers
# every real Wazuh fleet without unbounded memory growth.
MAX_TOTAL = 50_000
PAGE_SIZE = max(1, min(limit, 5000))
hits: List[Dict[str, Any]] = []
total_hits = 0
page_offset = offset
try:
response = self._indexer_request(
"POST",
"/wazuh-states-vulnerabilities-*/_search",
json_data=query
)
total_hits = response.get("hits", {}).get("total", {}).get("value", 0)
hits = response.get("hits", {}).get("hits", [])
logger.info(f"Agent {agent_id}: Indexer returned {len(hits)} hits (total: {total_hits})")
if total_hits > limit:
logger.warning(
f"Agent {agent_id}: Has {total_hits} vulnerabilities but limit is {limit}. "
f"Some vulnerabilities may be missing!"
while True:
query = {
"size": PAGE_SIZE,
"from": page_offset,
"query": {
"bool": {
"must": [{"term": {"agent.id": agent_id}}]
}
},
"track_total_hits": True,
}
response = self._indexer_request(
"POST",
"/wazuh-states-vulnerabilities-*/_search",
json_data=query,
)
page_hits = response.get("hits", {}).get("hits", []) or []
total_hits = response.get("hits", {}).get("total", {}).get("value", 0)
hits.extend(page_hits)
logger.info(
f"Agent {agent_id}: page from={page_offset} returned "
f"{len(page_hits)} hits (total: {total_hits}, accumulated: {len(hits)})"
)
if not page_hits or len(hits) >= total_hits:
break
if len(hits) >= MAX_TOTAL:
logger.warning(
f"Agent {agent_id}: hit MAX_TOTAL cap of {MAX_TOTAL}; "
f"{total_hits - len(hits)} vulnerabilities skipped"
)
break
page_offset += PAGE_SIZE
results = []
skipped_no_cve = 0