From 7ea63872785f6e79e4b43e0dfa90d82583b03b74 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 4 Mar 2026 02:45:32 +0300 Subject: [PATCH] API ME Pool Status Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool_status.rs | 187 ++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 8b46a30..c01f74b 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -1,8 +1,11 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::net::SocketAddr; use std::sync::atomic::Ordering; +use std::time::Instant; use super::pool::{MePool, WriterContour}; +use crate::config::{MeBindStaleMode, MeFloorMode, MeSocksKdfPolicy}; +use crate::transport::upstream::IpPreference; #[derive(Clone, Debug)] pub(crate) struct MeApiWriterStatusSnapshot { @@ -45,6 +48,57 @@ pub(crate) struct MeApiStatusSnapshot { pub dcs: Vec, } +#[derive(Clone, Debug)] +pub(crate) struct MeApiQuarantinedEndpointSnapshot { + pub endpoint: SocketAddr, + pub remaining_ms: u64, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiDcPathSnapshot { + pub dc: i16, + pub ip_preference: Option<&'static str>, + pub selected_addr_v4: Option, + pub selected_addr_v6: Option, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiRuntimeSnapshot { + pub active_generation: u64, + pub warm_generation: u64, + pub pending_hardswap_generation: u64, + pub pending_hardswap_age_secs: Option, + pub hardswap_enabled: bool, + pub floor_mode: &'static str, + pub adaptive_floor_idle_secs: u64, + pub adaptive_floor_min_writers_single_endpoint: u8, + pub adaptive_floor_recover_grace_secs: u64, + pub me_keepalive_enabled: bool, + pub me_keepalive_interval_secs: u64, + pub me_keepalive_jitter_secs: u64, + pub me_keepalive_payload_random: bool, + pub rpc_proxy_req_every_secs: u64, + pub me_reconnect_max_concurrent_per_dc: u32, + pub me_reconnect_backoff_base_ms: u64, + pub me_reconnect_backoff_cap_ms: u64, + pub me_reconnect_fast_retry_count: u32, + pub me_pool_drain_ttl_secs: u64, + pub me_pool_force_close_secs: u64, + pub me_pool_min_fresh_ratio: f32, + pub me_bind_stale_mode: &'static str, + pub me_bind_stale_ttl_secs: u64, + pub me_single_endpoint_shadow_writers: u8, + pub me_single_endpoint_outage_mode_enabled: bool, + pub me_single_endpoint_outage_disable_quarantine: bool, + pub me_single_endpoint_outage_backoff_min_ms: u64, + pub me_single_endpoint_outage_backoff_max_ms: u64, + pub me_single_endpoint_shadow_rotate_every_secs: u64, + pub me_deterministic_writer_sort: bool, + pub me_socks_kdf_policy: &'static str, + pub quarantined_endpoints: Vec, + pub network_path: Vec, +} + impl MePool { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { let now_epoch_secs = Self::now_epoch_secs(); @@ -206,6 +260,107 @@ impl MePool { dcs, } } + + pub(crate) async fn api_runtime_snapshot(&self) -> MeApiRuntimeSnapshot { + let now = Instant::now(); + let now_epoch_secs = Self::now_epoch_secs(); + let pending_started_at = self + .pending_hardswap_started_at_epoch_secs + .load(Ordering::Relaxed); + let pending_hardswap_age_secs = (pending_started_at > 0) + .then_some(now_epoch_secs.saturating_sub(pending_started_at)); + + let mut quarantined_endpoints = Vec::::new(); + { + let guard = self.endpoint_quarantine.lock().await; + for (endpoint, expires_at) in guard.iter() { + if *expires_at <= now { + continue; + } + let remaining_ms = expires_at.duration_since(now).as_millis() as u64; + quarantined_endpoints.push(MeApiQuarantinedEndpointSnapshot { + endpoint: *endpoint, + remaining_ms, + }); + } + } + quarantined_endpoints.sort_by_key(|entry| entry.endpoint); + + let mut network_path = Vec::::new(); + if let Some(upstream) = &self.upstream { + for dc in 1..=5 { + let dc_idx = dc as i16; + let ip_preference = upstream + .get_dc_ip_preference(dc_idx) + .await + .map(ip_preference_label); + let selected_addr_v4 = upstream.get_dc_addr(dc_idx, false).await; + let selected_addr_v6 = upstream.get_dc_addr(dc_idx, true).await; + network_path.push(MeApiDcPathSnapshot { + dc: dc_idx, + ip_preference, + selected_addr_v4, + selected_addr_v6, + }); + } + } + + MeApiRuntimeSnapshot { + active_generation: self.active_generation.load(Ordering::Relaxed), + warm_generation: self.warm_generation.load(Ordering::Relaxed), + pending_hardswap_generation: self.pending_hardswap_generation.load(Ordering::Relaxed), + pending_hardswap_age_secs, + hardswap_enabled: self.hardswap.load(Ordering::Relaxed), + floor_mode: floor_mode_label(self.floor_mode()), + adaptive_floor_idle_secs: self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed), + adaptive_floor_min_writers_single_endpoint: self + .me_adaptive_floor_min_writers_single_endpoint + .load(Ordering::Relaxed), + adaptive_floor_recover_grace_secs: self + .me_adaptive_floor_recover_grace_secs + .load(Ordering::Relaxed), + me_keepalive_enabled: self.me_keepalive_enabled, + me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(), + me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(), + me_keepalive_payload_random: self.me_keepalive_payload_random, + rpc_proxy_req_every_secs: self.rpc_proxy_req_every_secs.load(Ordering::Relaxed), + me_reconnect_max_concurrent_per_dc: self.me_reconnect_max_concurrent_per_dc, + me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64, + me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, + me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, + me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), + me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed), + me_pool_min_fresh_ratio: Self::permille_to_ratio( + self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed), + ), + me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()), + me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed), + me_single_endpoint_shadow_writers: self + .me_single_endpoint_shadow_writers + .load(Ordering::Relaxed), + me_single_endpoint_outage_mode_enabled: self + .me_single_endpoint_outage_mode_enabled + .load(Ordering::Relaxed), + me_single_endpoint_outage_disable_quarantine: self + .me_single_endpoint_outage_disable_quarantine + .load(Ordering::Relaxed), + me_single_endpoint_outage_backoff_min_ms: self + .me_single_endpoint_outage_backoff_min_ms + .load(Ordering::Relaxed), + me_single_endpoint_outage_backoff_max_ms: self + .me_single_endpoint_outage_backoff_max_ms + .load(Ordering::Relaxed), + me_single_endpoint_shadow_rotate_every_secs: self + .me_single_endpoint_shadow_rotate_every_secs + .load(Ordering::Relaxed), + me_deterministic_writer_sort: self + .me_deterministic_writer_sort + .load(Ordering::Relaxed), + me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()), + quarantined_endpoints, + network_path, + } + } } fn ratio_pct(part: usize, total: usize) -> f64 { @@ -216,6 +371,38 @@ fn ratio_pct(part: usize, total: usize) -> f64 { pct.clamp(0.0, 100.0) } +fn floor_mode_label(mode: MeFloorMode) -> &'static str { + match mode { + MeFloorMode::Static => "static", + MeFloorMode::Adaptive => "adaptive", + } +} + +fn bind_stale_mode_label(mode: MeBindStaleMode) -> &'static str { + match mode { + MeBindStaleMode::Never => "never", + MeBindStaleMode::Ttl => "ttl", + MeBindStaleMode::Always => "always", + } +} + +fn socks_kdf_policy_label(policy: MeSocksKdfPolicy) -> &'static str { + match policy { + MeSocksKdfPolicy::Strict => "strict", + MeSocksKdfPolicy::Compat => "compat", + } +} + +fn ip_preference_label(preference: IpPreference) -> &'static str { + match preference { + IpPreference::Unknown => "unknown", + IpPreference::PreferV6 => "prefer_v6", + IpPreference::PreferV4 => "prefer_v4", + IpPreference::BothWork => "both", + IpPreference::Unavailable => "unavailable", + } +} + #[cfg(test)] mod tests { use super::ratio_pct;