Merge pull request #164 from telemt/flow

ME Pool V2 - Healthcheck + Pool rebuild
This commit is contained in:
Alexey 2026-02-19 14:33:23 +03:00 committed by GitHub
commit 9edbbb692e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 33 additions and 4 deletions

View File

@ -11,12 +11,16 @@ use crate::network::IpFamily;
use super::MePool; use super::MePool;
const HEALTH_INTERVAL_SECS: u64 = 1;
const QUICK_RETRY_ATTEMPTS: u8 = 10;
const QUICK_RETRY_DELAY_MS: u64 = 2500;
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new();
loop { loop {
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
check_family( check_family(
IpFamily::V4, IpFamily::V4,
&pool, &pool,
@ -83,8 +87,33 @@ async fn check_family(
inflight_single.remove(&(dc, family)); inflight_single.remove(&(dc, family));
continue; continue;
} }
// Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff.
let key = (dc, family); let key = (dc, family);
let delay = *backoff.get(&key).unwrap_or(&30); for attempt in 0..QUICK_RETRY_ATTEMPTS {
let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng());
let mut success = false;
for addr in &shuffled {
match pool.connect_one(*addr, rng.as_ref()).await {
Ok(()) => {
info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)");
backoff.insert(key, HEALTH_INTERVAL_SECS);
last_attempt.insert(key, Instant::now());
inflight_single.remove(&key);
success = true;
break;
}
Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"),
}
}
if success {
continue;
}
tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await;
}
let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS);
let now = Instant::now(); let now = Instant::now();
if let Some(last) = last_attempt.get(&key) { if let Some(last) = last_attempt.get(&key) {
if now.duration_since(*last).as_secs() < delay { if now.duration_since(*last).as_secs() < delay {
@ -149,7 +178,7 @@ async fn check_family(
continue; continue;
} }
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting..."); warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)...");
let mut shuffled = dc_addrs.clone(); let mut shuffled = dc_addrs.clone();
shuffled.shuffle(&mut rand::rng()); shuffled.shuffle(&mut rand::rng());
let mut reconnected = false; let mut reconnected = false;
@ -166,7 +195,7 @@ async fn check_family(
} }
} }
if !reconnected { if !reconnected {
let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300); let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60);
backoff.insert(key, next); backoff.insert(key, next);
last_attempt.insert(key, now); last_attempt.insert(key, now);
} }