From 2aa7ea51379f8cd66f4a950ff504419de701ce7d Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:46:13 +0300 Subject: [PATCH] ME Coverage Ratio in API + as Draining Factor --- src/api/model.rs | 2 + src/api/runtime_min.rs | 2 + src/api/runtime_stats.rs | 3 + src/transport/middle_proxy/pool_reinit.rs | 92 ++++++++++++++++++----- src/transport/middle_proxy/pool_status.rs | 16 ++++ 5 files changed, 96 insertions(+), 19 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index 6b6fd72..ac4e297 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -237,6 +237,7 @@ pub(super) struct MeWritersSummary { pub(super) available_pct: f64, pub(super) required_writers: usize, pub(super) alive_writers: usize, + pub(super) coverage_ratio: f64, pub(super) coverage_pct: f64, pub(super) fresh_alive_writers: usize, pub(super) fresh_coverage_pct: f64, @@ -285,6 +286,7 @@ pub(super) struct DcStatus { pub(super) floor_max: usize, pub(super) floor_capped: bool, pub(super) alive_writers: usize, + pub(super) coverage_ratio: f64, pub(super) coverage_pct: f64, pub(super) fresh_alive_writers: usize, pub(super) fresh_coverage_pct: f64, diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index d3066a3..f334dd0 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -113,6 +113,7 @@ pub(super) struct RuntimeMeQualityDcRttData { pub(super) rtt_ema_ms: Option, pub(super) alive_writers: usize, pub(super) required_writers: usize, + pub(super) coverage_ratio: f64, pub(super) coverage_pct: f64, } @@ -388,6 +389,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime rtt_ema_ms: dc.rtt_ms, alive_writers: dc.alive_writers, required_writers: dc.required_writers, + coverage_ratio: dc.coverage_ratio, coverage_pct: dc.coverage_pct, }) .collect(), diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 61c8a5a..f8948d1 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -315,6 +315,7 @@ async fn get_minimal_payload_cached( available_pct: status.available_pct, required_writers: status.required_writers, alive_writers: status.alive_writers, + coverage_ratio: status.coverage_ratio, coverage_pct: status.coverage_pct, fresh_alive_writers: status.fresh_alive_writers, fresh_coverage_pct: status.fresh_coverage_pct, @@ -372,6 +373,7 @@ async fn get_minimal_payload_cached( floor_max: entry.floor_max, floor_capped: entry.floor_capped, alive_writers: entry.alive_writers, + coverage_ratio: entry.coverage_ratio, coverage_pct: entry.coverage_pct, fresh_alive_writers: entry.fresh_alive_writers, fresh_coverage_pct: entry.fresh_coverage_pct, @@ -502,6 +504,7 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa available_pct: 0.0, required_writers: 0, alive_writers: 0, + coverage_ratio: 0.0, coverage_pct: 0.0, fresh_alive_writers: 0, fresh_coverage_pct: 0.0, diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 3d9d679..0d5c6f4 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -70,10 +70,12 @@ impl MePool { let mut missing_dc = Vec::::new(); let mut covered = 0usize; + let mut total = 0usize; for (dc, endpoints) in desired_by_dc { if endpoints.is_empty() { continue; } + total += 1; if endpoints .iter() .any(|addr| active_writer_addrs.contains(&(*dc, *addr))) @@ -85,7 +87,9 @@ impl MePool { } missing_dc.sort_unstable(); - let total = desired_by_dc.len().max(1); + if total == 0 { + return (1.0, missing_dc); + } let ratio = (covered as f32) / (total as f32); (ratio, missing_dc) } @@ -399,29 +403,21 @@ impl MePool { } if hardswap { - let mut fresh_missing_dc = Vec::<(i32, usize, usize)>::new(); - for (dc, endpoints) in &desired_by_dc { - if endpoints.is_empty() { - continue; - } - let required = self.required_writers_for_dc(endpoints.len()); - let fresh_count = writers - .iter() - .filter(|w| !w.draining.load(Ordering::Relaxed)) - .filter(|w| w.generation == generation) - .filter(|w| w.writer_dc == *dc) - .filter(|w| endpoints.contains(&w.addr)) - .count(); - if fresh_count < required { - fresh_missing_dc.push((*dc, fresh_count, required)); - } - } + let fresh_writer_addrs: HashSet<(i32, SocketAddr)> = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .filter(|w| w.generation == generation) + .map(|w| (w.writer_dc, w.addr)) + .collect(); + let (fresh_coverage_ratio, fresh_missing_dc) = + Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs); if !fresh_missing_dc.is_empty() { warn!( previous_generation, generation, + fresh_coverage_ratio = format_args!("{fresh_coverage_ratio:.3}"), missing_dc = ?fresh_missing_dc, - "ME hardswap pending: fresh generation coverage incomplete" + "ME hardswap pending: fresh generation DC coverage incomplete" ); return; } @@ -491,3 +487,61 @@ impl MePool { self.zero_downtime_reinit_after_map_change(rng).await; } } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + use super::MePool; + + fn addr(octet: u8, port: u16) -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, octet)), port) + } + + #[test] + fn coverage_ratio_counts_dc_coverage_not_floor() { + let dc1 = addr(1, 2001); + let dc2 = addr(2, 2002); + + let mut desired_by_dc = HashMap::>::new(); + desired_by_dc.insert(1, HashSet::from([dc1])); + desired_by_dc.insert(2, HashSet::from([dc2])); + + let active_writer_addrs = HashSet::from([(1, dc1)]); + let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs); + + assert_eq!(ratio, 0.5); + assert_eq!(missing_dc, vec![2]); + } + + #[test] + fn coverage_ratio_ignores_empty_dc_groups() { + let dc1 = addr(1, 2001); + + let mut desired_by_dc = HashMap::>::new(); + desired_by_dc.insert(1, HashSet::from([dc1])); + desired_by_dc.insert(2, HashSet::new()); + + let active_writer_addrs = HashSet::from([(1, dc1)]); + let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs); + + assert_eq!(ratio, 1.0); + assert!(missing_dc.is_empty()); + } + + #[test] + fn coverage_ratio_reports_missing_dcs_sorted() { + let dc1 = addr(1, 2001); + let dc2 = addr(2, 2002); + + let mut desired_by_dc = HashMap::>::new(); + desired_by_dc.insert(2, HashSet::from([dc2])); + desired_by_dc.insert(1, HashSet::from([dc1])); + + let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &HashSet::new()); + + assert_eq!(ratio, 0.0); + assert_eq!(missing_dc, vec![1, 2]); + } +} diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index d32835c..214ee49 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -40,6 +40,7 @@ pub(crate) struct MeApiDcStatusSnapshot { pub floor_max: usize, pub floor_capped: bool, pub alive_writers: usize, + pub coverage_ratio: f64, pub coverage_pct: f64, pub fresh_alive_writers: usize, pub fresh_coverage_pct: f64, @@ -62,6 +63,7 @@ pub(crate) struct MeApiStatusSnapshot { pub available_pct: f64, pub required_writers: usize, pub alive_writers: usize, + pub coverage_ratio: f64, pub coverage_pct: f64, pub fresh_alive_writers: usize, pub fresh_coverage_pct: f64, @@ -342,6 +344,8 @@ impl MePool { let mut available_endpoints = 0usize; let mut alive_writers = 0usize; let mut fresh_alive_writers = 0usize; + let mut coverage_ratio_dcs_total = 0usize; + let mut coverage_ratio_dcs_covered = 0usize; let floor_mode = self.floor_mode(); let adaptive_cpu_cores = (self .me_adaptive_floor_cpu_cores_effective @@ -393,6 +397,12 @@ impl MePool { available_endpoints += dc_available_endpoints; alive_writers += dc_alive_writers; fresh_alive_writers += dc_fresh_alive_writers; + if endpoint_count > 0 { + coverage_ratio_dcs_total += 1; + if dc_alive_writers > 0 { + coverage_ratio_dcs_covered += 1; + } + } dcs.push(MeApiDcStatusSnapshot { dc, @@ -415,6 +425,11 @@ impl MePool { floor_max, floor_capped, alive_writers: dc_alive_writers, + coverage_ratio: if endpoint_count > 0 && dc_alive_writers > 0 { + 100.0 + } else { + 0.0 + }, coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers), fresh_alive_writers: dc_fresh_alive_writers, fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers), @@ -431,6 +446,7 @@ impl MePool { available_pct: ratio_pct(available_endpoints, configured_endpoints), required_writers, alive_writers, + coverage_ratio: ratio_pct(coverage_ratio_dcs_covered, coverage_ratio_dcs_total), coverage_pct: ratio_pct(alive_writers, required_writers), fresh_alive_writers, fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers),