From 2b8159a65ea45e12808df3ac3f7a3e8bd0fe8d79 Mon Sep 17 00:00:00 2001 From: miniusercoder Date: Mon, 6 Apr 2026 21:06:53 +0300 Subject: [PATCH] fix(pool): enhance reconnect logic for single-endpoint data centers --- src/transport/middle_proxy/health.rs | 38 ++++++++++++++++++----- src/transport/middle_proxy/pool.rs | 11 ++++++- src/transport/middle_proxy/pool_refill.rs | 13 ++++++-- 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 9f6e1bb..e56d001 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -67,10 +67,8 @@ struct FamilyReconnectOutcome { key: (i32, IpFamily), dc: i32, family: IpFamily, - alive: usize, required: usize, endpoint_count: usize, - restored: usize, } pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { @@ -638,6 +636,7 @@ async fn check_family( break; } } + pool_for_reconnect.stats.increment_me_reconnect_attempt(); let res = tokio::time::timeout( pool_for_reconnect.reconnect_runtime.me_one_timeout, pool_for_reconnect.connect_endpoints_round_robin( @@ -653,11 +652,9 @@ async fn check_family( pool_for_reconnect.stats.increment_me_reconnect_success(); } Ok(false) => { - pool_for_reconnect.stats.increment_me_reconnect_attempt(); debug!(dc = %dc, ?family, "ME round-robin reconnect failed") } Err(_) => { - pool_for_reconnect.stats.increment_me_reconnect_attempt(); debug!(dc = %dc, ?family, "ME reconnect timed out"); } } @@ -668,10 +665,8 @@ async fn check_family( key, dc, family, - alive, required, endpoint_count: endpoints_for_dc.len(), - restored, } }); } @@ -685,7 +680,7 @@ async fn check_family( } }; let now = Instant::now(); - let now_alive = outcome.alive + outcome.restored; + let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await; if now_alive >= outcome.required { info!( dc = %outcome.dc, @@ -841,6 +836,33 @@ fn should_emit_rate_limited_warn( false } +async fn live_active_writers_for_dc_family(pool: &Arc, dc: i32, family: IpFamily) -> usize { + let writers = pool.writers.read().await; + writers + .iter() + .filter(|writer| { + if writer.draining.load(std::sync::atomic::Ordering::Relaxed) { + return false; + } + if writer.writer_dc != dc { + return false; + } + if !matches!( + super::pool::WriterContour::from_u8( + writer.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + super::pool::WriterContour::Active + ) { + return false; + } + match family { + IpFamily::V4 => writer.addr.is_ipv4(), + IpFamily::V6 => writer.addr.is_ipv6(), + } + }) + .count() +} + fn adaptive_floor_class_min( pool: &Arc, endpoint_count: usize, @@ -1304,6 +1326,7 @@ async fn recover_single_endpoint_outage( ); return; }; + pool.stats.increment_me_reconnect_attempt(); pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); @@ -1379,7 +1402,6 @@ async fn recover_single_endpoint_outage( return; } - pool.stats.increment_me_reconnect_attempt(); let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms); let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms); outage_backoff.insert(key, next_ms); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 7b1d3e8..315b6fc 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1674,7 +1674,16 @@ impl MePool { let active_for_dc = { let ws = self.writers.read().await; ws.iter() - .filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed) && w.writer_dc == writer_dc) + .filter(|w| { + !w.draining.load(std::sync::atomic::Ordering::Relaxed) + && w.writer_dc == writer_dc + && matches!( + WriterContour::from_u8( + w.contour.load(std::sync::atomic::Ordering::Relaxed), + ), + WriterContour::Active + ) + }) .count() }; if active_for_dc < base_req { diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 69d8aa0..f43ec3e 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -236,8 +236,18 @@ impl MePool { let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1); let mut total_attempts = 0u32; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; + let dc_endpoints = self.endpoints_for_dc(writer_dc).await; + let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr; + let bypass_quarantine_for_single_endpoint = + single_endpoint_dc && self.single_endpoint_outage_disable_quarantine(); - if !same_endpoint_quarantined { + if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint { + if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint { + debug!( + %addr, + "Bypassing quarantine for immediate reconnect on single-endpoint DC" + ); + } for attempt in 0..fast_retries { if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { break; @@ -276,7 +286,6 @@ impl MePool { ); } - let dc_endpoints = self.endpoints_for_dc(writer_dc).await; if dc_endpoints.is_empty() { self.stats.increment_me_refill_failed_total(); return false;