From c6c3d71b08954cc0279fd23d40a7fec08a01c072 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Feb 2026 01:26:01 +0300 Subject: [PATCH] ME Pool Flap-Detect in statistics --- src/metrics.rs | 93 ++++++++++++++++++++++++++++++ src/stats/mod.rs | 49 ++++++++++++++++ src/transport/middle_proxy/pool.rs | 10 ++++ 3 files changed, 152 insertions(+) diff --git a/src/metrics.rs b/src/metrics.rs index 53ddd5d..0051858 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -199,6 +199,95 @@ fn render_metrics(stats: &Stats) -> String { stats.get_pool_stale_pick_total() ); + let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); + let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_removed_total {}", + stats.get_me_writer_removed_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_removed_unexpected_total Unexpected ME writer removals that triggered refill" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_removed_unexpected_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_removed_unexpected_total {}", + stats.get_me_writer_removed_unexpected_total() + ); + + let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started"); + let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter"); + let _ = writeln!( + out, + "telemt_me_refill_triggered_total {}", + stats.get_me_refill_triggered_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_refill_skipped_inflight_total Immediate ME refill skips due to inflight dedup" + ); + let _ = writeln!(out, "# TYPE telemt_me_refill_skipped_inflight_total counter"); + let _ = writeln!( + out, + "telemt_me_refill_skipped_inflight_total {}", + stats.get_me_refill_skipped_inflight_total() + ); + + let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures"); + let _ = writeln!(out, "# TYPE telemt_me_refill_failed_total counter"); + let _ = writeln!( + out, + "telemt_me_refill_failed_total {}", + stats.get_me_refill_failed_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_restored_same_endpoint_total Refilled ME writer restored on the same endpoint" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_restored_same_endpoint_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_restored_same_endpoint_total {}", + stats.get_me_writer_restored_same_endpoint_total() + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_restored_fallback_total Refilled ME writer restored via fallback endpoint" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_restored_fallback_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_restored_fallback_total {}", + stats.get_me_writer_restored_fallback_total() + ); + + let unresolved_writer_losses = stats + .get_me_writer_removed_unexpected_total() + .saturating_sub( + stats + .get_me_writer_restored_same_endpoint_total() + .saturating_add(stats.get_me_writer_restored_fallback_total()), + ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" + ); + let _ = writeln!( + out, + "telemt_me_writer_removed_unexpected_minus_restored_total {}", + unresolved_writer_losses + ); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); @@ -277,6 +366,10 @@ mod tests { assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" + )); } #[tokio::test] diff --git a/src/stats/mod.rs b/src/stats/mod.rs index a58996d..5f4c98e 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -43,6 +43,13 @@ pub struct Stats { pool_drain_active: AtomicU64, pool_force_close_total: AtomicU64, pool_stale_pick_total: AtomicU64, + me_writer_removed_total: AtomicU64, + me_writer_removed_unexpected_total: AtomicU64, + me_refill_triggered_total: AtomicU64, + me_refill_skipped_inflight_total: AtomicU64, + me_refill_failed_total: AtomicU64, + me_writer_restored_same_endpoint_total: AtomicU64, + me_writer_restored_fallback_total: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -142,6 +149,27 @@ impl Stats { pub fn increment_pool_stale_pick_total(&self) { self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); } + pub fn increment_me_writer_removed_total(&self) { + self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_writer_removed_unexpected_total(&self) { + self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_refill_triggered_total(&self) { + self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_refill_skipped_inflight_total(&self) { + self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_refill_failed_total(&self) { + self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_writer_restored_same_endpoint_total(&self) { + self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_me_writer_restored_fallback_total(&self) { + self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed); + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -195,6 +223,27 @@ impl Stats { pub fn get_pool_stale_pick_total(&self) -> u64 { self.pool_stale_pick_total.load(Ordering::Relaxed) } + pub fn get_me_writer_removed_total(&self) -> u64 { + self.me_writer_removed_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_removed_unexpected_total(&self) -> u64 { + self.me_writer_removed_unexpected_total.load(Ordering::Relaxed) + } + pub fn get_me_refill_triggered_total(&self) -> u64 { + self.me_refill_triggered_total.load(Ordering::Relaxed) + } + pub fn get_me_refill_skipped_inflight_total(&self) -> u64 { + self.me_refill_skipped_inflight_total.load(Ordering::Relaxed) + } + pub fn get_me_refill_failed_total(&self) -> u64 { + self.me_refill_failed_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_restored_same_endpoint_total(&self) -> u64 { + self.me_writer_restored_same_endpoint_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_restored_fallback_total(&self) -> u64 { + self.me_writer_restored_fallback_total.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index aa14e5b..e5aebe4 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -708,6 +708,7 @@ impl MePool { match self.connect_one(addr, self.rng.as_ref()).await { Ok(()) => { self.stats.increment_me_reconnect_success(); + self.stats.increment_me_writer_restored_same_endpoint_total(); info!( %addr, attempt = attempt + 1, @@ -728,6 +729,7 @@ impl MePool { let dc_endpoints = self.endpoints_for_same_dc(addr).await; if dc_endpoints.is_empty() { + self.stats.increment_me_refill_failed_total(); return false; } @@ -738,6 +740,7 @@ impl MePool { .await { self.stats.increment_me_reconnect_success(); + self.stats.increment_me_writer_restored_fallback_total(); info!( %addr, attempt = attempt + 1, @@ -747,6 +750,7 @@ impl MePool { } } + self.stats.increment_me_refill_failed_total(); false } @@ -756,9 +760,11 @@ impl MePool { { let mut guard = pool.refill_inflight.lock().await; if !guard.insert(addr) { + pool.stats.increment_me_refill_skipped_inflight_total(); return; } } + pool.stats.increment_me_refill_triggered_total(); let restored = pool.refill_writer_after_loss(addr).await; if !restored { @@ -1189,9 +1195,13 @@ impl MePool { if was_draining { self.stats.decrement_pool_drain_active(); } + self.stats.increment_me_writer_removed_total(); w.cancel.cancel(); removed_addr = Some(w.addr); trigger_refill = !was_draining; + if trigger_refill { + self.stats.increment_me_writer_removed_unexpected_total(); + } close_tx = Some(w.tx.clone()); self.conn_count.fetch_sub(1, Ordering::Relaxed); }