diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 257d8f3..9f6e1bb 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -82,8 +82,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap = HashMap::new(); let mut degraded_interval = true; @@ -109,8 +107,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut single_endpoint_outage, &mut shadow_rotate_deadline, &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, &mut floor_warn_next_allowed, ) .await; @@ -126,8 +122,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut single_endpoint_outage, &mut shadow_rotate_deadline, &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, &mut floor_warn_next_allowed, ) .await; @@ -360,8 +354,6 @@ async fn check_family( single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, ) -> bool { let enabled = match family { @@ -394,8 +386,6 @@ async fn check_family( let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget)); if pool.floor_mode() == MeFloorMode::Static { - adaptive_idle_since.clear(); - adaptive_recover_until.clear(); } let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new(); @@ -435,8 +425,6 @@ async fn check_family( &live_addr_counts, &live_writer_ids_by_addr, &bound_clients_by_writer, - adaptive_idle_since, - adaptive_recover_until, ) .await; pool.set_adaptive_floor_runtime_caps( @@ -503,8 +491,6 @@ async fn check_family( outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); idle_refresh_next_attempt.remove(&key); - adaptive_idle_since.remove(&key); - adaptive_recover_until.remove(&key); info!( dc = %dc, ?family, @@ -632,21 +618,25 @@ async fn check_family( restored += 1; continue; } - pool_for_reconnect - .stats - .increment_me_floor_cap_block_total(); - pool_for_reconnect - .stats - .increment_me_floor_swap_idle_failed_total(); - debug!( - dc = %dc, - ?family, - alive, - required, - active_cap_effective_total, - "Adaptive floor cap reached, reconnect attempt blocked" - ); - break; + + let base_req = pool_for_reconnect.required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false); + if alive + restored >= base_req { + pool_for_reconnect + .stats + .increment_me_floor_cap_block_total(); + pool_for_reconnect + .stats + .increment_me_floor_swap_idle_failed_total(); + debug!( + dc = %dc, + ?family, + alive, + required, + active_cap_effective_total, + "Adaptive floor cap reached, reconnect attempt blocked" + ); + break; + } } let res = tokio::time::timeout( pool_for_reconnect.reconnect_runtime.me_one_timeout, @@ -904,8 +894,6 @@ async fn build_family_floor_plan( live_addr_counts: &HashMap<(i32, SocketAddr), usize>, live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>, bound_clients_by_writer: &HashMap, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) -> FamilyFloorPlan { let mut entries = Vec::::new(); let mut by_dc = HashMap::::new(); @@ -921,18 +909,7 @@ async fn build_family_floor_plan( if endpoints.is_empty() { continue; } - let key = (*dc, family); - let reduce_for_idle = should_reduce_floor_for_idle( - pool, - key, - *dc, - endpoints, - live_writer_ids_by_addr, - bound_clients_by_writer, - adaptive_idle_since, - adaptive_recover_until, - ) - .await; + let _key = (*dc, family); let base_required = pool.required_writers_for_dc(endpoints.len()).max(1); let min_required = if is_adaptive { adaptive_floor_class_min(pool, endpoints.len(), base_required) @@ -947,11 +924,11 @@ async fn build_family_floor_plan( if max_required < min_required { max_required = min_required; } - let desired_raw = if is_adaptive && reduce_for_idle { - min_required - } else { - base_required - }; + // We initialize target_required at base_required to prevent 0-writer blackouts + // caused by proactively dropping an idle DC to a single fragile connection. + // The Adaptive Floor constraint loop below will gracefully compress idle DCs + // (prioritized via has_bound_clients = false) to min_required only when global capacity is reached. + let desired_raw = base_required; let target_required = desired_raw.clamp(min_required, max_required); let alive = endpoints .iter() @@ -1278,43 +1255,6 @@ async fn maybe_refresh_idle_writer_for_dc( ); } -async fn should_reduce_floor_for_idle( - pool: &Arc, - key: (i32, IpFamily), - dc: i32, - endpoints: &[SocketAddr], - live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>, - bound_clients_by_writer: &HashMap, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, -) -> bool { - if pool.floor_mode() != MeFloorMode::Adaptive { - adaptive_idle_since.remove(&key); - adaptive_recover_until.remove(&key); - return false; - } - - let now = Instant::now(); - let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr); - let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer); - if has_bound_clients { - adaptive_idle_since.remove(&key); - adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration()); - return false; - } - - if let Some(recover_until) = adaptive_recover_until.get(&key) - && now < *recover_until - { - adaptive_idle_since.remove(&key); - return false; - } - adaptive_recover_until.remove(&key); - - let idle_since = adaptive_idle_since.entry(key).or_insert(now); - now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration() -} - fn has_bound_clients_on_endpoint( writer_ids: &[u64], bound_clients_by_writer: &HashMap, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 249d387..7b1d3e8 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1422,22 +1422,6 @@ impl MePool { MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed)) } - pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { - Duration::from_secs( - self.floor_runtime - .me_adaptive_floor_idle_secs - .load(Ordering::Relaxed), - ) - } - - pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { - Duration::from_secs( - self.floor_runtime - .me_adaptive_floor_recover_grace_secs - .load(Ordering::Relaxed), - ) - } - pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { (self .floor_runtime @@ -1659,6 +1643,7 @@ impl MePool { &self, contour: WriterContour, allow_coverage_override: bool, + writer_dc: i32, ) -> bool { let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await; match contour { @@ -1670,6 +1655,33 @@ impl MePool { if !allow_coverage_override { return false; } + + let mut endpoints_len = 0; + let now_epoch = Self::now_epoch_secs(); + if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) { + if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) { + endpoints_len += addrs.len(); + } + } + if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) { + if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) { + endpoints_len += addrs.len(); + } + } + + if endpoints_len > 0 { + let base_req = self.required_writers_for_dc_with_floor_mode(endpoints_len, false); + let active_for_dc = { + let ws = self.writers.read().await; + ws.iter() + .filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed) && w.writer_dc == writer_dc) + .count() + }; + if active_for_dc < base_req { + return true; + } + } + let coverage_required = self.active_coverage_required_total().await; active_writers < coverage_required } diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index fae68b9..52c8fae 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -342,7 +342,7 @@ impl MePool { allow_coverage_override: bool, ) -> Result<()> { if !self - .can_open_writer_for_contour(contour, allow_coverage_override) + .can_open_writer_for_contour(contour, allow_coverage_override, writer_dc) .await { return Err(ProxyError::Proxy(format!(