Per-upstream Runtime Selftest

This commit is contained in:
Alexey 2026-03-10 01:25:28 +03:00
parent 8cd719da3f
commit be24b47300
No known key found for this signature in database
5 changed files with 151 additions and 9 deletions

View File

@ -1,4 +1,5 @@
use std::net::IpAddr;
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
@ -6,7 +7,8 @@ use serde::Serialize;
use crate::config::{ProxyConfig, UpstreamType};
use crate::network::probe::{detect_interface_ipv4, detect_interface_ipv6, is_bogon};
use crate::transport::middle_proxy::{bnd_snapshot, timeskew_snapshot};
use crate::transport::middle_proxy::{bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots};
use crate::transport::UpstreamRouteKind;
use super::ApiShared;
@ -66,6 +68,17 @@ pub(super) struct RuntimeMeSelftestBndData {
pub(super) last_seen_age_secs: Option<u64>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeSelftestUpstreamData {
pub(super) upstream_id: usize,
pub(super) route_kind: &'static str,
pub(super) address: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) bnd: Option<RuntimeMeSelftestBndData>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) ip: Option<String>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeSelftestPayload {
pub(super) kdf: RuntimeMeSelftestKdfData,
@ -73,6 +86,8 @@ pub(super) struct RuntimeMeSelftestPayload {
pub(super) ip: RuntimeMeSelftestIpData,
pub(super) pid: RuntimeMeSelftestPidData,
pub(super) bnd: Option<RuntimeMeSelftestBndData>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) upstreams: Option<Vec<RuntimeMeSelftestUpstreamData>>,
}
#[derive(Serialize)]
@ -162,6 +177,7 @@ pub(super) async fn build_runtime_me_selftest_data(
} else {
None
};
let upstreams = build_upstream_selftest_data(shared);
RuntimeMeSelftestData {
enabled: true,
@ -191,10 +207,40 @@ pub(super) async fn build_runtime_me_selftest_data(
state: pid_state,
},
bnd,
upstreams,
}),
}
}
fn build_upstream_selftest_data(shared: &ApiShared) -> Option<Vec<RuntimeMeSelftestUpstreamData>> {
let snapshot = shared.upstream_manager.try_api_snapshot()?;
if snapshot.summary.configured_total <= 1 {
return None;
}
let mut upstream_bnd_by_id: HashMap<usize, _> = upstream_bnd_snapshots()
.into_iter()
.map(|entry| (entry.upstream_id, entry))
.collect();
let mut rows = Vec::with_capacity(snapshot.upstreams.len());
for upstream in snapshot.upstreams {
let upstream_bnd = upstream_bnd_by_id.remove(&upstream.upstream_id);
rows.push(RuntimeMeSelftestUpstreamData {
upstream_id: upstream.upstream_id,
route_kind: map_route_kind(upstream.route_kind),
address: upstream.address,
bnd: upstream_bnd.as_ref().map(|entry| RuntimeMeSelftestBndData {
addr_state: entry.addr_status,
port_state: entry.port_status,
last_addr: entry.last_addr.map(|value| value.to_string()),
last_seen_age_secs: entry.last_seen_age_secs,
}),
ip: upstream_bnd.and_then(|entry| entry.last_ip.map(|value| value.to_string())),
});
}
Some(rows)
}
fn update_kdf_ewma(now_epoch_secs: u64, total_errors: u64) -> f64 {
let Ok(mut guard) = kdf_ewma_state().lock() else {
return 0.0;
@ -233,6 +279,14 @@ fn classify_ip(ip: IpAddr) -> &'static str {
"good"
}
fn map_route_kind(value: UpstreamRouteKind) -> &'static str {
match value {
UpstreamRouteKind::Direct => "direct",
UpstreamRouteKind::Socks4 => "socks4",
UpstreamRouteKind::Socks5 => "socks5",
}
}
fn round3(value: f64) -> f64 {
(value * 1000.0).round() / 1000.0
}

View File

@ -33,7 +33,7 @@ use super::codec::{
cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload,
read_rpc_frame_plaintext, rpc_crc,
};
use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status};
use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status, record_upstream_bnd_status};
use super::wire::{extract_ip_material, IpMaterial};
use super::MePool;
@ -299,6 +299,18 @@ impl MePool {
let local_addr_nat = self.translate_our_addr_with_reflection(local_addr, reflected);
let peer_addr_nat = SocketAddr::new(self.translate_ip_for_nat(peer_addr.ip()), peer_addr.port());
if let Some(upstream_info) = upstream_egress {
let client_ip_for_kdf = socks_bound_addr
.map(|value| value.ip())
.unwrap_or(local_addr_nat.ip());
record_upstream_bnd_status(
upstream_info.upstream_id,
bnd_addr_status,
bnd_port_status,
raw_socks_bound_addr,
Some(client_ip_for_kdf),
);
}
let (mut rd, mut wr) = tokio::io::split(stream);
let my_nonce: [u8; 16] = rng.bytes(16).try_into().unwrap();

View File

@ -38,7 +38,9 @@ pub use config_updater::{
me_config_updater, save_proxy_config_cache,
};
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub(crate) use selftest::{bnd_snapshot, timeskew_snapshot};
pub(crate) use selftest::{
bnd_snapshot, timeskew_snapshot, upstream_bnd_snapshots,
};
pub use wire::proto_flags_for_tag;
#[derive(Debug)]

View File

@ -1,5 +1,5 @@
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::collections::{HashMap, VecDeque};
use std::net::{IpAddr, SocketAddr};
use std::sync::{Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
@ -45,6 +45,16 @@ pub(crate) struct MeBndSnapshot {
pub last_seen_age_secs: Option<u64>,
}
#[derive(Clone, Debug)]
pub(crate) struct MeUpstreamBndSnapshot {
pub upstream_id: usize,
pub addr_status: &'static str,
pub port_status: &'static str,
pub last_addr: Option<SocketAddr>,
pub last_ip: Option<IpAddr>,
pub last_seen_age_secs: Option<u64>,
}
#[derive(Clone, Debug, Default)]
pub(crate) struct MeTimeskewSnapshot {
pub max_skew_secs_15m: Option<u64>,
@ -67,9 +77,19 @@ struct MeSelftestState {
bnd_port_status: BndPortStatus,
bnd_last_addr: Option<SocketAddr>,
bnd_last_seen_epoch_secs: Option<u64>,
upstream_bnd: HashMap<usize, UpstreamBndState>,
timeskew_samples: VecDeque<MeTimeskewSample>,
}
#[derive(Clone, Copy, Debug)]
struct UpstreamBndState {
addr_status: BndAddrStatus,
port_status: BndPortStatus,
last_addr: Option<SocketAddr>,
last_ip: Option<IpAddr>,
last_seen_epoch_secs: Option<u64>,
}
impl Default for MeSelftestState {
fn default() -> Self {
Self {
@ -77,6 +97,7 @@ impl Default for MeSelftestState {
bnd_port_status: BndPortStatus::Error,
bnd_last_addr: None,
bnd_last_seen_epoch_secs: None,
upstream_bnd: HashMap::new(),
timeskew_samples: VecDeque::new(),
}
}
@ -126,6 +147,51 @@ pub(crate) fn bnd_snapshot() -> MeBndSnapshot {
}
}
pub(crate) fn record_upstream_bnd_status(
upstream_id: usize,
addr_status: BndAddrStatus,
port_status: BndPortStatus,
last_addr: Option<SocketAddr>,
last_ip: Option<IpAddr>,
) {
let now_epoch_secs = now_epoch_secs();
let Ok(mut guard) = state().lock() else {
return;
};
guard.upstream_bnd.insert(
upstream_id,
UpstreamBndState {
addr_status,
port_status,
last_addr,
last_ip,
last_seen_epoch_secs: Some(now_epoch_secs),
},
);
}
pub(crate) fn upstream_bnd_snapshots() -> Vec<MeUpstreamBndSnapshot> {
let now_epoch_secs = now_epoch_secs();
let Ok(guard) = state().lock() else {
return Vec::new();
};
let mut out = Vec::with_capacity(guard.upstream_bnd.len());
for (upstream_id, entry) in &guard.upstream_bnd {
out.push(MeUpstreamBndSnapshot {
upstream_id: *upstream_id,
addr_status: entry.addr_status.as_str(),
port_status: entry.port_status.as_str(),
last_addr: entry.last_addr,
last_ip: entry.last_ip,
last_seen_age_secs: entry
.last_seen_epoch_secs
.map(|value| now_epoch_secs.saturating_sub(value)),
});
}
out.sort_by_key(|entry| entry.upstream_id);
out
}
pub(crate) fn record_timeskew_sample(source: &'static str, skew_secs: u64) {
let now_epoch_secs = now_epoch_secs();
let Ok(mut guard) = state().lock() else {

View File

@ -213,6 +213,7 @@ pub struct UpstreamApiPolicySnapshot {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UpstreamEgressInfo {
pub upstream_id: usize,
pub route_kind: UpstreamRouteKind,
pub local_addr: Option<SocketAddr>,
pub direct_bind_ip: Option<IpAddr>,
@ -672,7 +673,7 @@ impl UpstreamManager {
self.stats.increment_upstream_connect_attempt_total();
let start = Instant::now();
match self
.connect_via_upstream(&upstream, target, bind_rr.clone(), attempt_timeout)
.connect_via_upstream(idx, &upstream, target, bind_rr.clone(), attempt_timeout)
.await
{
Ok((stream, egress)) => {
@ -779,6 +780,7 @@ impl UpstreamManager {
async fn connect_via_upstream(
&self,
upstream_id: usize,
config: &UpstreamConfig,
target: SocketAddr,
bind_rr: Option<Arc<AtomicUsize>>,
@ -828,6 +830,7 @@ impl UpstreamManager {
Ok((
stream,
UpstreamEgressInfo {
upstream_id,
route_kind: UpstreamRouteKind::Direct,
local_addr,
direct_bind_ip: bind_ip,
@ -906,6 +909,7 @@ impl UpstreamManager {
Ok((
stream,
UpstreamEgressInfo {
upstream_id,
route_kind: UpstreamRouteKind::Socks4,
local_addr,
direct_bind_ip: None,
@ -986,6 +990,7 @@ impl UpstreamManager {
Ok((
stream,
UpstreamEgressInfo {
upstream_id,
route_kind: UpstreamRouteKind::Socks5,
local_addr,
direct_bind_ip: None,
@ -1048,7 +1053,7 @@ impl UpstreamManager {
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v6)
self.ping_single_dc(*upstream_idx, upstream_config, Some(bind_rr.clone()), addr_v6)
).await;
let ping_result = match result {
@ -1099,7 +1104,7 @@ impl UpstreamManager {
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr_v4)
self.ping_single_dc(*upstream_idx, upstream_config, Some(bind_rr.clone()), addr_v4)
).await;
let ping_result = match result {
@ -1162,7 +1167,7 @@ impl UpstreamManager {
}
let result = tokio::time::timeout(
Duration::from_secs(DC_PING_TIMEOUT_SECS),
self.ping_single_dc(upstream_config, Some(bind_rr.clone()), addr)
self.ping_single_dc(*upstream_idx, upstream_config, Some(bind_rr.clone()), addr)
).await;
let ping_result = match result {
@ -1233,6 +1238,7 @@ impl UpstreamManager {
async fn ping_single_dc(
&self,
upstream_id: usize,
config: &UpstreamConfig,
bind_rr: Option<Arc<AtomicUsize>>,
target: SocketAddr,
@ -1240,6 +1246,7 @@ impl UpstreamManager {
let start = Instant::now();
let _ = self
.connect_via_upstream(
upstream_id,
config,
target,
bind_rr,
@ -1418,6 +1425,7 @@ impl UpstreamManager {
let result = tokio::time::timeout(
Duration::from_secs(HEALTH_CHECK_CONNECT_TIMEOUT_SECS),
self.connect_via_upstream(
i,
&config,
endpoint,
Some(bind_rr.clone()),