From 70479c4094a262c6770996d85adacb2e00e475fe Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Mar 2026 22:25:39 +0300 Subject: [PATCH] Unexpected-only Quarantine Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/metrics.rs | 46 +++++++++++++++++++ src/stats/mod.rs | 22 +++++++++ src/transport/middle_proxy/pool_refill.rs | 2 + src/transport/middle_proxy/pool_writer.rs | 17 ++++++- .../tests/pool_writer_security_tests.rs | 43 +++++++++++++++-- 7 files changed, 125 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f79a2ea..cf52770 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2793,7 +2793,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "telemt" -version = "3.3.31" +version = "3.3.32" dependencies = [ "aes", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index ebadbb5..62b3b13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.31" +version = "3.3.32" edition = "2024" [features] diff --git a/src/metrics.rs b/src/metrics.rs index c125ef5..2c87ed6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1558,6 +1558,40 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_endpoint_quarantine_unexpected_total ME endpoint quarantines caused by unexpected writer removals" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_endpoint_quarantine_unexpected_total counter" + ); + let _ = writeln!( + out, + "telemt_me_endpoint_quarantine_unexpected_total {}", + if me_allows_normal { + stats.get_me_endpoint_quarantine_unexpected_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_endpoint_quarantine_draining_suppressed_total Draining writer removals that skipped endpoint quarantine" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_endpoint_quarantine_draining_suppressed_total counter" + ); + let _ = writeln!( + out, + "telemt_me_endpoint_quarantine_draining_suppressed_total {}", + if me_allows_normal { + stats.get_me_endpoint_quarantine_draining_suppressed_total() + } else { + 0 + } + ); let _ = writeln!( out, @@ -2622,6 +2656,9 @@ mod tests { stats.increment_me_d2c_write_mode(crate::stats::MeD2cWriteMode::Coalesced); stats.increment_me_d2c_quota_reject_total(crate::stats::MeD2cQuotaRejectStage::PostWrite); stats.observe_me_d2c_frame_buf_shrink(4096); + stats.increment_me_endpoint_quarantine_total(); + stats.increment_me_endpoint_quarantine_unexpected_total(); + stats.increment_me_endpoint_quarantine_draining_suppressed_total(); stats.increment_user_connects("alice"); stats.increment_user_curr_connects("alice"); stats.add_user_octets_from("alice", 1024); @@ -2672,6 +2709,9 @@ mod tests { assert!(output.contains("telemt_me_d2c_quota_reject_total{stage=\"post_write\"} 1")); assert!(output.contains("telemt_me_d2c_frame_buf_shrink_total 1")); assert!(output.contains("telemt_me_d2c_frame_buf_shrink_bytes_total 4096")); + assert!(output.contains("telemt_me_endpoint_quarantine_total 1")); + assert!(output.contains("telemt_me_endpoint_quarantine_unexpected_total 1")); + assert!(output.contains("telemt_me_endpoint_quarantine_draining_suppressed_total 1")); assert!(output.contains("telemt_user_connections_total{user=\"alice\"} 1")); assert!(output.contains("telemt_user_connections_current{user=\"alice\"} 1")); assert!(output.contains("telemt_user_octets_from_client{user=\"alice\"} 1024")); @@ -2738,6 +2778,12 @@ mod tests { assert!(output.contains("# TYPE telemt_me_d2c_write_mode_total counter")); assert!(output.contains("# TYPE telemt_me_d2c_batch_frames_bucket_total counter")); assert!(output.contains("# TYPE telemt_me_d2c_flush_duration_us_bucket_total counter")); + assert!(output.contains("# TYPE telemt_me_endpoint_quarantine_total counter")); + assert!(output.contains("# TYPE telemt_me_endpoint_quarantine_unexpected_total counter")); + assert!( + output + .contains("# TYPE telemt_me_endpoint_quarantine_draining_suppressed_total counter") + ); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!( output diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 4144f82..2d1f413 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -128,6 +128,8 @@ pub struct Stats { me_crc_mismatch: AtomicU64, me_seq_mismatch: AtomicU64, me_endpoint_quarantine_total: AtomicU64, + me_endpoint_quarantine_unexpected_total: AtomicU64, + me_endpoint_quarantine_draining_suppressed_total: AtomicU64, me_kdf_drift_total: AtomicU64, me_kdf_port_only_drift_total: AtomicU64, me_hardswap_pending_reuse_total: AtomicU64, @@ -1251,6 +1253,18 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_endpoint_quarantine_unexpected_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_endpoint_quarantine_unexpected_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_endpoint_quarantine_draining_suppressed_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_endpoint_quarantine_draining_suppressed_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_kdf_drift_total(&self) { if self.telemetry_me_allows_normal() { self.me_kdf_drift_total.fetch_add(1, Ordering::Relaxed); @@ -1503,6 +1517,14 @@ impl Stats { pub fn get_me_endpoint_quarantine_total(&self) -> u64 { self.me_endpoint_quarantine_total.load(Ordering::Relaxed) } + pub fn get_me_endpoint_quarantine_unexpected_total(&self) -> u64 { + self.me_endpoint_quarantine_unexpected_total + .load(Ordering::Relaxed) + } + pub fn get_me_endpoint_quarantine_draining_suppressed_total(&self) -> u64 { + self.me_endpoint_quarantine_draining_suppressed_total + .load(Ordering::Relaxed) + } pub fn get_me_kdf_drift_total(&self) -> u64 { self.me_kdf_drift_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index ddeb3e3..06e450b 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -33,6 +33,7 @@ impl MePool { &self, addr: SocketAddr, uptime: Duration, + reason: &'static str, ) { if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) { return; @@ -45,6 +46,7 @@ impl MePool { self.stats.increment_me_endpoint_quarantine_total(); warn!( %addr, + reason, uptime_ms = uptime.as_millis(), quarantine_secs = ME_FLAP_QUARANTINE_SECS, "ME endpoint temporarily quarantined due to rapid writer flap" diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 75f2d65..0c31ecb 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -609,8 +609,21 @@ impl MePool { } if let Some(addr) = removed_addr { if let Some(uptime) = removed_uptime { - // Quarantine flapping endpoints regardless of draining state. - self.maybe_quarantine_flapping_endpoint(addr, uptime).await; + // Quarantine contract: only unexpected removals are considered endpoint flap. + if trigger_refill { + self.stats + .increment_me_endpoint_quarantine_unexpected_total(); + self.maybe_quarantine_flapping_endpoint(addr, uptime, "unexpected") + .await; + } else { + self.stats + .increment_me_endpoint_quarantine_draining_suppressed_total(); + debug!( + %addr, + uptime_ms = uptime.as_millis(), + "Skipping endpoint quarantine for draining writer removal" + ); + } } if trigger_refill && let Some(writer_dc) = removed_dc { self.trigger_immediate_refill_for_dc(addr, writer_dc); diff --git a/src/transport/middle_proxy/tests/pool_writer_security_tests.rs b/src/transport/middle_proxy/tests/pool_writer_security_tests.rs index fc5135b..d6ab8d6 100644 --- a/src/transport/middle_proxy/tests/pool_writer_security_tests.rs +++ b/src/transport/middle_proxy/tests/pool_writer_security_tests.rs @@ -173,10 +173,15 @@ async fn bind_conn_to_writer(pool: &Arc, writer_id: u64, port: u16) -> u } #[tokio::test] -async fn remove_draining_writer_still_quarantines_flapping_endpoint() { +async fn remove_draining_writer_does_not_quarantine_flapping_endpoint() { let pool = make_pool().await; let writer_id = 77; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 12, 0, 77)), 443); + let before_total = pool.stats.get_me_endpoint_quarantine_total(); + let before_unexpected = pool.stats.get_me_endpoint_quarantine_unexpected_total(); + let before_suppressed = pool + .stats + .get_me_endpoint_quarantine_draining_suppressed_total(); insert_writer( &pool, writer_id, @@ -200,8 +205,18 @@ async fn remove_draining_writer_still_quarantines_flapping_endpoint() { "writer must be removed from pool after cleanup" ); assert!( - pool.is_endpoint_quarantined(addr).await, - "draining removals must still quarantine flapping endpoints" + !pool.is_endpoint_quarantined(addr).await, + "draining removals must not quarantine endpoint" + ); + assert_eq!(pool.stats.get_me_endpoint_quarantine_total(), before_total); + assert_eq!( + pool.stats.get_me_endpoint_quarantine_unexpected_total(), + before_unexpected + ); + assert_eq!( + pool.stats + .get_me_endpoint_quarantine_draining_suppressed_total(), + before_suppressed + 1 ); assert_eq!(pool.conn_count.load(Ordering::Relaxed), 0); } @@ -257,16 +272,21 @@ async fn edge_draining_only_detach_rejects_active_writer() { } #[tokio::test] -async fn adversarial_blackhat_single_remove_establishes_single_quarantine_entry() { +async fn adversarial_blackhat_single_unexpected_remove_establishes_single_quarantine_entry() { let pool = make_pool().await; let writer_id = 93; let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 12, 0, 93)), 443); + let before_total = pool.stats.get_me_endpoint_quarantine_total(); + let before_unexpected = pool.stats.get_me_endpoint_quarantine_unexpected_total(); + let before_suppressed = pool + .stats + .get_me_endpoint_quarantine_draining_suppressed_total(); insert_writer( &pool, writer_id, 2, addr, - true, + false, Instant::now() - Duration::from_secs(1), ) .await; @@ -274,6 +294,19 @@ async fn adversarial_blackhat_single_remove_establishes_single_quarantine_entry( pool.remove_writer_and_close_clients(writer_id).await; assert!(pool.is_endpoint_quarantined(addr).await); assert_eq!(pool.endpoint_quarantine.lock().await.len(), 1); + assert_eq!( + pool.stats.get_me_endpoint_quarantine_total(), + before_total + 1 + ); + assert_eq!( + pool.stats.get_me_endpoint_quarantine_unexpected_total(), + before_unexpected + 1 + ); + assert_eq!( + pool.stats + .get_me_endpoint_quarantine_draining_suppressed_total(), + before_suppressed + ); } #[tokio::test]