ME Selftest

This commit is contained in:
Alexey
2026-03-09 20:35:31 +03:00
parent f4418d2d50
commit b950987229
8 changed files with 472 additions and 0 deletions

View File

@@ -15,6 +15,7 @@ use crate::error::Result;
use super::MePool;
use super::rotation::{MeReinitTrigger, enqueue_reinit_trigger};
use super::secret::download_proxy_secret_with_max_len;
use super::selftest::record_timeskew_sample;
use std::time::SystemTime;
async fn retry_fetch(url: &str) -> Option<ProxyConfigData> {
@@ -109,6 +110,7 @@ pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData,
})
{
let skew_secs = skew.as_secs();
record_timeskew_sample("proxy_config_date_header", skew_secs);
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {

View File

@@ -33,6 +33,7 @@ use super::codec::{
cbc_decrypt_inplace, cbc_encrypt_padded, parse_handshake_flags, parse_nonce_payload,
read_rpc_frame_plaintext, rpc_crc,
};
use super::selftest::{BndAddrStatus, BndPortStatus, record_bnd_status};
use super::wire::{extract_ip_material, IpMaterial};
use super::MePool;
@@ -131,6 +132,14 @@ impl MePool {
)
}
fn bnd_port_status(bound: Option<SocketAddr>) -> BndPortStatus {
match bound {
Some(addr) if addr.port() == 0 => BndPortStatus::Zero,
Some(_) => BndPortStatus::Ok,
None => BndPortStatus::Error,
}
}
/// TCP connect with timeout + return RTT in milliseconds.
pub(crate) async fn connect_tcp(
&self,
@@ -239,7 +248,27 @@ impl MePool {
IpFamily::V6
};
let is_socks_route = Self::is_socks_route(upstream_egress);
let raw_socks_bound_addr = if is_socks_route {
upstream_egress.and_then(|info| info.socks_bound_addr)
} else {
None
};
let socks_bound_addr = Self::select_socks_bound_addr(family, upstream_egress);
let bnd_addr_status = if !is_socks_route {
BndAddrStatus::Error
} else if raw_socks_bound_addr.is_some() && socks_bound_addr.is_none() {
BndAddrStatus::Bogon
} else if socks_bound_addr.is_some() {
BndAddrStatus::Ok
} else {
BndAddrStatus::Error
};
let bnd_port_status = if is_socks_route {
Self::bnd_port_status(raw_socks_bound_addr)
} else {
BndPortStatus::Error
};
record_bnd_status(bnd_addr_status, bnd_port_status, raw_socks_bound_addr);
let reflected = if let Some(bound) = socks_bound_addr {
Some(bound)
} else if is_socks_route {

View File

@@ -18,6 +18,7 @@ mod registry;
mod rotation;
mod send;
mod secret;
mod selftest;
mod wire;
mod pool_status;
@@ -37,6 +38,7 @@ pub use config_updater::{
me_config_updater, save_proxy_config_cache,
};
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub(crate) use selftest::{bnd_snapshot, timeskew_snapshot};
pub use wire::proto_flags_for_tag;
#[derive(Debug)]

View File

@@ -3,6 +3,7 @@ use std::time::SystemTime;
use httpdate;
use crate::error::{ProxyError, Result};
use super::selftest::record_timeskew_sample;
pub const PROXY_SECRET_MIN_LEN: usize = 32;
@@ -98,6 +99,7 @@ pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result<Vec<u8
})
{
let skew_secs = skew.as_secs();
record_timeskew_sample("proxy_secret_date_header", skew_secs);
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from proxy-secret Date header");
} else if skew_secs > 30 {

View File

@@ -0,0 +1,194 @@
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{Mutex, OnceLock};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum BndAddrStatus {
Ok,
Bogon,
Error,
}
impl BndAddrStatus {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Ok => "ok",
Self::Bogon => "bogon",
Self::Error => "error",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum BndPortStatus {
Ok,
Zero,
Error,
}
impl BndPortStatus {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Ok => "ok",
Self::Zero => "zero",
Self::Error => "error",
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct MeBndSnapshot {
pub addr_status: &'static str,
pub port_status: &'static str,
pub last_addr: Option<SocketAddr>,
pub last_seen_age_secs: Option<u64>,
}
#[derive(Clone, Debug, Default)]
pub(crate) struct MeTimeskewSnapshot {
pub max_skew_secs_15m: Option<u64>,
pub samples_15m: usize,
pub last_skew_secs: Option<u64>,
pub last_source: Option<&'static str>,
pub last_seen_age_secs: Option<u64>,
}
#[derive(Clone, Copy, Debug)]
struct MeTimeskewSample {
ts_epoch_secs: u64,
skew_secs: u64,
source: &'static str,
}
#[derive(Debug)]
struct MeSelftestState {
bnd_addr_status: BndAddrStatus,
bnd_port_status: BndPortStatus,
bnd_last_addr: Option<SocketAddr>,
bnd_last_seen_epoch_secs: Option<u64>,
timeskew_samples: VecDeque<MeTimeskewSample>,
}
impl Default for MeSelftestState {
fn default() -> Self {
Self {
bnd_addr_status: BndAddrStatus::Error,
bnd_port_status: BndPortStatus::Error,
bnd_last_addr: None,
bnd_last_seen_epoch_secs: None,
timeskew_samples: VecDeque::new(),
}
}
}
const MAX_TIMESKEW_SAMPLES: usize = 512;
const TIMESKEW_WINDOW_SECS: u64 = 15 * 60;
static ME_SELFTEST_STATE: OnceLock<Mutex<MeSelftestState>> = OnceLock::new();
fn state() -> &'static Mutex<MeSelftestState> {
ME_SELFTEST_STATE.get_or_init(|| Mutex::new(MeSelftestState::default()))
}
pub(crate) fn record_bnd_status(
addr_status: BndAddrStatus,
port_status: BndPortStatus,
last_addr: Option<SocketAddr>,
) {
let now_epoch_secs = now_epoch_secs();
let Ok(mut guard) = state().lock() else {
return;
};
guard.bnd_addr_status = addr_status;
guard.bnd_port_status = port_status;
guard.bnd_last_addr = last_addr;
guard.bnd_last_seen_epoch_secs = Some(now_epoch_secs);
}
pub(crate) fn bnd_snapshot() -> MeBndSnapshot {
let now_epoch_secs = now_epoch_secs();
let Ok(guard) = state().lock() else {
return MeBndSnapshot {
addr_status: BndAddrStatus::Error.as_str(),
port_status: BndPortStatus::Error.as_str(),
last_addr: None,
last_seen_age_secs: None,
};
};
MeBndSnapshot {
addr_status: guard.bnd_addr_status.as_str(),
port_status: guard.bnd_port_status.as_str(),
last_addr: guard.bnd_last_addr,
last_seen_age_secs: guard
.bnd_last_seen_epoch_secs
.map(|value| now_epoch_secs.saturating_sub(value)),
}
}
pub(crate) fn record_timeskew_sample(source: &'static str, skew_secs: u64) {
let now_epoch_secs = now_epoch_secs();
let Ok(mut guard) = state().lock() else {
return;
};
guard.timeskew_samples.push_back(MeTimeskewSample {
ts_epoch_secs: now_epoch_secs,
skew_secs,
source,
});
while guard.timeskew_samples.len() > MAX_TIMESKEW_SAMPLES {
guard.timeskew_samples.pop_front();
}
let cutoff = now_epoch_secs.saturating_sub(TIMESKEW_WINDOW_SECS * 2);
while guard
.timeskew_samples
.front()
.is_some_and(|sample| sample.ts_epoch_secs < cutoff)
{
guard.timeskew_samples.pop_front();
}
}
pub(crate) fn timeskew_snapshot() -> MeTimeskewSnapshot {
let now_epoch_secs = now_epoch_secs();
let Ok(guard) = state().lock() else {
return MeTimeskewSnapshot::default();
};
let mut max_skew_secs_15m = None;
let mut samples_15m = 0usize;
let window_start = now_epoch_secs.saturating_sub(TIMESKEW_WINDOW_SECS);
for sample in &guard.timeskew_samples {
if sample.ts_epoch_secs < window_start {
continue;
}
samples_15m = samples_15m.saturating_add(1);
max_skew_secs_15m = Some(max_skew_secs_15m.unwrap_or(0).max(sample.skew_secs));
}
let (last_skew_secs, last_source, last_seen_age_secs) =
if let Some(last) = guard.timeskew_samples.back() {
(
Some(last.skew_secs),
Some(last.source),
Some(now_epoch_secs.saturating_sub(last.ts_epoch_secs)),
)
} else {
(None, None, None)
};
MeTimeskewSnapshot {
max_skew_secs_15m,
samples_15m,
last_skew_secs,
last_source,
last_seen_age_secs,
}
}
fn now_epoch_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}