From 2926b9f5c8cf18d50ec6703d69192be1a1df67ab Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 16:02:50 +0300 Subject: [PATCH] ME Concurrency Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 14 ++++++++++++++ src/transport/middle_proxy/pool.rs | 1 + 2 files changed, 15 insertions(+) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 79c92d6..d4d4a70 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -14,10 +14,12 @@ use super::MePool; const HEALTH_INTERVAL_SECS: u64 = 1; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff +const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( @@ -26,6 +28,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &rng, &mut backoff, &mut next_attempt, + &mut inflight, ) .await; check_family( @@ -34,6 +37,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &rng, &mut backoff, &mut next_attempt, + &mut inflight, ) .await; } @@ -45,6 +49,7 @@ async fn check_family( rng: &Arc, backoff: &mut HashMap<(i32, IpFamily), u64>, next_attempt: &mut HashMap<(i32, IpFamily), Instant>, + inflight: &mut HashMap<(i32, IpFamily), usize>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -91,6 +96,12 @@ async fn check_family( } } + let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize; + if *inflight.get(&key).unwrap_or(&0) >= max_concurrent { + return; + } + *inflight.entry(key).or_insert(0) += 1; + let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); let mut success = false; @@ -126,5 +137,8 @@ async fn check_family( next_attempt.insert(key, now + wait); warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect"); } + if let Some(v) = inflight.get_mut(&key) { + *v = v.saturating_sub(1); + } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 250b922..84c526f 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -420,6 +420,7 @@ impl MePool { let (tx, mut rx) = mpsc::channel::(4096); let tx_for_keepalive = tx.clone(); let keepalive_random = self.me_keepalive_payload_random; + let stats = self.stats.clone(); let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key,