No busy-poll in ME

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-07 03:25:26 +03:00
parent 0ff2e95e49
commit 93f58524d1
No known key found for this signature in database
3 changed files with 53 additions and 28 deletions

View File

@ -124,7 +124,7 @@ async fn check_family(
IpFamily::V6 => pool.proxy_map_v6.read().await, IpFamily::V6 => pool.proxy_map_v6.read().await,
}; };
for (dc, addrs) in map_guard.iter() { for (dc, addrs) in map_guard.iter() {
let entry = dc_endpoints.entry(*dc).or_default(); let entry = dc_endpoints.entry(dc.abs()).or_default();
for (ip, port) in addrs.iter().copied() { for (ip, port) in addrs.iter().copied() {
entry.push(SocketAddr::new(ip, port)); entry.push(SocketAddr::new(ip, port));
} }

View File

@ -792,33 +792,8 @@ impl MePool {
} }
pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 { pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 {
let map_guard = if addr.is_ipv4() { if let Some(cached) = self.endpoint_dc_map.read().await.get(&addr).copied()
self.proxy_map_v4.read().await && let Some(dc) = cached
} else {
self.proxy_map_v6.read().await
};
let mut matched_dc: Option<i32> = None;
let mut ambiguous = false;
for (dc, addrs) in map_guard.iter() {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
match matched_dc {
None => matched_dc = Some(*dc),
Some(prev_dc) if prev_dc == *dc => {}
Some(_) => {
ambiguous = true;
break;
}
}
}
}
drop(map_guard);
if !ambiguous
&& let Some(dc) = matched_dc
{ {
return dc; return dc;
} }
@ -835,4 +810,48 @@ impl MePool {
IpFamily::V6 => self.proxy_map_v6.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await.clone(),
} }
} }
fn merge_endpoint_dc(
endpoint_dc_map: &mut HashMap<SocketAddr, Option<i32>>,
dc: i32,
ip: IpAddr,
port: u16,
) {
let endpoint = SocketAddr::new(ip, port);
match endpoint_dc_map.get_mut(&endpoint) {
None => {
endpoint_dc_map.insert(endpoint, Some(dc));
}
Some(existing) => {
if existing.is_some_and(|existing_dc| existing_dc != dc) {
*existing = None;
}
}
}
}
fn build_endpoint_dc_map_from_maps(
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
) -> HashMap<SocketAddr, Option<i32>> {
let mut endpoint_dc_map = HashMap::<SocketAddr, Option<i32>>::new();
for (dc, endpoints) in map_v4 {
for (ip, port) in endpoints {
Self::merge_endpoint_dc(&mut endpoint_dc_map, *dc, *ip, *port);
}
}
for (dc, endpoints) in map_v6 {
for (ip, port) in endpoints {
Self::merge_endpoint_dc(&mut endpoint_dc_map, *dc, *ip, *port);
}
}
endpoint_dc_map
}
pub(super) async fn rebuild_endpoint_dc_map(&self) {
let map_v4 = self.proxy_map_v4.read().await.clone();
let map_v6 = self.proxy_map_v6.read().await.clone();
let rebuilt = Self::build_endpoint_dc_map_from_maps(&map_v4, &map_v6);
*self.endpoint_dc_map.write().await = rebuilt;
}
} }

View File

@ -54,6 +54,7 @@ impl MePool {
&& let Some(addrs) = guard.get(&k).cloned() && let Some(addrs) = guard.get(&k).cloned()
{ {
guard.insert(-k, addrs); guard.insert(-k, addrs);
changed = true;
} }
} }
} }
@ -65,9 +66,14 @@ impl MePool {
&& let Some(addrs) = guard.get(&k).cloned() && let Some(addrs) = guard.get(&k).cloned()
{ {
guard.insert(-k, addrs); guard.insert(-k, addrs);
changed = true;
} }
} }
} }
if changed {
self.rebuild_endpoint_dc_map().await;
self.writer_available.notify_waiters();
}
if changed { if changed {
SnapshotApplyOutcome::AppliedChanged SnapshotApplyOutcome::AppliedChanged
} else { } else {