ME Coverage Ratio in API + as Draining Factor: merge pull request #478 from telemt/flow-api

ME Coverage Ratio in API + as Draining Factor
This commit is contained in:
Alexey 2026-03-18 11:56:01 +03:00 committed by GitHub
commit a3bdf64353
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 96 additions and 19 deletions

View File

@ -237,6 +237,7 @@ pub(super) struct MeWritersSummary {
pub(super) available_pct: f64,
pub(super) required_writers: usize,
pub(super) alive_writers: usize,
pub(super) coverage_ratio: f64,
pub(super) coverage_pct: f64,
pub(super) fresh_alive_writers: usize,
pub(super) fresh_coverage_pct: f64,
@ -285,6 +286,7 @@ pub(super) struct DcStatus {
pub(super) floor_max: usize,
pub(super) floor_capped: bool,
pub(super) alive_writers: usize,
pub(super) coverage_ratio: f64,
pub(super) coverage_pct: f64,
pub(super) fresh_alive_writers: usize,
pub(super) fresh_coverage_pct: f64,

View File

@ -113,6 +113,7 @@ pub(super) struct RuntimeMeQualityDcRttData {
pub(super) rtt_ema_ms: Option<f64>,
pub(super) alive_writers: usize,
pub(super) required_writers: usize,
pub(super) coverage_ratio: f64,
pub(super) coverage_pct: f64,
}
@ -388,6 +389,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
rtt_ema_ms: dc.rtt_ms,
alive_writers: dc.alive_writers,
required_writers: dc.required_writers,
coverage_ratio: dc.coverage_ratio,
coverage_pct: dc.coverage_pct,
})
.collect(),

View File

@ -315,6 +315,7 @@ async fn get_minimal_payload_cached(
available_pct: status.available_pct,
required_writers: status.required_writers,
alive_writers: status.alive_writers,
coverage_ratio: status.coverage_ratio,
coverage_pct: status.coverage_pct,
fresh_alive_writers: status.fresh_alive_writers,
fresh_coverage_pct: status.fresh_coverage_pct,
@ -372,6 +373,7 @@ async fn get_minimal_payload_cached(
floor_max: entry.floor_max,
floor_capped: entry.floor_capped,
alive_writers: entry.alive_writers,
coverage_ratio: entry.coverage_ratio,
coverage_pct: entry.coverage_pct,
fresh_alive_writers: entry.fresh_alive_writers,
fresh_coverage_pct: entry.fresh_coverage_pct,
@ -502,6 +504,7 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa
available_pct: 0.0,
required_writers: 0,
alive_writers: 0,
coverage_ratio: 0.0,
coverage_pct: 0.0,
fresh_alive_writers: 0,
fresh_coverage_pct: 0.0,

View File

@ -70,10 +70,12 @@ impl MePool {
let mut missing_dc = Vec::<i32>::new();
let mut covered = 0usize;
let mut total = 0usize;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
total += 1;
if endpoints
.iter()
.any(|addr| active_writer_addrs.contains(&(*dc, *addr)))
@ -85,7 +87,9 @@ impl MePool {
}
missing_dc.sort_unstable();
let total = desired_by_dc.len().max(1);
if total == 0 {
return (1.0, missing_dc);
}
let ratio = (covered as f32) / (total as f32);
(ratio, missing_dc)
}
@ -399,29 +403,21 @@ impl MePool {
}
if hardswap {
let mut fresh_missing_dc = Vec::<(i32, usize, usize)>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
let required = self.required_writers_for_dc(endpoints.len());
let fresh_count = writers
let fresh_writer_addrs: HashSet<(i32, SocketAddr)> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| w.writer_dc == *dc)
.filter(|w| endpoints.contains(&w.addr))
.count();
if fresh_count < required {
fresh_missing_dc.push((*dc, fresh_count, required));
}
}
.map(|w| (w.writer_dc, w.addr))
.collect();
let (fresh_coverage_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
fresh_coverage_ratio = format_args!("{fresh_coverage_ratio:.3}"),
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
"ME hardswap pending: fresh generation DC coverage incomplete"
);
return;
}
@ -491,3 +487,61 @@ impl MePool {
self.zero_downtime_reinit_after_map_change(rng).await;
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::MePool;
fn addr(octet: u8, port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, octet)), port)
}
#[test]
fn coverage_ratio_counts_dc_coverage_not_floor() {
let dc1 = addr(1, 2001);
let dc2 = addr(2, 2002);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(1, HashSet::from([dc1]));
desired_by_dc.insert(2, HashSet::from([dc2]));
let active_writer_addrs = HashSet::from([(1, dc1)]);
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs);
assert_eq!(ratio, 0.5);
assert_eq!(missing_dc, vec![2]);
}
#[test]
fn coverage_ratio_ignores_empty_dc_groups() {
let dc1 = addr(1, 2001);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(1, HashSet::from([dc1]));
desired_by_dc.insert(2, HashSet::new());
let active_writer_addrs = HashSet::from([(1, dc1)]);
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs);
assert_eq!(ratio, 1.0);
assert!(missing_dc.is_empty());
}
#[test]
fn coverage_ratio_reports_missing_dcs_sorted() {
let dc1 = addr(1, 2001);
let dc2 = addr(2, 2002);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(2, HashSet::from([dc2]));
desired_by_dc.insert(1, HashSet::from([dc1]));
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &HashSet::new());
assert_eq!(ratio, 0.0);
assert_eq!(missing_dc, vec![1, 2]);
}
}

View File

@ -40,6 +40,7 @@ pub(crate) struct MeApiDcStatusSnapshot {
pub floor_max: usize,
pub floor_capped: bool,
pub alive_writers: usize,
pub coverage_ratio: f64,
pub coverage_pct: f64,
pub fresh_alive_writers: usize,
pub fresh_coverage_pct: f64,
@ -62,6 +63,7 @@ pub(crate) struct MeApiStatusSnapshot {
pub available_pct: f64,
pub required_writers: usize,
pub alive_writers: usize,
pub coverage_ratio: f64,
pub coverage_pct: f64,
pub fresh_alive_writers: usize,
pub fresh_coverage_pct: f64,
@ -342,6 +344,8 @@ impl MePool {
let mut available_endpoints = 0usize;
let mut alive_writers = 0usize;
let mut fresh_alive_writers = 0usize;
let mut coverage_ratio_dcs_total = 0usize;
let mut coverage_ratio_dcs_covered = 0usize;
let floor_mode = self.floor_mode();
let adaptive_cpu_cores = (self
.me_adaptive_floor_cpu_cores_effective
@ -393,6 +397,12 @@ impl MePool {
available_endpoints += dc_available_endpoints;
alive_writers += dc_alive_writers;
fresh_alive_writers += dc_fresh_alive_writers;
if endpoint_count > 0 {
coverage_ratio_dcs_total += 1;
if dc_alive_writers > 0 {
coverage_ratio_dcs_covered += 1;
}
}
dcs.push(MeApiDcStatusSnapshot {
dc,
@ -415,6 +425,11 @@ impl MePool {
floor_max,
floor_capped,
alive_writers: dc_alive_writers,
coverage_ratio: if endpoint_count > 0 && dc_alive_writers > 0 {
100.0
} else {
0.0
},
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
fresh_alive_writers: dc_fresh_alive_writers,
fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers),
@ -431,6 +446,7 @@ impl MePool {
available_pct: ratio_pct(available_endpoints, configured_endpoints),
required_writers,
alive_writers,
coverage_ratio: ratio_pct(coverage_ratio_dcs_covered, coverage_ratio_dcs_total),
coverage_pct: ratio_pct(alive_writers, required_writers),
fresh_alive_writers,
fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers),