Session by Target-DC-ID

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-06 19:59:23 +03:00
parent e9f8c79498
commit 24df865503
No known key found for this signature in database
5 changed files with 56 additions and 143 deletions

View File

@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet}; use std::collections::HashSet;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
@ -27,20 +27,14 @@ impl MePool {
for family in family_order { for family in family_order {
let map = self.proxy_map_for_family(family).await; let map = self.proxy_map_for_family(family).await;
let mut grouped_dc_addrs: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new(); let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = map
for (dc, addrs) in map {
if addrs.is_empty() {
continue;
}
grouped_dc_addrs.entry(dc.abs()).or_default().extend(addrs);
}
let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = grouped_dc_addrs
.into_iter() .into_iter()
.map(|(dc, mut addrs)| { .map(|(dc, mut addrs)| {
addrs.sort_unstable(); addrs.sort_unstable();
addrs.dedup(); addrs.dedup();
(dc, addrs) (dc, addrs)
}) })
.filter(|(_, addrs)| !addrs.is_empty())
.collect(); .collect();
dc_addrs.sort_unstable_by_key(|(dc, _)| *dc); dc_addrs.sort_unstable_by_key(|(dc, _)| *dc);
dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len())); dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len()));

View File

@ -108,19 +108,10 @@ impl MePool {
} else { } else {
IpFamily::V6 IpFamily::V6
}; };
let map = self.proxy_map_for_family(family).await; Some(RefillDcKey {
for (dc, endpoints) in map { dc: self.resolve_dc_for_endpoint(addr).await,
if endpoints
.into_iter()
.any(|(ip, port)| SocketAddr::new(ip, port) == addr)
{
return Some(RefillDcKey {
dc: dc.abs(),
family, family,
}); })
}
}
None
} }
async fn resolve_refill_dc_keys_for_endpoints( async fn resolve_refill_dc_keys_for_endpoints(
@ -177,50 +168,26 @@ impl MePool {
} }
async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec<SocketAddr> { async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec<SocketAddr> {
let mut target_dc = HashSet::<i32>::new();
let mut endpoints = HashSet::<SocketAddr>::new(); let mut endpoints = HashSet::<SocketAddr>::new();
let target_dc = self.resolve_dc_for_endpoint(addr).await;
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in &map { if let Some(addrs) = map.get(&target_dc) {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs { for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port)); endpoints.insert(SocketAddr::new(*ip, *port));
} }
} }
} }
}
}
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in &map { if let Some(addrs) = map.get(&target_dc) {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs { for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port)); endpoints.insert(SocketAddr::new(*ip, *port));
} }
} }
} }
}
}
let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect(); let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect();
sorted.sort_unstable(); sorted.sort_unstable();

View File

@ -128,7 +128,7 @@ impl MePool {
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map_v4 = self.proxy_map_v4.read().await.clone(); let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 { for (dc, addrs) in map_v4 {
let entry = out.entry(dc.abs()).or_default(); let entry = out.entry(dc).or_default();
for (ip, port) in addrs { for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port)); entry.insert(SocketAddr::new(ip, port));
} }
@ -138,7 +138,7 @@ impl MePool {
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map_v6 = self.proxy_map_v6.read().await.clone(); let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 { for (dc, addrs) in map_v6 {
let entry = out.entry(dc.abs()).or_default(); let entry = out.entry(dc).or_default();
for (ip, port) in addrs { for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port)); entry.insert(SocketAddr::new(ip, port));
} }

View File

@ -1,5 +1,5 @@
use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::time::Instant; use std::time::Instant;
@ -104,35 +104,11 @@ impl MePool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if endpoints_by_dc.is_empty() { if endpoints_by_dc.is_empty() {
@ -166,35 +142,11 @@ impl MePool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if endpoints_by_dc.is_empty() { if endpoints_by_dc.is_empty() {
@ -234,41 +186,17 @@ impl MePool {
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone(); let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
if self.decision.ipv6_me { if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone(); let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map { extend_signed_endpoints(&mut endpoints_by_dc, map);
let abs_dc = dc.abs();
if abs_dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(abs_dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
} }
let mut endpoint_to_dc = HashMap::<SocketAddr, i16>::new(); let mut endpoint_to_dc = HashMap::<SocketAddr, BTreeSet<i16>>::new();
for (dc, endpoints) in &endpoints_by_dc { for (dc, endpoints) in &endpoints_by_dc {
for endpoint in endpoints { for endpoint in endpoints {
endpoint_to_dc.entry(*endpoint).or_insert(*dc); endpoint_to_dc.entry(*endpoint).or_default().insert(*dc);
} }
} }
@ -292,7 +220,13 @@ impl MePool {
for writer in writers { for writer in writers {
let endpoint = writer.addr; let endpoint = writer.addr;
let dc = endpoint_to_dc.get(&endpoint).copied(); let dc = endpoint_to_dc.get(&endpoint).and_then(|dcs| {
if dcs.len() == 1 {
dcs.iter().next().copied()
} else {
None
}
});
let draining = writer.draining.load(Ordering::Relaxed); let draining = writer.draining.load(Ordering::Relaxed);
let degraded = writer.degraded.load(Ordering::Relaxed); let degraded = writer.degraded.load(Ordering::Relaxed);
let bound_clients = activity let bound_clients = activity
@ -499,6 +433,24 @@ fn ratio_pct(part: usize, total: usize) -> f64 {
pct.clamp(0.0, 100.0) pct.clamp(0.0, 100.0)
} }
fn extend_signed_endpoints(
endpoints_by_dc: &mut BTreeMap<i16, BTreeSet<SocketAddr>>,
map: HashMap<i32, Vec<(IpAddr, u16)>>,
) {
for (dc, addrs) in map {
if dc == 0 {
continue;
}
let Ok(dc_idx) = i16::try_from(dc) else {
continue;
};
let entry = endpoints_by_dc.entry(dc_idx).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
fn floor_mode_label(mode: MeFloorMode) -> &'static str { fn floor_mode_label(mode: MeFloorMode) -> &'static str {
match mode { match mode {
MeFloorMode::Static => "static", MeFloorMode::Static => "static",

View File

@ -273,13 +273,12 @@ impl ConnRegistry {
bound_clients_by_writer.insert(*writer_id, conn_ids.len()); bound_clients_by_writer.insert(*writer_id, conn_ids.len());
} }
for conn_meta in inner.meta.values() { for conn_meta in inner.meta.values() {
let dc_u16 = conn_meta.target_dc.unsigned_abs(); if conn_meta.target_dc == 0 {
if dc_u16 == 0 {
continue; continue;
} }
if let Ok(dc) = i16::try_from(dc_u16) { *active_sessions_by_target_dc
*active_sessions_by_target_dc.entry(dc).or_insert(0) += 1; .entry(conn_meta.target_dc)
} .or_insert(0) += 1;
} }
WriterActivitySnapshot { WriterActivitySnapshot {
@ -402,7 +401,8 @@ mod tests {
let snapshot = registry.writer_activity_snapshot().await; let snapshot = registry.writer_activity_snapshot().await;
assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2));
assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1)); assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&2)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1));
assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1));
} }
} }