mirror of https://github.com/telemt/telemt.git
ME Pool V2 - Agressive Healthcheck and Pool Rebuild
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
35ae455e2b
commit
4be4670668
|
|
@ -11,12 +11,16 @@ use crate::network::IpFamily;
|
||||||
|
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
|
|
||||||
|
const HEALTH_INTERVAL_SECS: u64 = 1;
|
||||||
|
const QUICK_RETRY_ATTEMPTS: u8 = 10;
|
||||||
|
const QUICK_RETRY_DELAY_MS: u64 = 2500;
|
||||||
|
|
||||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||||
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
||||||
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new();
|
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new();
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
||||||
check_family(
|
check_family(
|
||||||
IpFamily::V4,
|
IpFamily::V4,
|
||||||
&pool,
|
&pool,
|
||||||
|
|
@ -83,8 +87,33 @@ async fn check_family(
|
||||||
inflight_single.remove(&(dc, family));
|
inflight_single.remove(&(dc, family));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff.
|
||||||
let key = (dc, family);
|
let key = (dc, family);
|
||||||
let delay = *backoff.get(&key).unwrap_or(&30);
|
for attempt in 0..QUICK_RETRY_ATTEMPTS {
|
||||||
|
let mut shuffled = dc_addrs.clone();
|
||||||
|
shuffled.shuffle(&mut rand::rng());
|
||||||
|
let mut success = false;
|
||||||
|
for addr in &shuffled {
|
||||||
|
match pool.connect_one(*addr, rng.as_ref()).await {
|
||||||
|
Ok(()) => {
|
||||||
|
info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)");
|
||||||
|
backoff.insert(key, HEALTH_INTERVAL_SECS);
|
||||||
|
last_attempt.insert(key, Instant::now());
|
||||||
|
inflight_single.remove(&key);
|
||||||
|
success = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if success {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS);
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
if let Some(last) = last_attempt.get(&key) {
|
if let Some(last) = last_attempt.get(&key) {
|
||||||
if now.duration_since(*last).as_secs() < delay {
|
if now.duration_since(*last).as_secs() < delay {
|
||||||
|
|
@ -149,7 +178,7 @@ async fn check_family(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting...");
|
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)...");
|
||||||
let mut shuffled = dc_addrs.clone();
|
let mut shuffled = dc_addrs.clone();
|
||||||
shuffled.shuffle(&mut rand::rng());
|
shuffled.shuffle(&mut rand::rng());
|
||||||
let mut reconnected = false;
|
let mut reconnected = false;
|
||||||
|
|
@ -166,7 +195,7 @@ async fn check_family(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !reconnected {
|
if !reconnected {
|
||||||
let next = (*backoff.get(&key).unwrap_or(&30)).saturating_mul(2).min(300);
|
let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60);
|
||||||
backoff.insert(key, next);
|
backoff.insert(key, next);
|
||||||
last_attempt.insert(key, now);
|
last_attempt.insert(key, now);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue