From c8ba307780c6db99b647c5a0537fa289fb46d9f2 Mon Sep 17 00:00:00 2001 From: David Osipov Date: Mon, 23 Mar 2026 13:21:14 +0400 Subject: [PATCH] Add user-specific quota retry sleep allocation tracking in tests --- src/proxy/relay.rs | 30 ++++++- ...retry_allocation_latency_security_tests.rs | 80 +++++++++++++++++-- 2 files changed, 102 insertions(+), 8 deletions(-) diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 55f1385..90b46d9 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -293,6 +293,8 @@ const QUOTA_CONTENTION_RETRY_MAX_INTERVAL: Duration = Duration::from_millis(64); #[cfg(test)] static QUOTA_RETRY_SLEEP_ALLOCS: AtomicU64 = AtomicU64::new(0); #[cfg(test)] +static QUOTA_RETRY_SLEEP_ALLOCS_BY_USER: OnceLock> = OnceLock::new(); +#[cfg(test)] static QUOTA_USER_LOCK_EVICTOR_SPAWN_COUNT: AtomicU64 = AtomicU64::new(0); #[cfg(test)] @@ -300,11 +302,23 @@ pub(crate) fn reset_quota_retry_sleep_allocs_for_tests() { QUOTA_RETRY_SLEEP_ALLOCS.store(0, Ordering::Relaxed); } +#[cfg(test)] +pub(crate) fn reset_quota_retry_sleep_allocs_for_user_for_tests(user: &str) { + let map = QUOTA_RETRY_SLEEP_ALLOCS_BY_USER.get_or_init(DashMap::new); + map.remove(user); +} + #[cfg(test)] pub(crate) fn quota_retry_sleep_allocs_for_tests() -> u64 { QUOTA_RETRY_SLEEP_ALLOCS.load(Ordering::Relaxed) } +#[cfg(test)] +pub(crate) fn quota_retry_sleep_allocs_for_user_for_tests(user: &str) -> u64 { + let map = QUOTA_RETRY_SLEEP_ALLOCS_BY_USER.get_or_init(DashMap::new); + map.get(user).map(|v| *v.value()).unwrap_or(0) +} + #[inline] fn quota_contention_retry_delay(retry_attempt: u8) -> Duration { let shift = u32::from(retry_attempt.min(5)); @@ -329,12 +343,22 @@ fn poll_quota_retry_sleep( sleep_slot: &mut Option>>, wake_scheduled: &mut bool, retry_attempt: &mut u8, + user: &str, cx: &mut Context<'_>, ) { + #[cfg(not(test))] + let _ = user; + if !*wake_scheduled { *wake_scheduled = true; #[cfg(test)] - QUOTA_RETRY_SLEEP_ALLOCS.fetch_add(1, Ordering::Relaxed); + { + QUOTA_RETRY_SLEEP_ALLOCS.fetch_add(1, Ordering::Relaxed); + let map = QUOTA_RETRY_SLEEP_ALLOCS_BY_USER.get_or_init(DashMap::new); + map.entry(user.to_string()) + .and_modify(|count| *count = count.saturating_add(1)) + .or_insert(1); + } *sleep_slot = Some(Box::pin(tokio::time::sleep(quota_contention_retry_delay( *retry_attempt, )))); @@ -465,6 +489,7 @@ impl AsyncRead for StatsIo { &mut this.quota_read_retry_sleep, &mut this.quota_read_wake_scheduled, &mut this.quota_read_retry_attempt, + &this.user, cx, ); return Poll::Pending; @@ -482,6 +507,7 @@ impl AsyncRead for StatsIo { &mut this.quota_read_retry_sleep, &mut this.quota_read_wake_scheduled, &mut this.quota_read_retry_attempt, + &this.user, cx, ); return Poll::Pending; @@ -570,6 +596,7 @@ impl AsyncWrite for StatsIo { &mut this.quota_write_retry_sleep, &mut this.quota_write_wake_scheduled, &mut this.quota_write_retry_attempt, + &this.user, cx, ); return Poll::Pending; @@ -587,6 +614,7 @@ impl AsyncWrite for StatsIo { &mut this.quota_write_retry_sleep, &mut this.quota_write_wake_scheduled, &mut this.quota_write_retry_attempt, + &this.user, cx, ); return Poll::Pending; diff --git a/src/proxy/tests/relay_quota_retry_allocation_latency_security_tests.rs b/src/proxy/tests/relay_quota_retry_allocation_latency_security_tests.rs index 447a090..0cb7348 100644 --- a/src/proxy/tests/relay_quota_retry_allocation_latency_security_tests.rs +++ b/src/proxy/tests/relay_quota_retry_allocation_latency_security_tests.rs @@ -49,7 +49,7 @@ async fn tdd_single_pending_timer_does_not_allocate_on_each_repoll() { .try_lock() .expect("test must hold local lock to force retry scheduling"); - reset_quota_retry_sleep_allocs_for_tests(); + reset_quota_retry_sleep_allocs_for_user_for_tests(&user); let mut io = StatsIo::new( tokio::io::sink(), @@ -65,12 +65,12 @@ async fn tdd_single_pending_timer_does_not_allocate_on_each_repoll() { let first = Pin::new(&mut io).poll_write(&mut cx, &[0xA1]); assert!(first.is_pending()); - let allocs_after_first = quota_retry_sleep_allocs_for_tests(); + let allocs_after_first = quota_retry_sleep_allocs_for_user_for_tests(&io.user); let ptr_after_first = sleep_slot_ptr(&io.quota_write_retry_sleep); let second = Pin::new(&mut io).poll_write(&mut cx, &[0xA2]); assert!(second.is_pending()); - let allocs_after_second = quota_retry_sleep_allocs_for_tests(); + let allocs_after_second = quota_retry_sleep_allocs_for_user_for_tests(&io.user); let ptr_after_second = sleep_slot_ptr(&io.quota_write_retry_sleep); assert_eq!(allocs_after_first, 1, "first pending poll must allocate one timer"); @@ -96,7 +96,7 @@ async fn tdd_retry_cycle_allocates_once_per_fired_timer_cycle_not_per_poll() { .try_lock() .expect("test must hold local lock to keep write path pending"); - reset_quota_retry_sleep_allocs_for_tests(); + reset_quota_retry_sleep_allocs_for_user_for_tests(&user); let mut io = StatsIo::new( tokio::io::sink(), @@ -126,7 +126,7 @@ async fn tdd_retry_cycle_allocates_once_per_fired_timer_cycle_not_per_poll() { tokio::time::sleep(Duration::from_millis(1)).await; } - let allocs = quota_retry_sleep_allocs_for_tests(); + let allocs = quota_retry_sleep_allocs_for_user_for_tests(&io.user); assert!(allocs >= 2, "multiple fired cycles should allocate multiple timers"); assert!( allocs < polls, @@ -146,7 +146,7 @@ async fn adversarial_backoff_latency_envelope_stays_bounded_under_contention() { .try_lock() .expect("test must hold local lock for sustained contention"); - reset_quota_retry_sleep_allocs_for_tests(); + reset_quota_retry_sleep_allocs_for_user_for_tests(&user); let mut io = StatsIo::new( tokio::io::sink(), @@ -191,7 +191,7 @@ async fn adversarial_backoff_latency_envelope_stays_bounded_under_contention() { "retry wake gap must remain bounded in test profile; observed max gap={max_gap:?}" ); assert!( - quota_retry_sleep_allocs_for_tests() <= 16, + quota_retry_sleep_allocs_for_user_for_tests(&io.user) <= 16, "allocation cycles must remain bounded during a short contention window" ); @@ -247,3 +247,69 @@ async fn micro_benchmark_release_to_completion_latency_stays_bounded() { "contention release->completion p95 must stay bounded; p95_ms={p95_ms}, samples={samples_ms:?}" ); } + +#[tokio::test] +async fn adversarial_per_user_retry_allocation_counter_isolation_under_parallel_contention() { + let _guard = quota_test_guard(); + + let user_a = format!("retry-alloc-isolation-a-{}", std::process::id()); + let user_b = format!("retry-alloc-isolation-b-{}", std::process::id()); + + let lock_a = quota_user_lock(&user_a); + let lock_b = quota_user_lock(&user_b); + let held_guard_a = lock_a + .try_lock() + .expect("test must hold lock A to force pending scheduling"); + let held_guard_b = lock_b + .try_lock() + .expect("test must hold lock B to force pending scheduling"); + + reset_quota_retry_sleep_allocs_for_tests(); + reset_quota_retry_sleep_allocs_for_user_for_tests(&user_a); + reset_quota_retry_sleep_allocs_for_user_for_tests(&user_b); + + let mut io_a = StatsIo::new( + tokio::io::sink(), + Arc::new(SharedCounters::new()), + Arc::new(Stats::new()), + user_a.clone(), + Some(2048), + Arc::new(AtomicBool::new(false)), + Instant::now(), + ); + let mut io_b = StatsIo::new( + tokio::io::sink(), + Arc::new(SharedCounters::new()), + Arc::new(Stats::new()), + user_b.clone(), + Some(2048), + Arc::new(AtomicBool::new(false)), + Instant::now(), + ); + + let (_wake_counter_a, mut cx_a) = build_context(); + let (_wake_counter_b, mut cx_b) = build_context(); + + let first_a = Pin::new(&mut io_a).poll_write(&mut cx_a, &[0xE1]); + let first_b = Pin::new(&mut io_b).poll_write(&mut cx_b, &[0xE2]); + assert!(first_a.is_pending()); + assert!(first_b.is_pending()); + + assert_eq!( + quota_retry_sleep_allocs_for_user_for_tests(&user_a), + 1, + "user A scoped counter must reflect only user A allocations" + ); + assert_eq!( + quota_retry_sleep_allocs_for_user_for_tests(&user_b), + 1, + "user B scoped counter must reflect only user B allocations" + ); + assert!( + quota_retry_sleep_allocs_for_tests() >= 2, + "global counter remains aggregate and should include both users" + ); + + drop(held_guard_a); + drop(held_guard_b); +}