From beed6b4679b662748571eb8cf2f475fd5b7d12ad Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 13:58:02 +0300 Subject: [PATCH] Middle Wait Deadlines + Tighten Session Release State --- src/proxy/client.rs | 14 ++++++++---- src/proxy/middle_relay.rs | 46 +++++++++++++++++++++++++++++++++++---- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 2188e14..c022a8c 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -59,24 +59,30 @@ impl UserConnectionReservation { } } - async fn release(mut self) { + fn mark_released(&mut self) -> bool { if self.state != SessionReservationState::Active { + return false; + } + self.state = SessionReservationState::Released; + true + } + + async fn release(mut self) { + if !self.mark_released() { return; } if self.tracks_ip { self.ip_tracker.remove_ip(&self.user, self.ip).await; } - self.state = SessionReservationState::Released; self.stats.decrement_user_curr_connects(&self.user); } } impl Drop for UserConnectionReservation { fn drop(&mut self) { - if self.state != SessionReservationState::Active { + if !self.mark_released() { return; } - self.state = SessionReservationState::Released; self.stats.increment_session_drop_fallback_total(); self.stats.decrement_user_curr_connects(&self.user); if self.tracks_ip { diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index d67531a..425cff4 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -72,6 +72,7 @@ enum MiddleQuotaReserveError { LimitExceeded, Contended, Cancelled, + DeadlineExceeded, } #[derive(Default)] @@ -632,6 +633,7 @@ async fn reserve_user_quota_with_yield( limit: u64, stats: &Stats, cancel: &CancellationToken, + deadline: Option, ) -> std::result::Result { let mut backoff_ms = QUOTA_RESERVE_BACKOFF_MIN_MS; let mut backoff_rounds = 0usize; @@ -650,6 +652,10 @@ async fn reserve_user_quota_with_yield( } tokio::task::yield_now().await; + if deadline.is_some_and(|deadline| Instant::now() >= deadline) { + stats.increment_quota_contention_timeout_total(); + return Err(MiddleQuotaReserveError::DeadlineExceeded); + } tokio::select! { _ = tokio::time::sleep(Duration::from_millis(backoff_ms)) => {} _ = cancel.cancelled() => { @@ -672,6 +678,7 @@ async fn wait_for_traffic_budget( lease: Option<&Arc>, direction: RateDirection, bytes: u64, + deadline: Option, ) -> Result<()> { if bytes == 0 { return Ok(()); @@ -689,6 +696,9 @@ async fn wait_for_traffic_budget( } let wait_started_at = Instant::now(); + if deadline.is_some_and(|deadline| wait_started_at >= deadline) { + return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into())); + } tokio::time::sleep(next_refill_delay()).await; let wait_ms = wait_started_at .elapsed() @@ -711,6 +721,7 @@ async fn wait_for_traffic_budget_or_cancel( bytes: u64, cancel: &CancellationToken, stats: &Stats, + deadline: Option, ) -> Result<()> { if bytes == 0 { return Ok(()); @@ -728,6 +739,10 @@ async fn wait_for_traffic_budget_or_cancel( } let wait_started_at = Instant::now(); + if deadline.is_some_and(|deadline| wait_started_at >= deadline) { + stats.increment_flow_wait_middle_rate_limit_cancelled_total(); + return Err(ProxyError::Proxy("traffic budget wait deadline exceeded".into())); + } tokio::select! { _ = tokio::time::sleep(next_refill_delay()) => {} _ = cancel.cancelled() => { @@ -1720,6 +1735,7 @@ where traffic_lease.as_ref(), RateDirection::Up, payload.len() as u64, + None, ) .await?; forensics.bytes_c2me = forensics @@ -1734,6 +1750,7 @@ where limit, stats.as_ref(), &flow_cancel, + None, ) .await { @@ -1756,6 +1773,12 @@ where )); break; } + Err(MiddleQuotaReserveError::DeadlineExceeded) => { + main_result = Err(ProxyError::Proxy( + "ME C->ME quota reservation deadline exceeded".into(), + )); + break; + } } stats.add_user_octets_from_handle(user_stats, payload.len() as u64); } else { @@ -2449,8 +2472,10 @@ where let data_len = data.len() as u64; if let (Some(limit), Some(user_stats)) = (quota_limit, quota_user_stats) { let soft_limit = quota_soft_cap(limit, quota_soft_overshoot_bytes); - match reserve_user_quota_with_yield(user_stats, data_len, soft_limit, stats, cancel) - .await + match reserve_user_quota_with_yield( + user_stats, data_len, soft_limit, stats, cancel, None, + ) + .await { Ok(_) => {} Err(MiddleQuotaReserveError::LimitExceeded) => { @@ -2469,6 +2494,11 @@ where "ME D->C quota reservation cancelled".into(), )); } + Err(MiddleQuotaReserveError::DeadlineExceeded) => { + return Err(ProxyError::Proxy( + "ME D->C quota reservation deadline exceeded".into(), + )); + } } } wait_for_traffic_budget_or_cancel( @@ -2477,6 +2507,7 @@ where data_len, cancel, stats, + None, ) .await?; @@ -2516,8 +2547,15 @@ where } else { trace!(conn_id, confirm, "ME->C quickack"); } - wait_for_traffic_budget_or_cancel(traffic_lease, RateDirection::Down, 4, cancel, stats) - .await?; + wait_for_traffic_budget_or_cancel( + traffic_lease, + RateDirection::Down, + 4, + cancel, + stats, + None, + ) + .await?; write_client_ack(client_writer, proto_tag, confirm).await?; stats.increment_me_d2c_ack_frames_total();