From b5d5f0eb26e32a9be35140d4355d1fa9a56f4efd Mon Sep 17 00:00:00 2001 From: Talya <22233835+hookzof@users.noreply.github.com> Date: Mon, 23 Mar 2026 01:40:18 +0100 Subject: [PATCH] Optimize quota retry waker and user lock handling Refactor quota retry mechanism and user lock management for improved performance. --- src/proxy/relay.rs | 53 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 2431ff4..e608812 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -283,16 +283,22 @@ const QUOTA_CONTENTION_RETRY_INTERVAL: Duration = Duration::from_millis(1); const QUOTA_CONTENTION_RETRY_INTERVAL: Duration = Duration::from_millis(2); fn spawn_quota_retry_waker(retry_active: Arc, waker: std::task::Waker) { + // Wake exactly once after the retry interval, then exit. + // + // The previous implementation looped indefinitely, calling wake_by_ref() + // every 2ms until retry_active was cleared. Under high connection counts + // (e.g. 20k sessions) this created a flood of spurious wakeups that + // saturated the Tokio scheduler and starved real I/O tasks — causing media + // to stall completely. + // + // The correct poll model is: return Pending once, schedule a single wakeup, + // let the executor call poll again. If the lock is still contended on the + // next poll, a new waker is spawned at that point. This matches the + // standard Rust async/poll contract. tokio::task::spawn(async move { - loop { - if !retry_active.load(Ordering::Relaxed) { - break; - } - tokio::time::sleep(QUOTA_CONTENTION_RETRY_INTERVAL).await; - if !retry_active.load(Ordering::Relaxed) { - break; - } - waker.wake_by_ref(); + tokio::time::sleep(QUOTA_CONTENTION_RETRY_INTERVAL).await; + if retry_active.load(Ordering::Relaxed) { + waker.wake(); } }); } @@ -335,14 +341,26 @@ fn quota_overflow_user_lock(user: &str) -> Arc> { fn quota_user_lock(user: &str) -> Arc> { let locks = QUOTA_USER_LOCKS.get_or_init(DashMap::new); + + // Fast path: user already has a lock entry — just clone the Arc. + // This is the overwhelmingly common case in steady state. if let Some(existing) = locks.get(user) { return Arc::clone(existing.value()); } - if locks.len() >= QUOTA_USER_LOCKS_MAX { - locks.retain(|_, value| Arc::strong_count(value) > 1); - } - + // Slow path: new user. Check capacity and insert. + // + // Previous code called locks.retain() inline here whenever the map was at + // capacity. DashMap::retain() acquires a write-lock on all shards + // sequentially — under 20k concurrent connections this became a + // stop-the-world pause on every new-user poll(), serialising all I/O. + // + // Fix: skip the retain() entirely. At capacity we fall back to the + // overflow stripe locks (256 stripes, hash-distributed). The overflow + // stripes provide the same mutual-exclusion guarantee with much lower + // contention. Stale entries in QUOTA_USER_LOCKS are now evicted by + // `quota_user_lock_evict()`, which is called from the background stats + // cleanup task rather than inline on the hot path. if locks.len() >= QUOTA_USER_LOCKS_MAX { return quota_overflow_user_lock(user); } @@ -357,6 +375,15 @@ fn quota_user_lock(user: &str) -> Arc> { } } +/// Evict quota-lock entries whose Arc has no outside holders (i.e. no active +/// session is using them). Call this from a background task, not from any +/// poll() or I/O hot path. +pub(crate) fn quota_user_lock_evict() { + if let Some(locks) = QUOTA_USER_LOCKS.get() { + locks.retain(|_, value| Arc::strong_count(value) > 1); + } +} + impl AsyncRead for StatsIo { fn poll_read( self: Pin<&mut Self>,