From c465c200c4478900c7fc1e3a51ee084fa6b98fb9 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:52:31 +0300 Subject: [PATCH] ME Pool Runtime API --- .../middle_proxy/pool_runtime_api.rs | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 src/transport/middle_proxy/pool_runtime_api.rs diff --git a/src/transport/middle_proxy/pool_runtime_api.rs b/src/transport/middle_proxy/pool_runtime_api.rs new file mode 100644 index 0000000..37ef298 --- /dev/null +++ b/src/transport/middle_proxy/pool_runtime_api.rs @@ -0,0 +1,128 @@ +use std::collections::HashMap; +use std::time::Instant; + +use super::pool::{MePool, RefillDcKey}; +use crate::network::IpFamily; + +#[derive(Clone, Debug)] +pub(crate) struct MeApiRefillDcSnapshot { + pub dc: i16, + pub family: &'static str, + pub inflight: usize, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiRefillSnapshot { + pub inflight_endpoints_total: usize, + pub inflight_dc_total: usize, + pub by_dc: Vec, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiNatReflectionSnapshot { + pub addr: std::net::SocketAddr, + pub age_secs: u64, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiNatStunSnapshot { + pub nat_probe_enabled: bool, + pub nat_probe_disabled_runtime: bool, + pub nat_probe_attempts: u8, + pub configured_servers: Vec, + pub live_servers: Vec, + pub reflection_v4: Option, + pub reflection_v6: Option, + pub stun_backoff_remaining_ms: Option, +} + +impl MePool { + pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot { + let inflight_endpoints_total = self.refill_inflight.lock().await.len(); + let inflight_dc_keys = self + .refill_inflight_dc + .lock() + .await + .iter() + .copied() + .collect::>(); + + let mut by_dc_map = HashMap::<(i16, &'static str), usize>::new(); + for key in inflight_dc_keys { + let family = match key.family { + IpFamily::V4 => "v4", + IpFamily::V6 => "v6", + }; + let dc = key.dc as i16; + *by_dc_map.entry((dc, family)).or_insert(0) += 1; + } + + let mut by_dc = by_dc_map + .into_iter() + .map(|((dc, family), inflight)| MeApiRefillDcSnapshot { + dc, + family, + inflight, + }) + .collect::>(); + by_dc.sort_by_key(|entry| (entry.dc, entry.family)); + + MeApiRefillSnapshot { + inflight_endpoints_total, + inflight_dc_total: by_dc.len(), + by_dc, + } + } + + pub(crate) async fn api_nat_stun_snapshot(&self) -> MeApiNatStunSnapshot { + let now = Instant::now(); + let mut configured_servers = if !self.nat_stun_servers.is_empty() { + self.nat_stun_servers.clone() + } else if let Some(stun) = &self.nat_stun { + if stun.trim().is_empty() { + Vec::new() + } else { + vec![stun.clone()] + } + } else { + Vec::new() + }; + configured_servers.sort(); + configured_servers.dedup(); + + let mut live_servers = self.nat_stun_live_servers.read().await.clone(); + live_servers.sort(); + live_servers.dedup(); + + let reflection = self.nat_reflection_cache.lock().await; + let reflection_v4 = reflection.v4.map(|(ts, addr)| MeApiNatReflectionSnapshot { + addr, + age_secs: now.saturating_duration_since(ts).as_secs(), + }); + let reflection_v6 = reflection.v6.map(|(ts, addr)| MeApiNatReflectionSnapshot { + addr, + age_secs: now.saturating_duration_since(ts).as_secs(), + }); + drop(reflection); + + let backoff_until = *self.stun_backoff_until.read().await; + let stun_backoff_remaining_ms = backoff_until.and_then(|until| { + (until > now).then_some(until.duration_since(now).as_millis() as u64) + }); + + MeApiNatStunSnapshot { + nat_probe_enabled: self.nat_probe, + nat_probe_disabled_runtime: self + .nat_probe_disabled + .load(std::sync::atomic::Ordering::Relaxed), + nat_probe_attempts: self + .nat_probe_attempts + .load(std::sync::atomic::Ordering::Relaxed), + configured_servers, + live_servers, + reflection_v4, + reflection_v6, + stun_backoff_remaining_ms, + } + } +}