From 0ff2e95e49cff9a324618ff0425c3d59e2293fc6 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 7 Mar 2026 03:22:01 +0300 Subject: [PATCH] Event-driven Drafts Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 66 ++++++++++++++++++++--- src/transport/middle_proxy/pool.rs | 3 ++ src/transport/middle_proxy/pool_refill.rs | 4 +- src/transport/middle_proxy/send.rs | 36 +++++++------ 4 files changed, 84 insertions(+), 25 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index a594a01..4f8370c 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -22,6 +22,10 @@ const IDLE_REFRESH_TRIGGER_BASE_SECS: u64 = 45; const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5; const IDLE_REFRESH_RETRY_SECS: u64 = 8; const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5; +const HEALTH_RECONNECT_BUDGET_PER_CORE: usize = 2; +const HEALTH_RECONNECT_BUDGET_PER_DC: usize = 1; +const HEALTH_RECONNECT_BUDGET_MIN: usize = 4; +const HEALTH_RECONNECT_BUDGET_MAX: usize = 128; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -114,22 +118,23 @@ async fn check_family( return; } - let map = match family { - IpFamily::V4 => pool.proxy_map_v4.read().await.clone(), - IpFamily::V6 => pool.proxy_map_v6.read().await.clone(), - }; - let mut dc_endpoints = HashMap::>::new(); - for (dc, addrs) in map { - let entry = dc_endpoints.entry(dc).or_default(); - for (ip, port) in addrs { + let map_guard = match family { + IpFamily::V4 => pool.proxy_map_v4.read().await, + IpFamily::V6 => pool.proxy_map_v6.read().await, + }; + for (dc, addrs) in map_guard.iter() { + let entry = dc_endpoints.entry(*dc).or_default(); + for (ip, port) in addrs.iter().copied() { entry.push(SocketAddr::new(ip, port)); } } + drop(map_guard); for endpoints in dc_endpoints.values_mut() { endpoints.sort_unstable(); endpoints.dedup(); } + let mut reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); if pool.floor_mode() == MeFloorMode::Static { adaptive_idle_since.clear(); @@ -200,6 +205,7 @@ async fn check_family( required, outage_backoff, outage_next_attempt, + &mut reconnect_budget, ) .await; continue; @@ -256,6 +262,24 @@ async fn check_family( let missing = required - alive; let now = Instant::now(); + if reconnect_budget == 0 { + let base_ms = pool.me_reconnect_backoff_base.as_millis() as u64; + let next_ms = (*backoff.get(&key).unwrap_or(&base_ms)).max(base_ms); + let jitter = next_ms / JITTER_FRAC_NUM; + let wait = Duration::from_millis(next_ms) + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + debug!( + dc = %dc, + ?family, + alive, + required, + endpoint_count = endpoints.len(), + reconnect_budget, + "Skipping reconnect due to per-tick health reconnect budget" + ); + continue; + } if let Some(ts) = next_attempt.get(&key) && now < *ts { @@ -281,6 +305,10 @@ async fn check_family( let mut restored = 0usize; for _ in 0..missing { + if reconnect_budget == 0 { + break; + } + reconnect_budget = reconnect_budget.saturating_sub(1); if pool.floor_mode() == MeFloorMode::Adaptive && pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total { @@ -383,6 +411,15 @@ async fn check_family( } } +fn health_reconnect_budget(pool: &Arc, dc_groups: usize) -> usize { + let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1); + let by_cpu = cpu_cores.saturating_mul(HEALTH_RECONNECT_BUDGET_PER_CORE); + let by_dc = dc_groups.saturating_mul(HEALTH_RECONNECT_BUDGET_PER_DC); + by_cpu + .saturating_add(by_dc) + .clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX) +} + fn adaptive_floor_class_min( pool: &Arc, endpoint_count: usize, @@ -816,6 +853,7 @@ async fn recover_single_endpoint_outage( required: usize, outage_backoff: &mut HashMap<(i32, IpFamily), u64>, outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, + reconnect_budget: &mut usize, ) { let now = Instant::now(); if let Some(ts) = outage_next_attempt.get(&key) @@ -825,6 +863,18 @@ async fn recover_single_endpoint_outage( } let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); + if *reconnect_budget == 0 { + outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + required, + "Single-endpoint outage reconnect deferred by health reconnect budget" + ); + return; + } + *reconnect_budget = (*reconnect_budget).saturating_sub(1); pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 1145823..8c59b5b 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -124,6 +124,7 @@ pub struct MePool { pub(super) me_adaptive_floor_target_writers_total: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, + pub(super) endpoint_dc_map: Arc>>>, pub(super) default_dc: AtomicI32, pub(super) next_writer_id: AtomicU64, pub(super) ping_tracker: Arc>>, @@ -254,6 +255,7 @@ impl MePool { me_route_inline_recovery_attempts: u32, me_route_inline_recovery_wait_ms: u64, ) -> Arc { + let endpoint_dc_map = Self::build_endpoint_dc_map_from_maps(&proxy_map_v4, &proxy_map_v6); let registry = Arc::new(ConnRegistry::new()); registry.update_route_backpressure_policy( me_route_backpressure_base_timeout_ms, @@ -355,6 +357,7 @@ impl MePool { pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), + endpoint_dc_map: Arc::new(RwLock::new(endpoint_dc_map)), default_dc: AtomicI32::new(default_dc.unwrap_or(2)), next_writer_id: AtomicU64::new(1), ping_tracker: Arc::new(Mutex::new(HashMap::new())), diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 87b87d5..7da6acc 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -172,7 +172,7 @@ impl MePool { let target_dc = self.resolve_dc_for_endpoint(addr).await; if self.decision.ipv4_me { - let map = self.proxy_map_v4.read().await.clone(); + let map = self.proxy_map_v4.read().await; if let Some(addrs) = map.get(&target_dc) { for (ip, port) in addrs { endpoints.insert(SocketAddr::new(*ip, *port)); @@ -181,7 +181,7 @@ impl MePool { } if self.decision.ipv6_me { - let map = self.proxy_map_v6.read().await.clone(); + let map = self.proxy_map_v6.read().await; if let Some(addrs) = map.get(&target_dc) { for (ip, port) in addrs { endpoints.insert(SocketAddr::new(*ip, *port)); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index b9b1fd5..9ffcc8e 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -378,12 +378,16 @@ impl MePool { return self.has_candidate_for_target_dc(target_dc).await; } - let remaining = deadline.saturating_duration_since(now); - let sleep_for = remaining.min(Duration::from_millis(25)); let waiter = self.writer_available.notified(); - tokio::select! { - _ = waiter => {} - _ = tokio::time::sleep(sleep_for) => {} + if self.has_candidate_for_target_dc(target_dc).await { + return true; + } + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + return self.has_candidate_for_target_dc(target_dc).await; + } + if tokio::time::timeout(remaining, waiter).await.is_err() { + return self.has_candidate_for_target_dc(target_dc).await; } } } @@ -423,11 +427,11 @@ impl MePool { self.stats.increment_me_async_recovery_trigger_total(); let mut seen = HashSet::::new(); for family in self.family_order() { - let map = match family { - IpFamily::V4 => self.proxy_map_v4.read().await.clone(), - IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + let map_guard = match family { + IpFamily::V4 => self.proxy_map_v4.read().await, + IpFamily::V6 => self.proxy_map_v6.read().await, }; - for addrs in map.values() { + for addrs in map_guard.values() { for (ip, port) in addrs { let addr = SocketAddr::new(*ip, *port); if seen.insert(addr) { @@ -448,13 +452,13 @@ impl MePool { let lookup_keys = self.dc_lookup_chain_for_target(key); for family in self.family_order() { - let map = match family { - IpFamily::V4 => self.proxy_map_v4.read().await.clone(), - IpFamily::V6 => self.proxy_map_v6.read().await.clone(), + let map_guard = match family { + IpFamily::V4 => self.proxy_map_v4.read().await, + IpFamily::V6 => self.proxy_map_v6.read().await, }; let mut family_selected = Vec::::new(); for lookup in lookup_keys.iter().copied() { - if let Some(addrs) = map.get(&lookup) { + if let Some(addrs) = map_guard.get(&lookup) { for (ip, port) in addrs { family_selected.push(SocketAddr::new(*ip, *port)); } @@ -557,7 +561,7 @@ impl MePool { include_warm: bool, ) -> Vec { let key = target_dc as i32; - let mut preferred = Vec::::new(); + let mut preferred = HashSet::::new(); let lookup_keys = self.dc_lookup_chain_for_target(key); for family in self.family_order() { @@ -574,7 +578,9 @@ impl MePool { break; } } - preferred.extend(family_selected); + for endpoint in family_selected { + preferred.insert(endpoint); + } drop(map_guard);