diff --git a/src/api/runtime_selftest.rs b/src/api/runtime_selftest.rs index 02bfb04..0dce3dc 100644 --- a/src/api/runtime_selftest.rs +++ b/src/api/runtime_selftest.rs @@ -1,4 +1,5 @@ use std::net::IpAddr; +use std::collections::HashMap; use std::sync::{Mutex, OnceLock}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -6,7 +7,8 @@ use serde::Serialize; use crate::config::{ProxyConfig, UpstreamType}; use crate::network::probe::{detect_interface_ipv4, detect_interface_ipv6, is_bogon}; -use crate::transport::middle_proxy::{bnd_snapshot, timeskew_snapshot}; +use crate::transport::middle_proxy::{bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots}; +use crate::transport::UpstreamRouteKind; use super::ApiShared; @@ -66,6 +68,17 @@ pub(super) struct RuntimeMeSelftestBndData { pub(super) last_seen_age_secs: Option, } +#[derive(Serialize)] +pub(super) struct RuntimeMeSelftestUpstreamData { + pub(super) upstream_id: usize, + pub(super) route_kind: &'static str, + pub(super) address: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) bnd: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) ip: Option, +} + #[derive(Serialize)] pub(super) struct RuntimeMeSelftestPayload { pub(super) kdf: RuntimeMeSelftestKdfData, @@ -73,6 +86,8 @@ pub(super) struct RuntimeMeSelftestPayload { pub(super) ip: RuntimeMeSelftestIpData, pub(super) pid: RuntimeMeSelftestPidData, pub(super) bnd: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(super) upstreams: Option>, } #[derive(Serialize)] @@ -162,6 +177,7 @@ pub(super) async fn build_runtime_me_selftest_data( } else { None }; + let upstreams = build_upstream_selftest_data(shared); RuntimeMeSelftestData { enabled: true, @@ -191,10 +207,40 @@ pub(super) async fn build_runtime_me_selftest_data( state: pid_state, }, bnd, + upstreams, }), } } +fn build_upstream_selftest_data(shared: &ApiShared) -> Option> { + let snapshot = shared.upstream_manager.try_api_snapshot()?; + if snapshot.summary.configured_total <= 1 { + return None; + } + + let mut upstream_bnd_by_id: HashMap = upstream_bnd_snapshots() + .into_iter() + .map(|entry| (entry.upstream_id, entry)) + .collect(); + let mut rows = Vec::with_capacity(snapshot.upstreams.len()); + for upstream in snapshot.upstreams { + let upstream_bnd = upstream_bnd_by_id.remove(&upstream.upstream_id); + rows.push(RuntimeMeSelftestUpstreamData { + upstream_id: upstream.upstream_id, + route_kind: map_route_kind(upstream.route_kind), + address: upstream.address, + bnd: upstream_bnd.as_ref().map(|entry| RuntimeMeSelftestBndData { + addr_state: entry.addr_status, + port_state: entry.port_status, + last_addr: entry.last_addr.map(|value| value.to_string()), + last_seen_age_secs: entry.last_seen_age_secs, + }), + ip: upstream_bnd.and_then(|entry| entry.last_ip.map(|value| value.to_string())), + }); + } + Some(rows) +} + fn update_kdf_ewma(now_epoch_secs: u64, total_errors: u64) -> f64 { let Ok(mut guard) = kdf_ewma_state().lock() else { return 0.0; @@ -233,6 +279,14 @@ fn classify_ip(ip: IpAddr) -> &'static str { "good" } +fn map_route_kind(value: UpstreamRouteKind) -> &'static str { + match value { + UpstreamRouteKind::Direct => "direct", + UpstreamRouteKind::Socks4 => "socks4", + UpstreamRouteKind::Socks5 => "socks5", + } +} + fn round3(value: f64) -> f64 { (value * 1000.0).round() / 1000.0 } diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index 1c1b172..245a331 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -33,7 +33,7 @@ use super::codec::{ cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload, read_rpc_frame_plaintext, rpc_crc, }; -use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status}; +use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status, record_upstream_bnd_status}; use super::wire::{extract_ip_material, IpMaterial}; use super::MePool; @@ -299,6 +299,18 @@ impl MePool { let local_addr_nat = self.translate_our_addr_with_reflection(local_addr, reflected); let peer_addr_nat = SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port()); + if let Some(upstream_info) = upstream_egress { + let client_ip_for_kdf = socks_bound_addr + .map(|value| value.ip()) + .unwrap_or(local_addr_nat.ip()); + record_upstream_bnd_status( + upstream_info.upstream_id, + bnd_addr_status, + bnd_port_status, + raw_socks_bound_addr, + Some(client_ip_for_kdf), + ); + } let (mut rd, mut wr) = tokio::io::split(stream); let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap(); diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index ecc963d..92e222d 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -38,7 +38,9 @@ pub use config_updater::{ me_config_updater, save_proxy_config_cache, }; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task}; -pub(crate) use selftest::{bnd_snapshot, timeskew_snapshot}; +pub(crate) use selftest::{ + bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots, +}; pub use wire::proto_flags_for_tag; #[derive(Debug)] diff --git a/src/transport/middle_proxy/selftest.rs b/src/transport/middle_proxy/selftest.rs index c1653ec..86a93e3 100644 --- a/src/transport/middle_proxy/selftest.rs +++ b/src/transport/middle_proxy/selftest.rs @@ -1,5 +1,5 @@ -use std::collections::VecDeque; -use std::net::SocketAddr; +use std::collections::{HashMap, VecDeque}; +use std::net::{IpAddr, SocketAddr}; use std::sync::{Mutex, OnceLock}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -45,6 +45,16 @@ pub(crate) struct MeBndSnapshot { pub last_seen_age_secs: Option, } +#[derive(Clone, Debug)] +pub(crate) struct MeUpstreamBndSnapshot { + pub upstream_id: usize, + pub addr_status: &'static str, + pub port_status: &'static str, + pub last_addr: Option, + pub last_ip: Option, + pub last_seen_age_secs: Option, +} + #[derive(Clone, Debug, Default)] pub(crate) struct MeTimeskewSnapshot { pub max_skew_secs_15m: Option, @@ -67,9 +77,19 @@ struct MeSelftestState { bnd_port_status: BndPortStatus, bnd_last_addr: Option, bnd_last_seen_epoch_secs: Option, + upstream_bnd: HashMap, timeskew_samples: VecDeque, } +#[derive(Clone, Copy, Debug)] +struct UpstreamBndState { + addr_status: BndAddrStatus, + port_status: BndPortStatus, + last_addr: Option, + last_ip: Option, + last_seen_epoch_secs: Option, +} + impl Default for MeSelftestState { fn default() -> Self { Self { @@ -77,6 +97,7 @@ impl Default for MeSelftestState { bnd_port_status: BndPortStatus::Error, bnd_last_addr: None, bnd_last_seen_epoch_secs: None, + upstream_bnd: HashMap::new(), timeskew_samples: VecDeque::new(), } } @@ -126,6 +147,51 @@ pub(crate) fn bnd_snapshot() -> MeBndSnapshot { } } +pub(crate) fn record_upstream_bnd_status( + upstream_id: usize, + addr_status: BndAddrStatus, + port_status: BndPortStatus, + last_addr: Option, + last_ip: Option, +) { + let now_epoch_secs = now_epoch_secs(); + let Ok(mut guard) = state().lock() else { + return; + }; + guard.upstream_bnd.insert( + upstream_id, + UpstreamBndState { + addr_status, + port_status, + last_addr, + last_ip, + last_seen_epoch_secs: Some(now_epoch_secs), + }, + ); +} + +pub(crate) fn upstream_bnd_snapshots() -> Vec { + let now_epoch_secs = now_epoch_secs(); + let Ok(guard) = state().lock() else { + return Vec::new(); + }; + let mut out = Vec::with_capacity(guard.upstream_bnd.len()); + for (upstream_id, entry) in &guard.upstream_bnd { + out.push(MeUpstreamBndSnapshot { + upstream_id: *upstream_id, + addr_status: entry.addr_status.as_str(), + port_status: entry.port_status.as_str(), + last_addr: entry.last_addr, + last_ip: entry.last_ip, + last_seen_age_secs: entry + .last_seen_epoch_secs + .map(|value| now_epoch_secs.saturating_sub(value)), + }); + } + out.sort_by_key(|entry| entry.upstream_id); + out +} + pub(crate) fn record_timeskew_sample(source: &'static str, skew_secs: u64) { let now_epoch_secs = now_epoch_secs(); let Ok(mut guard) = state().lock() else { diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 2424f9c..1355934 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -213,6 +213,7 @@ pub struct UpstreamApiPolicySnapshot { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct UpstreamEgressInfo { + pub upstream_id: usize, pub route_kind: UpstreamRouteKind, pub local_addr: Option, pub direct_bind_ip: Option, @@ -672,7 +673,7 @@ impl UpstreamManager { self.stats.increment_upstream_connect_attempt_total(); let start = Instant::now(); match self - .connect_via_upstream(&upstream, target, bind_rr.clone(), attempt_timeout) + .connect_via_upstream(idx, &upstream, target, bind_rr.clone(), attempt_timeout) .await { Ok((stream, egress)) => { @@ -779,6 +780,7 @@ impl UpstreamManager { async fn connect_via_upstream( &self, + upstream_id: usize, config: &UpstreamConfig, target: SocketAddr, bind_rr: Option>, @@ -828,6 +830,7 @@ impl UpstreamManager { Ok(( stream, UpstreamEgressInfo { + upstream_id, route_kind: UpstreamRouteKind::Direct, local_addr, direct_bind_ip: bind_ip, @@ -906,6 +909,7 @@ impl UpstreamManager { Ok(( stream, UpstreamEgressInfo { + upstream_id, route_kind: UpstreamRouteKind::Socks4, local_addr, direct_bind_ip: None, @@ -986,6 +990,7 @@ impl UpstreamManager { Ok(( stream, UpstreamEgressInfo { + upstream_id, route_kind: UpstreamRouteKind::Socks5, local_addr, direct_bind_ip: None, @@ -1048,7 +1053,7 @@ impl UpstreamManager { let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v6) + self.ping_single_dc(*upstream_idx, upstream_config, Some(bind_rr.clone()), addr_v6) ).await; let ping_result = match result { @@ -1099,7 +1104,7 @@ impl UpstreamManager { let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v4) + self.ping_single_dc(*upstream_idx, upstream_config, Some(bind_rr.clone()), addr_v4) ).await; let ping_result = match result { @@ -1162,7 +1167,7 @@ impl UpstreamManager { } let result = tokio::time::timeout( Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr) + self.ping_single_dc(*upstream_idx, upstream_config, Some(bind_rr.clone()), addr) ).await; let ping_result = match result { @@ -1233,6 +1238,7 @@ impl UpstreamManager { async fn ping_single_dc( &self, + upstream_id: usize, config: &UpstreamConfig, bind_rr: Option>, target: SocketAddr, @@ -1240,6 +1246,7 @@ impl UpstreamManager { let start = Instant::now(); let _ = self .connect_via_upstream( + upstream_id, config, target, bind_rr, @@ -1418,6 +1425,7 @@ impl UpstreamManager { let result = tokio::time::timeout( Duration::from_secs(HEALTH_CHECK_CONNECT_TIMEOUT_SECS), self.connect_via_upstream( + i, &config, endpoint, Some(bind_rr.clone()),