From 844a912b3885d14f45e0ead9efe61bb1bea746dd Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 13:30:59 +0300 Subject: [PATCH] Expose Quota Contention + Cleanup fallback metrics --- src/metrics.rs | 74 +++++++++++++++++++++++++++++++++++++++ src/proxy/middle_relay.rs | 53 ++++++++++++++++++++-------- src/proxy/relay.rs | 19 ++++++++-- src/stats/mod.rs | 49 ++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 18 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 60b1dfe..55ff196 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -739,6 +739,52 @@ async fn render_metrics( } ); + let _ = writeln!( + out, + "# HELP telemt_quota_refund_bytes_total Reserved quota bytes returned before commit" + ); + let _ = writeln!(out, "# TYPE telemt_quota_refund_bytes_total counter"); + let _ = writeln!( + out, + "telemt_quota_refund_bytes_total {}", + if core_enabled { + stats.get_quota_refund_bytes_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_quota_contention_total Quota reservation CAS contention events" + ); + let _ = writeln!(out, "# TYPE telemt_quota_contention_total counter"); + let _ = writeln!( + out, + "telemt_quota_contention_total {}", + if core_enabled { + stats.get_quota_contention_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_quota_contention_timeout_total Quota reservations that hit the bounded contention budget" + ); + let _ = writeln!( + out, + "# TYPE telemt_quota_contention_timeout_total counter" + ); + let _ = writeln!( + out, + "telemt_quota_contention_timeout_total {}", + if core_enabled { + stats.get_quota_contention_timeout_total() + } else { + 0 + } + ); + let _ = writeln!( out, "# HELP telemt_conntrack_control_state Runtime conntrack control state flags" @@ -1955,6 +2001,34 @@ async fn render_metrics( 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_child_join_timeout_total Middle relay child tasks that did not join before cleanup deadline" + ); + let _ = writeln!(out, "# TYPE telemt_me_child_join_timeout_total counter"); + let _ = writeln!( + out, + "telemt_me_child_join_timeout_total {}", + if core_enabled { + stats.get_me_child_join_timeout_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_child_abort_total Middle relay child tasks aborted after bounded cleanup timeout" + ); + let _ = writeln!(out, "# TYPE telemt_me_child_abort_total counter"); + let _ = writeln!( + out, + "telemt_me_child_abort_total {}", + if core_enabled { + stats.get_me_child_abort_total() + } else { + 0 + } + ); let _ = writeln!( out, diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 1865c84..b1ac21a 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -624,6 +624,7 @@ async fn reserve_user_quota_with_yield( user_stats: &UserStats, bytes: u64, limit: u64, + stats: &Stats, ) -> std::result::Result { let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS; let mut backoff_rounds = 0usize; @@ -634,7 +635,10 @@ async fn reserve_user_quota_with_yield( Err(QuotaReserveError::LimitExceeded) => { return Err(QuotaReserveError::LimitExceeded); } - Err(QuotaReserveError::Contended) => std::hint::spin_loop(), + Err(QuotaReserveError::Contended) => { + stats.increment_quota_contention_total(); + std::hint::spin_loop(); + } } } @@ -642,6 +646,7 @@ async fn reserve_user_quota_with_yield( tokio::time::sleep(Duration::from_millis(backoff_ms)).await; backoff_rounds = backoff_rounds.saturating_add(1); if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS { + stats.increment_quota_contention_timeout_total(); return Err(QuotaReserveError::Contended); } backoff_ms = backoff_ms @@ -1656,18 +1661,27 @@ where if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats.as_deref()) { - if reserve_user_quota_with_yield( + match reserve_user_quota_with_yield( user_stats, payload.len() as u64, limit, + stats.as_ref(), ) .await - .is_err() { - main_result = Err(ProxyError::DataQuotaExceeded { - user: user.clone(), - }); - break; + Ok(_) => {} + Err(QuotaReserveError::LimitExceeded) => { + main_result = Err(ProxyError::DataQuotaExceeded { + user: user.clone(), + }); + break; + } + Err(QuotaReserveError::Contended) => { + main_result = Err(ProxyError::Proxy( + "ME C->ME quota reservation contended".into(), + )); + break; + } } stats.add_user_octets_from_handle(user_stats, payload.len() as u64); } else { @@ -1741,6 +1755,8 @@ where joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME sender join error: {e}")))) } Err(_) => { + stats.increment_me_child_join_timeout_total(); + stats.increment_me_child_abort_total(); c2me_sender.abort(); Err(ProxyError::Proxy("ME sender join timeout".into())) } @@ -1752,6 +1768,8 @@ where joined.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}")))) } Err(_) => { + stats.increment_me_child_join_timeout_total(); + stats.increment_me_child_abort_total(); me_writer.abort(); Err(ProxyError::Proxy("ME writer join timeout".into())) } @@ -2357,14 +2375,19 @@ where let data_len = data.len() as u64; if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) { let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes); - if reserve_user_quota_with_yield(user_stats, data_len, soft_limit) - .await - .is_err() - { - stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite); - return Err(ProxyError::DataQuotaExceeded { - user: user.to_string(), - }); + match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats).await { + Ok(_) => {} + Err(QuotaReserveError::LimitExceeded) => { + stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite); + return Err(ProxyError::DataQuotaExceeded { + user: user.to_string(), + }); + } + Err(QuotaReserveError::Contended) => { + return Err(ProxyError::Proxy( + "ME D->C quota reservation contended".into(), + )); + } } } wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await; diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 4d8b827..1ddef74 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -471,13 +471,16 @@ impl AsyncRead for StatsIo { this.quota_exceeded.store(true, Ordering::Release); return Poll::Ready(Err(quota_io_error())); } - Err(crate::stats::QuotaReserveError::Contended) => {} + Err(crate::stats::QuotaReserveError::Contended) => { + this.stats.increment_quota_contention_total(); + } } } if reserved_read_bytes == 0 { reserve_rounds = reserve_rounds.saturating_add(1); if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS { + this.stats.increment_quota_contention_timeout_total(); if this.arm_quota_wait(cx).is_pending() { return Poll::Pending; } @@ -514,10 +517,12 @@ impl AsyncRead for StatsIo { match read_result { Poll::Ready(Ok(n)) => { if reserved_read_bytes > n as u64 { + let refund_bytes = reserved_read_bytes - n as u64; refund_reserved_quota_bytes( this.user_stats.as_ref(), - reserved_read_bytes - n as u64, + refund_bytes, ); + this.stats.add_quota_refund_bytes_total(refund_bytes); } if n > 0 { let n_to_charge = n as u64; @@ -565,12 +570,14 @@ impl AsyncRead for StatsIo { Poll::Pending => { if reserved_read_bytes > 0 { refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes); + this.stats.add_quota_refund_bytes_total(reserved_read_bytes); } Poll::Pending } Poll::Ready(Err(err)) => { if reserved_read_bytes > 0 { refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_read_bytes); + this.stats.add_quota_refund_bytes_total(reserved_read_bytes); } Poll::Ready(Err(err)) } @@ -655,6 +662,7 @@ impl AsyncWrite for StatsIo { break; } Err(crate::stats::QuotaReserveError::Contended) => { + this.stats.increment_quota_contention_total(); saw_contention = true; } } @@ -663,6 +671,7 @@ impl AsyncWrite for StatsIo { if reserved_bytes == 0 { reserve_rounds = reserve_rounds.saturating_add(1); if reserve_rounds >= QUOTA_RESERVE_MAX_ROUNDS { + this.stats.increment_quota_contention_timeout_total(); if let Some(lease) = this.traffic_lease.as_ref() { lease.refund(RateDirection::Down, shaper_reserved_bytes); } @@ -690,10 +699,12 @@ impl AsyncWrite for StatsIo { match Pin::new(&mut this.inner).poll_write(cx, write_buf) { Poll::Ready(Ok(n)) => { if reserved_bytes > n as u64 { + let refund_bytes = reserved_bytes - n as u64; refund_reserved_quota_bytes( this.user_stats.as_ref(), - reserved_bytes - n as u64, + refund_bytes, ); + this.stats.add_quota_refund_bytes_total(refund_bytes); } if shaper_reserved_bytes > n as u64 && let Some(lease) = this.traffic_lease.as_ref() @@ -744,6 +755,7 @@ impl AsyncWrite for StatsIo { Poll::Ready(Err(err)) => { if reserved_bytes > 0 { refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes); + this.stats.add_quota_refund_bytes_total(reserved_bytes); } if shaper_reserved_bytes > 0 && let Some(lease) = this.traffic_lease.as_ref() @@ -755,6 +767,7 @@ impl AsyncWrite for StatsIo { Poll::Pending => { if reserved_bytes > 0 { refund_reserved_quota_bytes(this.user_stats.as_ref(), reserved_bytes); + this.stats.add_quota_refund_bytes_total(reserved_bytes); } if shaper_reserved_bytes > 0 && let Some(lease) = this.traffic_lease.as_ref() diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 792ea8d..b67a045 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -274,8 +274,13 @@ pub struct Stats { me_inline_recovery_total: AtomicU64, ip_reservation_rollback_tcp_limit_total: AtomicU64, ip_reservation_rollback_quota_limit_total: AtomicU64, + quota_refund_bytes_total: AtomicU64, + quota_contention_total: AtomicU64, + quota_contention_timeout_total: AtomicU64, quota_write_fail_bytes_total: AtomicU64, quota_write_fail_events_total: AtomicU64, + me_child_join_timeout_total: AtomicU64, + me_child_abort_total: AtomicU64, telemetry_core_enabled: AtomicBool, telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, @@ -1437,6 +1442,23 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn add_quota_refund_bytes_total(&self, bytes: u64) { + if self.telemetry_core_enabled() { + self.quota_refund_bytes_total + .fetch_add(bytes, Ordering::Relaxed); + } + } + pub fn increment_quota_contention_total(&self) { + if self.telemetry_core_enabled() { + self.quota_contention_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_quota_contention_timeout_total(&self) { + if self.telemetry_core_enabled() { + self.quota_contention_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn add_quota_write_fail_bytes_total(&self, bytes: u64) { if self.telemetry_core_enabled() { self.quota_write_fail_bytes_total @@ -1449,6 +1471,17 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_child_join_timeout_total(&self) { + if self.telemetry_core_enabled() { + self.me_child_join_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_child_abort_total(&self) { + if self.telemetry_core_enabled() { + self.me_child_abort_total.fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_endpoint_quarantine_total(&self) { if self.telemetry_me_allows_normal() { self.me_endpoint_quarantine_total @@ -2283,12 +2316,28 @@ impl Stats { self.ip_reservation_rollback_quota_limit_total .load(Ordering::Relaxed) } + pub fn get_quota_refund_bytes_total(&self) -> u64 { + self.quota_refund_bytes_total.load(Ordering::Relaxed) + } + pub fn get_quota_contention_total(&self) -> u64 { + self.quota_contention_total.load(Ordering::Relaxed) + } + pub fn get_quota_contention_timeout_total(&self) -> u64 { + self.quota_contention_timeout_total + .load(Ordering::Relaxed) + } pub fn get_quota_write_fail_bytes_total(&self) -> u64 { self.quota_write_fail_bytes_total.load(Ordering::Relaxed) } pub fn get_quota_write_fail_events_total(&self) -> u64 { self.quota_write_fail_events_total.load(Ordering::Relaxed) } + pub fn get_me_child_join_timeout_total(&self) -> u64 { + self.me_child_join_timeout_total.load(Ordering::Relaxed) + } + pub fn get_me_child_abort_total(&self) -> u64 { + self.me_child_abort_total.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { if !self.telemetry_user_enabled() {