From 14674bd4e640037d015e78b457e9ddccf7659367 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 6 Apr 2026 19:01:12 +0300 Subject: [PATCH] Update relay.rs --- src/proxy/relay.rs | 174 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 143 insertions(+), 31 deletions(-) diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 60b432b..9fd5f3d 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -270,6 +270,7 @@ const QUOTA_NEAR_LIMIT_BYTES: u64 = 64 * 1024; const QUOTA_LARGE_CHARGE_BYTES: u64 = 16 * 1024; const QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES: u64 = 4 * 1024; const QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES: u64 = 64 * 1024; +const QUOTA_RESERVE_SPIN_RETRIES: usize = 64; #[inline] fn quota_adaptive_interval_bytes(remaining_before: u64) -> u64 { @@ -314,6 +315,50 @@ impl AsyncRead for StatsIo { if n > 0 { let n_to_charge = n as u64; + if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) { + let mut reserved_total = None; + let mut reserve_rounds = 0usize; + while reserved_total.is_none() { + for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { + match this.user_stats.quota_try_reserve(n_to_charge, limit) { + Ok(total) => { + reserved_total = Some(total); + break; + } + Err(crate::stats::QuotaReserveError::LimitExceeded) => { + this.quota_exceeded.store(true, Ordering::Release); + buf.set_filled(before); + return Poll::Ready(Err(quota_io_error())); + } + Err(crate::stats::QuotaReserveError::Contended) => { + std::hint::spin_loop(); + } + } + } + reserve_rounds = reserve_rounds.saturating_add(1); + if reserved_total.is_none() && reserve_rounds >= 8 { + this.quota_exceeded.store(true, Ordering::Release); + buf.set_filled(before); + return Poll::Ready(Err(quota_io_error())); + } + } + + if should_immediate_quota_check(remaining, n_to_charge) { + this.quota_bytes_since_check = 0; + } else { + this.quota_bytes_since_check = + this.quota_bytes_since_check.saturating_add(n_to_charge); + let interval = quota_adaptive_interval_bytes(remaining); + if this.quota_bytes_since_check >= interval { + this.quota_bytes_since_check = 0; + } + } + + if reserved_total.unwrap_or(0) >= limit { + this.quota_exceeded.store(true, Ordering::Release); + } + } + // C→S: client sent data this.counters .c2s_bytes @@ -326,27 +371,6 @@ impl AsyncRead for StatsIo { this.stats .increment_user_msgs_from_handle(this.user_stats.as_ref()); - if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) { - this.stats - .quota_charge_post_write(this.user_stats.as_ref(), n_to_charge); - if should_immediate_quota_check(remaining, n_to_charge) { - this.quota_bytes_since_check = 0; - if this.user_stats.quota_used() >= limit { - this.quota_exceeded.store(true, Ordering::Release); - } - } else { - this.quota_bytes_since_check = - this.quota_bytes_since_check.saturating_add(n_to_charge); - let interval = quota_adaptive_interval_bytes(remaining); - if this.quota_bytes_since_check >= interval { - this.quota_bytes_since_check = 0; - if this.user_stats.quota_used() >= limit { - this.quota_exceeded.store(true, Ordering::Release); - } - } - } - } - trace!(user = %this.user, bytes = n, "C->S"); } Poll::Ready(Ok(())) @@ -368,18 +392,73 @@ impl AsyncWrite for StatsIo { } let mut remaining_before = None; + let mut reserved_bytes = 0u64; + let mut write_buf = buf; if let Some(limit) = this.quota_limit { - let used_before = this.user_stats.quota_used(); - let remaining = limit.saturating_sub(used_before); - if remaining == 0 { - this.quota_exceeded.store(true, Ordering::Release); - return Poll::Ready(Err(quota_io_error())); + if !buf.is_empty() { + let mut reserve_rounds = 0usize; + while reserved_bytes == 0 { + let used_before = this.user_stats.quota_used(); + let remaining = limit.saturating_sub(used_before); + if remaining == 0 { + this.quota_exceeded.store(true, Ordering::Release); + return Poll::Ready(Err(quota_io_error())); + } + remaining_before = Some(remaining); + + let desired = remaining.min(buf.len() as u64); + for _ in 0..QUOTA_RESERVE_SPIN_RETRIES { + match this.user_stats.quota_try_reserve(desired, limit) { + Ok(_) => { + reserved_bytes = desired; + write_buf = &buf[..desired as usize]; + break; + } + Err(crate::stats::QuotaReserveError::LimitExceeded) => { + break; + } + Err(crate::stats::QuotaReserveError::Contended) => { + std::hint::spin_loop(); + } + } + } + + reserve_rounds = reserve_rounds.saturating_add(1); + if reserved_bytes == 0 && reserve_rounds >= 8 { + this.quota_exceeded.store(true, Ordering::Release); + return Poll::Ready(Err(quota_io_error())); + } + } + } else { + let used_before = this.user_stats.quota_used(); + let remaining = limit.saturating_sub(used_before); + if remaining == 0 { + this.quota_exceeded.store(true, Ordering::Release); + return Poll::Ready(Err(quota_io_error())); + } + remaining_before = Some(remaining); } - remaining_before = Some(remaining); } - match Pin::new(&mut this.inner).poll_write(cx, buf) { + match Pin::new(&mut this.inner).poll_write(cx, write_buf) { Poll::Ready(Ok(n)) => { + if reserved_bytes > n as u64 { + let refund = reserved_bytes - n as u64; + let mut current = this.user_stats.quota_used.load(Ordering::Relaxed); + loop { + let next = current.saturating_sub(refund); + match this.user_stats.quota_used.compare_exchange_weak( + current, + next, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(observed) => current = observed, + } + } + } + if n > 0 { let n_to_charge = n as u64; @@ -396,8 +475,6 @@ impl AsyncWrite for StatsIo { .increment_user_msgs_to_handle(this.user_stats.as_ref()); if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) { - this.stats - .quota_charge_post_write(this.user_stats.as_ref(), n_to_charge); if should_immediate_quota_check(remaining, n_to_charge) { this.quota_bytes_since_check = 0; if this.user_stats.quota_used() >= limit { @@ -420,7 +497,42 @@ impl AsyncWrite for StatsIo { } Poll::Ready(Ok(n)) } - other => other, + Poll::Ready(Err(err)) => { + if reserved_bytes > 0 { + let mut current = this.user_stats.quota_used.load(Ordering::Relaxed); + loop { + let next = current.saturating_sub(reserved_bytes); + match this.user_stats.quota_used.compare_exchange_weak( + current, + next, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(observed) => current = observed, + } + } + } + Poll::Ready(Err(err)) + } + Poll::Pending => { + if reserved_bytes > 0 { + let mut current = this.user_stats.quota_used.load(Ordering::Relaxed); + loop { + let next = current.saturating_sub(reserved_bytes); + match this.user_stats.quota_used.compare_exchange_weak( + current, + next, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(observed) => current = observed, + } + } + } + Poll::Pending + } } }