diff --git a/src/stats/mod.rs b/src/stats/mod.rs index d13d834..950a154 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1858,7 +1858,16 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - self.maybe_cleanup_user_stats(); + // maybe_cleanup_user_stats() removed from this hot path. + // + // Previously it was called here (and in every other per-packet stats + // method), triggering a DashMap::retain() scan — which takes a + // write-lock on the entire map — up to once per 60s per poll cycle. + // Under 20k concurrent connections this caused frequent global + // stop-the-world pauses on the stats map, contributing to I/O stalls + // and media loading failures. Cleanup is now driven by a dedicated + // background task (see Stats::spawn_cleanup_task) and by lower-frequency + // call sites such as connect/disconnect accounting. if let Some(stats) = self.user_stats.get(user) { Self::touch_user_stats(stats.value()); stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed); @@ -1873,7 +1882,7 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - self.maybe_cleanup_user_stats(); + // maybe_cleanup_user_stats() removed from hot path — see add_user_octets_from. if let Some(stats) = self.user_stats.get(user) { Self::touch_user_stats(stats.value()); stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed); @@ -1888,7 +1897,7 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - self.maybe_cleanup_user_stats(); + // maybe_cleanup_user_stats() removed from hot path — see add_user_octets_from. if let Some(stats) = self.user_stats.get(user) { Self::touch_user_stats(stats.value()); stats.msgs_from_client.fetch_add(1, Ordering::Relaxed); @@ -1903,7 +1912,7 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - self.maybe_cleanup_user_stats(); + // maybe_cleanup_user_stats() removed from hot path — see add_user_octets_from. if let Some(stats) = self.user_stats.get(user) { Self::touch_user_stats(stats.value()); stats.msgs_to_client.fetch_add(1, Ordering::Relaxed); @@ -1924,6 +1933,40 @@ impl Stats { .unwrap_or(0) } + /// Run a single pass of user-stats cleanup (evict idle/disconnected entries). + /// + /// This is the only correct call site for `maybe_cleanup_user_stats` after + /// the hot-path removal. Call it from a background task on a fixed interval + /// (e.g. every 60 seconds) via `spawn_cleanup_task`, or from low-frequency + /// connect/disconnect accounting methods as appropriate. + pub fn run_cleanup(&self) { + self.maybe_cleanup_user_stats(); + } + + /// Spawn a background Tokio task that periodically cleans up stale user + /// stats entries. This replaces the per-packet `maybe_cleanup_user_stats()` + /// calls that were removed from the I/O hot path. + /// + /// The interval matches the original `USER_STATS_CLEANUP_INTERVAL_SECS` + /// (60 s), so eviction cadence is unchanged — only the call site is moved + /// off the hot path. + pub fn spawn_cleanup_task(self: &Arc) -> tokio::task::JoinHandle<()> { + let stats = Arc::clone(self); + tokio::task::spawn(async move { + let interval = std::time::Duration::from_secs(60); + loop { + tokio::time::sleep(interval).await; + stats.maybe_cleanup_user_stats(); + // Also evict stale quota-lock map entries. This was previously + // done inline in quota_user_lock() on the poll hot path via + // DashMap::retain(), which caused write-lock pauses under high + // connection counts. Doing it here keeps the map bounded without + // impacting I/O latency. + crate::proxy::relay::quota_user_lock_evict(); + } + }) + } + pub fn get_handshake_timeouts(&self) -> u64 { self.handshake_timeouts.load(Ordering::Relaxed) }