Update relay.rs

This commit is contained in:
Alexey 2026-04-06 19:01:12 +03:00
parent a36c7b3f66
commit 14674bd4e6
No known key found for this signature in database
1 changed files with 143 additions and 31 deletions

View File

@ -270,6 +270,7 @@ const QUOTA_NEAR_LIMIT_BYTES: u64 = 64 * 1024;
const QUOTA_LARGE_CHARGE_BYTES: u64 = 16 * 1024;
const QUOTA_ADAPTIVE_INTERVAL_MIN_BYTES: u64 = 4 * 1024;
const QUOTA_ADAPTIVE_INTERVAL_MAX_BYTES: u64 = 64 * 1024;
const QUOTA_RESERVE_SPIN_RETRIES: usize = 64;
#[inline]
fn quota_adaptive_interval_bytes(remaining_before: u64) -> u64 {
@ -314,6 +315,50 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
if n > 0 {
let n_to_charge = n as u64;
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) {
let mut reserved_total = None;
let mut reserve_rounds = 0usize;
while reserved_total.is_none() {
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(n_to_charge, limit) {
Ok(total) => {
reserved_total = Some(total);
break;
}
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
this.quota_exceeded.store(true, Ordering::Release);
buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
Err(crate::stats::QuotaReserveError::Contended) => {
std::hint::spin_loop();
}
}
}
reserve_rounds = reserve_rounds.saturating_add(1);
if reserved_total.is_none() && reserve_rounds >= 8 {
this.quota_exceeded.store(true, Ordering::Release);
buf.set_filled(before);
return Poll::Ready(Err(quota_io_error()));
}
}
if should_immediate_quota_check(remaining, n_to_charge) {
this.quota_bytes_since_check = 0;
} else {
this.quota_bytes_since_check =
this.quota_bytes_since_check.saturating_add(n_to_charge);
let interval = quota_adaptive_interval_bytes(remaining);
if this.quota_bytes_since_check >= interval {
this.quota_bytes_since_check = 0;
}
}
if reserved_total.unwrap_or(0) >= limit {
this.quota_exceeded.store(true, Ordering::Release);
}
}
// C→S: client sent data
this.counters
.c2s_bytes
@ -326,27 +371,6 @@ impl<S: AsyncRead + Unpin> AsyncRead for StatsIo<S> {
this.stats
.increment_user_msgs_from_handle(this.user_stats.as_ref());
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) {
this.stats
.quota_charge_post_write(this.user_stats.as_ref(), n_to_charge);
if should_immediate_quota_check(remaining, n_to_charge) {
this.quota_bytes_since_check = 0;
if this.user_stats.quota_used() >= limit {
this.quota_exceeded.store(true, Ordering::Release);
}
} else {
this.quota_bytes_since_check =
this.quota_bytes_since_check.saturating_add(n_to_charge);
let interval = quota_adaptive_interval_bytes(remaining);
if this.quota_bytes_since_check >= interval {
this.quota_bytes_since_check = 0;
if this.user_stats.quota_used() >= limit {
this.quota_exceeded.store(true, Ordering::Release);
}
}
}
}
trace!(user = %this.user, bytes = n, "C->S");
}
Poll::Ready(Ok(()))
@ -368,18 +392,73 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
}
let mut remaining_before = None;
let mut reserved_bytes = 0u64;
let mut write_buf = buf;
if let Some(limit) = this.quota_limit {
let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before);
if remaining == 0 {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
if !buf.is_empty() {
let mut reserve_rounds = 0usize;
while reserved_bytes == 0 {
let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before);
if remaining == 0 {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
remaining_before = Some(remaining);
let desired = remaining.min(buf.len() as u64);
for _ in 0..QUOTA_RESERVE_SPIN_RETRIES {
match this.user_stats.quota_try_reserve(desired, limit) {
Ok(_) => {
reserved_bytes = desired;
write_buf = &buf[..desired as usize];
break;
}
Err(crate::stats::QuotaReserveError::LimitExceeded) => {
break;
}
Err(crate::stats::QuotaReserveError::Contended) => {
std::hint::spin_loop();
}
}
}
reserve_rounds = reserve_rounds.saturating_add(1);
if reserved_bytes == 0 && reserve_rounds >= 8 {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
}
} else {
let used_before = this.user_stats.quota_used();
let remaining = limit.saturating_sub(used_before);
if remaining == 0 {
this.quota_exceeded.store(true, Ordering::Release);
return Poll::Ready(Err(quota_io_error()));
}
remaining_before = Some(remaining);
}
remaining_before = Some(remaining);
}
match Pin::new(&mut this.inner).poll_write(cx, buf) {
match Pin::new(&mut this.inner).poll_write(cx, write_buf) {
Poll::Ready(Ok(n)) => {
if reserved_bytes > n as u64 {
let refund = reserved_bytes - n as u64;
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(refund);
match this.user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
}
if n > 0 {
let n_to_charge = n as u64;
@ -396,8 +475,6 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
.increment_user_msgs_to_handle(this.user_stats.as_ref());
if let (Some(limit), Some(remaining)) = (this.quota_limit, remaining_before) {
this.stats
.quota_charge_post_write(this.user_stats.as_ref(), n_to_charge);
if should_immediate_quota_check(remaining, n_to_charge) {
this.quota_bytes_since_check = 0;
if this.user_stats.quota_used() >= limit {
@ -420,7 +497,42 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
}
Poll::Ready(Ok(n))
}
other => other,
Poll::Ready(Err(err)) => {
if reserved_bytes > 0 {
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(reserved_bytes);
match this.user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
}
Poll::Ready(Err(err))
}
Poll::Pending => {
if reserved_bytes > 0 {
let mut current = this.user_stats.quota_used.load(Ordering::Relaxed);
loop {
let next = current.saturating_sub(reserved_bytes);
match this.user_stats.quota_used.compare_exchange_weak(
current,
next,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
}
Poll::Pending
}
}
}