ME Healthcheck + ME Keepalive + ME Pool in Metrics: merge pull request #297 from telemt/flow-drift

ME Healthcheck + ME Keepalive + ME Pool in Metrics
This commit is contained in:
Alexey 2026-03-03 03:27:44 +03:00 committed by GitHub
commit 07ec84d071
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 131 additions and 35 deletions

View File

@ -130,11 +130,11 @@ pub(crate) fn default_middle_proxy_warm_standby() -> usize {
}
pub(crate) fn default_keepalive_interval() -> u64 {
25
8
}
pub(crate) fn default_keepalive_jitter() -> u64 {
5
2
}
pub(crate) fn default_warmup_step_delay_ms() -> u64 {

View File

@ -449,6 +449,21 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_kdf_port_only_drift_total ME KDF client-port changes with stable non-port material"
);
let _ = writeln!(out, "# TYPE telemt_me_kdf_port_only_drift_total counter");
let _ = writeln!(
out,
"telemt_me_kdf_port_only_drift_total {}",
if me_allows_debug {
stats.get_me_kdf_port_only_drift_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_hardswap_pending_reuse_total Hardswap cycles that reused an existing pending generation"
@ -587,6 +602,24 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_me_single_endpoint_shadow_rotate_skipped_quarantine_total Shadow rotations skipped because endpoint is quarantined"
);
let _ = writeln!(
out,
"# TYPE telemt_me_single_endpoint_shadow_rotate_skipped_quarantine_total counter"
);
let _ = writeln!(
out,
"telemt_me_single_endpoint_shadow_rotate_skipped_quarantine_total {}",
if me_allows_normal {
stats.get_me_single_endpoint_shadow_rotate_skipped_quarantine_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
let _ = writeln!(
@ -679,7 +712,7 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
let _ = writeln!(
out,
"telemt_pool_swap_total {}",
if me_allows_debug {
if me_allows_normal {
stats.get_pool_swap_total()
} else {
0

View File

@ -38,6 +38,7 @@ pub struct Stats {
me_seq_mismatch: AtomicU64,
me_endpoint_quarantine_total: AtomicU64,
me_kdf_drift_total: AtomicU64,
me_kdf_port_only_drift_total: AtomicU64,
me_hardswap_pending_reuse_total: AtomicU64,
me_hardswap_pending_ttl_expired_total: AtomicU64,
me_single_endpoint_outage_enter_total: AtomicU64,
@ -46,6 +47,7 @@ pub struct Stats {
me_single_endpoint_outage_reconnect_success_total: AtomicU64,
me_single_endpoint_quarantine_bypass_total: AtomicU64,
me_single_endpoint_shadow_rotate_total: AtomicU64,
me_single_endpoint_shadow_rotate_skipped_quarantine_total: AtomicU64,
me_handshake_error_codes: DashMap<i32, AtomicU64>,
me_route_drop_no_conn: AtomicU64,
me_route_drop_channel_closed: AtomicU64,
@ -290,7 +292,7 @@ impl Stats {
}
}
pub fn increment_pool_swap_total(&self) {
if self.telemetry_me_allows_debug() {
if self.telemetry_me_allows_normal() {
self.pool_swap_total.fetch_add(1, Ordering::Relaxed);
}
}
@ -377,6 +379,12 @@ impl Stats {
self.me_kdf_drift_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_kdf_port_only_drift_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_kdf_port_only_drift_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_hardswap_pending_reuse_total(&self) {
if self.telemetry_me_allows_debug() {
self.me_hardswap_pending_reuse_total
@ -425,6 +433,12 @@ impl Stats {
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_single_endpoint_shadow_rotate_skipped_quarantine_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_single_endpoint_shadow_rotate_skipped_quarantine_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
@ -447,6 +461,9 @@ impl Stats {
pub fn get_me_kdf_drift_total(&self) -> u64 {
self.me_kdf_drift_total.load(Ordering::Relaxed)
}
pub fn get_me_kdf_port_only_drift_total(&self) -> u64 {
self.me_kdf_port_only_drift_total.load(Ordering::Relaxed)
}
pub fn get_me_hardswap_pending_reuse_total(&self) -> u64 {
self.me_hardswap_pending_reuse_total
.load(Ordering::Relaxed)
@ -479,6 +496,10 @@ impl Stats {
self.me_single_endpoint_shadow_rotate_total
.load(Ordering::Relaxed)
}
pub fn get_me_single_endpoint_shadow_rotate_skipped_quarantine_total(&self) -> u64 {
self.me_single_endpoint_shadow_rotate_skipped_quarantine_total
.load(Ordering::Relaxed)
}
pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> {
let mut out: Vec<(i32, u64)> = self
.me_handshake_error_codes

View File

@ -38,6 +38,22 @@ use super::MePool;
const ME_KDF_DRIFT_STRICT: bool = false;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum KdfClientPortSource {
LocalSocket = 0,
SocksBound = 1,
}
impl KdfClientPortSource {
fn from_socks_bound_port(socks_bound_port: Option<u16>) -> Self {
if socks_bound_port.is_some() {
Self::SocksBound
} else {
Self::LocalSocket
}
}
}
/// Result of a successful ME handshake with timings.
pub(crate) struct HandshakeOutput {
pub rd: ReadHalf<TcpStream>,
@ -52,18 +68,18 @@ pub(crate) struct HandshakeOutput {
impl MePool {
fn kdf_material_fingerprint(
local_addr_nat: SocketAddr,
local_ip_nat: IpAddr,
peer_addr_nat: SocketAddr,
client_port_for_kdf: u16,
reflected: Option<SocketAddr>,
socks_bound_addr: Option<SocketAddr>,
reflected_ip: Option<IpAddr>,
socks_bound_ip: Option<IpAddr>,
client_port_source: KdfClientPortSource,
) -> u64 {
let mut hasher = DefaultHasher::new();
local_addr_nat.hash(&mut hasher);
local_ip_nat.hash(&mut hasher);
peer_addr_nat.hash(&mut hasher);
client_port_for_kdf.hash(&mut hasher);
reflected.hash(&mut hasher);
socks_bound_addr.hash(&mut hasher);
reflected_ip.hash(&mut hasher);
socks_bound_ip.hash(&mut hasher);
client_port_source.hash(&mut hasher);
hasher.finish()
}
@ -359,35 +375,48 @@ impl MePool {
let ts_bytes = crypto_ts.to_le_bytes();
let server_port_bytes = peer_addr_nat.port().to_le_bytes();
let client_port_for_kdf = socks_bound_addr
let socks_bound_port = socks_bound_addr
.map(|bound| bound.port())
.filter(|port| *port != 0)
.unwrap_or(local_addr_nat.port());
.filter(|port| *port != 0);
let client_port_for_kdf = socks_bound_port.unwrap_or(local_addr_nat.port());
let client_port_source = KdfClientPortSource::from_socks_bound_port(socks_bound_port);
let kdf_fingerprint = Self::kdf_material_fingerprint(
local_addr_nat,
local_addr_nat.ip(),
peer_addr_nat,
client_port_for_kdf,
reflected,
socks_bound_addr,
reflected.map(|value| value.ip()),
socks_bound_addr.map(|value| value.ip()),
client_port_source,
);
let mut kdf_fingerprint_guard = self.kdf_material_fingerprint.lock().await;
if let Some(prev_fingerprint) = kdf_fingerprint_guard.get(&peer_addr_nat).copied()
&& prev_fingerprint != kdf_fingerprint
if let Some((prev_fingerprint, prev_client_port)) =
kdf_fingerprint_guard.get(&peer_addr_nat).copied()
{
self.stats.increment_me_kdf_drift_total();
warn!(
%peer_addr_nat,
%local_addr_nat,
client_port_for_kdf,
"ME KDF input drift detected for endpoint"
);
if ME_KDF_DRIFT_STRICT {
return Err(ProxyError::InvalidHandshake(
"ME KDF input drift detected (strict mode)".to_string(),
));
if prev_fingerprint != kdf_fingerprint {
self.stats.increment_me_kdf_drift_total();
warn!(
%peer_addr_nat,
%local_addr_nat,
client_port_for_kdf,
client_port_source = ?client_port_source,
"ME KDF material drift detected for endpoint"
);
if ME_KDF_DRIFT_STRICT {
return Err(ProxyError::InvalidHandshake(
"ME KDF material drift detected (strict mode)".to_string(),
));
}
} else if prev_client_port != client_port_for_kdf {
self.stats.increment_me_kdf_port_only_drift_total();
debug!(
%peer_addr_nat,
previous_client_port_for_kdf = prev_client_port,
client_port_for_kdf,
client_port_source = ?client_port_source,
"ME KDF client port changed with stable material"
);
}
}
kdf_fingerprint_guard.insert(peer_addr_nat, kdf_fingerprint);
kdf_fingerprint_guard.insert(peer_addr_nat, (kdf_fingerprint, client_port_for_kdf));
drop(kdf_fingerprint_guard);
let client_port_bytes = client_port_for_kdf.to_le_bytes();

View File

@ -395,6 +395,19 @@ async fn maybe_rotate_single_endpoint_shadow(
}
let endpoint = endpoints[0];
if pool.is_endpoint_quarantined(endpoint).await {
pool.stats
.increment_me_single_endpoint_shadow_rotate_skipped_quarantine_total();
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
debug!(
dc = %dc,
?family,
%endpoint,
"Single-endpoint shadow rotation skipped: endpoint is quarantined"
);
return;
}
let Some(writer_ids) = live_writer_ids_by_addr.get(&endpoint) else {
shadow_rotate_deadline.insert(key, now + Duration::from_secs(SHADOW_ROTATE_RETRY_SECS));
return;

View File

@ -127,7 +127,7 @@ pub struct MePool {
pub(super) pending_hardswap_map_hash: AtomicU64,
pub(super) hardswap: AtomicBool,
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, u64>>>,
pub(super) kdf_material_fingerprint: Arc<Mutex<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,

View File

@ -37,7 +37,7 @@ impl MePool {
);
}
async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
pub(super) async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now();
guard.retain(|_, expiry| *expiry > now);