From e9f8c7949876e6c64a67788f080fb481db6f860e Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 19:58:57 +0300 Subject: [PATCH 1/4] ME Pool w/ Strict-Index --- src/transport/middle_proxy/pool.rs | 54 +++++++++++++- src/transport/middle_proxy/send.rs | 109 +++++++++++------------------ 2 files changed, 92 insertions(+), 71 deletions(-) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 22f40b5..b0ae394 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -320,7 +320,7 @@ impl MePool { pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), - default_dc: AtomicI32::new(default_dc.unwrap_or(0)), + default_dc: AtomicI32::new(default_dc.unwrap_or(2)), next_writer_id: AtomicU64::new(1), ping_tracker: Arc::new(Mutex::new(HashMap::new())), rtt_stats: Arc::new(Mutex::new(HashMap::new())), @@ -625,6 +625,58 @@ impl MePool { order } + pub(super) fn default_dc_for_routing(&self) -> i32 { + let dc = self.default_dc.load(Ordering::Relaxed); + if dc == 0 { 2 } else { dc } + } + + pub(super) fn dc_lookup_chain_for_target(&self, target_dc: i32) -> Vec { + let mut out = Vec::with_capacity(1); + if target_dc != 0 { + out.push(target_dc); + } else { + // Use default DC only when target DC is unknown and pinning is not established. + let fallback_dc = self.default_dc_for_routing(); + out.push(fallback_dc); + } + out + } + + pub(super) async fn resolve_dc_for_endpoint(&self, addr: SocketAddr) -> i32 { + let map_guard = if addr.is_ipv4() { + self.proxy_map_v4.read().await + } else { + self.proxy_map_v6.read().await + }; + + let mut matched_dc: Option = None; + let mut ambiguous = false; + for (dc, addrs) in map_guard.iter() { + if addrs + .iter() + .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) + { + match matched_dc { + None => matched_dc = Some(*dc), + Some(prev_dc) if prev_dc == *dc => {} + Some(_) => { + ambiguous = true; + break; + } + } + } + } + drop(map_guard); + + if !ambiguous + && let Some(dc) = matched_dc + { + return dc; + } + + self.default_dc_for_routing() + } + pub(super) async fn proxy_map_for_family( &self, family: IpFamily, diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index b442a8a..b9b1fd5 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -195,38 +195,25 @@ impl MePool { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } emergency_attempts += 1; - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - if let Some(addrs) = map_guard.get(&(target_dc as i32)) { - let mut shuffled = addrs.clone(); - shuffled.shuffle(&mut rand::rng()); - drop(map_guard); - for (ip, port) in shuffled { - let addr = SocketAddr::new(ip, port); - if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { - break; - } - } - tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await; - let ws2 = self.writers.read().await; - writers_snapshot = ws2.clone(); - drop(ws2); - candidate_indices = self - .candidate_indices_for_dc(&writers_snapshot, target_dc, false) - .await; - if candidate_indices.is_empty() { - candidate_indices = self - .candidate_indices_for_dc(&writers_snapshot, target_dc, true) - .await; - } - if !candidate_indices.is_empty() { - break; - } + let mut endpoints = self.endpoint_candidates_for_target_dc(target_dc).await; + endpoints.shuffle(&mut rand::rng()); + for addr in endpoints { + if self.connect_one(addr, self.rng.as_ref()).await.is_ok() { + break; } } + tokio::time::sleep(Duration::from_millis(100 * emergency_attempts as u64)).await; + let ws2 = self.writers.read().await; + writers_snapshot = ws2.clone(); + drop(ws2); + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, false) + .await; + if candidate_indices.is_empty() { + candidate_indices = self + .candidate_indices_for_dc(&writers_snapshot, target_dc, true) + .await; + } if candidate_indices.is_empty() { return Err(ProxyError::Proxy("No ME writers available for target DC".into())); } @@ -458,26 +445,28 @@ impl MePool { let key = target_dc as i32; let mut preferred = Vec::::new(); let mut seen = HashSet::::new(); + let lookup_keys = self.dc_lookup_chain_for_target(key); for family in self.family_order() { let map = match family { IpFamily::V4 => self.proxy_map_v4.read().await.clone(), IpFamily::V6 => self.proxy_map_v6.read().await.clone(), }; - let mut lookup_keys = vec![key, key.abs(), -key.abs()]; - let def = self.default_dc.load(Ordering::Relaxed); - if def != 0 { - lookup_keys.push(def); - } - for lookup in lookup_keys { + let mut family_selected = Vec::::new(); + for lookup in lookup_keys.iter().copied() { if let Some(addrs) = map.get(&lookup) { for (ip, port) in addrs { - let addr = SocketAddr::new(*ip, *port); - if seen.insert(addr) { - preferred.push(addr); - } + family_selected.push(SocketAddr::new(*ip, *port)); } } + if !family_selected.is_empty() { + break; + } + } + for addr in family_selected { + if seen.insert(addr) { + preferred.push(addr); + } } if !preferred.is_empty() && !self.decision.effective_multipath { break; @@ -569,36 +558,23 @@ impl MePool { ) -> Vec { let key = target_dc as i32; let mut preferred = Vec::::new(); + let lookup_keys = self.dc_lookup_chain_for_target(key); for family in self.family_order() { let map_guard = match family { IpFamily::V4 => self.proxy_map_v4.read().await, IpFamily::V6 => self.proxy_map_v6.read().await, }; - - if let Some(v) = map_guard.get(&key) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - if preferred.is_empty() { - let abs = key.abs(); - if let Some(v) = map_guard.get(&abs) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - } - if preferred.is_empty() { - let abs = key.abs(); - if let Some(v) = map_guard.get(&-abs) { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - } - if preferred.is_empty() { - let def = self.default_dc.load(Ordering::Relaxed); - if def != 0 - && let Some(v) = map_guard.get(&def) - { - preferred.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + let mut family_selected = Vec::::new(); + for lookup in lookup_keys.iter().copied() { + if let Some(v) = map_guard.get(&lookup) { + family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); + } + if !family_selected.is_empty() { + break; } } + preferred.extend(family_selected); drop(map_guard); @@ -608,9 +584,7 @@ impl MePool { } if preferred.is_empty() { - return (0..writers.len()) - .filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm)) - .collect(); + return Vec::new(); } let mut out = Vec::new(); @@ -622,11 +596,6 @@ impl MePool { out.push(idx); } } - if out.is_empty() { - return (0..writers.len()) - .filter(|i| self.writer_eligible_for_selection(&writers[*i], include_warm)) - .collect(); - } out } From 24df8655035c5401cfecf7891adc2e7a507371d3 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 19:59:23 +0300 Subject: [PATCH 2/4] Session by Target-DC-ID Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool_init.rs | 12 +-- src/transport/middle_proxy/pool_refill.rs | 55 ++-------- src/transport/middle_proxy/pool_reinit.rs | 4 +- src/transport/middle_proxy/pool_status.rs | 116 +++++++--------------- src/transport/middle_proxy/registry.rs | 12 +-- 5 files changed, 56 insertions(+), 143 deletions(-) diff --git a/src/transport/middle_proxy/pool_init.rs b/src/transport/middle_proxy/pool_init.rs index fbb5c64..668cfda 100644 --- a/src/transport/middle_proxy/pool_init.rs +++ b/src/transport/middle_proxy/pool_init.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; @@ -27,20 +27,14 @@ impl MePool { for family in family_order { let map = self.proxy_map_for_family(family).await; - let mut grouped_dc_addrs: HashMap> = HashMap::new(); - for (dc, addrs) in map { - if addrs.is_empty() { - continue; - } - grouped_dc_addrs.entry(dc.abs()).or_default().extend(addrs); - } - let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = grouped_dc_addrs + let mut dc_addrs: Vec<(i32, Vec<(IpAddr, u16)>)> = map .into_iter() .map(|(dc, mut addrs)| { addrs.sort_unstable(); addrs.dedup(); (dc, addrs) }) + .filter(|(_, addrs)| !addrs.is_empty()) .collect(); dc_addrs.sort_unstable_by_key(|(dc, _)| *dc); dc_addrs.sort_by_key(|(_, addrs)| (addrs.len() != 1, addrs.len())); diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 6e14617..87b87d5 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -108,19 +108,10 @@ impl MePool { } else { IpFamily::V6 }; - let map = self.proxy_map_for_family(family).await; - for (dc, endpoints) in map { - if endpoints - .into_iter() - .any(|(ip, port)| SocketAddr::new(ip, port) == addr) - { - return Some(RefillDcKey { - dc: dc.abs(), - family, - }); - } - } - None + Some(RefillDcKey { + dc: self.resolve_dc_for_endpoint(addr).await, + family, + }) } async fn resolve_refill_dc_keys_for_endpoints( @@ -177,47 +168,23 @@ impl MePool { } async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec { - let mut target_dc = HashSet::::new(); let mut endpoints = HashSet::::new(); + let target_dc = self.resolve_dc_for_endpoint(addr).await; if self.decision.ipv4_me { let map = self.proxy_map_v4.read().await.clone(); - for (dc, addrs) in &map { - if addrs - .iter() - .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) - { - target_dc.insert(dc.abs()); - } - } - for dc in &target_dc { - for key in [*dc, -*dc] { - if let Some(addrs) = map.get(&key) { - for (ip, port) in addrs { - endpoints.insert(SocketAddr::new(*ip, *port)); - } - } + if let Some(addrs) = map.get(&target_dc) { + for (ip, port) in addrs { + endpoints.insert(SocketAddr::new(*ip, *port)); } } } if self.decision.ipv6_me { let map = self.proxy_map_v6.read().await.clone(); - for (dc, addrs) in &map { - if addrs - .iter() - .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) - { - target_dc.insert(dc.abs()); - } - } - for dc in &target_dc { - for key in [*dc, -*dc] { - if let Some(addrs) = map.get(&key) { - for (ip, port) in addrs { - endpoints.insert(SocketAddr::new(*ip, *port)); - } - } + if let Some(addrs) = map.get(&target_dc) { + for (ip, port) in addrs { + endpoints.insert(SocketAddr::new(*ip, *port)); } } } diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index d5242b7..39944ba 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -128,7 +128,7 @@ impl MePool { if self.decision.ipv4_me { let map_v4 = self.proxy_map_v4.read().await.clone(); for (dc, addrs) in map_v4 { - let entry = out.entry(dc.abs()).or_default(); + let entry = out.entry(dc).or_default(); for (ip, port) in addrs { entry.insert(SocketAddr::new(ip, port)); } @@ -138,7 +138,7 @@ impl MePool { if self.decision.ipv6_me { let map_v6 = self.proxy_map_v6.read().await.clone(); for (dc, addrs) in map_v6 { - let entry = out.entry(dc.abs()).or_default(); + let entry = out.entry(dc).or_default(); for (ip, port) in addrs { entry.insert(SocketAddr::new(ip, port)); } diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 17a418c..d9898b1 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -1,5 +1,5 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::Ordering; use std::time::Instant; @@ -104,35 +104,11 @@ impl MePool { let mut endpoints_by_dc = BTreeMap::>::new(); if self.decision.ipv4_me { let map = self.proxy_map_v4.read().await.clone(); - for (dc, addrs) in map { - let abs_dc = dc.abs(); - if abs_dc == 0 { - continue; - } - let Ok(dc_idx) = i16::try_from(abs_dc) else { - continue; - }; - let entry = endpoints_by_dc.entry(dc_idx).or_default(); - for (ip, port) in addrs { - entry.insert(SocketAddr::new(ip, port)); - } - } + extend_signed_endpoints(&mut endpoints_by_dc, map); } if self.decision.ipv6_me { let map = self.proxy_map_v6.read().await.clone(); - for (dc, addrs) in map { - let abs_dc = dc.abs(); - if abs_dc == 0 { - continue; - } - let Ok(dc_idx) = i16::try_from(abs_dc) else { - continue; - }; - let entry = endpoints_by_dc.entry(dc_idx).or_default(); - for (ip, port) in addrs { - entry.insert(SocketAddr::new(ip, port)); - } - } + extend_signed_endpoints(&mut endpoints_by_dc, map); } if endpoints_by_dc.is_empty() { @@ -166,35 +142,11 @@ impl MePool { let mut endpoints_by_dc = BTreeMap::>::new(); if self.decision.ipv4_me { let map = self.proxy_map_v4.read().await.clone(); - for (dc, addrs) in map { - let abs_dc = dc.abs(); - if abs_dc == 0 { - continue; - } - let Ok(dc_idx) = i16::try_from(abs_dc) else { - continue; - }; - let entry = endpoints_by_dc.entry(dc_idx).or_default(); - for (ip, port) in addrs { - entry.insert(SocketAddr::new(ip, port)); - } - } + extend_signed_endpoints(&mut endpoints_by_dc, map); } if self.decision.ipv6_me { let map = self.proxy_map_v6.read().await.clone(); - for (dc, addrs) in map { - let abs_dc = dc.abs(); - if abs_dc == 0 { - continue; - } - let Ok(dc_idx) = i16::try_from(abs_dc) else { - continue; - }; - let entry = endpoints_by_dc.entry(dc_idx).or_default(); - for (ip, port) in addrs { - entry.insert(SocketAddr::new(ip, port)); - } - } + extend_signed_endpoints(&mut endpoints_by_dc, map); } if endpoints_by_dc.is_empty() { @@ -234,41 +186,17 @@ impl MePool { let mut endpoints_by_dc = BTreeMap::>::new(); if self.decision.ipv4_me { let map = self.proxy_map_v4.read().await.clone(); - for (dc, addrs) in map { - let abs_dc = dc.abs(); - if abs_dc == 0 { - continue; - } - let Ok(dc_idx) = i16::try_from(abs_dc) else { - continue; - }; - let entry = endpoints_by_dc.entry(dc_idx).or_default(); - for (ip, port) in addrs { - entry.insert(SocketAddr::new(ip, port)); - } - } + extend_signed_endpoints(&mut endpoints_by_dc, map); } if self.decision.ipv6_me { let map = self.proxy_map_v6.read().await.clone(); - for (dc, addrs) in map { - let abs_dc = dc.abs(); - if abs_dc == 0 { - continue; - } - let Ok(dc_idx) = i16::try_from(abs_dc) else { - continue; - }; - let entry = endpoints_by_dc.entry(dc_idx).or_default(); - for (ip, port) in addrs { - entry.insert(SocketAddr::new(ip, port)); - } - } + extend_signed_endpoints(&mut endpoints_by_dc, map); } - let mut endpoint_to_dc = HashMap::::new(); + let mut endpoint_to_dc = HashMap::>::new(); for (dc, endpoints) in &endpoints_by_dc { for endpoint in endpoints { - endpoint_to_dc.entry(*endpoint).or_insert(*dc); + endpoint_to_dc.entry(*endpoint).or_default().insert(*dc); } } @@ -292,7 +220,13 @@ impl MePool { for writer in writers { let endpoint = writer.addr; - let dc = endpoint_to_dc.get(&endpoint).copied(); + let dc = endpoint_to_dc.get(&endpoint).and_then(|dcs| { + if dcs.len() == 1 { + dcs.iter().next().copied() + } else { + None + } + }); let draining = writer.draining.load(Ordering::Relaxed); let degraded = writer.degraded.load(Ordering::Relaxed); let bound_clients = activity @@ -499,6 +433,24 @@ fn ratio_pct(part: usize, total: usize) -> f64 { pct.clamp(0.0, 100.0) } +fn extend_signed_endpoints( + endpoints_by_dc: &mut BTreeMap>, + map: HashMap>, +) { + for (dc, addrs) in map { + if dc == 0 { + continue; + } + let Ok(dc_idx) = i16::try_from(dc) else { + continue; + }; + let entry = endpoints_by_dc.entry(dc_idx).or_default(); + for (ip, port) in addrs { + entry.insert(SocketAddr::new(ip, port)); + } + } +} + fn floor_mode_label(mode: MeFloorMode) -> &'static str { match mode { MeFloorMode::Static => "static", diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 66a7f81..b437885 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -273,13 +273,12 @@ impl ConnRegistry { bound_clients_by_writer.insert(*writer_id, conn_ids.len()); } for conn_meta in inner.meta.values() { - let dc_u16 = conn_meta.target_dc.unsigned_abs(); - if dc_u16 == 0 { + if conn_meta.target_dc == 0 { continue; } - if let Ok(dc) = i16::try_from(dc_u16) { - *active_sessions_by_target_dc.entry(dc).or_insert(0) += 1; - } + *active_sessions_by_target_dc + .entry(conn_meta.target_dc) + .or_insert(0) += 1; } WriterActivitySnapshot { @@ -402,7 +401,8 @@ mod tests { let snapshot = registry.writer_activity_snapshot().await; assert_eq!(snapshot.bound_clients_by_writer.get(&10), Some(&2)); assert_eq!(snapshot.bound_clients_by_writer.get(&20), Some(&1)); - assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&2)); + assert_eq!(snapshot.active_sessions_by_target_dc.get(&2), Some(&1)); + assert_eq!(snapshot.active_sessions_by_target_dc.get(&-2), Some(&1)); assert_eq!(snapshot.active_sessions_by_target_dc.get(&4), Some(&1)); } } From 02fe89f7d0a61d8fe08301411ead6030e442f5c4 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 20:00:32 +0300 Subject: [PATCH 3/4] DC Endpoints on default Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/main.rs | 15 ++++++----- src/transport/middle_proxy/handshake.rs | 33 +------------------------ src/transport/middle_proxy/health.rs | 2 +- 3 files changed, 9 insertions(+), 41 deletions(-) diff --git a/src/main.rs b/src/main.rs index a9207ac..ee5aaad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -942,22 +942,21 @@ async fn main() -> std::result::Result<(), Box> { let mut grouped: BTreeMap> = BTreeMap::new(); for report in me_results { for s in report.samples { - let key = s.dc.abs(); - grouped.entry(key).or_default().push(s); + grouped.entry(s.dc).or_default().push(s); } } let family_order = if prefer_ipv6 { - vec![(MePingFamily::V6, true), (MePingFamily::V6, false), (MePingFamily::V4, true), (MePingFamily::V4, false)] + vec![MePingFamily::V6, MePingFamily::V4] } else { - vec![(MePingFamily::V4, true), (MePingFamily::V4, false), (MePingFamily::V6, true), (MePingFamily::V6, false)] + vec![MePingFamily::V4, MePingFamily::V6] }; - for (dc_abs, samples) in grouped { - for (family, is_pos) in &family_order { + for (dc, samples) in grouped { + for family in &family_order { let fam_samples: Vec<&MePingSample> = samples .iter() - .filter(|s| matches!(s.family, f if &f == family) && (s.dc >= 0) == *is_pos) + .filter(|s| matches!(s.family, f if &f == family)) .collect(); if fam_samples.is_empty() { continue; @@ -967,7 +966,7 @@ async fn main() -> std::result::Result<(), Box> { MePingFamily::V4 => "IPv4", MePingFamily::V6 => "IPv6", }; - info!(" DC{} [{}]", dc_abs, fam_label); + info!(" DC{} [{}]", dc, fam_label); for sample in fam_samples { let line = format_sample_line(sample); info!("{}", line); diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index 77634a6..948c999 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -84,38 +84,7 @@ impl MePool { } async fn resolve_dc_idx_for_endpoint(&self, addr: SocketAddr) -> Option { - if addr.is_ipv4() { - let map = self.proxy_map_v4.read().await; - for (dc, addrs) in map.iter() { - if addrs - .iter() - .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) - { - let abs_dc = dc.abs(); - if abs_dc > 0 - && let Ok(dc_idx) = i16::try_from(abs_dc) - { - return Some(dc_idx); - } - } - } - } else { - let map = self.proxy_map_v6.read().await; - for (dc, addrs) in map.iter() { - if addrs - .iter() - .any(|(ip, port)| SocketAddr::new(*ip, *port) == addr) - { - let abs_dc = dc.abs(); - if abs_dc > 0 - && let Ok(dc_idx) = i16::try_from(abs_dc) - { - return Some(dc_idx); - } - } - } - } - None + i16::try_from(self.resolve_dc_for_endpoint(addr).await).ok() } fn direct_bind_ip_for_stun( diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 1cc8d8a..9a54e32 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -102,7 +102,7 @@ async fn check_family( let mut dc_endpoints = HashMap::>::new(); for (dc, addrs) in map { - let entry = dc_endpoints.entry(dc.abs()).or_default(); + let entry = dc_endpoints.entry(dc).or_default(); for (ip, port) in addrs { entry.push(SocketAddr::new(ip, port)); } From 640468d4e707edd17ecd86a8a8fd08717dd5201c Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 20:01:12 +0300 Subject: [PATCH 4/4] Update API.md Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- docs/API.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/docs/API.md b/docs/API.md index cb964d9..2f98b62 100644 --- a/docs/API.md +++ b/docs/API.md @@ -16,6 +16,10 @@ API runtime is configured in `[server.api]`. | `request_body_limit_bytes` | `usize` | `65536` | Maximum request body size. Must be `> 0`. | | `minimal_runtime_enabled` | `bool` | `false` | Enables runtime snapshot endpoints requiring ME pool read-lock aggregation. | | `minimal_runtime_cache_ttl_ms` | `u64` | `1000` | Cache TTL for minimal snapshots. `0` disables cache; valid range is `[0, 60000]`. | +| `runtime_edge_enabled` | `bool` | `false` | Enables runtime edge endpoints with cached aggregation payloads. | +| `runtime_edge_cache_ttl_ms` | `u64` | `1000` | Cache TTL for runtime edge summary payloads. `0` disables cache. | +| `runtime_edge_top_n` | `usize` | `10` | Top-N rows for runtime edge leaderboard payloads. | +| `runtime_edge_events_capacity` | `usize` | `256` | Ring-buffer size for `/v1/runtime/events/recent`. | | `read_only` | `bool` | `false` | Disables mutating endpoints. | `server.admin_api` is accepted as an alias for backward compatibility. @@ -24,6 +28,9 @@ Runtime validation for API config: - `server.api.listen` must be a valid `IP:PORT`. - `server.api.request_body_limit_bytes` must be `> 0`. - `server.api.minimal_runtime_cache_ttl_ms` must be within `[0, 60000]`. +- `server.api.runtime_edge_cache_ttl_ms` must be within `[0, 60000]`. +- `server.api.runtime_edge_top_n` must be within `[1, 1000]`. +- `server.api.runtime_edge_events_capacity` must be within `[16, 4096]`. ## Protocol Contract @@ -80,12 +87,19 @@ Notes: | `GET` | `/v1/runtime/gates` | none | `200` | `RuntimeGatesData` | | `GET` | `/v1/limits/effective` | none | `200` | `EffectiveLimitsData` | | `GET` | `/v1/security/posture` | none | `200` | `SecurityPostureData` | +| `GET` | `/v1/security/whitelist` | none | `200` | `SecurityWhitelistData` | | `GET` | `/v1/stats/summary` | none | `200` | `SummaryData` | | `GET` | `/v1/stats/zero/all` | none | `200` | `ZeroAllData` | | `GET` | `/v1/stats/upstreams` | none | `200` | `UpstreamsData` | | `GET` | `/v1/stats/minimal/all` | none | `200` | `MinimalAllData` | | `GET` | `/v1/stats/me-writers` | none | `200` | `MeWritersData` | | `GET` | `/v1/stats/dcs` | none | `200` | `DcStatusData` | +| `GET` | `/v1/runtime/me_pool_state` | none | `200` | `RuntimeMePoolStateData` | +| `GET` | `/v1/runtime/me_quality` | none | `200` | `RuntimeMeQualityData` | +| `GET` | `/v1/runtime/upstream_quality` | none | `200` | `RuntimeUpstreamQualityData` | +| `GET` | `/v1/runtime/nat_stun` | none | `200` | `RuntimeNatStunData` | +| `GET` | `/v1/runtime/connections/summary` | none | `200` | `RuntimeEdgeConnectionsSummaryData` | +| `GET` | `/v1/runtime/events/recent` | none | `200` | `RuntimeEdgeEventsData` | | `GET` | `/v1/stats/users` | none | `200` | `UserInfo[]` | | `GET` | `/v1/users` | none | `200` | `UserInfo[]` | | `POST` | `/v1/users` | `CreateUserRequest` | `201` | `CreateUserResponse` | @@ -268,6 +282,25 @@ Note: the request contract is defined, but the corresponding route currently ret | `telemetry_user_enabled` | `bool` | Per-user telemetry toggle. | | `telemetry_me_level` | `string` | ME telemetry level (`silent`, `normal`, `debug`). | +### `SecurityWhitelistData` +| Field | Type | Description | +| --- | --- | --- | +| `generated_at_epoch_secs` | `u64` | Snapshot generation timestamp. | +| `enabled` | `bool` | `true` when whitelist has at least one CIDR entry. | +| `entries_total` | `usize` | Number of whitelist CIDR entries. | +| `entries` | `string[]` | Whitelist CIDR entries as strings. | + +### Runtime Min Endpoints +- `/v1/runtime/me_pool_state`: generations, hardswap state, writer contour/health counts, refill inflight snapshot. +- `/v1/runtime/me_quality`: ME error/drift/reconnect counters and per-DC RTT coverage snapshot. +- `/v1/runtime/upstream_quality`: upstream runtime policy, connect counters, health summary and per-upstream DC latency/IP preference. +- `/v1/runtime/nat_stun`: NAT/STUN runtime flags, server lists, reflection cache state and backoff remaining. + +### Runtime Edge Endpoints +- `/v1/runtime/connections/summary`: cached connection totals (`total/me/direct`), active users and top-N users by connections/traffic. +- `/v1/runtime/events/recent?limit=N`: bounded control-plane ring-buffer events (`limit` clamped to `[1, 1000]`). +- If `server.api.runtime_edge_enabled=false`, runtime edge endpoints return `enabled=false` with `reason=feature_disabled`. + ### `ZeroAllData` | Field | Type | Description | | --- | --- | --- |