From b950987229203c885c57d29a4d5b640b3bef38ae Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 9 Mar 2026 20:35:31 +0300 Subject: [PATCH] ME Selftest --- src/api/mod.rs | 7 + src/api/runtime_selftest.rs | 228 +++++++++++++++++++ src/network/probe.rs | 8 + src/transport/middle_proxy/config_updater.rs | 2 + src/transport/middle_proxy/handshake.rs | 29 +++ src/transport/middle_proxy/mod.rs | 2 + src/transport/middle_proxy/secret.rs | 2 + src/transport/middle_proxy/selftest.rs | 194 ++++++++++++++++ 8 files changed, 472 insertions(+) create mode 100644 src/api/runtime_selftest.rs create mode 100644 src/transport/middle_proxy/selftest.rs diff --git a/src/api/mod.rs b/src/api/mod.rs index 63fafad..0a51231 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -28,6 +28,7 @@ mod model; mod runtime_edge; mod runtime_init; mod runtime_min; +mod runtime_selftest; mod runtime_stats; mod runtime_watch; mod runtime_zero; @@ -48,6 +49,7 @@ use runtime_min::{ build_runtime_me_pool_state_data, build_runtime_me_quality_data, build_runtime_nat_stun_data, build_runtime_upstream_quality_data, build_security_whitelist_data, }; +use runtime_selftest::build_runtime_me_selftest_data; use runtime_stats::{ MinimalCacheEntry, build_dcs_data, build_me_writers_data, build_minimal_all_data, build_upstreams_data, build_zero_all_data, @@ -333,6 +335,11 @@ async fn handle( let data = build_runtime_nat_stun_data(shared.as_ref()).await; Ok(success_response(StatusCode::OK, data, revision)) } + ("GET", "/v1/runtime/me-selftest") => { + let revision = current_revision(&shared.config_path).await?; + let data = build_runtime_me_selftest_data(shared.as_ref()).await; + Ok(success_response(StatusCode::OK, data, revision)) + } ("GET", "/v1/runtime/connections/summary") => { let revision = current_revision(&shared.config_path).await?; let data = build_runtime_connections_summary_data(shared.as_ref(), cfg.as_ref()).await; diff --git a/src/api/runtime_selftest.rs b/src/api/runtime_selftest.rs new file mode 100644 index 0000000..da591b2 --- /dev/null +++ b/src/api/runtime_selftest.rs @@ -0,0 +1,228 @@ +use std::net::IpAddr; +use std::sync::{Mutex, OnceLock}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::Serialize; + +use crate::network::probe::{detect_interface_ipv4, detect_interface_ipv6, is_bogon}; +use crate::transport::middle_proxy::{bnd_snapshot, timeskew_snapshot}; + +use super::ApiShared; + +const SOURCE_UNAVAILABLE_REASON: &str = "source_unavailable"; +const KDF_EWMA_TAU_SECS: f64 = 600.0; +const KDF_EWMA_THRESHOLD_ERRORS_PER_MIN: f64 = 0.30; +const TIMESKEW_THRESHOLD_SECS: u64 = 60; + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestKdfData { + pub(super) state: &'static str, + pub(super) ewma_errors_per_min: f64, + pub(super) threshold_errors_per_min: f64, + pub(super) errors_total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestTimeskewData { + pub(super) state: &'static str, + pub(super) max_skew_secs_15m: Option, + pub(super) samples_15m: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_skew_secs: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_source: Option<&'static str>, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_seen_age_secs: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestIpFamilyData { + pub(super) addr: String, + pub(super) state: &'static str, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestIpData { + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) v4: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) v6: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestPidData { + pub(super) pid: u32, + pub(super) state: &'static str, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestBndData { + pub(super) addr_state: &'static str, + pub(super) port_state: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_addr: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) last_seen_age_secs: Option, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestPayload { + pub(super) kdf: RuntimeMeSelftestKdfData, + pub(super) timeskew: RuntimeMeSelftestTimeskewData, + pub(super) ip: RuntimeMeSelftestIpData, + pub(super) pid: RuntimeMeSelftestPidData, + pub(super) bnd: RuntimeMeSelftestBndData, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestData { + pub(super) enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) reason: Option<&'static str>, + pub(super) generated_at_epoch_secs: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) data: Option, +} + +#[derive(Default)] +struct KdfEwmaState { + initialized: bool, + last_epoch_secs: u64, + last_total_errors: u64, + ewma_errors_per_min: f64, +} + +static KDF_EWMA_STATE: OnceLock> = OnceLock::new(); + +fn kdf_ewma_state() -> &'static Mutex { + KDF_EWMA_STATE.get_or_init(|| Mutex::new(KdfEwmaState::default())) +} + +pub(super) async fn build_runtime_me_selftest_data(shared: &ApiShared) -> RuntimeMeSelftestData { + let now_epoch_secs = now_epoch_secs(); + if shared.me_pool.read().await.is_none() { + return RuntimeMeSelftestData { + enabled: false, + reason: Some(SOURCE_UNAVAILABLE_REASON), + generated_at_epoch_secs: now_epoch_secs, + data: None, + }; + } + + let kdf_errors_total = shared + .stats + .get_me_kdf_drift_total() + .saturating_add(shared.stats.get_me_socks_kdf_strict_reject()); + let kdf_ewma = update_kdf_ewma(now_epoch_secs, kdf_errors_total); + let kdf_state = if kdf_ewma >= KDF_EWMA_THRESHOLD_ERRORS_PER_MIN { + "error" + } else { + "ok" + }; + + let skew = timeskew_snapshot(); + let timeskew_state = if skew.max_skew_secs_15m.unwrap_or(0) > TIMESKEW_THRESHOLD_SECS { + "error" + } else { + "ok" + }; + + let ip_v4 = detect_interface_ipv4().map(|ip| RuntimeMeSelftestIpFamilyData { + addr: ip.to_string(), + state: classify_ip(IpAddr::V4(ip)), + }); + let ip_v6 = detect_interface_ipv6().map(|ip| RuntimeMeSelftestIpFamilyData { + addr: ip.to_string(), + state: classify_ip(IpAddr::V6(ip)), + }); + + let pid = std::process::id(); + let pid_state = if pid == 1 { "one" } else { "non-one" }; + + let bnd = bnd_snapshot(); + + RuntimeMeSelftestData { + enabled: true, + reason: None, + generated_at_epoch_secs: now_epoch_secs, + data: Some(RuntimeMeSelftestPayload { + kdf: RuntimeMeSelftestKdfData { + state: kdf_state, + ewma_errors_per_min: round3(kdf_ewma), + threshold_errors_per_min: KDF_EWMA_THRESHOLD_ERRORS_PER_MIN, + errors_total: kdf_errors_total, + }, + timeskew: RuntimeMeSelftestTimeskewData { + state: timeskew_state, + max_skew_secs_15m: skew.max_skew_secs_15m, + samples_15m: skew.samples_15m, + last_skew_secs: skew.last_skew_secs, + last_source: skew.last_source, + last_seen_age_secs: skew.last_seen_age_secs, + }, + ip: RuntimeMeSelftestIpData { + v4: ip_v4, + v6: ip_v6, + }, + pid: RuntimeMeSelftestPidData { + pid, + state: pid_state, + }, + bnd: RuntimeMeSelftestBndData { + addr_state: bnd.addr_status, + port_state: bnd.port_status, + last_addr: bnd.last_addr.map(|value| value.to_string()), + last_seen_age_secs: bnd.last_seen_age_secs, + }, + }), + } +} + +fn update_kdf_ewma(now_epoch_secs: u64, total_errors: u64) -> f64 { + let Ok(mut guard) = kdf_ewma_state().lock() else { + return 0.0; + }; + + if !guard.initialized { + guard.initialized = true; + guard.last_epoch_secs = now_epoch_secs; + guard.last_total_errors = total_errors; + guard.ewma_errors_per_min = 0.0; + return guard.ewma_errors_per_min; + } + + let dt_secs = now_epoch_secs.saturating_sub(guard.last_epoch_secs); + if dt_secs == 0 { + return guard.ewma_errors_per_min; + } + + let delta_errors = total_errors.saturating_sub(guard.last_total_errors); + let instant_rate_per_min = (delta_errors as f64) * 60.0 / (dt_secs as f64); + let alpha = 1.0 - f64::exp(-(dt_secs as f64) / KDF_EWMA_TAU_SECS); + guard.ewma_errors_per_min = guard.ewma_errors_per_min + + alpha * (instant_rate_per_min - guard.ewma_errors_per_min); + guard.last_epoch_secs = now_epoch_secs; + guard.last_total_errors = total_errors; + guard.ewma_errors_per_min +} + +fn classify_ip(ip: IpAddr) -> &'static str { + if ip.is_loopback() { + return "loopback"; + } + if is_bogon(ip) { + return "bogon"; + } + "good" +} + +fn round3(value: f64) -> f64 { + (value * 1000.0).round() / 1000.0 +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} diff --git a/src/network/probe.rs b/src/network/probe.rs index 2ceeb2c..63e23a1 100644 --- a/src/network/probe.rs +++ b/src/network/probe.rs @@ -280,6 +280,14 @@ fn detect_local_ip_v6() -> Option { } } +pub fn detect_interface_ipv4() -> Option { + detect_local_ip_v4() +} + +pub fn detect_interface_ipv6() -> Option { + detect_local_ip_v6() +} + pub fn is_bogon(ip: IpAddr) -> bool { match ip { IpAddr::V4(v4) => is_bogon_v4(v4), diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 2c6a07a..194da5b 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -15,6 +15,7 @@ use crate::error::Result; use super::MePool; use super::rotation::{MeReinitTrigger, enqueue_reinit_trigger}; use super::secret::download_proxy_secret_with_max_len; +use super::selftest::record_timeskew_sample; use std::time::SystemTime; async fn retry_fetch(url: &str) -> Option { @@ -109,6 +110,7 @@ pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, }) { let skew_secs = skew.as_secs(); + record_timeskew_sample("proxy_config_date_header", skew_secs); if skew_secs > 60 { warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header"); } else if skew_secs > 30 { diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index f556b99..1c1b172 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -33,6 +33,7 @@ use super::codec::{ cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload, read_rpc_frame_plaintext, rpc_crc, }; +use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status}; use super::wire::{extract_ip_material, IpMaterial}; use super::MePool; @@ -131,6 +132,14 @@ impl MePool { ) } + fn bnd_port_status(bound: Option) -> BndPortStatus { + match bound { + Some(addr) if addr.port() == 0 => BndPortStatus::Zero, + Some(_) => BndPortStatus::Ok, + None => BndPortStatus::Error, + } + } + /// TCP connect with timeout + return RTT in milliseconds. pub(crate) async fn connect_tcp( &self, @@ -239,7 +248,27 @@ impl MePool { IpFamily::V6 }; let is_socks_route = Self::is_socks_route(upstream_egress); + let raw_socks_bound_addr = if is_socks_route { + upstream_egress.and_then(|info| info.socks_bound_addr) + } else { + None + }; let socks_bound_addr = Self::select_socks_bound_addr(family, upstream_egress); + let bnd_addr_status = if !is_socks_route { + BndAddrStatus::Error + } else if raw_socks_bound_addr.is_some() && socks_bound_addr.is_none() { + BndAddrStatus::Bogon + } else if socks_bound_addr.is_some() { + BndAddrStatus::Ok + } else { + BndAddrStatus::Error + }; + let bnd_port_status = if is_socks_route { + Self::bnd_port_status(raw_socks_bound_addr) + } else { + BndPortStatus::Error + }; + record_bnd_status(bnd_addr_status, bnd_port_status, raw_socks_bound_addr); let reflected = if let Some(bound) = socks_bound_addr { Some(bound) } else if is_socks_route { diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 87c012b..ecc963d 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -18,6 +18,7 @@ mod registry; mod rotation; mod send; mod secret; +mod selftest; mod wire; mod pool_status; @@ -37,6 +38,7 @@ pub use config_updater::{ me_config_updater, save_proxy_config_cache, }; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task}; +pub(crate) use selftest::{bnd_snapshot, timeskew_snapshot}; pub use wire::proto_flags_for_tag; #[derive(Debug)] diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index 4991d32..b2cfcb6 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -3,6 +3,7 @@ use std::time::SystemTime; use httpdate; use crate::error::{ProxyError, Result}; +use super::selftest::record_timeskew_sample; pub const PROXY_SECRET_MIN_LEN: usize = 32; @@ -98,6 +99,7 @@ pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result 60 { warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header"); } else if skew_secs > 30 { diff --git a/src/transport/middle_proxy/selftest.rs b/src/transport/middle_proxy/selftest.rs new file mode 100644 index 0000000..c1653ec --- /dev/null +++ b/src/transport/middle_proxy/selftest.rs @@ -0,0 +1,194 @@ +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::sync::{Mutex, OnceLock}; +use std::time::{SystemTime, UNIX_EPOCH}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum BndAddrStatus { + Ok, + Bogon, + Error, +} + +impl BndAddrStatus { + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Ok => "ok", + Self::Bogon => "bogon", + Self::Error => "error", + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum BndPortStatus { + Ok, + Zero, + Error, +} + +impl BndPortStatus { + pub(crate) fn as_str(self) -> &'static str { + match self { + Self::Ok => "ok", + Self::Zero => "zero", + Self::Error => "error", + } + } +} + +#[derive(Clone, Debug)] +pub(crate) struct MeBndSnapshot { + pub addr_status: &'static str, + pub port_status: &'static str, + pub last_addr: Option, + pub last_seen_age_secs: Option, +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct MeTimeskewSnapshot { + pub max_skew_secs_15m: Option, + pub samples_15m: usize, + pub last_skew_secs: Option, + pub last_source: Option<&'static str>, + pub last_seen_age_secs: Option, +} + +#[derive(Clone, Copy, Debug)] +struct MeTimeskewSample { + ts_epoch_secs: u64, + skew_secs: u64, + source: &'static str, +} + +#[derive(Debug)] +struct MeSelftestState { + bnd_addr_status: BndAddrStatus, + bnd_port_status: BndPortStatus, + bnd_last_addr: Option, + bnd_last_seen_epoch_secs: Option, + timeskew_samples: VecDeque, +} + +impl Default for MeSelftestState { + fn default() -> Self { + Self { + bnd_addr_status: BndAddrStatus::Error, + bnd_port_status: BndPortStatus::Error, + bnd_last_addr: None, + bnd_last_seen_epoch_secs: None, + timeskew_samples: VecDeque::new(), + } + } +} + +const MAX_TIMESKEW_SAMPLES: usize = 512; +const TIMESKEW_WINDOW_SECS: u64 = 15 * 60; + +static ME_SELFTEST_STATE: OnceLock> = OnceLock::new(); + +fn state() -> &'static Mutex { + ME_SELFTEST_STATE.get_or_init(|| Mutex::new(MeSelftestState::default())) +} + +pub(crate) fn record_bnd_status( + addr_status: BndAddrStatus, + port_status: BndPortStatus, + last_addr: Option, +) { + let now_epoch_secs = now_epoch_secs(); + let Ok(mut guard) = state().lock() else { + return; + }; + guard.bnd_addr_status = addr_status; + guard.bnd_port_status = port_status; + guard.bnd_last_addr = last_addr; + guard.bnd_last_seen_epoch_secs = Some(now_epoch_secs); +} + +pub(crate) fn bnd_snapshot() -> MeBndSnapshot { + let now_epoch_secs = now_epoch_secs(); + let Ok(guard) = state().lock() else { + return MeBndSnapshot { + addr_status: BndAddrStatus::Error.as_str(), + port_status: BndPortStatus::Error.as_str(), + last_addr: None, + last_seen_age_secs: None, + }; + }; + MeBndSnapshot { + addr_status: guard.bnd_addr_status.as_str(), + port_status: guard.bnd_port_status.as_str(), + last_addr: guard.bnd_last_addr, + last_seen_age_secs: guard + .bnd_last_seen_epoch_secs + .map(|value| now_epoch_secs.saturating_sub(value)), + } +} + +pub(crate) fn record_timeskew_sample(source: &'static str, skew_secs: u64) { + let now_epoch_secs = now_epoch_secs(); + let Ok(mut guard) = state().lock() else { + return; + }; + guard.timeskew_samples.push_back(MeTimeskewSample { + ts_epoch_secs: now_epoch_secs, + skew_secs, + source, + }); + while guard.timeskew_samples.len() > MAX_TIMESKEW_SAMPLES { + guard.timeskew_samples.pop_front(); + } + let cutoff = now_epoch_secs.saturating_sub(TIMESKEW_WINDOW_SECS * 2); + while guard + .timeskew_samples + .front() + .is_some_and(|sample| sample.ts_epoch_secs < cutoff) + { + guard.timeskew_samples.pop_front(); + } +} + +pub(crate) fn timeskew_snapshot() -> MeTimeskewSnapshot { + let now_epoch_secs = now_epoch_secs(); + let Ok(guard) = state().lock() else { + return MeTimeskewSnapshot::default(); + }; + + let mut max_skew_secs_15m = None; + let mut samples_15m = 0usize; + let window_start = now_epoch_secs.saturating_sub(TIMESKEW_WINDOW_SECS); + for sample in &guard.timeskew_samples { + if sample.ts_epoch_secs < window_start { + continue; + } + samples_15m = samples_15m.saturating_add(1); + max_skew_secs_15m = Some(max_skew_secs_15m.unwrap_or(0).max(sample.skew_secs)); + } + + let (last_skew_secs, last_source, last_seen_age_secs) = + if let Some(last) = guard.timeskew_samples.back() { + ( + Some(last.skew_secs), + Some(last.source), + Some(now_epoch_secs.saturating_sub(last.ts_epoch_secs)), + ) + } else { + (None, None, None) + }; + + MeTimeskewSnapshot { + max_skew_secs_15m, + samples_15m, + last_skew_secs, + last_source, + last_seen_age_secs, + } +} + +fn now_epoch_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +}