From 3f9ac87daf5a1526727c7d3202522f0ac56ad602 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 13:33:54 +0300 Subject: [PATCH] Bounded Rate Bursts + Cancel ME Waits --- src/config/types.rs | 14 ++++++-- src/metrics.rs | 26 ++++++++++++++ src/proxy/middle_relay.rs | 74 +++++++++++++++++++++++++++++++++++---- 3 files changed, 106 insertions(+), 8 deletions(-) diff --git a/src/config/types.rs b/src/config/types.rs index 30031d1..d20dd7e 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -543,10 +543,17 @@ pub struct GeneralConfig { pub me_d2c_frame_buf_shrink_threshold_bytes: usize, /// Copy buffer size for client->DC direction in direct relay. + /// + /// This is also the upper bound for one amortized upload rate-limit burst: + /// upload debt is settled before the next relay read instead of blocking + /// inside the completed read path. #[serde(default = "default_direct_relay_copy_buf_c2s_bytes")] pub direct_relay_copy_buf_c2s_bytes: usize, /// Copy buffer size for DC->client direction in direct relay. + /// + /// This bounds one direct download rate-limit grant because writes are + /// clipped to the currently available shaper budget. #[serde(default = "default_direct_relay_copy_buf_s2c_bytes")] pub direct_relay_copy_buf_s2c_bytes: usize, @@ -1891,14 +1898,17 @@ pub struct AccessConfig { /// /// Each entry supports independent upload (`up_bps`) and download /// (`down_bps`) ceilings. A value of `0` in one direction means - /// "unlimited" for that direction. + /// "unlimited" for that direction. Limits are amortized: a relay quantum + /// may pass as a bounded burst, and the limiter applies the resulting wait + /// before later traffic in the same direction proceeds. #[serde(default)] pub user_rate_limits: HashMap, /// Per-CIDR aggregate transport rate limits in bits-per-second. /// /// Matching uses longest-prefix-wins semantics. A value of `0` in one - /// direction means "unlimited" for that direction. + /// direction means "unlimited" for that direction. Limits are amortized + /// with the same bounded-burst contract as per-user rate limits. #[serde(default)] pub cidr_rate_limits: HashMap, diff --git a/src/metrics.rs b/src/metrics.rs index 55ff196..2762e9d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -899,6 +899,32 @@ async fn render_metrics( ); let limiter_metrics = shared_state.traffic_limiter.metrics_snapshot(); + let _ = writeln!( + out, + "# HELP telemt_rate_limiter_burst_bound_bytes Configured upper bound for one direct relay rate-limit burst" + ); + let _ = writeln!( + out, + "# TYPE telemt_rate_limiter_burst_bound_bytes gauge" + ); + let _ = writeln!( + out, + "telemt_rate_limiter_burst_bound_bytes{{direction=\"up\"}} {}", + if core_enabled { + config.general.direct_relay_copy_buf_c2s_bytes + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_rate_limiter_burst_bound_bytes{{direction=\"down\"}} {}", + if core_enabled { + config.general.direct_relay_copy_buf_s2c_bytes + } else { + 0 + } + ); let _ = writeln!( out, "# HELP telemt_rate_limiter_throttle_total Traffic limiter throttle events by scope and direction" diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index b1ac21a..e09daae 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -14,6 +14,7 @@ use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot, watch}; use tokio::time::timeout; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, trace, warn}; use crate::config::{ConntrackPressureProfile, ProxyConfig}; @@ -659,12 +660,12 @@ async fn wait_for_traffic_budget( lease: Option<&Arc>, direction: RateDirection, bytes: u64, -) { +) -> Result<()> { if bytes == 0 { - return; + return Ok(()); } let Some(lease) = lease else { - return; + return Ok(()); }; let mut remaining = bytes; @@ -688,6 +689,51 @@ async fn wait_for_traffic_budget( wait_ms, ); } + + Ok(()) +} + +async fn wait_for_traffic_budget_or_cancel( + lease: Option<&Arc>, + direction: RateDirection, + bytes: u64, + cancel: &CancellationToken, +) -> Result<()> { + if bytes == 0 { + return Ok(()); + } + let Some(lease) = lease else { + return Ok(()); + }; + + let mut remaining = bytes; + while remaining > 0 { + let consume = lease.try_consume(direction, remaining); + if consume.granted > 0 { + remaining = remaining.saturating_sub(consume.granted); + continue; + } + + let wait_started_at = Instant::now(); + tokio::select! { + _ = tokio::time::sleep(next_refill_delay()) => {} + _ = cancel.cancelled() => { + return Err(ProxyError::Proxy("traffic budget wait cancelled".into())); + } + } + let wait_ms = wait_started_at + .elapsed() + .as_millis() + .min(u128::from(u64::MAX)) as u64; + lease.observe_wait_ms( + direction, + consume.blocked_user, + consume.blocked_cidr, + wait_ms, + ); + } + + Ok(()) } fn classify_me_d2c_flush_reason( @@ -1217,12 +1263,14 @@ where }); let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + let flow_cancel = CancellationToken::new(); let mut me_rx_task = me_rx; let stats_clone = stats.clone(); let rng_clone = rng.clone(); let user_clone = user.clone(); let quota_user_stats_me_writer = quota_user_stats.clone(); let traffic_lease_me_writer = traffic_lease.clone(); + let flow_cancel_me_writer = flow_cancel.clone(); let last_downstream_activity_ms_clone = last_downstream_activity_ms.clone(); let bytes_me2c_clone = bytes_me2c.clone(); let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config); @@ -1268,6 +1316,7 @@ where quota_limit, d2c_flush_policy.quota_soft_overshoot_bytes, traffic_lease_me_writer.as_ref(), + &flow_cancel_me_writer, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -1329,6 +1378,7 @@ where quota_limit, d2c_flush_policy.quota_soft_overshoot_bytes, traffic_lease_me_writer.as_ref(), + &flow_cancel_me_writer, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -1393,6 +1443,7 @@ where quota_limit, d2c_flush_policy.quota_soft_overshoot_bytes, traffic_lease_me_writer.as_ref(), + &flow_cancel_me_writer, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -1459,6 +1510,7 @@ where quota_limit, d2c_flush_policy.quota_soft_overshoot_bytes, traffic_lease_me_writer.as_ref(), + &flow_cancel_me_writer, bytes_me2c_clone.as_ref(), conn_id, d2c_flush_policy.ack_flush_immediate, @@ -1654,7 +1706,7 @@ where RateDirection::Up, payload.len() as u64, ) - .await; + .await?; forensics.bytes_c2me = forensics .bytes_c2me .saturating_add(payload.len() as u64); @@ -1762,6 +1814,7 @@ where } }; + flow_cancel.cancel(); let _ = stop_tx.send(()); let mut writer_result = match timeout(ME_CHILD_JOIN_TIMEOUT, &mut me_writer).await { Ok(joined) => { @@ -2337,6 +2390,7 @@ where quota_limit, quota_soft_overshoot_bytes, None, + &CancellationToken::new(), bytes_me2c, conn_id, ack_flush_immediate, @@ -2357,6 +2411,7 @@ async fn process_me_writer_response_with_traffic_lease( quota_limit: Option, quota_soft_overshoot_bytes: u64, traffic_lease: Option<&Arc>, + cancel: &CancellationToken, bytes_me2c: &AtomicU64, conn_id: u64, ack_flush_immediate: bool, @@ -2390,7 +2445,13 @@ where } } } - wait_for_traffic_budget(traffic_lease, RateDirection::Down, data_len).await; + wait_for_traffic_budget_or_cancel( + traffic_lease, + RateDirection::Down, + data_len, + cancel, + ) + .await?; let write_mode = match write_client_payload(client_writer, proto_tag, flags, &data, rng, frame_buf) @@ -2428,7 +2489,8 @@ where } else { trace!(conn_id, confirm, "ME->C quickack"); } - wait_for_traffic_budget(traffic_lease, RateDirection::Down, 4).await; + wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel) + .await?; write_client_ack(client_writer, proto_tag, confirm).await?; stats.increment_me_d2c_ack_frames_total();