From 93f58524d12c9b9315b7bdcad64a2e413d6e4d4c Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 7 Mar 2026 03:25:26 +0300 Subject: [PATCH] No busy-poll in ME Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 2 +- src/transport/middle_proxy/pool.rs | 73 ++++++++++++++--------- src/transport/middle_proxy/pool_config.rs | 6 ++ 3 files changed, 53 insertions(+), 28 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 4f8370c..8f4ad95 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -124,7 +124,7 @@ async fn check_family( IpFamily::V6 => pool.proxy_map_v6.read().await, }; for (dc, addrs) in map_guard.iter() { - let entry = dc_endpoints.entry(*dc).or_default(); + let entry = dc_endpoints.entry(dc.abs()).or_default(); for (ip, port) in addrs.iter().copied() { entry.push(SocketAddr::new(ip, port)); } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 8c59b5b..b3d8dc6 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -792,33 +792,8 @@ impl MePool { } pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 { - let map_guard = if addr.is_ipv4() { - self.proxy_map_v4.read().await - } else { - self.proxy_map_v6.read().await - }; - - let mut matched_dc: Option = None; - let mut ambiguous = false; - for (dc, addrs) in map_guard.iter() { - if addrs - .iter() - .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) - { - match matched_dc { - None => matched_dc = Some(*dc), - Some(prev_dc) if prev_dc == *dc => {} - Some(_) => { - ambiguous = true; - break; - } - } - } - } - drop(map_guard); - - if !ambiguous - && let Some(dc) = matched_dc + if let Some(cached) = self.endpoint_dc_map.read().await.get(&addr).copied() + && let Some(dc) = cached { return dc; } @@ -835,4 +810,48 @@ impl MePool { IpFamily::V6 => self.proxy_map_v6.read().await.clone(), } } + + fn merge_endpoint_dc( + endpoint_dc_map: &mut HashMap>, + dc: i32, + ip: IpAddr, + port: u16, + ) { + let endpoint = SocketAddr::new(ip, port); + match endpoint_dc_map.get_mut(&endpoint) { + None => { + endpoint_dc_map.insert(endpoint, Some(dc)); + } + Some(existing) => { + if existing.is_some_and(|existing_dc| existing_dc != dc) { + *existing = None; + } + } + } + } + + fn build_endpoint_dc_map_from_maps( + map_v4: &HashMap>, + map_v6: &HashMap>, + ) -> HashMap> { + let mut endpoint_dc_map = HashMap::>::new(); + for (dc, endpoints) in map_v4 { + for (ip, port) in endpoints { + Self::merge_endpoint_dc(&mut endpoint_dc_map, *dc, *ip, *port); + } + } + for (dc, endpoints) in map_v6 { + for (ip, port) in endpoints { + Self::merge_endpoint_dc(&mut endpoint_dc_map, *dc, *ip, *port); + } + } + endpoint_dc_map + } + + pub(super) async fn rebuild_endpoint_dc_map(&self) { + let map_v4 = self.proxy_map_v4.read().await.clone(); + let map_v6 = self.proxy_map_v6.read().await.clone(); + let rebuilt = Self::build_endpoint_dc_map_from_maps(&map_v4, &map_v6); + *self.endpoint_dc_map.write().await = rebuilt; + } } diff --git a/src/transport/middle_proxy/pool_config.rs b/src/transport/middle_proxy/pool_config.rs index 04e3bb5..a43f9bf 100644 --- a/src/transport/middle_proxy/pool_config.rs +++ b/src/transport/middle_proxy/pool_config.rs @@ -54,6 +54,7 @@ impl MePool { && let Some(addrs) = guard.get(&k).cloned() { guard.insert(-k, addrs); + changed = true; } } } @@ -65,9 +66,14 @@ impl MePool { && let Some(addrs) = guard.get(&k).cloned() { guard.insert(-k, addrs); + changed = true; } } } + if changed { + self.rebuild_endpoint_dc_map().await; + self.writer_available.notify_waiters(); + } if changed { SnapshotApplyOutcome::AppliedChanged } else {