From 8b39a4ef6d924acd35935f9b4c3d1849f6d9d546 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:18:31 +0300 Subject: [PATCH] Statistics on ME + Dynamic backpressure + KDF with SOCKS Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 12 + src/config/hot_reload.rs | 52 ++- src/config/load.rs | 20 + src/config/types.rs | 117 ++++++ src/main.rs | 27 ++ src/metrics.rs | 482 ++++++++++++++++++++---- src/stats/mod.rs | 292 ++++++++++++-- src/stats/telemetry.rs | 29 ++ src/transport/middle_proxy/handshake.rs | 37 +- src/transport/middle_proxy/ping.rs | 44 +++ src/transport/middle_proxy/pool.rs | 37 +- src/transport/middle_proxy/reader.rs | 18 +- src/transport/middle_proxy/registry.rs | 72 +++- 13 files changed, 1108 insertions(+), 131 deletions(-) create mode 100644 src/stats/telemetry.rs diff --git a/src/config/defaults.rs b/src/config/defaults.rs index d82f8ed..dbc251c 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -170,6 +170,18 @@ pub(crate) fn default_desync_all_full() -> bool { false } +pub(crate) fn default_me_route_backpressure_base_timeout_ms() -> u64 { + 25 +} + +pub(crate) fn default_me_route_backpressure_high_timeout_ms() -> u64 { + 120 +} + +pub(crate) fn default_me_route_backpressure_high_watermark_pct() -> u8 { + 80 +} + pub(crate) fn default_beobachten_minutes() -> u64 { 10 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index acc64cd..579a9cb 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -16,6 +16,7 @@ //! | `general` | `me_pool_drain_ttl_secs` | Applied on next ME map update | //! | `general` | `me_pool_min_fresh_ratio` | Applied on next ME map update | //! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update | +//! | `general` | `telemetry` / `me_*_policy` | Applied immediately | //! | `network` | `dns_overrides` | Applied immediately | //! | `access` | All user/quota fields | Effective immediately | //! @@ -30,7 +31,7 @@ use notify::{EventKind, RecursiveMode, Watcher, recommended_watcher}; use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; -use crate::config::LogLevel; +use crate::config::{LogLevel, MeSocksKdfPolicy, MeTelemetryLevel}; use super::load::ProxyConfig; // ── Hot fields ──────────────────────────────────────────────────────────────── @@ -52,6 +53,13 @@ pub struct HotFields { pub me_keepalive_interval_secs: u64, pub me_keepalive_jitter_secs: u64, pub me_keepalive_payload_random: bool, + pub telemetry_core_enabled: bool, + pub telemetry_user_enabled: bool, + pub telemetry_me_level: MeTelemetryLevel, + pub me_socks_kdf_policy: MeSocksKdfPolicy, + pub me_route_backpressure_base_timeout_ms: u64, + pub me_route_backpressure_high_timeout_ms: u64, + pub me_route_backpressure_high_watermark_pct: u8, pub access: crate::config::AccessConfig, } @@ -72,6 +80,13 @@ impl HotFields { me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, me_keepalive_payload_random: cfg.general.me_keepalive_payload_random, + telemetry_core_enabled: cfg.general.telemetry.core_enabled, + telemetry_user_enabled: cfg.general.telemetry.user_enabled, + telemetry_me_level: cfg.general.telemetry.me_level, + me_socks_kdf_policy: cfg.general.me_socks_kdf_policy, + me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, + me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, + me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, access: cfg.access.clone(), } } @@ -262,6 +277,41 @@ fn log_changes( ); } + if old_hot.telemetry_core_enabled != new_hot.telemetry_core_enabled + || old_hot.telemetry_user_enabled != new_hot.telemetry_user_enabled + || old_hot.telemetry_me_level != new_hot.telemetry_me_level + { + info!( + "config reload: telemetry: core_enabled={} user_enabled={} me_level={}", + new_hot.telemetry_core_enabled, + new_hot.telemetry_user_enabled, + new_hot.telemetry_me_level, + ); + } + + if old_hot.me_socks_kdf_policy != new_hot.me_socks_kdf_policy { + info!( + "config reload: me_socks_kdf_policy: {:?} → {:?}", + old_hot.me_socks_kdf_policy, + new_hot.me_socks_kdf_policy, + ); + } + + if old_hot.me_route_backpressure_base_timeout_ms + != new_hot.me_route_backpressure_base_timeout_ms + || old_hot.me_route_backpressure_high_timeout_ms + != new_hot.me_route_backpressure_high_timeout_ms + || old_hot.me_route_backpressure_high_watermark_pct + != new_hot.me_route_backpressure_high_watermark_pct + { + info!( + "config reload: me_route_backpressure: base={}ms high={}ms watermark={}%", + new_hot.me_route_backpressure_base_timeout_ms, + new_hot.me_route_backpressure_high_timeout_ms, + new_hot.me_route_backpressure_high_watermark_pct, + ); + } + if old_hot.access.users != new_hot.access.users { let mut added: Vec<&String> = new_hot.access.users.keys() .filter(|u| !old_hot.access.users.contains_key(*u)) diff --git a/src/config/load.rs b/src/config/load.rs index c1bbdef..7c578a3 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -311,6 +311,26 @@ impl ProxyConfig { )); } + if config.general.me_route_backpressure_base_timeout_ms == 0 { + return Err(ProxyError::Config( + "general.me_route_backpressure_base_timeout_ms must be > 0".to_string(), + )); + } + + if config.general.me_route_backpressure_high_timeout_ms + < config.general.me_route_backpressure_base_timeout_ms + { + return Err(ProxyError::Config( + "general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(), + )); + } + + if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) { + return Err(ProxyError::Config( + "general.me_route_backpressure_high_watermark_pct must be within [1, 100]".to_string(), + )); + } + if config.general.effective_me_pool_force_close_secs() > 0 && config.general.effective_me_pool_force_close_secs() < config.general.me_pool_drain_ttl_secs diff --git a/src/config/types.rs b/src/config/types.rs index 7d9f13a..902d816 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -59,6 +59,98 @@ impl std::fmt::Display for LogLevel { } } +/// Middle-End telemetry verbosity level. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum MeTelemetryLevel { + #[default] + Normal, + Silent, + Debug, +} + +impl MeTelemetryLevel { + pub fn as_u8(self) -> u8 { + match self { + MeTelemetryLevel::Silent => 0, + MeTelemetryLevel::Normal => 1, + MeTelemetryLevel::Debug => 2, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 0 => MeTelemetryLevel::Silent, + 2 => MeTelemetryLevel::Debug, + _ => MeTelemetryLevel::Normal, + } + } + + pub fn allows_normal(self) -> bool { + !matches!(self, MeTelemetryLevel::Silent) + } + + pub fn allows_debug(self) -> bool { + matches!(self, MeTelemetryLevel::Debug) + } +} + +impl std::fmt::Display for MeTelemetryLevel { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MeTelemetryLevel::Silent => write!(f, "silent"), + MeTelemetryLevel::Normal => write!(f, "normal"), + MeTelemetryLevel::Debug => write!(f, "debug"), + } + } +} + +/// Middle-End SOCKS KDF fallback policy. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum MeSocksKdfPolicy { + #[default] + Strict, + Compat, +} + +impl MeSocksKdfPolicy { + pub fn as_u8(self) -> u8 { + match self { + MeSocksKdfPolicy::Strict => 0, + MeSocksKdfPolicy::Compat => 1, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 1 => MeSocksKdfPolicy::Compat, + _ => MeSocksKdfPolicy::Strict, + } + } +} + +/// Telemetry controls for hot-path counters and ME diagnostics. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TelemetryConfig { + #[serde(default = "default_true")] + pub core_enabled: bool, + #[serde(default = "default_true")] + pub user_enabled: bool, + #[serde(default)] + pub me_level: MeTelemetryLevel, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + core_enabled: default_true(), + user_enabled: default_true(), + me_level: MeTelemetryLevel::Normal, + } + } +} + // ============= Sub-Configs ============= #[derive(Debug, Clone, Serialize, Deserialize)] @@ -288,6 +380,26 @@ pub struct GeneralConfig { #[serde(default)] pub disable_colors: bool, + /// Runtime telemetry controls for counters/metrics in hot paths. + #[serde(default)] + pub telemetry: TelemetryConfig, + + /// SOCKS-bound KDF policy for Middle-End handshake. + #[serde(default)] + pub me_socks_kdf_policy: MeSocksKdfPolicy, + + /// Base backpressure timeout in milliseconds for ME route channel send. + #[serde(default = "default_me_route_backpressure_base_timeout_ms")] + pub me_route_backpressure_base_timeout_ms: u64, + + /// High backpressure timeout in milliseconds when queue occupancy is above watermark. + #[serde(default = "default_me_route_backpressure_high_timeout_ms")] + pub me_route_backpressure_high_timeout_ms: u64, + + /// Queue occupancy percent threshold for high backpressure timeout. + #[serde(default = "default_me_route_backpressure_high_watermark_pct")] + pub me_route_backpressure_high_watermark_pct: u8, + /// [general.links] — proxy link generation overrides. #[serde(default)] pub links: LinksConfig, @@ -414,6 +526,11 @@ impl Default for GeneralConfig { unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, disable_colors: false, + telemetry: TelemetryConfig::default(), + me_socks_kdf_policy: MeSocksKdfPolicy::Strict, + me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(), + me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(), + me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(), links: LinksConfig::default(), crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), diff --git a/src/main.rs b/src/main.rs index e985051..4d4d3f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ use crate::ip_tracker::UserIpTracker; use crate::network::probe::{decide_network_capabilities, log_probe_result, run_probe}; use crate::proxy::ClientHandler; use crate::stats::beobachten::BeobachtenStore; +use crate::stats::telemetry::TelemetryPolicy; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ @@ -406,6 +407,7 @@ async fn main() -> std::result::Result<(), Box> { let prefer_ipv6 = decision.prefer_ipv6(); let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); let stats = Arc::new(Stats::new()); + stats.apply_telemetry_policy(TelemetryPolicy::from_config(&config.general.telemetry)); let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); @@ -539,6 +541,10 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_hardswap_warmup_delay_max_ms, config.general.me_hardswap_warmup_extra_passes, config.general.me_hardswap_warmup_pass_backoff_base_ms, + config.general.me_socks_kdf_policy, + config.general.me_route_backpressure_base_timeout_ms, + config.general.me_route_backpressure_high_timeout_ms, + config.general.me_route_backpressure_high_watermark_pct, ); let pool_size = config.general.middle_proxy_pool_size.max(1); @@ -794,6 +800,27 @@ async fn main() -> std::result::Result<(), Box> { detected_ip_v6, ); + let stats_policy = stats.clone(); + let mut config_rx_policy = config_rx.clone(); + let me_pool_policy = me_pool.clone(); + tokio::spawn(async move { + loop { + if config_rx_policy.changed().await.is_err() { + break; + } + let cfg = config_rx_policy.borrow_and_update().clone(); + stats_policy.apply_telemetry_policy(TelemetryPolicy::from_config(&cfg.general.telemetry)); + if let Some(pool) = &me_pool_policy { + pool.update_runtime_transport_policy( + cfg.general.me_socks_kdf_policy, + cfg.general.me_route_backpressure_base_timeout_ms, + cfg.general.me_route_backpressure_high_timeout_ms, + cfg.general.me_route_backpressure_high_watermark_pct, + ); + } + } + }); + let beobachten_writer = beobachten.clone(); let config_rx_beobachten = config_rx.clone(); tokio::spawn(async move { diff --git a/src/metrics.rs b/src/metrics.rs index 63b337b..35f29ca 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -118,120 +118,394 @@ fn render_beobachten(beobachten: &BeobachtenStore, config: &ProxyConfig) -> Stri async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIpTracker) -> String { use std::fmt::Write; let mut out = String::with_capacity(4096); + let telemetry = stats.telemetry_policy(); + let core_enabled = telemetry.core_enabled; + let user_enabled = telemetry.user_enabled; + let me_allows_normal = telemetry.me_level.allows_normal(); + let me_allows_debug = telemetry.me_level.allows_debug(); let _ = writeln!(out, "# HELP telemt_uptime_seconds Proxy uptime"); let _ = writeln!(out, "# TYPE telemt_uptime_seconds gauge"); let _ = writeln!(out, "telemt_uptime_seconds {:.1}", stats.uptime_secs()); + let _ = writeln!(out, "# HELP telemt_telemetry_core_enabled Runtime core telemetry switch"); + let _ = writeln!(out, "# TYPE telemt_telemetry_core_enabled gauge"); + let _ = writeln!( + out, + "telemt_telemetry_core_enabled {}", + if core_enabled { 1 } else { 0 } + ); + + let _ = writeln!(out, "# HELP telemt_telemetry_user_enabled Runtime per-user telemetry switch"); + let _ = writeln!(out, "# TYPE telemt_telemetry_user_enabled gauge"); + let _ = writeln!( + out, + "telemt_telemetry_user_enabled {}", + if user_enabled { 1 } else { 0 } + ); + + let _ = writeln!(out, "# HELP telemt_telemetry_me_level Runtime ME telemetry level flag"); + let _ = writeln!(out, "# TYPE telemt_telemetry_me_level gauge"); + let _ = writeln!( + out, + "telemt_telemetry_me_level{{level=\"silent\"}} {}", + if matches!(telemetry.me_level, crate::config::MeTelemetryLevel::Silent) { + 1 + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_telemetry_me_level{{level=\"normal\"}} {}", + if matches!(telemetry.me_level, crate::config::MeTelemetryLevel::Normal) { + 1 + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_telemetry_me_level{{level=\"debug\"}} {}", + if matches!(telemetry.me_level, crate::config::MeTelemetryLevel::Debug) { + 1 + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_connections_total Total accepted connections"); let _ = writeln!(out, "# TYPE telemt_connections_total counter"); - let _ = writeln!(out, "telemt_connections_total {}", stats.get_connects_all()); + let _ = writeln!( + out, + "telemt_connections_total {}", + if core_enabled { stats.get_connects_all() } else { 0 } + ); let _ = writeln!(out, "# HELP telemt_connections_bad_total Bad/rejected connections"); let _ = writeln!(out, "# TYPE telemt_connections_bad_total counter"); - let _ = writeln!(out, "telemt_connections_bad_total {}", stats.get_connects_bad()); + let _ = writeln!( + out, + "telemt_connections_bad_total {}", + if core_enabled { stats.get_connects_bad() } else { 0 } + ); let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts"); let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); - let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts()); + let _ = writeln!( + out, + "telemt_handshake_timeouts_total {}", + if core_enabled { + stats.get_handshake_timeouts() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent"); let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter"); - let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent()); + let _ = writeln!( + out, + "telemt_me_keepalive_sent_total {}", + if me_allows_debug { + stats.get_me_keepalive_sent() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures"); let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter"); - let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed()); + let _ = writeln!( + out, + "telemt_me_keepalive_failed_total {}", + if me_allows_normal { + stats.get_me_keepalive_failed() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_keepalive_pong_total ME keepalive pong replies"); let _ = writeln!(out, "# TYPE telemt_me_keepalive_pong_total counter"); - let _ = writeln!(out, "telemt_me_keepalive_pong_total {}", stats.get_me_keepalive_pong()); + let _ = writeln!( + out, + "telemt_me_keepalive_pong_total {}", + if me_allows_debug { + stats.get_me_keepalive_pong() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_keepalive_timeout_total ME keepalive ping timeouts"); let _ = writeln!(out, "# TYPE telemt_me_keepalive_timeout_total counter"); - let _ = writeln!(out, "telemt_me_keepalive_timeout_total {}", stats.get_me_keepalive_timeout()); + let _ = writeln!( + out, + "telemt_me_keepalive_timeout_total {}", + if me_allows_normal { + stats.get_me_keepalive_timeout() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts"); let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter"); - let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts()); + let _ = writeln!( + out, + "telemt_me_reconnect_attempts_total {}", + if me_allows_normal { + stats.get_me_reconnect_attempts() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes"); let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter"); - let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success()); + let _ = writeln!( + out, + "telemt_me_reconnect_success_total {}", + if me_allows_normal { + stats.get_me_reconnect_success() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_crc_mismatch_total ME CRC mismatches"); let _ = writeln!(out, "# TYPE telemt_me_crc_mismatch_total counter"); - let _ = writeln!(out, "telemt_me_crc_mismatch_total {}", stats.get_me_crc_mismatch()); + let _ = writeln!( + out, + "telemt_me_crc_mismatch_total {}", + if me_allows_normal { + stats.get_me_crc_mismatch() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_seq_mismatch_total ME sequence mismatches"); let _ = writeln!(out, "# TYPE telemt_me_seq_mismatch_total counter"); - let _ = writeln!(out, "telemt_me_seq_mismatch_total {}", stats.get_me_seq_mismatch()); + let _ = writeln!( + out, + "telemt_me_seq_mismatch_total {}", + if me_allows_normal { + stats.get_me_seq_mismatch() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_route_drop_no_conn_total ME route drops: no conn"); let _ = writeln!(out, "# TYPE telemt_me_route_drop_no_conn_total counter"); - let _ = writeln!(out, "telemt_me_route_drop_no_conn_total {}", stats.get_me_route_drop_no_conn()); + let _ = writeln!( + out, + "telemt_me_route_drop_no_conn_total {}", + if me_allows_normal { + stats.get_me_route_drop_no_conn() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_route_drop_channel_closed_total ME route drops: channel closed"); let _ = writeln!(out, "# TYPE telemt_me_route_drop_channel_closed_total counter"); - let _ = writeln!(out, "telemt_me_route_drop_channel_closed_total {}", stats.get_me_route_drop_channel_closed()); + let _ = writeln!( + out, + "telemt_me_route_drop_channel_closed_total {}", + if me_allows_normal { + stats.get_me_route_drop_channel_closed() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_me_route_drop_queue_full_total ME route drops: queue full"); let _ = writeln!(out, "# TYPE telemt_me_route_drop_queue_full_total counter"); - let _ = writeln!(out, "telemt_me_route_drop_queue_full_total {}", stats.get_me_route_drop_queue_full()); + let _ = writeln!( + out, + "telemt_me_route_drop_queue_full_total {}", + if me_allows_normal { + stats.get_me_route_drop_queue_full() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_route_drop_queue_full_profile_total ME route drops: queue full by adaptive profile" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_route_drop_queue_full_profile_total counter" + ); + let _ = writeln!( + out, + "telemt_me_route_drop_queue_full_profile_total{{profile=\"base\"}} {}", + if me_allows_normal { + stats.get_me_route_drop_queue_full_base() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_route_drop_queue_full_profile_total{{profile=\"high\"}} {}", + if me_allows_normal { + stats.get_me_route_drop_queue_full_high() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_socks_kdf_policy_total SOCKS KDF policy outcomes" + ); + let _ = writeln!(out, "# TYPE telemt_me_socks_kdf_policy_total counter"); + let _ = writeln!( + out, + "telemt_me_socks_kdf_policy_total{{policy=\"strict\",outcome=\"reject\"}} {}", + if me_allows_normal { + stats.get_me_socks_kdf_strict_reject() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_socks_kdf_policy_total{{policy=\"compat\",outcome=\"fallback\"}} {}", + if me_allows_debug { + stats.get_me_socks_kdf_compat_fallback() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths"); let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); - let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid()); + let _ = writeln!( + out, + "telemt_secure_padding_invalid_total {}", + if me_allows_normal { + stats.get_secure_padding_invalid() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_desync_total Total crypto-desync detections"); let _ = writeln!(out, "# TYPE telemt_desync_total counter"); - let _ = writeln!(out, "telemt_desync_total {}", stats.get_desync_total()); + let _ = writeln!( + out, + "telemt_desync_total {}", + if me_allows_normal { + stats.get_desync_total() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_desync_full_logged_total Full forensic desync logs emitted"); let _ = writeln!(out, "# TYPE telemt_desync_full_logged_total counter"); - let _ = writeln!(out, "telemt_desync_full_logged_total {}", stats.get_desync_full_logged()); + let _ = writeln!( + out, + "telemt_desync_full_logged_total {}", + if me_allows_normal { + stats.get_desync_full_logged() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_desync_suppressed_total Suppressed desync forensic events"); let _ = writeln!(out, "# TYPE telemt_desync_suppressed_total counter"); - let _ = writeln!(out, "telemt_desync_suppressed_total {}", stats.get_desync_suppressed()); + let _ = writeln!( + out, + "telemt_desync_suppressed_total {}", + if me_allows_normal { + stats.get_desync_suppressed() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_desync_frames_bucket_total Desync count by frames_ok bucket"); let _ = writeln!(out, "# TYPE telemt_desync_frames_bucket_total counter"); let _ = writeln!( out, "telemt_desync_frames_bucket_total{{bucket=\"0\"}} {}", - stats.get_desync_frames_bucket_0() + if me_allows_normal { + stats.get_desync_frames_bucket_0() + } else { + 0 + } ); let _ = writeln!( out, "telemt_desync_frames_bucket_total{{bucket=\"1_2\"}} {}", - stats.get_desync_frames_bucket_1_2() + if me_allows_normal { + stats.get_desync_frames_bucket_1_2() + } else { + 0 + } ); let _ = writeln!( out, "telemt_desync_frames_bucket_total{{bucket=\"3_10\"}} {}", - stats.get_desync_frames_bucket_3_10() + if me_allows_normal { + stats.get_desync_frames_bucket_3_10() + } else { + 0 + } ); let _ = writeln!( out, "telemt_desync_frames_bucket_total{{bucket=\"gt_10\"}} {}", - stats.get_desync_frames_bucket_gt_10() + if me_allows_normal { + stats.get_desync_frames_bucket_gt_10() + } else { + 0 + } ); let _ = writeln!(out, "# HELP telemt_pool_swap_total Successful ME pool swaps"); let _ = writeln!(out, "# TYPE telemt_pool_swap_total counter"); - let _ = writeln!(out, "telemt_pool_swap_total {}", stats.get_pool_swap_total()); + let _ = writeln!( + out, + "telemt_pool_swap_total {}", + if me_allows_debug { + stats.get_pool_swap_total() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_pool_drain_active Active draining ME writers"); let _ = writeln!(out, "# TYPE telemt_pool_drain_active gauge"); - let _ = writeln!(out, "telemt_pool_drain_active {}", stats.get_pool_drain_active()); + let _ = writeln!( + out, + "telemt_pool_drain_active {}", + if me_allows_debug { + stats.get_pool_drain_active() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_pool_force_close_total Forced close events for draining writers"); let _ = writeln!(out, "# TYPE telemt_pool_force_close_total counter"); let _ = writeln!( out, "telemt_pool_force_close_total {}", - stats.get_pool_force_close_total() + if me_allows_normal { + stats.get_pool_force_close_total() + } else { + 0 + } ); let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds"); @@ -239,7 +513,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_pool_stale_pick_total {}", - stats.get_pool_stale_pick_total() + if me_allows_normal { + stats.get_pool_stale_pick_total() + } else { + 0 + } ); let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); @@ -247,7 +525,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_writer_removed_total {}", - stats.get_me_writer_removed_total() + if me_allows_debug { + stats.get_me_writer_removed_total() + } else { + 0 + } ); let _ = writeln!( @@ -258,7 +540,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_writer_removed_unexpected_total {}", - stats.get_me_writer_removed_unexpected_total() + if me_allows_normal { + stats.get_me_writer_removed_unexpected_total() + } else { + 0 + } ); let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started"); @@ -266,7 +552,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_refill_triggered_total {}", - stats.get_me_refill_triggered_total() + if me_allows_debug { + stats.get_me_refill_triggered_total() + } else { + 0 + } ); let _ = writeln!( @@ -277,7 +567,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_refill_skipped_inflight_total {}", - stats.get_me_refill_skipped_inflight_total() + if me_allows_debug { + stats.get_me_refill_skipped_inflight_total() + } else { + 0 + } ); let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures"); @@ -285,7 +579,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_refill_failed_total {}", - stats.get_me_refill_failed_total() + if me_allows_normal { + stats.get_me_refill_failed_total() + } else { + 0 + } ); let _ = writeln!( @@ -296,7 +594,11 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_writer_restored_same_endpoint_total {}", - stats.get_me_writer_restored_same_endpoint_total() + if me_allows_normal { + stats.get_me_writer_restored_same_endpoint_total() + } else { + 0 + } ); let _ = writeln!( @@ -307,16 +609,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!( out, "telemt_me_writer_restored_fallback_total {}", - stats.get_me_writer_restored_fallback_total() + if me_allows_normal { + stats.get_me_writer_restored_fallback_total() + } else { + 0 + } ); - let unresolved_writer_losses = stats - .get_me_writer_removed_unexpected_total() - .saturating_sub( - stats - .get_me_writer_restored_same_endpoint_total() - .saturating_add(stats.get_me_writer_restored_fallback_total()), - ); + let unresolved_writer_losses = if me_allows_normal { + stats + .get_me_writer_removed_unexpected_total() + .saturating_sub( + stats + .get_me_writer_restored_same_endpoint_total() + .saturating_add(stats.get_me_writer_restored_fallback_total()), + ) + } else { + 0 + }; let _ = writeln!( out, "# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore" @@ -343,51 +653,63 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp let _ = writeln!(out, "# TYPE telemt_user_msgs_from_client counter"); let _ = writeln!(out, "# HELP telemt_user_msgs_to_client Per-user messages sent"); let _ = writeln!(out, "# TYPE telemt_user_msgs_to_client counter"); + let _ = writeln!( + out, + "# HELP telemt_telemetry_user_series_suppressed User-labeled metric series suppression flag" + ); + let _ = writeln!(out, "# TYPE telemt_telemetry_user_series_suppressed gauge"); + let _ = writeln!( + out, + "telemt_telemetry_user_series_suppressed {}", + if user_enabled { 0 } else { 1 } + ); - for entry in stats.iter_user_stats() { - let user = entry.key(); - let s = entry.value(); - let _ = writeln!(out, "telemt_user_connections_total{{user=\"{}\"}} {}", user, s.connects.load(std::sync::atomic::Ordering::Relaxed)); - let _ = writeln!(out, "telemt_user_connections_current{{user=\"{}\"}} {}", user, s.curr_connects.load(std::sync::atomic::Ordering::Relaxed)); - let _ = writeln!(out, "telemt_user_octets_from_client{{user=\"{}\"}} {}", user, s.octets_from_client.load(std::sync::atomic::Ordering::Relaxed)); - let _ = writeln!(out, "telemt_user_octets_to_client{{user=\"{}\"}} {}", user, s.octets_to_client.load(std::sync::atomic::Ordering::Relaxed)); - let _ = writeln!(out, "telemt_user_msgs_from_client{{user=\"{}\"}} {}", user, s.msgs_from_client.load(std::sync::atomic::Ordering::Relaxed)); - let _ = writeln!(out, "telemt_user_msgs_to_client{{user=\"{}\"}} {}", user, s.msgs_to_client.load(std::sync::atomic::Ordering::Relaxed)); - } + if user_enabled { + for entry in stats.iter_user_stats() { + let user = entry.key(); + let s = entry.value(); + let _ = writeln!(out, "telemt_user_connections_total{{user=\"{}\"}} {}", user, s.connects.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_connections_current{{user=\"{}\"}} {}", user, s.curr_connects.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_octets_from_client{{user=\"{}\"}} {}", user, s.octets_from_client.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_octets_to_client{{user=\"{}\"}} {}", user, s.octets_to_client.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_msgs_from_client{{user=\"{}\"}} {}", user, s.msgs_from_client.load(std::sync::atomic::Ordering::Relaxed)); + let _ = writeln!(out, "telemt_user_msgs_to_client{{user=\"{}\"}} {}", user, s.msgs_to_client.load(std::sync::atomic::Ordering::Relaxed)); + } - let ip_stats = ip_tracker.get_stats().await; - let ip_counts: HashMap = ip_stats - .into_iter() - .map(|(user, count, _)| (user, count)) - .collect(); + let ip_stats = ip_tracker.get_stats().await; + let ip_counts: HashMap = ip_stats + .into_iter() + .map(|(user, count, _)| (user, count)) + .collect(); - let mut unique_users = BTreeSet::new(); - unique_users.extend(config.access.user_max_unique_ips.keys().cloned()); - unique_users.extend(ip_counts.keys().cloned()); + let mut unique_users = BTreeSet::new(); + unique_users.extend(config.access.user_max_unique_ips.keys().cloned()); + unique_users.extend(ip_counts.keys().cloned()); - let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs"); - let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge"); - let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); - let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); - let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); - let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); + let _ = writeln!(out, "# HELP telemt_user_unique_ips_current Per-user current number of unique active IPs"); + let _ = writeln!(out, "# TYPE telemt_user_unique_ips_current gauge"); + let _ = writeln!(out, "# HELP telemt_user_unique_ips_limit Per-user configured unique IP limit (0 means unlimited)"); + let _ = writeln!(out, "# TYPE telemt_user_unique_ips_limit gauge"); + let _ = writeln!(out, "# HELP telemt_user_unique_ips_utilization Per-user unique IP usage ratio (0 for unlimited)"); + let _ = writeln!(out, "# TYPE telemt_user_unique_ips_utilization gauge"); - for user in unique_users { - let current = ip_counts.get(&user).copied().unwrap_or(0); - let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0); - let utilization = if limit > 0 { - current as f64 / limit as f64 - } else { - 0.0 - }; - let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current); - let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit); - let _ = writeln!( - out, - "telemt_user_unique_ips_utilization{{user=\"{}\"}} {:.6}", - user, - utilization - ); + for user in unique_users { + let current = ip_counts.get(&user).copied().unwrap_or(0); + let limit = config.access.user_max_unique_ips.get(&user).copied().unwrap_or(0); + let utilization = if limit > 0 { + current as f64 / limit as f64 + } else { + 0.0 + }; + let _ = writeln!(out, "telemt_user_unique_ips_current{{user=\"{}\"}} {}", user, current); + let _ = writeln!(out, "telemt_user_unique_ips_limit{{user=\"{}\"}} {}", user, limit); + let _ = writeln!( + out, + "telemt_user_unique_ips_utilization{{user=\"{}\"}} {:.6}", + user, + utilization + ); + } } out diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 1e32bb7..f5aa2b7 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -3,8 +3,9 @@ #![allow(dead_code)] pub mod beobachten; +pub mod telemetry; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering}; use std::time::{Instant, Duration}; use dashmap::DashMap; use parking_lot::Mutex; @@ -15,6 +16,9 @@ use std::collections::hash_map::DefaultHasher; use std::collections::VecDeque; use tracing::debug; +use crate::config::MeTelemetryLevel; +use self::telemetry::TelemetryPolicy; + // ============= Stats ============= #[derive(Default)] @@ -33,6 +37,10 @@ pub struct Stats { me_route_drop_no_conn: AtomicU64, me_route_drop_channel_closed: AtomicU64, me_route_drop_queue_full: AtomicU64, + me_route_drop_queue_full_base: AtomicU64, + me_route_drop_queue_full_high: AtomicU64, + me_socks_kdf_strict_reject: AtomicU64, + me_socks_kdf_compat_fallback: AtomicU64, secure_padding_invalid: AtomicU64, desync_total: AtomicU64, desync_full_logged: AtomicU64, @@ -52,6 +60,9 @@ pub struct Stats { me_refill_failed_total: AtomicU64, me_writer_restored_same_endpoint_total: AtomicU64, me_writer_restored_fallback_total: AtomicU64, + telemetry_core_enabled: AtomicBool, + telemetry_user_enabled: AtomicBool, + telemetry_me_level: AtomicU8, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -69,44 +80,167 @@ pub struct UserStats { impl Stats { pub fn new() -> Self { let stats = Self::default(); + stats.apply_telemetry_policy(TelemetryPolicy::default()); *stats.start_time.write() = Some(Instant::now()); stats } - - pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); } - pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); } - pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_keepalive_pong(&self) { self.me_keepalive_pong.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_keepalive_timeout(&self) { self.me_keepalive_timeout.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_keepalive_timeout_by(&self, value: u64) { - self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed); + + fn telemetry_me_level(&self) -> MeTelemetryLevel { + MeTelemetryLevel::from_u8(self.telemetry_me_level.load(Ordering::Relaxed)) + } + + fn telemetry_core_enabled(&self) -> bool { + self.telemetry_core_enabled.load(Ordering::Relaxed) + } + + fn telemetry_user_enabled(&self) -> bool { + self.telemetry_user_enabled.load(Ordering::Relaxed) + } + + fn telemetry_me_allows_normal(&self) -> bool { + self.telemetry_me_level().allows_normal() + } + + fn telemetry_me_allows_debug(&self) -> bool { + self.telemetry_me_level().allows_debug() + } + + pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) { + self.telemetry_core_enabled + .store(policy.core_enabled, Ordering::Relaxed); + self.telemetry_user_enabled + .store(policy.user_enabled, Ordering::Relaxed); + self.telemetry_me_level + .store(policy.me_level.as_u8(), Ordering::Relaxed); + } + + pub fn telemetry_policy(&self) -> TelemetryPolicy { + TelemetryPolicy { + core_enabled: self.telemetry_core_enabled(), + user_enabled: self.telemetry_user_enabled(), + me_level: self.telemetry_me_level(), + } + } + + pub fn increment_connects_all(&self) { + if self.telemetry_core_enabled() { + self.connects_all.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_connects_bad(&self) { + if self.telemetry_core_enabled() { + self.connects_bad.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_handshake_timeouts(&self) { + if self.telemetry_core_enabled() { + self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_keepalive_sent(&self) { + if self.telemetry_me_allows_debug() { + self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_keepalive_failed(&self) { + if self.telemetry_me_allows_normal() { + self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_keepalive_pong(&self) { + if self.telemetry_me_allows_debug() { + self.me_keepalive_pong.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_keepalive_timeout(&self) { + if self.telemetry_me_allows_normal() { + self.me_keepalive_timeout.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_keepalive_timeout_by(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_keepalive_timeout.fetch_add(value, Ordering::Relaxed); + } + } + pub fn increment_me_reconnect_attempt(&self) { + if self.telemetry_me_allows_normal() { + self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_reconnect_success(&self) { + if self.telemetry_me_allows_normal() { + self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_crc_mismatch(&self) { + if self.telemetry_me_allows_normal() { + self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_seq_mismatch(&self) { + if self.telemetry_me_allows_normal() { + self.me_seq_mismatch.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_route_drop_no_conn(&self) { + if self.telemetry_me_allows_normal() { + self.me_route_drop_no_conn.fetch_add(1, Ordering::Relaxed); + } } - pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_crc_mismatch(&self) { self.me_crc_mismatch.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_seq_mismatch(&self) { self.me_seq_mismatch.fetch_add(1, Ordering::Relaxed); } - pub fn increment_me_route_drop_no_conn(&self) { self.me_route_drop_no_conn.fetch_add(1, Ordering::Relaxed); } pub fn increment_me_route_drop_channel_closed(&self) { - self.me_route_drop_channel_closed.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.me_route_drop_channel_closed.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_route_drop_queue_full(&self) { - self.me_route_drop_queue_full.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.me_route_drop_queue_full.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_route_drop_queue_full_base(&self) { + if self.telemetry_me_allows_normal() { + self.me_route_drop_queue_full_base.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_route_drop_queue_full_high(&self) { + if self.telemetry_me_allows_normal() { + self.me_route_drop_queue_full_high.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_socks_kdf_strict_reject(&self) { + if self.telemetry_me_allows_normal() { + self.me_socks_kdf_strict_reject.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_socks_kdf_compat_fallback(&self) { + if self.telemetry_me_allows_debug() { + self.me_socks_kdf_compat_fallback.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_secure_padding_invalid(&self) { - self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_desync_total(&self) { - self.desync_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.desync_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_desync_full_logged(&self) { - self.desync_full_logged.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.desync_full_logged.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_desync_suppressed(&self) { - self.desync_suppressed.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.desync_suppressed.fetch_add(1, Ordering::Relaxed); + } } pub fn observe_desync_frames_ok(&self, frames_ok: u64) { + if !self.telemetry_me_allows_normal() { + return; + } match frames_ok { 0 => { self.desync_frames_bucket_0.fetch_add(1, Ordering::Relaxed); @@ -123,12 +257,19 @@ impl Stats { } } pub fn increment_pool_swap_total(&self) { - self.pool_swap_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_debug() { + self.pool_swap_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_pool_drain_active(&self) { - self.pool_drain_active.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_debug() { + self.pool_drain_active.fetch_add(1, Ordering::Relaxed); + } } pub fn decrement_pool_drain_active(&self) { + if !self.telemetry_me_allows_debug() { + return; + } let mut current = self.pool_drain_active.load(Ordering::Relaxed); loop { if current == 0 { @@ -146,31 +287,51 @@ impl Stats { } } pub fn increment_pool_force_close_total(&self) { - self.pool_force_close_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.pool_force_close_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_pool_stale_pick_total(&self) { - self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_writer_removed_total(&self) { - self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_debug() { + self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_writer_removed_unexpected_total(&self) { - self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_refill_triggered_total(&self) { - self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_debug() { + self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_refill_skipped_inflight_total(&self) { - self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_debug() { + self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_refill_failed_total(&self) { - self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_writer_restored_same_endpoint_total(&self) { - self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.me_writer_restored_same_endpoint_total + .fetch_add(1, Ordering::Relaxed); + } } pub fn increment_me_writer_restored_fallback_total(&self) { - self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed); + if self.telemetry_me_allows_normal() { + self.me_writer_restored_fallback_total + .fetch_add(1, Ordering::Relaxed); + } } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } @@ -189,6 +350,18 @@ impl Stats { pub fn get_me_route_drop_queue_full(&self) -> u64 { self.me_route_drop_queue_full.load(Ordering::Relaxed) } + pub fn get_me_route_drop_queue_full_base(&self) -> u64 { + self.me_route_drop_queue_full_base.load(Ordering::Relaxed) + } + pub fn get_me_route_drop_queue_full_high(&self) -> u64 { + self.me_route_drop_queue_full_high.load(Ordering::Relaxed) + } + pub fn get_me_socks_kdf_strict_reject(&self) -> u64 { + self.me_socks_kdf_strict_reject.load(Ordering::Relaxed) + } + pub fn get_me_socks_kdf_compat_fallback(&self) -> u64 { + self.me_socks_kdf_compat_fallback.load(Ordering::Relaxed) + } pub fn get_secure_padding_invalid(&self) -> u64 { self.secure_padding_invalid.load(Ordering::Relaxed) } @@ -248,11 +421,17 @@ impl Stats { } pub fn increment_user_connects(&self, user: &str) { + if !self.telemetry_user_enabled() { + return; + } self.user_stats.entry(user.to_string()).or_default() .connects.fetch_add(1, Ordering::Relaxed); } pub fn increment_user_curr_connects(&self, user: &str) { + if !self.telemetry_user_enabled() { + return; + } self.user_stats.entry(user.to_string()).or_default() .curr_connects.fetch_add(1, Ordering::Relaxed); } @@ -285,21 +464,33 @@ impl Stats { } pub fn add_user_octets_from(&self, user: &str, bytes: u64) { + if !self.telemetry_user_enabled() { + return; + } self.user_stats.entry(user.to_string()).or_default() .octets_from_client.fetch_add(bytes, Ordering::Relaxed); } pub fn add_user_octets_to(&self, user: &str, bytes: u64) { + if !self.telemetry_user_enabled() { + return; + } self.user_stats.entry(user.to_string()).or_default() .octets_to_client.fetch_add(bytes, Ordering::Relaxed); } pub fn increment_user_msgs_from(&self, user: &str) { + if !self.telemetry_user_enabled() { + return; + } self.user_stats.entry(user.to_string()).or_default() .msgs_from_client.fetch_add(1, Ordering::Relaxed); } pub fn increment_user_msgs_to(&self, user: &str) { + if !self.telemetry_user_enabled() { + return; + } self.user_stats.entry(user.to_string()).or_default() .msgs_to_client.fetch_add(1, Ordering::Relaxed); } @@ -548,6 +739,7 @@ impl ReplayStats { #[cfg(test)] mod tests { use super::*; + use crate::config::MeTelemetryLevel; use std::sync::Arc; #[test] @@ -558,6 +750,40 @@ mod tests { stats.increment_connects_all(); assert_eq!(stats.get_connects_all(), 3); } + + #[test] + fn test_telemetry_policy_disables_core_and_user_counters() { + let stats = Stats::new(); + stats.apply_telemetry_policy(TelemetryPolicy { + core_enabled: false, + user_enabled: false, + me_level: MeTelemetryLevel::Normal, + }); + + stats.increment_connects_all(); + stats.increment_user_connects("alice"); + stats.add_user_octets_from("alice", 1024); + assert_eq!(stats.get_connects_all(), 0); + assert_eq!(stats.get_user_curr_connects("alice"), 0); + assert_eq!(stats.get_user_total_octets("alice"), 0); + } + + #[test] + fn test_telemetry_policy_me_silent_blocks_me_counters() { + let stats = Stats::new(); + stats.apply_telemetry_policy(TelemetryPolicy { + core_enabled: true, + user_enabled: true, + me_level: MeTelemetryLevel::Silent, + }); + + stats.increment_me_crc_mismatch(); + stats.increment_me_keepalive_sent(); + stats.increment_me_route_drop_queue_full(); + assert_eq!(stats.get_me_crc_mismatch(), 0); + assert_eq!(stats.get_me_keepalive_sent(), 0); + assert_eq!(stats.get_me_route_drop_queue_full(), 0); + } #[test] fn test_replay_checker_basic() { diff --git a/src/stats/telemetry.rs b/src/stats/telemetry.rs new file mode 100644 index 0000000..e29fa44 --- /dev/null +++ b/src/stats/telemetry.rs @@ -0,0 +1,29 @@ +use crate::config::{MeTelemetryLevel, TelemetryConfig}; + +/// Runtime telemetry policy used by hot-path counters. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TelemetryPolicy { + pub core_enabled: bool, + pub user_enabled: bool, + pub me_level: MeTelemetryLevel, +} + +impl Default for TelemetryPolicy { + fn default() -> Self { + Self { + core_enabled: true, + user_enabled: true, + me_level: MeTelemetryLevel::Normal, + } + } +} + +impl TelemetryPolicy { + pub fn from_config(cfg: &TelemetryConfig) -> Self { + Self { + core_enabled: cfg.core_enabled, + user_enabled: cfg.user_enabled, + me_level: cfg.me_level, + } + } +} diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index 988834a..384ecc9 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -14,6 +14,7 @@ use tokio::net::{TcpStream, TcpSocket}; use tokio::time::timeout; use tracing::{debug, info, warn}; +use crate::config::MeSocksKdfPolicy; use crate::crypto::{SecureRandom, build_middleproxy_prekey, derive_middleproxy_keys, sha256}; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; @@ -117,6 +118,13 @@ impl MePool { Some(bound) } + fn is_socks_route(upstream_egress: Option) -> bool { + matches!( + upstream_egress.map(|info| info.route_kind), + Some(UpstreamRouteKind::Socks4 | UpstreamRouteKind::Socks5) + ) + } + /// TCP connect with timeout + return RTT in milliseconds. pub(crate) async fn connect_tcp( &self, @@ -125,14 +133,7 @@ impl MePool { let start = Instant::now(); let (stream, upstream_egress) = if let Some(upstream) = &self.upstream { let dc_idx = self.resolve_dc_idx_for_endpoint(addr).await; - let (stream, egress) = timeout( - Duration::from_secs(ME_CONNECT_TIMEOUT_SECS), - upstream.connect_with_details(addr, dc_idx, None), - ) - .await - .map_err(|_| ProxyError::ConnectionTimeout { - addr: addr.to_string(), - })??; + let (stream, egress) = upstream.connect_with_details(addr, dc_idx, None).await?; (stream, Some(egress)) } else { let connect_fut = async { @@ -226,9 +227,29 @@ impl MePool { } else { IpFamily::V6 }; + let is_socks_route = Self::is_socks_route(upstream_egress); let socks_bound_addr = Self::select_socks_bound_addr(family, upstream_egress); let reflected = if let Some(bound) = socks_bound_addr { Some(bound) + } else if is_socks_route { + match self.socks_kdf_policy() { + MeSocksKdfPolicy::Strict => { + self.stats.increment_me_socks_kdf_strict_reject(); + return Err(ProxyError::InvalidHandshake( + "SOCKS route returned no valid BND.ADDR for ME KDF (strict policy)" + .to_string(), + )); + } + MeSocksKdfPolicy::Compat => { + self.stats.increment_me_socks_kdf_compat_fallback(); + if self.nat_probe { + let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress); + self.maybe_reflect_public_addr(family, bind_ip).await + } else { + None + } + } + } } else if self.nat_probe { let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress); self.maybe_reflect_public_addr(family, bind_ip).await diff --git a/src/transport/middle_proxy/ping.rs b/src/transport/middle_proxy/ping.rs index e90d98f..a05e44d 100644 --- a/src/transport/middle_proxy/ping.rs +++ b/src/transport/middle_proxy/ping.rs @@ -7,6 +7,7 @@ use tokio::net::UdpSocket; use crate::config::{UpstreamConfig, UpstreamType}; use crate::crypto::SecureRandom; use crate::error::ProxyError; +use crate::transport::{UpstreamEgressInfo, UpstreamRouteKind}; use super::MePool; @@ -20,6 +21,7 @@ pub enum MePingFamily { pub struct MePingSample { pub dc: i32, pub addr: SocketAddr, + pub route: Option, pub connect_ms: Option, pub handshake_ms: Option, pub error: Option, @@ -84,6 +86,34 @@ fn pick_target_for_family(reports: &[MePingReport], family: MePingFamily) -> Opt }) } +fn route_from_egress(egress: Option) -> Option { + let info = egress?; + match info.route_kind { + UpstreamRouteKind::Direct => { + let src_ip = info + .direct_bind_ip + .or_else(|| info.local_addr.map(|addr| addr.ip())); + let ip = src_ip?; + let mut parts = Vec::new(); + if let Some(dev) = detect_interface_for_ip(ip) { + parts.push(format!("dev={dev}")); + } + parts.push(format!("src={ip}")); + Some(format!("direct {}", parts.join(" "))) + } + UpstreamRouteKind::Socks4 => Some( + info.socks_bound_addr + .map(|addr| format!("socks4 bnd={addr}")) + .unwrap_or_else(|| "socks4".to_string()), + ), + UpstreamRouteKind::Socks5 => Some( + info.socks_bound_addr + .map(|addr| format!("socks5 bnd={addr}")) + .unwrap_or_else(|| "socks5".to_string()), + ), + } +} + #[cfg(unix)] fn detect_interface_for_ip(ip: IpAddr) -> Option { use nix::ifaddrs::getifaddrs; @@ -160,6 +190,15 @@ pub async fn format_me_route( v4_ok: bool, v6_ok: bool, ) -> String { + if let Some(route) = reports + .iter() + .flat_map(|report| report.samples.iter()) + .find(|sample| sample.error.is_none() && sample.handshake_ms.is_some()) + .and_then(|sample| sample.route.clone()) + { + return route; + } + let enabled_upstreams: Vec<_> = upstreams.iter().filter(|u| u.enabled).collect(); if enabled_upstreams.is_empty() { return detect_direct_route_details(reports, prefer_ipv6, v4_ok, v6_ok) @@ -222,6 +261,7 @@ mod tests { let s = sample(MePingSample { dc: 4, addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 8888), + route: Some("direct src=1.2.3.4".to_string()), connect_ms: Some(12.3), handshake_ms: Some(34.7), error: None, @@ -238,6 +278,7 @@ mod tests { let s = sample(MePingSample { dc: -5, addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(5, 6, 7, 8)), 80), + route: Some("socks5".to_string()), connect_ms: Some(10.0), handshake_ms: None, error: Some("handshake timeout".to_string()), @@ -278,10 +319,12 @@ pub async fn run_me_ping(pool: &Arc, rng: &SecureRandom) -> Vec { connect_ms = Some(conn_rtt); + route = route_from_egress(upstream_egress); match pool.handshake_only(stream, addr, upstream_egress, rng).await { Ok(hs) => { handshake_ms = Some(hs.handshake_ms); @@ -302,6 +345,7 @@ pub async fn run_me_ping(pool: &Arc, rng: &SecureRandom) -> Vec Arc { + let registry = Arc::new(ConnRegistry::new()); + registry.update_route_backpressure_policy( + me_route_backpressure_base_timeout_ms, + me_route_backpressure_high_timeout_ms, + me_route_backpressure_high_watermark_pct, + ); Arc::new(Self { - registry: Arc::new(ConnRegistry::new()), + registry, writers: Arc::new(RwLock::new(Vec::new())), rr: AtomicU64::new(0), decision, @@ -204,6 +216,7 @@ impl MePool { me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new( me_hardswap_warmup_pass_backoff_base_ms, ), + me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), }) } @@ -260,6 +273,26 @@ impl MePool { &self.registry } + pub fn update_runtime_transport_policy( + &self, + socks_kdf_policy: MeSocksKdfPolicy, + route_backpressure_base_timeout_ms: u64, + route_backpressure_high_timeout_ms: u64, + route_backpressure_high_watermark_pct: u8, + ) { + self.me_socks_kdf_policy + .store(socks_kdf_policy.as_u8(), Ordering::Relaxed); + self.registry.update_route_backpressure_policy( + route_backpressure_base_timeout_ms, + route_backpressure_high_timeout_ms, + route_backpressure_high_watermark_pct, + ); + } + + pub(super) fn socks_kdf_policy(&self) -> MeSocksKdfPolicy { + MeSocksKdfPolicy::from_u8(self.me_socks_kdf_policy.load(Ordering::Relaxed)) + } + pub(super) fn writers_arc(&self) -> Arc>> { self.writers.clone() } diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 95bd0d8..ea0dd75 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -124,7 +124,14 @@ pub(crate) async fn reader_loop( match routed { RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(), - RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(), + RouteResult::QueueFullBase => { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_base(); + } + RouteResult::QueueFullHigh => { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_high(); + } RouteResult::Routed => {} } reg.unregister(cid).await; @@ -140,7 +147,14 @@ pub(crate) async fn reader_loop( match routed { RouteResult::NoConn => stats.increment_me_route_drop_no_conn(), RouteResult::ChannelClosed => stats.increment_me_route_drop_channel_closed(), - RouteResult::QueueFull => stats.increment_me_route_drop_queue_full(), + RouteResult::QueueFullBase => { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_base(); + } + RouteResult::QueueFullHigh => { + stats.increment_me_route_drop_queue_full(); + stats.increment_me_route_drop_queue_full_high(); + } RouteResult::Routed => {} } reg.unregister(cid).await; diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 2122ed8..223fa71 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU8, AtomicU64, Ordering}; use std::time::Duration; use tokio::sync::{mpsc, RwLock}; @@ -10,14 +10,17 @@ use super::codec::WriterCommand; use super::MeResponse; const ROUTE_CHANNEL_CAPACITY: usize = 4096; -const ROUTE_BACKPRESSURE_TIMEOUT: Duration = Duration::from_millis(25); +const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25; +const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120; +const ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT: u8 = 80; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RouteResult { Routed, NoConn, ChannelClosed, - QueueFull, + QueueFullBase, + QueueFullHigh, } #[derive(Clone)] @@ -65,6 +68,9 @@ impl RegistryInner { pub struct ConnRegistry { inner: RwLock, next_id: AtomicU64, + route_backpressure_base_timeout_ms: AtomicU64, + route_backpressure_high_timeout_ms: AtomicU64, + route_backpressure_high_watermark_pct: AtomicU8, } impl ConnRegistry { @@ -73,9 +79,35 @@ impl ConnRegistry { Self { inner: RwLock::new(RegistryInner::new()), next_id: AtomicU64::new(start), + route_backpressure_base_timeout_ms: AtomicU64::new( + ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS, + ), + route_backpressure_high_timeout_ms: AtomicU64::new( + ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS, + ), + route_backpressure_high_watermark_pct: AtomicU8::new( + ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT, + ), } } + pub fn update_route_backpressure_policy( + &self, + base_timeout_ms: u64, + high_timeout_ms: u64, + high_watermark_pct: u8, + ) { + let base = base_timeout_ms.max(1); + let high = high_timeout_ms.max(base); + let watermark = high_watermark_pct.clamp(1, 100); + self.route_backpressure_base_timeout_ms + .store(base, Ordering::Relaxed); + self.route_backpressure_high_timeout_ms + .store(high, Ordering::Relaxed); + self.route_backpressure_high_watermark_pct + .store(watermark, Ordering::Relaxed); + } + pub async fn register(&self) -> (u64, mpsc::Receiver) { let id = self.next_id.fetch_add(1, Ordering::Relaxed); let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY); @@ -112,10 +144,40 @@ impl ConnRegistry { Err(TrySendError::Closed(_)) => RouteResult::ChannelClosed, Err(TrySendError::Full(resp)) => { // Absorb short bursts without dropping/closing the session immediately. - match tokio::time::timeout(ROUTE_BACKPRESSURE_TIMEOUT, tx.send(resp)).await { + let base_timeout_ms = + self.route_backpressure_base_timeout_ms.load(Ordering::Relaxed).max(1); + let high_timeout_ms = self + .route_backpressure_high_timeout_ms + .load(Ordering::Relaxed) + .max(base_timeout_ms); + let high_watermark_pct = self + .route_backpressure_high_watermark_pct + .load(Ordering::Relaxed) + .clamp(1, 100); + let used = ROUTE_CHANNEL_CAPACITY.saturating_sub(tx.capacity()); + let used_pct = if ROUTE_CHANNEL_CAPACITY == 0 { + 100 + } else { + (used.saturating_mul(100) / ROUTE_CHANNEL_CAPACITY) as u8 + }; + let high_profile = used_pct >= high_watermark_pct; + let timeout_ms = if high_profile { + high_timeout_ms + } else { + base_timeout_ms + }; + let timeout_dur = Duration::from_millis(timeout_ms); + + match tokio::time::timeout(timeout_dur, tx.send(resp)).await { Ok(Ok(())) => RouteResult::Routed, Ok(Err(_)) => RouteResult::ChannelClosed, - Err(_) => RouteResult::QueueFull, + Err(_) => { + if high_profile { + RouteResult::QueueFullHigh + } else { + RouteResult::QueueFullBase + } + } } } }