diff --git a/src/metrics.rs b/src/metrics.rs index 2762e9d..da290c3 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -784,6 +784,20 @@ async fn render_metrics( 0 } ); + let _ = writeln!( + out, + "# HELP telemt_quota_acquire_cancelled_total Quota acquisitions cancelled before reservation completed" + ); + let _ = writeln!(out, "# TYPE telemt_quota_acquire_cancelled_total counter"); + let _ = writeln!( + out, + "telemt_quota_acquire_cancelled_total {}", + if core_enabled { + stats.get_quota_acquire_cancelled_total() + } else { + 0 + } + ); let _ = writeln!( out, @@ -2055,6 +2069,43 @@ async fn render_metrics( 0 } ); + let _ = writeln!( + out, + "# HELP telemt_flow_wait_events_total Flow wait events by reason, direction, and outcome" + ); + let _ = writeln!(out, "# TYPE telemt_flow_wait_events_total counter"); + let _ = writeln!( + out, + "telemt_flow_wait_events_total{{reason=\"middle_rate_limit\",direction=\"down\",outcome=\"waited\"}} {}", + if core_enabled { + stats.get_flow_wait_middle_rate_limit_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_flow_wait_events_total{{reason=\"middle_rate_limit\",direction=\"down\",outcome=\"cancelled\"}} {}", + if core_enabled { + stats.get_flow_wait_middle_rate_limit_cancelled_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_flow_wait_ms_total Flow wait time in milliseconds by reason and direction" + ); + let _ = writeln!(out, "# TYPE telemt_flow_wait_ms_total counter"); + let _ = writeln!( + out, + "telemt_flow_wait_ms_total{{reason=\"middle_rate_limit\",direction=\"down\"}} {}", + if core_enabled { + stats.get_flow_wait_middle_rate_limit_ms_total() + } else { + 0 + } + ); let _ = writeln!( out, diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index e09daae..cb6508b 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -69,6 +69,12 @@ const QUOTA_RESERVE_BACKOFF_MAX_MS: u64 = 16; const QUOTA_RESERVE_MAX_BACKOFF_ROUNDS: usize = 16; const ME_CHILD_JOIN_TIMEOUT: Duration = Duration::from_secs(2); +enum MiddleQuotaReserveError { + LimitExceeded, + Contended, + Cancelled, +} + #[derive(Default)] pub(crate) struct DesyncDedupRotationState { current_started_at: Option, @@ -626,7 +632,8 @@ async fn reserve_user_quota_with_yield( bytes: u64, limit: u64, stats: &Stats, -) -> std::result::Result { + cancel: &CancellationToken, +) -> std::result::Result { let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS; let mut backoff_rounds = 0usize; loop { @@ -634,7 +641,7 @@ async fn reserve_user_quota_with_yield( match user_stats.quota_try_reserve(bytes, limit) { Ok(total) => return Ok(total), Err(QuotaReserveError::LimitExceeded) => { - return Err(QuotaReserveError::LimitExceeded); + return Err(MiddleQuotaReserveError::LimitExceeded); } Err(QuotaReserveError::Contended) => { stats.increment_quota_contention_total(); @@ -644,11 +651,17 @@ async fn reserve_user_quota_with_yield( } tokio::task::yield_now().await; - tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} + _ = cancel.cancelled() => { + stats.increment_quota_acquire_cancelled_total(); + return Err(MiddleQuotaReserveError::Cancelled); + } + } backoff_rounds = backoff_rounds.saturating_add(1); if backoff_rounds >= QUOTA_RESERVE_MAX_BACKOFF_ROUNDS { stats.increment_quota_contention_timeout_total(); - return Err(QuotaReserveError::Contended); + return Err(MiddleQuotaReserveError::Contended); } backoff_ms = backoff_ms .saturating_mul(2) @@ -698,6 +711,7 @@ async fn wait_for_traffic_budget_or_cancel( direction: RateDirection, bytes: u64, cancel: &CancellationToken, + stats: &Stats, ) -> Result<()> { if bytes == 0 { return Ok(()); @@ -718,6 +732,7 @@ async fn wait_for_traffic_budget_or_cancel( tokio::select! { _ = tokio::time::sleep(next_refill_delay()) => {} _ = cancel.cancelled() => { + stats.increment_flow_wait_middle_rate_limit_cancelled_total(); return Err(ProxyError::Proxy("traffic budget wait cancelled".into())); } } @@ -731,6 +746,7 @@ async fn wait_for_traffic_budget_or_cancel( consume.blocked_cidr, wait_ms, ); + stats.observe_flow_wait_middle_rate_limit_ms(wait_ms); } Ok(()) @@ -1718,22 +1734,29 @@ where payload.len() as u64, limit, stats.as_ref(), + &flow_cancel, ) .await { Ok(_) => {} - Err(QuotaReserveError::LimitExceeded) => { + Err(MiddleQuotaReserveError::LimitExceeded) => { main_result = Err(ProxyError::DataQuotaExceeded { user: user.clone(), }); break; } - Err(QuotaReserveError::Contended) => { + Err(MiddleQuotaReserveError::Contended) => { main_result = Err(ProxyError::Proxy( "ME C->ME quota reservation contended".into(), )); break; } + Err(MiddleQuotaReserveError::Cancelled) => { + main_result = Err(ProxyError::Proxy( + "ME C->ME quota reservation cancelled".into(), + )); + break; + } } stats.add_user_octets_from_handle(user_stats, payload.len() as u64); } else { @@ -2430,19 +2453,26 @@ where let data_len = data.len() as u64; if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) { let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes); - match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats).await { + match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats, cancel) + .await + { Ok(_) => {} - Err(QuotaReserveError::LimitExceeded) => { + Err(MiddleQuotaReserveError::LimitExceeded) => { stats.increment_me_d2c_quota_reject_total(MeD2cQuotaRejectStage::PreWrite); return Err(ProxyError::DataQuotaExceeded { user: user.to_string(), }); } - Err(QuotaReserveError::Contended) => { + Err(MiddleQuotaReserveError::Contended) => { return Err(ProxyError::Proxy( "ME D->C quota reservation contended".into(), )); } + Err(MiddleQuotaReserveError::Cancelled) => { + return Err(ProxyError::Proxy( + "ME D->C quota reservation cancelled".into(), + )); + } } } wait_for_traffic_budget_or_cancel( @@ -2450,6 +2480,7 @@ where RateDirection::Down, data_len, cancel, + stats, ) .await?; @@ -2489,7 +2520,7 @@ where } else { trace!(conn_id, confirm, "ME->C quickack"); } - wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel) + wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel, stats) .await?; write_client_ack(client_writer, proto_tag, confirm).await?; stats.increment_me_d2c_ack_frames_total(); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index b67a045..20697fb 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -277,10 +277,14 @@ pub struct Stats { quota_refund_bytes_total: AtomicU64, quota_contention_total: AtomicU64, quota_contention_timeout_total: AtomicU64, + quota_acquire_cancelled_total: AtomicU64, quota_write_fail_bytes_total: AtomicU64, quota_write_fail_events_total: AtomicU64, me_child_join_timeout_total: AtomicU64, me_child_abort_total: AtomicU64, + flow_wait_middle_rate_limit_total: AtomicU64, + flow_wait_middle_rate_limit_cancelled_total: AtomicU64, + flow_wait_middle_rate_limit_ms_total: AtomicU64, telemetry_core_enabled: AtomicBool, telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, @@ -1459,6 +1463,12 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_quota_acquire_cancelled_total(&self) { + if self.telemetry_core_enabled() { + self.quota_acquire_cancelled_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn add_quota_write_fail_bytes_total(&self, bytes: u64) { if self.telemetry_core_enabled() { self.quota_write_fail_bytes_total @@ -1482,6 +1492,20 @@ impl Stats { self.me_child_abort_total.fetch_add(1, Ordering::Relaxed); } } + pub fn observe_flow_wait_middle_rate_limit_ms(&self, wait_ms: u64) { + if self.telemetry_core_enabled() { + self.flow_wait_middle_rate_limit_total + .fetch_add(1, Ordering::Relaxed); + self.flow_wait_middle_rate_limit_ms_total + .fetch_add(wait_ms, Ordering::Relaxed); + } + } + pub fn increment_flow_wait_middle_rate_limit_cancelled_total(&self) { + if self.telemetry_core_enabled() { + self.flow_wait_middle_rate_limit_cancelled_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_endpoint_quarantine_total(&self) { if self.telemetry_me_allows_normal() { self.me_endpoint_quarantine_total @@ -2326,6 +2350,9 @@ impl Stats { self.quota_contention_timeout_total .load(Ordering::Relaxed) } + pub fn get_quota_acquire_cancelled_total(&self) -> u64 { + self.quota_acquire_cancelled_total.load(Ordering::Relaxed) + } pub fn get_quota_write_fail_bytes_total(&self) -> u64 { self.quota_write_fail_bytes_total.load(Ordering::Relaxed) } @@ -2338,6 +2365,18 @@ impl Stats { pub fn get_me_child_abort_total(&self) -> u64 { self.me_child_abort_total.load(Ordering::Relaxed) } + pub fn get_flow_wait_middle_rate_limit_total(&self) -> u64 { + self.flow_wait_middle_rate_limit_total + .load(Ordering::Relaxed) + } + pub fn get_flow_wait_middle_rate_limit_cancelled_total(&self) -> u64 { + self.flow_wait_middle_rate_limit_cancelled_total + .load(Ordering::Relaxed) + } + pub fn get_flow_wait_middle_rate_limit_ms_total(&self) -> u64 { + self.flow_wait_middle_rate_limit_ms_total + .load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { if !self.telemetry_user_enabled() {