diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 192bf1b..55d8409 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant}; use rand::Rng; use tracing::{debug, info, warn}; +use crate::config::MeFloorMode; use crate::crypto::SecureRandom; use crate::network::IpFamily; @@ -26,6 +27,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut outage_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; pool.prune_closed_writers().await; @@ -40,6 +43,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut adaptive_idle_since, + &mut adaptive_recover_until, ) .await; check_family( @@ -53,6 +58,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut outage_next_attempt, &mut single_endpoint_outage, &mut shadow_rotate_deadline, + &mut adaptive_idle_since, + &mut adaptive_recover_until, ) .await; } @@ -69,6 +76,8 @@ async fn check_family( outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -95,6 +104,11 @@ async fn check_family( endpoints.dedup(); } + if pool.floor_mode() == MeFloorMode::Static { + adaptive_idle_since.clear(); + adaptive_recover_until.clear(); + } + let mut live_addr_counts = HashMap::::new(); let mut live_writer_ids_by_addr = HashMap::>::new(); for writer in pool.writers.read().await.iter().filter(|w| { @@ -111,12 +125,21 @@ async fn check_family( if endpoints.is_empty() { continue; } - let required = pool.required_writers_for_dc(endpoints.len()); + let key = (dc, family); + let reduce_for_idle = should_reduce_floor_for_idle( + pool, + key, + &endpoints, + &live_writer_ids_by_addr, + adaptive_idle_since, + adaptive_recover_until, + ) + .await; + let required = pool.required_writers_for_dc_with_floor_mode(endpoints.len(), reduce_for_idle); let alive = endpoints .iter() .map(|addr| *live_addr_counts.get(addr).unwrap_or(&0)) .sum::(); - let key = (dc, family); if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 { if single_endpoint_outage.insert(key) { @@ -148,6 +171,8 @@ async fn check_family( outage_backoff.remove(&key); outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); + adaptive_idle_since.remove(&key); + adaptive_recover_until.remove(&key); info!( dc = %dc, ?family, @@ -262,6 +287,54 @@ async fn check_family( } } +async fn should_reduce_floor_for_idle( + pool: &Arc, + key: (i32, IpFamily), + endpoints: &[SocketAddr], + live_writer_ids_by_addr: &HashMap>, + adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, +) -> bool { + if endpoints.len() != 1 || pool.floor_mode() != MeFloorMode::Adaptive { + adaptive_idle_since.remove(&key); + adaptive_recover_until.remove(&key); + return false; + } + + let now = Instant::now(); + let endpoint = endpoints[0]; + let writer_ids = live_writer_ids_by_addr + .get(&endpoint) + .map(Vec::as_slice) + .unwrap_or(&[]); + let has_bound_clients = has_bound_clients_on_endpoint(pool, writer_ids).await; + if has_bound_clients { + adaptive_idle_since.remove(&key); + adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration()); + return false; + } + + if let Some(recover_until) = adaptive_recover_until.get(&key) + && now < *recover_until + { + adaptive_idle_since.remove(&key); + return false; + } + adaptive_recover_until.remove(&key); + + let idle_since = adaptive_idle_since.entry(key).or_insert(now); + now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration() +} + +async fn has_bound_clients_on_endpoint(pool: &Arc, writer_ids: &[u64]) -> bool { + for writer_id in writer_ids { + if !pool.registry.is_writer_empty(*writer_id).await { + return true; + } + } + false +} + async fn recover_single_endpoint_outage( pool: &Arc, rng: &Arc,