From 820ed8d346d7cf11811f4db4e21e94d357752a31 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:49:35 +0300 Subject: [PATCH] ME Keepalives Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/main.rs | 1 + src/metrics.rs | 16 ++++++++++++++++ src/stats/mod.rs | 12 ++++++++++++ src/transport/middle_proxy/health.rs | 7 ++++++- src/transport/middle_proxy/pool.rs | 13 ++++++++++++- 5 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index e8883f2..d542b63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -333,6 +333,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), rng.clone(), + stats.clone(), config.general.me_keepalive_enabled, config.general.me_keepalive_interval_secs, config.general.me_keepalive_jitter_secs, diff --git a/src/metrics.rs b/src/metrics.rs index 24acf30..fa6c680 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String { let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts()); + let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent"); + let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter"); + let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent()); + + let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures"); + let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter"); + let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed()); + + let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts"); + let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter"); + let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts()); + + let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes"); + let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter"); + let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success()); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 2cdcdf9..38318cc 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -19,6 +19,10 @@ pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, handshake_timeouts: AtomicU64, + me_keepalive_sent: AtomicU64, + me_keepalive_failed: AtomicU64, + me_reconnect_attempts: AtomicU64, + me_reconnect_success: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -43,8 +47,16 @@ impl Stats { pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); } pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); } pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } + pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } + pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) } + pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index e2cdad6..79c92d6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -99,6 +99,7 @@ async fn check_family( match res { Ok(Ok(())) => { info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage"); + pool.stats.increment_me_reconnect_success(); backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64); let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; let wait = pool.me_reconnect_backoff_base @@ -107,11 +108,15 @@ async fn check_family( success = true; break; } - Ok(Err(e)) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + Ok(Err(e)) => { + pool.stats.increment_me_reconnect_attempt(); + debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed") + } Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"), } } if !success { + pool.stats.increment_me_reconnect_attempt(); let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64)); let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64); backoff.insert(key, next_ms); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 5106004..250b922 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -74,6 +74,7 @@ pub struct MePool { pub(super) nat_reflection_cache: Arc>, pub(super) writer_available: Arc, pub(super) conn_count: AtomicUsize, + pub(super) stats: Arc, pool_size: usize, } @@ -99,6 +100,7 @@ impl MePool { default_dc: Option, decision: NetworkDecision, rng: Arc, + stats: Arc, me_keepalive_enabled: bool, me_keepalive_interval_secs: u64, me_keepalive_jitter_secs: u64, @@ -130,6 +132,7 @@ impl MePool { stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), + stats, me_keepalive_enabled, me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), @@ -440,7 +443,15 @@ impl MePool { if keepalive_random { rand::rng().fill(&mut payload); } - if rpc_writer.send_keepalive(payload).await.is_err() { break; } + match rpc_writer.send_keepalive(payload).await { + Ok(()) => { + stats.increment_me_keepalive_sent(); + } + Err(_) => { + stats.increment_me_keepalive_failed(); + break; + } + } } Some(WriterCommand::Close) | None => break, }