diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 257d8f3..000bca0 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -67,10 +67,8 @@ struct FamilyReconnectOutcome { key: (i32, IpFamily), dc: i32, family: IpFamily, - alive: usize, required: usize, endpoint_count: usize, - restored: usize, } pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { @@ -82,8 +80,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap = HashMap::new(); let mut degraded_interval = true; @@ -109,8 +105,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut single_endpoint_outage, &mut shadow_rotate_deadline, &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, &mut floor_warn_next_allowed, ) .await; @@ -126,8 +120,6 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut single_endpoint_outage, &mut shadow_rotate_deadline, &mut idle_refresh_next_attempt, - &mut adaptive_idle_since, - &mut adaptive_recover_until, &mut floor_warn_next_allowed, ) .await; @@ -360,8 +352,6 @@ async fn check_family( single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, ) -> bool { let enabled = match family { @@ -393,10 +383,7 @@ async fn check_family( let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget)); - if pool.floor_mode() == MeFloorMode::Static { - adaptive_idle_since.clear(); - adaptive_recover_until.clear(); - } + if pool.floor_mode() == MeFloorMode::Static {} let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new(); let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec>::new(); @@ -435,8 +422,6 @@ async fn check_family( &live_addr_counts, &live_writer_ids_by_addr, &bound_clients_by_writer, - adaptive_idle_since, - adaptive_recover_until, ) .await; pool.set_adaptive_floor_runtime_caps( @@ -503,8 +488,6 @@ async fn check_family( outage_next_attempt.remove(&key); shadow_rotate_deadline.remove(&key); idle_refresh_next_attempt.remove(&key); - adaptive_idle_since.remove(&key); - adaptive_recover_until.remove(&key); info!( dc = %dc, ?family, @@ -632,22 +615,28 @@ async fn check_family( restored += 1; continue; } - pool_for_reconnect - .stats - .increment_me_floor_cap_block_total(); - pool_for_reconnect - .stats - .increment_me_floor_swap_idle_failed_total(); - debug!( - dc = %dc, - ?family, - alive, - required, - active_cap_effective_total, - "Adaptive floor cap reached, reconnect attempt blocked" - ); - break; + + let base_req = pool_for_reconnect + .required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false); + if alive + restored >= base_req { + pool_for_reconnect + .stats + .increment_me_floor_cap_block_total(); + pool_for_reconnect + .stats + .increment_me_floor_swap_idle_failed_total(); + debug!( + dc = %dc, + ?family, + alive, + required, + active_cap_effective_total, + "Adaptive floor cap reached, reconnect attempt blocked" + ); + break; + } } + pool_for_reconnect.stats.increment_me_reconnect_attempt(); let res = tokio::time::timeout( pool_for_reconnect.reconnect_runtime.me_one_timeout, pool_for_reconnect.connect_endpoints_round_robin( @@ -663,11 +652,9 @@ async fn check_family( pool_for_reconnect.stats.increment_me_reconnect_success(); } Ok(false) => { - pool_for_reconnect.stats.increment_me_reconnect_attempt(); debug!(dc = %dc, ?family, "ME round-robin reconnect failed") } Err(_) => { - pool_for_reconnect.stats.increment_me_reconnect_attempt(); debug!(dc = %dc, ?family, "ME reconnect timed out"); } } @@ -678,10 +665,8 @@ async fn check_family( key, dc, family, - alive, required, endpoint_count: endpoints_for_dc.len(), - restored, } }); } @@ -695,7 +680,7 @@ async fn check_family( } }; let now = Instant::now(); - let now_alive = outcome.alive + outcome.restored; + let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await; if now_alive >= outcome.required { info!( dc = %outcome.dc, @@ -851,6 +836,33 @@ fn should_emit_rate_limited_warn( false } +async fn live_active_writers_for_dc_family(pool: &Arc, dc: i32, family: IpFamily) -> usize { + let writers = pool.writers.read().await; + writers + .iter() + .filter(|writer| { + if writer.draining.load(std::sync::atomic::Ordering::Relaxed) { + return false; + } + if writer.writer_dc != dc { + return false; + } + if !matches!( + super::pool::WriterContour::from_u8( + writer.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + super::pool::WriterContour::Active + ) { + return false; + } + match family { + IpFamily::V4 => writer.addr.is_ipv4(), + IpFamily::V6 => writer.addr.is_ipv6(), + } + }) + .count() +} + fn adaptive_floor_class_min( pool: &Arc, endpoint_count: usize, @@ -904,8 +916,6 @@ async fn build_family_floor_plan( live_addr_counts: &HashMap<(i32, SocketAddr), usize>, live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>, bound_clients_by_writer: &HashMap, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) -> FamilyFloorPlan { let mut entries = Vec::::new(); let mut by_dc = HashMap::::new(); @@ -921,18 +931,7 @@ async fn build_family_floor_plan( if endpoints.is_empty() { continue; } - let key = (*dc, family); - let reduce_for_idle = should_reduce_floor_for_idle( - pool, - key, - *dc, - endpoints, - live_writer_ids_by_addr, - bound_clients_by_writer, - adaptive_idle_since, - adaptive_recover_until, - ) - .await; + let _key = (*dc, family); let base_required = pool.required_writers_for_dc(endpoints.len()).max(1); let min_required = if is_adaptive { adaptive_floor_class_min(pool, endpoints.len(), base_required) @@ -947,11 +946,11 @@ async fn build_family_floor_plan( if max_required < min_required { max_required = min_required; } - let desired_raw = if is_adaptive && reduce_for_idle { - min_required - } else { - base_required - }; + // We initialize target_required at base_required to prevent 0-writer blackouts + // caused by proactively dropping an idle DC to a single fragile connection. + // The Adaptive Floor constraint loop below will gracefully compress idle DCs + // (prioritized via has_bound_clients = false) to min_required only when global capacity is reached. + let desired_raw = base_required; let target_required = desired_raw.clamp(min_required, max_required); let alive = endpoints .iter() @@ -1278,43 +1277,6 @@ async fn maybe_refresh_idle_writer_for_dc( ); } -async fn should_reduce_floor_for_idle( - pool: &Arc, - key: (i32, IpFamily), - dc: i32, - endpoints: &[SocketAddr], - live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec>, - bound_clients_by_writer: &HashMap, - adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, - adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, -) -> bool { - if pool.floor_mode() != MeFloorMode::Adaptive { - adaptive_idle_since.remove(&key); - adaptive_recover_until.remove(&key); - return false; - } - - let now = Instant::now(); - let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr); - let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer); - if has_bound_clients { - adaptive_idle_since.remove(&key); - adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration()); - return false; - } - - if let Some(recover_until) = adaptive_recover_until.get(&key) - && now < *recover_until - { - adaptive_idle_since.remove(&key); - return false; - } - adaptive_recover_until.remove(&key); - - let idle_since = adaptive_idle_since.entry(key).or_insert(now); - now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration() -} - fn has_bound_clients_on_endpoint( writer_ids: &[u64], bound_clients_by_writer: &HashMap, @@ -1364,6 +1326,7 @@ async fn recover_single_endpoint_outage( ); return; }; + pool.stats.increment_me_reconnect_attempt(); pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); @@ -1439,7 +1402,6 @@ async fn recover_single_endpoint_outage( return; } - pool.stats.increment_me_reconnect_attempt(); let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms); let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms); outage_backoff.insert(key, next_ms); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 249d387..b89a844 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1422,22 +1422,6 @@ impl MePool { MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed)) } - pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { - Duration::from_secs( - self.floor_runtime - .me_adaptive_floor_idle_secs - .load(Ordering::Relaxed), - ) - } - - pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { - Duration::from_secs( - self.floor_runtime - .me_adaptive_floor_recover_grace_secs - .load(Ordering::Relaxed), - ) - } - pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { (self .floor_runtime @@ -1659,6 +1643,7 @@ impl MePool { &self, contour: WriterContour, allow_coverage_override: bool, + writer_dc: i32, ) -> bool { let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await; match contour { @@ -1670,6 +1655,43 @@ impl MePool { if !allow_coverage_override { return false; } + + let mut endpoints_len = 0; + let now_epoch = Self::now_epoch_secs(); + if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) { + if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) { + endpoints_len += addrs.len(); + } + } + if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) { + if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) { + endpoints_len += addrs.len(); + } + } + + if endpoints_len > 0 { + let base_req = + self.required_writers_for_dc_with_floor_mode(endpoints_len, false); + let active_for_dc = { + let ws = self.writers.read().await; + ws.iter() + .filter(|w| { + !w.draining.load(std::sync::atomic::Ordering::Relaxed) + && w.writer_dc == writer_dc + && matches!( + WriterContour::from_u8( + w.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + WriterContour::Active + ) + }) + .count() + }; + if active_for_dc < base_req { + return true; + } + } + let coverage_required = self.active_coverage_required_total().await; active_writers < coverage_required } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 69d8aa0..bb62604 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -77,6 +77,12 @@ impl MePool { return Vec::new(); } + if endpoints.len() == 1 && self.single_endpoint_outage_disable_quarantine() { + let mut guard = self.endpoint_quarantine.lock().await; + guard.retain(|_, expiry| *expiry > Instant::now()); + return endpoints.to_vec(); + } + let mut guard = self.endpoint_quarantine.lock().await; let now = Instant::now(); guard.retain(|_, expiry| *expiry > now); @@ -236,8 +242,18 @@ impl MePool { let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1); let mut total_attempts = 0u32; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; + let dc_endpoints = self.endpoints_for_dc(writer_dc).await; + let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr; + let bypass_quarantine_for_single_endpoint = + single_endpoint_dc && self.single_endpoint_outage_disable_quarantine(); - if !same_endpoint_quarantined { + if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint { + if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint { + debug!( + %addr, + "Bypassing quarantine for immediate reconnect on single-endpoint DC" + ); + } for attempt in 0..fast_retries { if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { break; @@ -276,7 +292,6 @@ impl MePool { ); } - let dc_endpoints = self.endpoints_for_dc(writer_dc).await; if dc_endpoints.is_empty() { self.stats.increment_me_refill_failed_total(); return false; diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index fae68b9..52c8fae 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -342,7 +342,7 @@ impl MePool { allow_coverage_override: bool, ) -> Result<()> { if !self - .can_open_writer_for_contour(contour, allow_coverage_override) + .can_open_writer_for_contour(contour, allow_coverage_override, writer_dc) .await { return Err(ProxyError::Proxy(format!(