From d7716ad8757eea2fde545d5172110ecebd398233 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:52:17 +0300 Subject: [PATCH 01/12] Upstream API Policy Snapshot --- src/transport/upstream.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index b9db0eb..84c6fdf 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -202,6 +202,15 @@ pub struct UpstreamApiSnapshot { pub upstreams: Vec, } +#[derive(Debug, Clone, Copy)] +pub struct UpstreamApiPolicySnapshot { + pub connect_retry_attempts: u32, + pub connect_retry_backoff_ms: u64, + pub connect_budget_ms: u64, + pub unhealthy_fail_threshold: u32, + pub connect_failfast_hard_errors: bool, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct UpstreamEgressInfo { pub route_kind: UpstreamRouteKind, @@ -315,6 +324,16 @@ impl UpstreamManager { Some(UpstreamApiSnapshot { summary, upstreams }) } + pub fn api_policy_snapshot(&self) -> UpstreamApiPolicySnapshot { + UpstreamApiPolicySnapshot { + connect_retry_attempts: self.connect_retry_attempts, + connect_retry_backoff_ms: self.connect_retry_backoff.as_millis() as u64, + connect_budget_ms: self.connect_budget.as_millis() as u64, + unhealthy_fail_threshold: self.unhealthy_fail_threshold, + connect_failfast_hard_errors: self.connect_failfast_hard_errors, + } + } + #[cfg(unix)] fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec { use nix::ifaddrs::getifaddrs; From c465c200c4478900c7fc1e3a51ee084fa6b98fb9 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:52:31 +0300 Subject: [PATCH 02/12] ME Pool Runtime API --- .../middle_proxy/pool_runtime_api.rs | 128 ++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 src/transport/middle_proxy/pool_runtime_api.rs diff --git a/src/transport/middle_proxy/pool_runtime_api.rs b/src/transport/middle_proxy/pool_runtime_api.rs new file mode 100644 index 0000000..37ef298 --- /dev/null +++ b/src/transport/middle_proxy/pool_runtime_api.rs @@ -0,0 +1,128 @@ +use std::collections::HashMap; +use std::time::Instant; + +use super::pool::{MePool, RefillDcKey}; +use crate::network::IpFamily; + +#[derive(Clone, Debug)] +pub(crate) struct MeApiRefillDcSnapshot { + pub dc: i16, + pub family: &'static str, + pub inflight: usize, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiRefillSnapshot { + pub inflight_endpoints_total: usize, + pub inflight_dc_total: usize, + pub by_dc: Vec, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiNatReflectionSnapshot { + pub addr: std::net::SocketAddr, + pub age_secs: u64, +} + +#[derive(Clone, Debug)] +pub(crate) struct MeApiNatStunSnapshot { + pub nat_probe_enabled: bool, + pub nat_probe_disabled_runtime: bool, + pub nat_probe_attempts: u8, + pub configured_servers: Vec, + pub live_servers: Vec, + pub reflection_v4: Option, + pub reflection_v6: Option, + pub stun_backoff_remaining_ms: Option, +} + +impl MePool { + pub(crate) async fn api_refill_snapshot(&self) -> MeApiRefillSnapshot { + let inflight_endpoints_total = self.refill_inflight.lock().await.len(); + let inflight_dc_keys = self + .refill_inflight_dc + .lock() + .await + .iter() + .copied() + .collect::>(); + + let mut by_dc_map = HashMap::<(i16, &'static str), usize>::new(); + for key in inflight_dc_keys { + let family = match key.family { + IpFamily::V4 => "v4", + IpFamily::V6 => "v6", + }; + let dc = key.dc as i16; + *by_dc_map.entry((dc, family)).or_insert(0) += 1; + } + + let mut by_dc = by_dc_map + .into_iter() + .map(|((dc, family), inflight)| MeApiRefillDcSnapshot { + dc, + family, + inflight, + }) + .collect::>(); + by_dc.sort_by_key(|entry| (entry.dc, entry.family)); + + MeApiRefillSnapshot { + inflight_endpoints_total, + inflight_dc_total: by_dc.len(), + by_dc, + } + } + + pub(crate) async fn api_nat_stun_snapshot(&self) -> MeApiNatStunSnapshot { + let now = Instant::now(); + let mut configured_servers = if !self.nat_stun_servers.is_empty() { + self.nat_stun_servers.clone() + } else if let Some(stun) = &self.nat_stun { + if stun.trim().is_empty() { + Vec::new() + } else { + vec![stun.clone()] + } + } else { + Vec::new() + }; + configured_servers.sort(); + configured_servers.dedup(); + + let mut live_servers = self.nat_stun_live_servers.read().await.clone(); + live_servers.sort(); + live_servers.dedup(); + + let reflection = self.nat_reflection_cache.lock().await; + let reflection_v4 = reflection.v4.map(|(ts, addr)| MeApiNatReflectionSnapshot { + addr, + age_secs: now.saturating_duration_since(ts).as_secs(), + }); + let reflection_v6 = reflection.v6.map(|(ts, addr)| MeApiNatReflectionSnapshot { + addr, + age_secs: now.saturating_duration_since(ts).as_secs(), + }); + drop(reflection); + + let backoff_until = *self.stun_backoff_until.read().await; + let stun_backoff_remaining_ms = backoff_until.and_then(|until| { + (until > now).then_some(until.duration_since(now).as_millis() as u64) + }); + + MeApiNatStunSnapshot { + nat_probe_enabled: self.nat_probe, + nat_probe_disabled_runtime: self + .nat_probe_disabled + .load(std::sync::atomic::Ordering::Relaxed), + nat_probe_attempts: self + .nat_probe_attempts + .load(std::sync::atomic::Ordering::Relaxed), + configured_servers, + live_servers, + reflection_v4, + reflection_v6, + stun_backoff_remaining_ms, + } + } +} From 487e95a66e5811ba0be02b8ddd0383a84a6c0992 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:52:39 +0300 Subject: [PATCH 03/12] Update mod.rs --- src/transport/middle_proxy/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 131e215..87c012b 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -10,6 +10,7 @@ mod pool_init; mod pool_nat; mod pool_refill; mod pool_reinit; +mod pool_runtime_api; mod pool_writer; mod ping; mod reader; From 44b825edf52339fa84b6068e325ad5e344b5cdab Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:53:13 +0300 Subject: [PATCH 04/12] Atomics in Stats Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/stats/mod.rs | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 4cc9933..4b59367 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -25,6 +25,8 @@ use self::telemetry::TelemetryPolicy; pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, + current_connections_direct: AtomicU64, + current_connections_me: AtomicU64, handshake_timeouts: AtomicU64, upstream_connect_attempt_total: AtomicU64, upstream_connect_success_total: AtomicU64, @@ -150,6 +152,24 @@ impl Stats { self.telemetry_me_level().allows_debug() } + fn decrement_atomic_saturating(counter: &AtomicU64) { + let mut current = counter.load(Ordering::Relaxed); + loop { + if current == 0 { + break; + } + match counter.compare_exchange_weak( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } + } + pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) { self.telemetry_core_enabled .store(policy.core_enabled, Ordering::Relaxed); @@ -177,6 +197,18 @@ impl Stats { self.connects_bad.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_current_connections_direct(&self) { + self.current_connections_direct.fetch_add(1, Ordering::Relaxed); + } + pub fn decrement_current_connections_direct(&self) { + Self::decrement_atomic_saturating(&self.current_connections_direct); + } + pub fn increment_current_connections_me(&self) { + self.current_connections_me.fetch_add(1, Ordering::Relaxed); + } + pub fn decrement_current_connections_me(&self) { + Self::decrement_atomic_saturating(&self.current_connections_me); + } pub fn increment_handshake_timeouts(&self) { if self.telemetry_core_enabled() { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); @@ -646,6 +678,16 @@ impl Stats { } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + pub fn get_current_connections_direct(&self) -> u64 { + self.current_connections_direct.load(Ordering::Relaxed) + } + pub fn get_current_connections_me(&self) -> u64 { + self.current_connections_me.load(Ordering::Relaxed) + } + pub fn get_current_connections_total(&self) -> u64 { + self.get_current_connections_direct() + .saturating_add(self.get_current_connections_me()) + } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) } From 83ed9065b046d102ee1a2eea00e3c37eba6e26c3 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:53:22 +0300 Subject: [PATCH 05/12] Update middle_relay.rs Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/proxy/middle_relay.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 8f5fc36..8384e32 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -237,6 +237,7 @@ where stats.increment_user_connects(&user); stats.increment_user_curr_connects(&user); + stats.increment_current_connections_me(); // Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable) let user_tag: Option> = config @@ -466,6 +467,7 @@ where "ME relay cleanup" ); me_pool.registry().unregister(conn_id).await; + stats.decrement_current_connections_me(); stats.decrement_user_curr_connects(&user); result } From 2a3b6b917f89eabf4e5ee9569a106d22cd65eae0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:53:28 +0300 Subject: [PATCH 06/12] Update direct_relay.rs Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/proxy/direct_relay.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 1245f34..e39e446 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -57,6 +57,7 @@ where stats.increment_user_connects(user); stats.increment_user_curr_connects(user); + stats.increment_current_connections_direct(); let relay_result = relay_bidirectional( client_reader, @@ -69,6 +70,7 @@ where ) .await; + stats.decrement_current_connections_direct(); stats.decrement_user_curr_connects(user); match &relay_result { From 2ea85c00d3ccc30799443d5a8a9aaaec9a8837a0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:54:00 +0300 Subject: [PATCH 07/12] Runtime API Defaults --- src/config/defaults.rs | 5 ++ src/config/hot_reload.rs | 6 ++ src/config/load.rs | 116 +++++++++++++++++++++++++++++++++++++++ src/config/types.rs | 20 +++++++ 4 files changed, 147 insertions(+) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 9851216..3ba146c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -114,6 +114,11 @@ pub(crate) fn default_api_minimal_runtime_cache_ttl_ms() -> u64 { 1000 } +pub(crate) fn default_api_runtime_edge_enabled() -> bool { false } +pub(crate) fn default_api_runtime_edge_cache_ttl_ms() -> u64 { 1000 } +pub(crate) fn default_api_runtime_edge_top_n() -> usize { 10 } +pub(crate) fn default_api_runtime_edge_events_capacity() -> usize { 256 } + pub(crate) fn default_proxy_protocol_header_timeout_ms() -> u64 { 500 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 97d5e4e..c39cafa 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -312,6 +312,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b || old.server.api.minimal_runtime_enabled != new.server.api.minimal_runtime_enabled || old.server.api.minimal_runtime_cache_ttl_ms != new.server.api.minimal_runtime_cache_ttl_ms + || old.server.api.runtime_edge_enabled != new.server.api.runtime_edge_enabled + || old.server.api.runtime_edge_cache_ttl_ms + != new.server.api.runtime_edge_cache_ttl_ms + || old.server.api.runtime_edge_top_n != new.server.api.runtime_edge_top_n + || old.server.api.runtime_edge_events_capacity + != new.server.api.runtime_edge_events_capacity || old.server.api.read_only != new.server.api.read_only { warned = true; diff --git a/src/config/load.rs b/src/config/load.rs index 8fec710..6ce7b65 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -462,6 +462,24 @@ impl ProxyConfig { )); } + if config.server.api.runtime_edge_cache_ttl_ms > 60_000 { + return Err(ProxyError::Config( + "server.api.runtime_edge_cache_ttl_ms must be within [0, 60000]".to_string(), + )); + } + + if !(1..=1000).contains(&config.server.api.runtime_edge_top_n) { + return Err(ProxyError::Config( + "server.api.runtime_edge_top_n must be within [1, 1000]".to_string(), + )); + } + + if !(16..=4096).contains(&config.server.api.runtime_edge_events_capacity) { + return Err(ProxyError::Config( + "server.api.runtime_edge_events_capacity must be within [16, 4096]".to_string(), + )); + } + if config.server.api.listen.parse::().is_err() { return Err(ProxyError::Config( "server.api.listen must be in IP:PORT format".to_string(), @@ -802,6 +820,22 @@ mod tests { cfg.server.api.minimal_runtime_cache_ttl_ms, default_api_minimal_runtime_cache_ttl_ms() ); + assert_eq!( + cfg.server.api.runtime_edge_enabled, + default_api_runtime_edge_enabled() + ); + assert_eq!( + cfg.server.api.runtime_edge_cache_ttl_ms, + default_api_runtime_edge_cache_ttl_ms() + ); + assert_eq!( + cfg.server.api.runtime_edge_top_n, + default_api_runtime_edge_top_n() + ); + assert_eq!( + cfg.server.api.runtime_edge_events_capacity, + default_api_runtime_edge_events_capacity() + ); assert_eq!(cfg.access.users, default_access_users()); assert_eq!( cfg.access.user_max_unique_ips_mode, @@ -918,6 +952,22 @@ mod tests { server.api.minimal_runtime_cache_ttl_ms, default_api_minimal_runtime_cache_ttl_ms() ); + assert_eq!( + server.api.runtime_edge_enabled, + default_api_runtime_edge_enabled() + ); + assert_eq!( + server.api.runtime_edge_cache_ttl_ms, + default_api_runtime_edge_cache_ttl_ms() + ); + assert_eq!( + server.api.runtime_edge_top_n, + default_api_runtime_edge_top_n() + ); + assert_eq!( + server.api.runtime_edge_events_capacity, + default_api_runtime_edge_events_capacity() + ); let access = AccessConfig::default(); assert_eq!(access.users, default_access_users()); @@ -1565,6 +1615,72 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn api_runtime_edge_cache_ttl_out_of_range_is_rejected() { + let toml = r#" + [server.api] + enabled = true + listen = "127.0.0.1:9091" + runtime_edge_cache_ttl_ms = 70000 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_api_runtime_edge_cache_ttl_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("server.api.runtime_edge_cache_ttl_ms must be within [0, 60000]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn api_runtime_edge_top_n_out_of_range_is_rejected() { + let toml = r#" + [server.api] + enabled = true + listen = "127.0.0.1:9091" + runtime_edge_top_n = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_api_runtime_edge_top_n_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("server.api.runtime_edge_top_n must be within [1, 1000]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn api_runtime_edge_events_capacity_out_of_range_is_rejected() { + let toml = r#" + [server.api] + enabled = true + listen = "127.0.0.1:9091" + runtime_edge_events_capacity = 8 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_api_runtime_edge_events_capacity_invalid_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("server.api.runtime_edge_events_capacity must be within [16, 4096]")); + let _ = std::fs::remove_file(path); + } + #[test] fn force_close_bumped_when_below_drain_ttl() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index be238d3..4a33b7c 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -918,6 +918,22 @@ pub struct ApiConfig { #[serde(default = "default_api_minimal_runtime_cache_ttl_ms")] pub minimal_runtime_cache_ttl_ms: u64, + /// Enables runtime edge endpoints with optional cached aggregation. + #[serde(default = "default_api_runtime_edge_enabled")] + pub runtime_edge_enabled: bool, + + /// Cache TTL for runtime edge aggregation payloads in milliseconds. + #[serde(default = "default_api_runtime_edge_cache_ttl_ms")] + pub runtime_edge_cache_ttl_ms: u64, + + /// Top-N limit for edge connection leaderboard payloads. + #[serde(default = "default_api_runtime_edge_top_n")] + pub runtime_edge_top_n: usize, + + /// Ring-buffer capacity for runtime edge control-plane events. + #[serde(default = "default_api_runtime_edge_events_capacity")] + pub runtime_edge_events_capacity: usize, + /// Read-only mode: mutating endpoints are rejected. #[serde(default)] pub read_only: bool, @@ -933,6 +949,10 @@ impl Default for ApiConfig { request_body_limit_bytes: default_api_request_body_limit_bytes(), minimal_runtime_enabled: default_api_minimal_runtime_enabled(), minimal_runtime_cache_ttl_ms: default_api_minimal_runtime_cache_ttl_ms(), + runtime_edge_enabled: default_api_runtime_edge_enabled(), + runtime_edge_cache_ttl_ms: default_api_runtime_edge_cache_ttl_ms(), + runtime_edge_top_n: default_api_runtime_edge_top_n(), + runtime_edge_events_capacity: default_api_runtime_edge_events_capacity(), read_only: false, } } From fb5e9947bd46e0eb5cda6dec8c1526529b80e763 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:54:12 +0300 Subject: [PATCH 08/12] Runtime Watch --- src/api/runtime_watch.rs | 66 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/api/runtime_watch.rs diff --git a/src/api/runtime_watch.rs b/src/api/runtime_watch.rs new file mode 100644 index 0000000..0485e55 --- /dev/null +++ b/src/api/runtime_watch.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::{SystemTime, UNIX_EPOCH}; + +use tokio::sync::watch; + +use crate::config::ProxyConfig; + +use super::ApiRuntimeState; +use super::events::ApiEventStore; + +pub(super) fn spawn_runtime_watchers( + config_rx: watch::Receiver>, + admission_rx: watch::Receiver, + runtime_state: Arc, + runtime_events: Arc, +) { + let mut config_rx_reload = config_rx; + let runtime_state_reload = runtime_state.clone(); + let runtime_events_reload = runtime_events.clone(); + tokio::spawn(async move { + loop { + if config_rx_reload.changed().await.is_err() { + break; + } + runtime_state_reload + .config_reload_count + .fetch_add(1, Ordering::Relaxed); + runtime_state_reload + .last_config_reload_epoch_secs + .store(now_epoch_secs(), Ordering::Relaxed); + runtime_events_reload.record("config.reload.applied", "config receiver updated"); + } + }); + + let mut admission_rx_watch = admission_rx; + tokio::spawn(async move { + runtime_state + .admission_open + .store(*admission_rx_watch.borrow(), Ordering::Relaxed); + runtime_events.record( + "admission.state", + format!("accepting_new_connections={}", *admission_rx_watch.borrow()), + ); + loop { + if admission_rx_watch.changed().await.is_err() { + break; + } + let admission_open = *admission_rx_watch.borrow(); + runtime_state + .admission_open + .store(admission_open, Ordering::Relaxed); + runtime_events.record( + "admission.state", + format!("accepting_new_connections={}", admission_open), + ); + } + }); +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} From 2d98ebf3c3dd39124ad0caa17593b1a8130c08e6 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:54:26 +0300 Subject: [PATCH 09/12] Runtime w/ Minimal Overhead --- src/api/runtime_min.rs | 534 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 534 insertions(+) create mode 100644 src/api/runtime_min.rs diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs new file mode 100644 index 0000000..96270df --- /dev/null +++ b/src/api/runtime_min.rs @@ -0,0 +1,534 @@ +use std::collections::BTreeSet; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::Serialize; + +use crate::config::ProxyConfig; + +use super::ApiShared; + +const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable"; + +#[derive(Serialize)] +pub(super) struct SecurityWhitelistData { + pub(super) generated_at_epoch_secs: u64, + pub(super) enabled: bool, + pub(super) entries_total: usize, + pub(super) entries: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateGenerationData { + pub(super) active_generation: u64, + pub(super) warm_generation: u64, + pub(super) pending_hardswap_generation: u64, + pub(super) pending_hardswap_age_secs: Option, + pub(super) draining_generations: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateHardswapData { + pub(super) enabled: bool, + pub(super) pending: bool, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateWriterContourData { + pub(super) warm: usize, + pub(super) active: usize, + pub(super) draining: usize, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateWriterHealthData { + pub(super) healthy: usize, + pub(super) degraded: usize, + pub(super) draining: usize, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateWriterData { + pub(super) total: usize, + pub(super) alive_non_draining: usize, + pub(super) draining: usize, + pub(super) degraded: usize, + pub(super) contour: RuntimeMePoolStateWriterContourData, + pub(super) health: RuntimeMePoolStateWriterHealthData, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateRefillDcData { + pub(super) dc: i16, + pub(super) family: &'static str, + pub(super) inflight: usize, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateRefillData { + pub(super) inflight_endpoints_total: usize, + pub(super) inflight_dc_total: usize, + pub(super) by_dc: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStatePayload { + pub(super) generations: RuntimeMePoolStateGenerationData, + pub(super) hardswap: RuntimeMePoolStateHardswapData, + pub(super) writers: RuntimeMePoolStateWriterData, + pub(super) refill: RuntimeMePoolStateRefillData, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMePoolStateData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityCountersData { + pub(super) idle_close_by_peer_total: u64, + pub(super) reader_eof_total: u64, + pub(super) kdf_drift_total: u64, + pub(super) kdf_port_only_drift_total: u64, + pub(super) reconnect_attempt_total: u64, + pub(super) reconnect_success_total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityRouteDropData { + pub(super) no_conn_total: u64, + pub(super) channel_closed_total: u64, + pub(super) queue_full_total: u64, + pub(super) queue_full_base_total: u64, + pub(super) queue_full_high_total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityDcRttData { + pub(super) dc: i16, + pub(super) rtt_ema_ms: Option, + pub(super) alive_writers: usize, + pub(super) required_writers: usize, + pub(super) coverage_pct: f64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityPayload { + pub(super) counters: RuntimeMeQualityCountersData, + pub(super) route_drops: RuntimeMeQualityRouteDropData, + pub(super) dc_rtt: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeUpstreamQualityPolicyData { + pub(super) connect_retry_attempts: u32, + pub(super) connect_retry_backoff_ms: u64, + pub(super) connect_budget_ms: u64, + pub(super) unhealthy_fail_threshold: u32, + pub(super) connect_failfast_hard_errors: bool, +} + +#[derive(Serialize)] +pub(super) struct RuntimeUpstreamQualityCountersData { + pub(super) connect_attempt_total: u64, + pub(super) connect_success_total: u64, + pub(super) connect_fail_total: u64, + pub(super) connect_failfast_hard_error_total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeUpstreamQualitySummaryData { + pub(super) configured_total: usize, + pub(super) healthy_total: usize, + pub(super) unhealthy_total: usize, + pub(super) direct_total: usize, + pub(super) socks4_total: usize, + pub(super) socks5_total: usize, +} + +#[derive(Serialize)] +pub(super) struct RuntimeUpstreamQualityDcData { + pub(super) dc: i16, + pub(super) latency_ema_ms: Option, + pub(super) ip_preference: &'static str, +} + +#[derive(Serialize)] +pub(super) struct RuntimeUpstreamQualityUpstreamData { + pub(super) upstream_id: usize, + pub(super) route_kind: &'static str, + pub(super) address: String, + pub(super) weight: u16, + pub(super) scopes: String, + pub(super) healthy: bool, + pub(super) fails: u32, + pub(super) last_check_age_secs: u64, + pub(super) effective_latency_ms: Option, + pub(super) dc: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeUpstreamQualityData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + pub(super) policy: RuntimeUpstreamQualityPolicyData, + pub(super) counters: RuntimeUpstreamQualityCountersData, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) summary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) upstreams: Option>, +} + +#[derive(Serialize)] +pub(super) struct RuntimeNatStunReflectionData { + pub(super) addr: String, + pub(super) age_secs: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeNatStunFlagsData { + pub(super) nat_probe_enabled: bool, + pub(super) nat_probe_disabled_runtime: bool, + pub(super) nat_probe_attempts: u8, +} + +#[derive(Serialize)] +pub(super) struct RuntimeNatStunServersData { + pub(super) configured: Vec, + pub(super) live: Vec, + pub(super) live_total: usize, +} + +#[derive(Serialize)] +pub(super) struct RuntimeNatStunReflectionBlockData { + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) v4: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) v6: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeNatStunPayload { + pub(super) flags: RuntimeNatStunFlagsData, + pub(super) servers: RuntimeNatStunServersData, + pub(super) reflection: RuntimeNatStunReflectionBlockData, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) stun_backoff_remaining_ms: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeNatStunData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + +pub(super) fn build_security_whitelist_data(cfg: &ProxyConfig) -> SecurityWhitelistData { + let entries = cfg + .server + .api + .whitelist + .iter() + .map(ToString::to_string) + .collect::>(); + SecurityWhitelistData { + generated_at_epoch_secs: now_epoch_secs(), + enabled: !entries.is_empty(), + entries_total: entries.len(), + entries, + } +} + +pub(super) async fn build_runtime_me_pool_state_data(shared: &ApiShared) -> RuntimeMePoolStateData { + let now_epoch_secs = now_epoch_secs(); + let Some(pool) = &shared.me_pool else { + return RuntimeMePoolStateData { + enabled: false, + reason: Some(SOURCE_UNAVAILABLE_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + }; + + let status = pool.api_status_snapshot().await; + let runtime = pool.api_runtime_snapshot().await; + let refill = pool.api_refill_snapshot().await; + + let mut draining_generations = BTreeSet::::new(); + let mut contour_warm = 0usize; + let mut contour_active = 0usize; + let mut contour_draining = 0usize; + let mut draining = 0usize; + let mut degraded = 0usize; + let mut healthy = 0usize; + + for writer in &status.writers { + if writer.draining { + draining_generations.insert(writer.generation); + draining += 1; + } + if writer.degraded && !writer.draining { + degraded += 1; + } + if !writer.degraded && !writer.draining { + healthy += 1; + } + match writer.state { + "warm" => contour_warm += 1, + "active" => contour_active += 1, + _ => contour_draining += 1, + } + } + + RuntimeMePoolStateData { + enabled: true, + reason: None, + generated_at_epoch_secs: status.generated_at_epoch_secs, + data: Some(RuntimeMePoolStatePayload { + generations: RuntimeMePoolStateGenerationData { + active_generation: runtime.active_generation, + warm_generation: runtime.warm_generation, + pending_hardswap_generation: runtime.pending_hardswap_generation, + pending_hardswap_age_secs: runtime.pending_hardswap_age_secs, + draining_generations: draining_generations.into_iter().collect(), + }, + hardswap: RuntimeMePoolStateHardswapData { + enabled: runtime.hardswap_enabled, + pending: runtime.pending_hardswap_generation != 0, + }, + writers: RuntimeMePoolStateWriterData { + total: status.writers.len(), + alive_non_draining: status.writers.len().saturating_sub(draining), + draining, + degraded, + contour: RuntimeMePoolStateWriterContourData { + warm: contour_warm, + active: contour_active, + draining: contour_draining, + }, + health: RuntimeMePoolStateWriterHealthData { + healthy, + degraded, + draining, + }, + }, + refill: RuntimeMePoolStateRefillData { + inflight_endpoints_total: refill.inflight_endpoints_total, + inflight_dc_total: refill.inflight_dc_total, + by_dc: refill + .by_dc + .into_iter() + .map(|entry| RuntimeMePoolStateRefillDcData { + dc: entry.dc, + family: entry.family, + inflight: entry.inflight, + }) + .collect(), + }, + }), + } +} + +pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> RuntimeMeQualityData { + let now_epoch_secs = now_epoch_secs(); + let Some(pool) = &shared.me_pool else { + return RuntimeMeQualityData { + enabled: false, + reason: Some(SOURCE_UNAVAILABLE_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + }; + + let status = pool.api_status_snapshot().await; + RuntimeMeQualityData { + enabled: true, + reason: None, + generated_at_epoch_secs: status.generated_at_epoch_secs, + data: Some(RuntimeMeQualityPayload { + counters: RuntimeMeQualityCountersData { + idle_close_by_peer_total: shared.stats.get_me_idle_close_by_peer_total(), + reader_eof_total: shared.stats.get_me_reader_eof_total(), + kdf_drift_total: shared.stats.get_me_kdf_drift_total(), + kdf_port_only_drift_total: shared.stats.get_me_kdf_port_only_drift_total(), + reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(), + reconnect_success_total: shared.stats.get_me_reconnect_success(), + }, + route_drops: RuntimeMeQualityRouteDropData { + no_conn_total: shared.stats.get_me_route_drop_no_conn(), + channel_closed_total: shared.stats.get_me_route_drop_channel_closed(), + queue_full_total: shared.stats.get_me_route_drop_queue_full(), + queue_full_base_total: shared.stats.get_me_route_drop_queue_full_base(), + queue_full_high_total: shared.stats.get_me_route_drop_queue_full_high(), + }, + dc_rtt: status + .dcs + .into_iter() + .map(|dc| RuntimeMeQualityDcRttData { + dc: dc.dc, + rtt_ema_ms: dc.rtt_ms, + alive_writers: dc.alive_writers, + required_writers: dc.required_writers, + coverage_pct: dc.coverage_pct, + }) + .collect(), + }), + } +} + +pub(super) async fn build_runtime_upstream_quality_data( + shared: &ApiShared, +) -> RuntimeUpstreamQualityData { + let generated_at_epoch_secs = now_epoch_secs(); + let policy = shared.upstream_manager.api_policy_snapshot(); + let counters = RuntimeUpstreamQualityCountersData { + connect_attempt_total: shared.stats.get_upstream_connect_attempt_total(), + connect_success_total: shared.stats.get_upstream_connect_success_total(), + connect_fail_total: shared.stats.get_upstream_connect_fail_total(), + connect_failfast_hard_error_total: shared.stats.get_upstream_connect_failfast_hard_error_total(), + }; + + let Some(snapshot) = shared.upstream_manager.try_api_snapshot() else { + return RuntimeUpstreamQualityData { + enabled: false, + reason: Some(SOURCE_UNAVAILABLE_REASON), + generated_at_epoch_secs, + policy: RuntimeUpstreamQualityPolicyData { + connect_retry_attempts: policy.connect_retry_attempts, + connect_retry_backoff_ms: policy.connect_retry_backoff_ms, + connect_budget_ms: policy.connect_budget_ms, + unhealthy_fail_threshold: policy.unhealthy_fail_threshold, + connect_failfast_hard_errors: policy.connect_failfast_hard_errors, + }, + counters, + summary: None, + upstreams: None, + }; + }; + + RuntimeUpstreamQualityData { + enabled: true, + reason: None, + generated_at_epoch_secs, + policy: RuntimeUpstreamQualityPolicyData { + connect_retry_attempts: policy.connect_retry_attempts, + connect_retry_backoff_ms: policy.connect_retry_backoff_ms, + connect_budget_ms: policy.connect_budget_ms, + unhealthy_fail_threshold: policy.unhealthy_fail_threshold, + connect_failfast_hard_errors: policy.connect_failfast_hard_errors, + }, + counters, + summary: Some(RuntimeUpstreamQualitySummaryData { + configured_total: snapshot.summary.configured_total, + healthy_total: snapshot.summary.healthy_total, + unhealthy_total: snapshot.summary.unhealthy_total, + direct_total: snapshot.summary.direct_total, + socks4_total: snapshot.summary.socks4_total, + socks5_total: snapshot.summary.socks5_total, + }), + upstreams: Some( + snapshot + .upstreams + .into_iter() + .map(|upstream| RuntimeUpstreamQualityUpstreamData { + upstream_id: upstream.upstream_id, + route_kind: match upstream.route_kind { + crate::transport::UpstreamRouteKind::Direct => "direct", + crate::transport::UpstreamRouteKind::Socks4 => "socks4", + crate::transport::UpstreamRouteKind::Socks5 => "socks5", + }, + address: upstream.address, + weight: upstream.weight, + scopes: upstream.scopes, + healthy: upstream.healthy, + fails: upstream.fails, + last_check_age_secs: upstream.last_check_age_secs, + effective_latency_ms: upstream.effective_latency_ms, + dc: upstream + .dc + .into_iter() + .map(|dc| RuntimeUpstreamQualityDcData { + dc: dc.dc, + latency_ema_ms: dc.latency_ema_ms, + ip_preference: match dc.ip_preference { + crate::transport::upstream::IpPreference::Unknown => "unknown", + crate::transport::upstream::IpPreference::PreferV6 => "prefer_v6", + crate::transport::upstream::IpPreference::PreferV4 => "prefer_v4", + crate::transport::upstream::IpPreference::BothWork => "both_work", + crate::transport::upstream::IpPreference::Unavailable => "unavailable", + }, + }) + .collect(), + }) + .collect(), + ), + } +} + +pub(super) async fn build_runtime_nat_stun_data(shared: &ApiShared) -> RuntimeNatStunData { + let now_epoch_secs = now_epoch_secs(); + let Some(pool) = &shared.me_pool else { + return RuntimeNatStunData { + enabled: false, + reason: Some(SOURCE_UNAVAILABLE_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + }; + + let snapshot = pool.api_nat_stun_snapshot().await; + RuntimeNatStunData { + enabled: true, + reason: None, + generated_at_epoch_secs: now_epoch_secs, + data: Some(RuntimeNatStunPayload { + flags: RuntimeNatStunFlagsData { + nat_probe_enabled: snapshot.nat_probe_enabled, + nat_probe_disabled_runtime: snapshot.nat_probe_disabled_runtime, + nat_probe_attempts: snapshot.nat_probe_attempts, + }, + servers: RuntimeNatStunServersData { + configured: snapshot.configured_servers, + live: snapshot.live_servers.clone(), + live_total: snapshot.live_servers.len(), + }, + reflection: RuntimeNatStunReflectionBlockData { + v4: snapshot.reflection_v4.map(|entry| RuntimeNatStunReflectionData { + addr: entry.addr.to_string(), + age_secs: entry.age_secs, + }), + v6: snapshot.reflection_v6.map(|entry| RuntimeNatStunReflectionData { + addr: entry.addr.to_string(), + age_secs: entry.age_secs, + }), + }, + stun_backoff_remaining_ms: snapshot.stun_backoff_remaining_ms, + }), + } +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} From da89415961f6a133d47b6a455dd4f1ffc09d32ff Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:54:37 +0300 Subject: [PATCH 10/12] Runtime API on Edge --- src/api/runtime_edge.rs | 294 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 src/api/runtime_edge.rs diff --git a/src/api/runtime_edge.rs b/src/api/runtime_edge.rs new file mode 100644 index 0000000..b61f504 --- /dev/null +++ b/src/api/runtime_edge.rs @@ -0,0 +1,294 @@ +use std::cmp::Reverse; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use serde::Serialize; + +use crate::config::ProxyConfig; + +use super::ApiShared; +use super::events::ApiEventRecord; + +const FEATURE_DISABLED_REASON: &str = "feature_disabled"; +const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable"; +const EVENTS_DEFAULT_LIMIT: usize = 50; +const EVENTS_MAX_LIMIT: usize = 1000; + +#[derive(Clone, Serialize)] +pub(super) struct RuntimeEdgeConnectionUserData { + pub(super) username: String, + pub(super) current_connections: u64, + pub(super) total_octets: u64, +} + +#[derive(Clone, Serialize)] +pub(super) struct RuntimeEdgeConnectionTotalsData { + pub(super) current_connections: u64, + pub(super) current_connections_me: u64, + pub(super) current_connections_direct: u64, + pub(super) active_users: usize, +} + +#[derive(Clone, Serialize)] +pub(super) struct RuntimeEdgeConnectionTopData { + pub(super) limit: usize, + pub(super) by_connections: Vec, + pub(super) by_throughput: Vec, +} + +#[derive(Clone, Serialize)] +pub(super) struct RuntimeEdgeConnectionCacheData { + pub(super) ttl_ms: u64, + pub(super) served_from_cache: bool, + pub(super) stale_cache_used: bool, +} + +#[derive(Clone, Serialize)] +pub(super) struct RuntimeEdgeConnectionTelemetryData { + pub(super) user_enabled: bool, + pub(super) throughput_is_cumulative: bool, +} + +#[derive(Clone, Serialize)] +pub(super) struct RuntimeEdgeConnectionsSummaryPayload { + pub(super) cache: RuntimeEdgeConnectionCacheData, + pub(super) totals: RuntimeEdgeConnectionTotalsData, + pub(super) top: RuntimeEdgeConnectionTopData, + pub(super) telemetry: RuntimeEdgeConnectionTelemetryData, +} + +#[derive(Serialize)] +pub(super) struct RuntimeEdgeConnectionsSummaryData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + +#[derive(Clone)] +pub(crate) struct EdgeConnectionsCacheEntry { + pub(super) expires_at: Instant, + pub(super) payload: RuntimeEdgeConnectionsSummaryPayload, + pub(super) generated_at_epoch_secs: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeEdgeEventsPayload { + pub(super) capacity: usize, + pub(super) dropped_total: u64, + pub(super) events: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeEdgeEventsData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + +pub(super) async fn build_runtime_connections_summary_data( + shared: &ApiShared, + cfg: &ProxyConfig, +) -> RuntimeEdgeConnectionsSummaryData { + let now_epoch_secs = now_epoch_secs(); + let api_cfg = &cfg.server.api; + if !api_cfg.runtime_edge_enabled { + return RuntimeEdgeConnectionsSummaryData { + enabled: false, + reason: Some(FEATURE_DISABLED_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + } + + let (generated_at_epoch_secs, payload) = match get_connections_payload_cached( + shared, + api_cfg.runtime_edge_cache_ttl_ms, + api_cfg.runtime_edge_top_n, + ) + .await + { + Some(v) => v, + None => { + return RuntimeEdgeConnectionsSummaryData { + enabled: true, + reason: Some(SOURCE_UNAVAILABLE_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + } + }; + + RuntimeEdgeConnectionsSummaryData { + enabled: true, + reason: None, + generated_at_epoch_secs, + data: Some(payload), + } +} + +pub(super) fn build_runtime_events_recent_data( + shared: &ApiShared, + cfg: &ProxyConfig, + query: Option<&str>, +) -> RuntimeEdgeEventsData { + let now_epoch_secs = now_epoch_secs(); + let api_cfg = &cfg.server.api; + if !api_cfg.runtime_edge_enabled { + return RuntimeEdgeEventsData { + enabled: false, + reason: Some(FEATURE_DISABLED_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + } + + let limit = parse_recent_events_limit(query, EVENTS_DEFAULT_LIMIT, EVENTS_MAX_LIMIT); + let snapshot = shared.runtime_events.snapshot(limit); + + RuntimeEdgeEventsData { + enabled: true, + reason: None, + generated_at_epoch_secs: now_epoch_secs, + data: Some(RuntimeEdgeEventsPayload { + capacity: snapshot.capacity, + dropped_total: snapshot.dropped_total, + events: snapshot.events, + }), + } +} + +async fn get_connections_payload_cached( + shared: &ApiShared, + cache_ttl_ms: u64, + top_n: usize, +) -> Option<(u64, RuntimeEdgeConnectionsSummaryPayload)> { + if cache_ttl_ms > 0 { + let now = Instant::now(); + let cached = shared.runtime_edge_connections_cache.lock().await.clone(); + if let Some(entry) = cached + && now < entry.expires_at + { + let mut payload = entry.payload; + payload.cache.served_from_cache = true; + payload.cache.stale_cache_used = false; + return Some((entry.generated_at_epoch_secs, payload)); + } + } + + let Ok(_guard) = shared.runtime_edge_recompute_lock.try_lock() else { + let cached = shared.runtime_edge_connections_cache.lock().await.clone(); + if let Some(entry) = cached { + let mut payload = entry.payload; + payload.cache.served_from_cache = true; + payload.cache.stale_cache_used = true; + return Some((entry.generated_at_epoch_secs, payload)); + } + return None; + }; + + let generated_at_epoch_secs = now_epoch_secs(); + let payload = recompute_connections_payload(shared, cache_ttl_ms, top_n).await; + + if cache_ttl_ms > 0 { + let entry = EdgeConnectionsCacheEntry { + expires_at: Instant::now() + Duration::from_millis(cache_ttl_ms), + payload: payload.clone(), + generated_at_epoch_secs, + }; + *shared.runtime_edge_connections_cache.lock().await = Some(entry); + } + + Some((generated_at_epoch_secs, payload)) +} + +async fn recompute_connections_payload( + shared: &ApiShared, + cache_ttl_ms: u64, + top_n: usize, +) -> RuntimeEdgeConnectionsSummaryPayload { + let mut rows = Vec::::new(); + let mut active_users = 0usize; + for entry in shared.stats.iter_user_stats() { + let user_stats = entry.value(); + let current_connections = user_stats + .curr_connects + .load(std::sync::atomic::Ordering::Relaxed); + let total_octets = user_stats + .octets_from_client + .load(std::sync::atomic::Ordering::Relaxed) + .saturating_add( + user_stats + .octets_to_client + .load(std::sync::atomic::Ordering::Relaxed), + ); + if current_connections > 0 { + active_users = active_users.saturating_add(1); + } + rows.push(RuntimeEdgeConnectionUserData { + username: entry.key().clone(), + current_connections, + total_octets, + }); + } + + let limit = top_n.max(1); + let mut by_connections = rows.clone(); + by_connections.sort_by_key(|row| (Reverse(row.current_connections), row.username.clone())); + by_connections.truncate(limit); + + let mut by_throughput = rows; + by_throughput.sort_by_key(|row| (Reverse(row.total_octets), row.username.clone())); + by_throughput.truncate(limit); + + let telemetry = shared.stats.telemetry_policy(); + RuntimeEdgeConnectionsSummaryPayload { + cache: RuntimeEdgeConnectionCacheData { + ttl_ms: cache_ttl_ms, + served_from_cache: false, + stale_cache_used: false, + }, + totals: RuntimeEdgeConnectionTotalsData { + current_connections: shared.stats.get_current_connections_total(), + current_connections_me: shared.stats.get_current_connections_me(), + current_connections_direct: shared.stats.get_current_connections_direct(), + active_users, + }, + top: RuntimeEdgeConnectionTopData { + limit, + by_connections, + by_throughput, + }, + telemetry: RuntimeEdgeConnectionTelemetryData { + user_enabled: telemetry.user_enabled, + throughput_is_cumulative: true, + }, + } +} + +fn parse_recent_events_limit(query: Option<&str>, default_limit: usize, max_limit: usize) -> usize { + let Some(query) = query else { + return default_limit; + }; + for pair in query.split('&') { + let mut split = pair.splitn(2, '='); + if split.next() == Some("limit") + && let Some(raw) = split.next() + && let Ok(parsed) = raw.parse::() + { + return parsed.clamp(1, max_limit); + } + } + default_limit +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} From d87196c105cd891d7d0d809550764ed0132b54ef Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:55:04 +0300 Subject: [PATCH 11/12] HTTP Utils for API --- src/api/http_utils.rs | 91 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 src/api/http_utils.rs diff --git a/src/api/http_utils.rs b/src/api/http_utils.rs new file mode 100644 index 0000000..e04bd04 --- /dev/null +++ b/src/api/http_utils.rs @@ -0,0 +1,91 @@ +use http_body_util::{BodyExt, Full}; +use hyper::StatusCode; +use hyper::body::{Bytes, Incoming}; +use serde::Serialize; +use serde::de::DeserializeOwned; + +use super::model::{ApiFailure, ErrorBody, ErrorResponse, SuccessResponse}; + +pub(super) fn success_response( + status: StatusCode, + data: T, + revision: String, +) -> hyper::Response> { + let payload = SuccessResponse { + ok: true, + data, + revision, + }; + let body = serde_json::to_vec(&payload).unwrap_or_else(|_| b"{\"ok\":false}".to_vec()); + hyper::Response::builder() + .status(status) + .header("content-type", "application/json; charset=utf-8") + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +pub(super) fn error_response( + request_id: u64, + failure: ApiFailure, +) -> hyper::Response> { + let payload = ErrorResponse { + ok: false, + error: ErrorBody { + code: failure.code, + message: failure.message, + }, + request_id, + }; + let body = serde_json::to_vec(&payload).unwrap_or_else(|_| { + format!( + "{{\"ok\":false,\"error\":{{\"code\":\"internal_error\",\"message\":\"serialization failed\"}},\"request_id\":{}}}", + request_id + ) + .into_bytes() + }); + hyper::Response::builder() + .status(failure.status) + .header("content-type", "application/json; charset=utf-8") + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +pub(super) async fn read_json( + body: Incoming, + limit: usize, +) -> Result { + let bytes = read_body_with_limit(body, limit).await?; + serde_json::from_slice(&bytes).map_err(|_| ApiFailure::bad_request("Invalid JSON body")) +} + +pub(super) async fn read_optional_json( + body: Incoming, + limit: usize, +) -> Result, ApiFailure> { + let bytes = read_body_with_limit(body, limit).await?; + if bytes.is_empty() { + return Ok(None); + } + serde_json::from_slice(&bytes) + .map(Some) + .map_err(|_| ApiFailure::bad_request("Invalid JSON body")) +} + +async fn read_body_with_limit(body: Incoming, limit: usize) -> Result, ApiFailure> { + let mut collected = Vec::new(); + let mut body = body; + while let Some(frame_result) = body.frame().await { + let frame = frame_result.map_err(|_| ApiFailure::bad_request("Invalid request body"))?; + if let Some(chunk) = frame.data_ref() { + if collected.len().saturating_add(chunk.len()) > limit { + return Err(ApiFailure::new( + StatusCode::PAYLOAD_TOO_LARGE, + "payload_too_large", + format!("Body exceeds {} bytes", limit), + )); + } + collected.extend_from_slice(chunk); + } + } + Ok(collected) +} From 42212309698da5fe1f1a0e239b2fb170fd9b99a5 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 6 Mar 2026 18:55:20 +0300 Subject: [PATCH 12/12] API Events + API as module --- src/api/events.rs | 90 ++++++++++++++++ src/api/mod.rs | 262 +++++++++++++++++++++++----------------------- 2 files changed, 223 insertions(+), 129 deletions(-) create mode 100644 src/api/events.rs diff --git a/src/api/events.rs b/src/api/events.rs new file mode 100644 index 0000000..4ca91e8 --- /dev/null +++ b/src/api/events.rs @@ -0,0 +1,90 @@ +use std::collections::VecDeque; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::Serialize; + +#[derive(Clone, Serialize)] +pub(super) struct ApiEventRecord { + pub(super) seq: u64, + pub(super) ts_epoch_secs: u64, + pub(super) event_type: String, + pub(super) context: String, +} + +#[derive(Clone, Serialize)] +pub(super) struct ApiEventSnapshot { + pub(super) capacity: usize, + pub(super) dropped_total: u64, + pub(super) events: Vec, +} + +struct ApiEventsInner { + capacity: usize, + dropped_total: u64, + next_seq: u64, + events: VecDeque, +} + +/// Bounded ring-buffer for control-plane API/runtime events. +pub(crate) struct ApiEventStore { + inner: Mutex, +} + +impl ApiEventStore { + pub(super) fn new(capacity: usize) -> Self { + let bounded = capacity.max(16); + Self { + inner: Mutex::new(ApiEventsInner { + capacity: bounded, + dropped_total: 0, + next_seq: 1, + events: VecDeque::with_capacity(bounded), + }), + } + } + + pub(super) fn record(&self, event_type: &str, context: impl Into) { + let now_epoch_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let mut context = context.into(); + if context.len() > 256 { + context.truncate(256); + } + + let mut guard = self.inner.lock().expect("api event store mutex poisoned"); + if guard.events.len() == guard.capacity { + guard.events.pop_front(); + guard.dropped_total = guard.dropped_total.saturating_add(1); + } + let seq = guard.next_seq; + guard.next_seq = guard.next_seq.saturating_add(1); + guard.events.push_back(ApiEventRecord { + seq, + ts_epoch_secs: now_epoch_secs, + event_type: event_type.to_string(), + context, + }); + } + + pub(super) fn snapshot(&self, limit: usize) -> ApiEventSnapshot { + let guard = self.inner.lock().expect("api event store mutex poisoned"); + let bounded_limit = limit.clamp(1, guard.capacity.max(1)); + let mut items: Vec = guard + .events + .iter() + .rev() + .take(bounded_limit) + .cloned() + .collect(); + items.reverse(); + + ApiEventSnapshot { + capacity: guard.capacity, + dropped_total: guard.dropped_total, + events: items, + } + } +} diff --git a/src/api/mod.rs b/src/api/mod.rs index f2d31da..a705a46 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,16 +3,13 @@ use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::{SystemTime, UNIX_EPOCH}; -use http_body_util::{BodyExt, Full}; +use http_body_util::Full; use hyper::body::{Bytes, Incoming}; use hyper::header::AUTHORIZATION; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; -use serde::Serialize; -use serde::de::DeserializeOwned; use tokio::net::TcpListener; use tokio::sync::{Mutex, watch}; use tracing::{debug, info, warn}; @@ -24,15 +21,29 @@ use crate::transport::middle_proxy::MePool; use crate::transport::UpstreamManager; mod config_store; +mod events; +mod http_utils; mod model; +mod runtime_edge; +mod runtime_min; mod runtime_stats; +mod runtime_watch; mod runtime_zero; mod users; use config_store::{current_revision, parse_if_match}; +use http_utils::{error_response, read_json, read_optional_json, success_response}; +use events::ApiEventStore; use model::{ - ApiFailure, CreateUserRequest, ErrorBody, ErrorResponse, HealthData, PatchUserRequest, - RotateSecretRequest, SuccessResponse, SummaryData, + ApiFailure, CreateUserRequest, HealthData, PatchUserRequest, RotateSecretRequest, SummaryData, +}; +use runtime_edge::{ + EdgeConnectionsCacheEntry, build_runtime_connections_summary_data, + build_runtime_events_recent_data, +}; +use runtime_min::{ + build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data, + build_runtime_upstream_quality_data, build_security_whitelist_data, }; use runtime_stats::{ MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data, @@ -42,6 +53,7 @@ use runtime_zero::{ build_limits_effective_data, build_runtime_gates_data, build_security_posture_data, build_system_info_data, }; +use runtime_watch::spawn_runtime_watchers; use users::{create_user, delete_user, patch_user, rotate_secret, users_from_config}; pub(super) struct ApiRuntimeState { @@ -62,6 +74,9 @@ pub(super) struct ApiShared { pub(super) startup_detected_ip_v6: Option, pub(super) mutation_lock: Arc>, pub(super) minimal_cache: Arc>>, + pub(super) runtime_edge_connections_cache: Arc>>, + pub(super) runtime_edge_recompute_lock: Arc>, + pub(super) runtime_events: Arc, pub(super) request_id: Arc, pub(super) runtime_state: Arc, } @@ -116,40 +131,21 @@ pub async fn serve( startup_detected_ip_v6, mutation_lock: Arc::new(Mutex::new(())), minimal_cache: Arc::new(Mutex::new(None)), + runtime_edge_connections_cache: Arc::new(Mutex::new(None)), + runtime_edge_recompute_lock: Arc::new(Mutex::new(())), + runtime_events: Arc::new(ApiEventStore::new( + config_rx.borrow().server.api.runtime_edge_events_capacity, + )), request_id: Arc::new(AtomicU64::new(1)), runtime_state: runtime_state.clone(), }); - let mut config_rx_reload = config_rx.clone(); - let runtime_state_reload = runtime_state.clone(); - tokio::spawn(async move { - loop { - if config_rx_reload.changed().await.is_err() { - break; - } - runtime_state_reload - .config_reload_count - .fetch_add(1, Ordering::Relaxed); - runtime_state_reload - .last_config_reload_epoch_secs - .store(now_epoch_secs(), Ordering::Relaxed); - } - }); - - let mut admission_rx_watch = admission_rx.clone(); - tokio::spawn(async move { - runtime_state - .admission_open - .store(*admission_rx_watch.borrow(), Ordering::Relaxed); - loop { - if admission_rx_watch.changed().await.is_err() { - break; - } - runtime_state - .admission_open - .store(*admission_rx_watch.borrow(), Ordering::Relaxed); - } - }); + spawn_runtime_watchers( + config_rx.clone(), + admission_rx.clone(), + runtime_state.clone(), + shared.runtime_events.clone(), + ); loop { let (stream, peer) = match listener.accept().await { @@ -232,6 +228,7 @@ async fn handle( let method = req.method().clone(); let path = req.uri().path().to_string(); + let query = req.uri().query().map(str::to_string); let body_limit = api_cfg.request_body_limit_bytes; let result: Result>, ApiFailure> = async { @@ -264,6 +261,11 @@ async fn handle( let data = build_security_posture_data(cfg.as_ref()); Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/security/whitelist") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_security_whitelist_data(cfg.as_ref()); + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/summary") => { let revision = current_revision(&shared.config_path).await?; let data = SummaryData { @@ -300,6 +302,40 @@ async fn handle( let data = build_dcs_data(shared.as_ref(), api_cfg).await; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/runtime/me_pool_state") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_me_pool_state_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/me_quality") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_me_quality_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/upstream_quality") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_upstream_quality_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/nat_stun") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_nat_stun_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/connections/summary") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_connections_summary_data(shared.as_ref(), cfg.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } + ("GET", "/v1/runtime/events/recent") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_events_recent_data( + shared.as_ref(), + cfg.as_ref(), + query.as_deref(), + ); + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/stats/users") | ("GET", "/v1/users") => { let revision = current_revision(&shared.config_path).await?; let users = users_from_config( @@ -325,7 +361,17 @@ async fn handle( } let expected_revision = parse_if_match(req.headers()); let body = read_json::(req.into_body(), body_limit).await?; - let (data, revision) = create_user(body, expected_revision, &shared).await?; + let result = create_user(body, expected_revision, &shared).await; + let (data, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record("api.user.create.failed", error.code); + return Err(error); + } + }; + shared + .runtime_events + .record("api.user.create.ok", format!("username={}", data.user.username)); Ok(success_response(StatusCode::CREATED, data, revision)) } _ => { @@ -365,8 +411,20 @@ async fn handle( } let expected_revision = parse_if_match(req.headers()); let body = read_json::(req.into_body(), body_limit).await?; - let (data, revision) = - patch_user(user, body, expected_revision, &shared).await?; + let result = patch_user(user, body, expected_revision, &shared).await; + let (data, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record( + "api.user.patch.failed", + format!("username={} code={}", user, error.code), + ); + return Err(error); + } + }; + shared + .runtime_events + .record("api.user.patch.ok", format!("username={}", data.username)); return Ok(success_response(StatusCode::OK, data, revision)); } if method == Method::DELETE { @@ -381,8 +439,21 @@ async fn handle( )); } let expected_revision = parse_if_match(req.headers()); - let (deleted_user, revision) = - delete_user(user, expected_revision, &shared).await?; + let result = delete_user(user, expected_revision, &shared).await; + let (deleted_user, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record( + "api.user.delete.failed", + format!("username={} code={}", user, error.code), + ); + return Err(error); + } + }; + shared.runtime_events.record( + "api.user.delete.ok", + format!("username={}", deleted_user), + ); return Ok(success_response(StatusCode::OK, deleted_user, revision)); } if method == Method::POST @@ -404,9 +475,27 @@ async fn handle( let body = read_optional_json::(req.into_body(), body_limit) .await?; - let (data, revision) = - rotate_secret(base_user, body.unwrap_or_default(), expected_revision, &shared) - .await?; + let result = rotate_secret( + base_user, + body.unwrap_or_default(), + expected_revision, + &shared, + ) + .await; + let (data, revision) = match result { + Ok(ok) => ok, + Err(error) => { + shared.runtime_events.record( + "api.user.rotate_secret.failed", + format!("username={} code={}", base_user, error.code), + ); + return Err(error); + } + }; + shared.runtime_events.record( + "api.user.rotate_secret.ok", + format!("username={}", base_user), + ); return Ok(success_response(StatusCode::OK, data, revision)); } if method == Method::POST { @@ -438,88 +527,3 @@ async fn handle( Err(error) => Ok(error_response(request_id, error)), } } - -fn success_response( - status: StatusCode, - data: T, - revision: String, -) -> Response> { - let payload = SuccessResponse { - ok: true, - data, - revision, - }; - let body = serde_json::to_vec(&payload).unwrap_or_else(|_| b"{\"ok\":false}".to_vec()); - Response::builder() - .status(status) - .header("content-type", "application/json; charset=utf-8") - .body(Full::new(Bytes::from(body))) - .unwrap() -} - -fn error_response(request_id: u64, failure: ApiFailure) -> Response> { - let payload = ErrorResponse { - ok: false, - error: ErrorBody { - code: failure.code, - message: failure.message, - }, - request_id, - }; - let body = serde_json::to_vec(&payload).unwrap_or_else(|_| { - format!( - "{{\"ok\":false,\"error\":{{\"code\":\"internal_error\",\"message\":\"serialization failed\"}},\"request_id\":{}}}", - request_id - ) - .into_bytes() - }); - Response::builder() - .status(failure.status) - .header("content-type", "application/json; charset=utf-8") - .body(Full::new(Bytes::from(body))) - .unwrap() -} - -async fn read_json(body: Incoming, limit: usize) -> Result { - let bytes = read_body_with_limit(body, limit).await?; - serde_json::from_slice(&bytes).map_err(|_| ApiFailure::bad_request("Invalid JSON body")) -} - -async fn read_optional_json( - body: Incoming, - limit: usize, -) -> Result, ApiFailure> { - let bytes = read_body_with_limit(body, limit).await?; - if bytes.is_empty() { - return Ok(None); - } - serde_json::from_slice(&bytes) - .map(Some) - .map_err(|_| ApiFailure::bad_request("Invalid JSON body")) -} - -async fn read_body_with_limit(body: Incoming, limit: usize) -> Result, ApiFailure> { - let mut collected = Vec::new(); - let mut body = body; - while let Some(frame_result) = body.frame().await { - let frame = frame_result.map_err(|_| ApiFailure::bad_request("Invalid request body"))?; - if let Some(chunk) = frame.data_ref() { - if collected.len().saturating_add(chunk.len()) > limit { - return Err(ApiFailure::new( - StatusCode::PAYLOAD_TOO_LARGE, - "payload_too_large", - format!("Body exceeds {} bytes", limit), - )); - } - collected.extend_from_slice(chunk); - } - } - Ok(collected) -} - -fn now_epoch_secs() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs() -}