diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 7141f94..574ea73 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -11,12 +11,16 @@ use crate::network::IpFamily; use super::MePool; +const HEALTH_INTERVAL_SECS: u64 = 1; +const QUICK_RETRY_ATTEMPTS: u8 = 10; +const QUICK_RETRY_DELAY_MS: u64 = 2500; + pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); loop { - tokio::time::sleep(Duration::from_secs(30)).await; + tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( IpFamily::V4, &pool, @@ -83,8 +87,33 @@ async fn check_family( inflight_single.remove(&(dc, family)); continue; } + + // Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff. let key = (dc, family); - let delay = *backoff.get(&key).unwrap_or(&30); + for attempt in 0..QUICK_RETRY_ATTEMPTS { + let mut shuffled = dc_addrs.clone(); + shuffled.shuffle(&mut rand::rng()); + let mut success = false; + for addr in &shuffled { + match pool.connect_one(*addr, rng.as_ref()).await { + Ok(()) => { + info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)"); + backoff.insert(key, HEALTH_INTERVAL_SECS); + last_attempt.insert(key, Instant::now()); + inflight_single.remove(&key); + success = true; + break; + } + Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"), + } + } + if success { + continue; + } + tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await; + } + + let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS); let now = Instant::now(); if let Some(last) = last_attempt.get(&key) { if now.duration_since(*last).as_secs() < delay { @@ -149,7 +178,7 @@ async fn check_family( continue; } - warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); + warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)..."); let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); let mut reconnected = false; @@ -166,7 +195,7 @@ async fn check_family( } } if !reconnected { - let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300); + let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60); backoff.insert(key, next); last_attempt.insert(key, now); }