ME Coverage Ratio in API + as Draining Factor

This commit is contained in:
Alexey
2026-03-18 11:46:13 +03:00
parent cb87b2eac3
commit 2aa7ea5137
5 changed files with 96 additions and 19 deletions

View File

@@ -70,10 +70,12 @@ impl MePool {
let mut missing_dc = Vec::<i32>::new();
let mut covered = 0usize;
let mut total = 0usize;
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
total += 1;
if endpoints
.iter()
.any(|addr| active_writer_addrs.contains(&(*dc, *addr)))
@@ -85,7 +87,9 @@ impl MePool {
}
missing_dc.sort_unstable();
let total = desired_by_dc.len().max(1);
if total == 0 {
return (1.0, missing_dc);
}
let ratio = (covered as f32) / (total as f32);
(ratio, missing_dc)
}
@@ -399,29 +403,21 @@ impl MePool {
}
if hardswap {
let mut fresh_missing_dc = Vec::<(i32, usize, usize)>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
let required = self.required_writers_for_dc(endpoints.len());
let fresh_count = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| w.writer_dc == *dc)
.filter(|w| endpoints.contains(&w.addr))
.count();
if fresh_count < required {
fresh_missing_dc.push((*dc, fresh_count, required));
}
}
let fresh_writer_addrs: HashSet<(i32, SocketAddr)> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.map(|w| (w.writer_dc, w.addr))
.collect();
let (fresh_coverage_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
fresh_coverage_ratio = format_args!("{fresh_coverage_ratio:.3}"),
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
"ME hardswap pending: fresh generation DC coverage incomplete"
);
return;
}
@@ -491,3 +487,61 @@ impl MePool {
self.zero_downtime_reinit_after_map_change(rng).await;
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::MePool;
fn addr(octet: u8, port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, octet)), port)
}
#[test]
fn coverage_ratio_counts_dc_coverage_not_floor() {
let dc1 = addr(1, 2001);
let dc2 = addr(2, 2002);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(1, HashSet::from([dc1]));
desired_by_dc.insert(2, HashSet::from([dc2]));
let active_writer_addrs = HashSet::from([(1, dc1)]);
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs);
assert_eq!(ratio, 0.5);
assert_eq!(missing_dc, vec![2]);
}
#[test]
fn coverage_ratio_ignores_empty_dc_groups() {
let dc1 = addr(1, 2001);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(1, HashSet::from([dc1]));
desired_by_dc.insert(2, HashSet::new());
let active_writer_addrs = HashSet::from([(1, dc1)]);
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &active_writer_addrs);
assert_eq!(ratio, 1.0);
assert!(missing_dc.is_empty());
}
#[test]
fn coverage_ratio_reports_missing_dcs_sorted() {
let dc1 = addr(1, 2001);
let dc2 = addr(2, 2002);
let mut desired_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
desired_by_dc.insert(2, HashSet::from([dc2]));
desired_by_dc.insert(1, HashSet::from([dc1]));
let (ratio, missing_dc) = MePool::coverage_ratio(&desired_by_dc, &HashSet::new());
assert_eq!(ratio, 0.0);
assert_eq!(missing_dc, vec![1, 2]);
}
}