fix(pool): enhance reconnect logic for single-endpoint data centers

This commit is contained in:
miniusercoder
2026-04-06 21:06:53 +03:00
parent 86be0d53fe
commit 2b8159a65e
3 changed files with 51 additions and 11 deletions

View File

@@ -67,10 +67,8 @@ struct FamilyReconnectOutcome {
key: (i32, IpFamily), key: (i32, IpFamily),
dc: i32, dc: i32,
family: IpFamily, family: IpFamily,
alive: usize,
required: usize, required: usize,
endpoint_count: usize, endpoint_count: usize,
restored: usize,
} }
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
@@ -638,6 +636,7 @@ async fn check_family(
break; break;
} }
} }
pool_for_reconnect.stats.increment_me_reconnect_attempt();
let res = tokio::time::timeout( let res = tokio::time::timeout(
pool_for_reconnect.reconnect_runtime.me_one_timeout, pool_for_reconnect.reconnect_runtime.me_one_timeout,
pool_for_reconnect.connect_endpoints_round_robin( pool_for_reconnect.connect_endpoints_round_robin(
@@ -653,11 +652,9 @@ async fn check_family(
pool_for_reconnect.stats.increment_me_reconnect_success(); pool_for_reconnect.stats.increment_me_reconnect_success();
} }
Ok(false) => { Ok(false) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME round-robin reconnect failed") debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
} }
Err(_) => { Err(_) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME reconnect timed out"); debug!(dc = %dc, ?family, "ME reconnect timed out");
} }
} }
@@ -668,10 +665,8 @@ async fn check_family(
key, key,
dc, dc,
family, family,
alive,
required, required,
endpoint_count: endpoints_for_dc.len(), endpoint_count: endpoints_for_dc.len(),
restored,
} }
}); });
} }
@@ -685,7 +680,7 @@ async fn check_family(
} }
}; };
let now = Instant::now(); let now = Instant::now();
let now_alive = outcome.alive + outcome.restored; let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await;
if now_alive >= outcome.required { if now_alive >= outcome.required {
info!( info!(
dc = %outcome.dc, dc = %outcome.dc,
@@ -841,6 +836,33 @@ fn should_emit_rate_limited_warn(
false false
} }
async fn live_active_writers_for_dc_family(pool: &Arc<MePool>, dc: i32, family: IpFamily) -> usize {
let writers = pool.writers.read().await;
writers
.iter()
.filter(|writer| {
if writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
return false;
}
if writer.writer_dc != dc {
return false;
}
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(std::sync::atomic::Ordering::Relaxed),
),
super::pool::WriterContour::Active
) {
return false;
}
match family {
IpFamily::V4 => writer.addr.is_ipv4(),
IpFamily::V6 => writer.addr.is_ipv6(),
}
})
.count()
}
fn adaptive_floor_class_min( fn adaptive_floor_class_min(
pool: &Arc<MePool>, pool: &Arc<MePool>,
endpoint_count: usize, endpoint_count: usize,
@@ -1304,6 +1326,7 @@ async fn recover_single_endpoint_outage(
); );
return; return;
}; };
pool.stats.increment_me_reconnect_attempt();
pool.stats pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total(); .increment_me_single_endpoint_outage_reconnect_attempt_total();
@@ -1379,7 +1402,6 @@ async fn recover_single_endpoint_outage(
return; return;
} }
pool.stats.increment_me_reconnect_attempt();
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms); let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms); let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
outage_backoff.insert(key, next_ms); outage_backoff.insert(key, next_ms);

View File

@@ -1674,7 +1674,16 @@ impl MePool {
let active_for_dc = { let active_for_dc = {
let ws = self.writers.read().await; let ws = self.writers.read().await;
ws.iter() ws.iter()
.filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed) && w.writer_dc == writer_dc) .filter(|w| {
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
&& w.writer_dc == writer_dc
&& matches!(
WriterContour::from_u8(
w.contour.load(std::sync::atomic::Ordering::Relaxed),
),
WriterContour::Active
)
})
.count() .count()
}; };
if active_for_dc < base_req { if active_for_dc < base_req {

View File

@@ -236,8 +236,18 @@ impl MePool {
let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1); let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1);
let mut total_attempts = 0u32; let mut total_attempts = 0u32;
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr;
let bypass_quarantine_for_single_endpoint =
single_endpoint_dc && self.single_endpoint_outage_disable_quarantine();
if !same_endpoint_quarantined { if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint {
if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint {
debug!(
%addr,
"Bypassing quarantine for immediate reconnect on single-endpoint DC"
);
}
for attempt in 0..fast_retries { for attempt in 0..fast_retries {
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
break; break;
@@ -276,7 +286,6 @@ impl MePool {
); );
} }
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
if dc_endpoints.is_empty() { if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total(); self.stats.increment_me_refill_failed_total();
return false; return false;