mirror of https://github.com/telemt/telemt.git
Optimize user stats cleanup process
Removed maybe_cleanup_user_stats from hot path to reduce I/O stalls and added a background task for user stats cleanup.
This commit is contained in:
parent
b5d5f0eb26
commit
8d82a1597f
|
|
@ -1858,7 +1858,16 @@ impl Stats {
|
||||||
if !self.telemetry_user_enabled() {
|
if !self.telemetry_user_enabled() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.maybe_cleanup_user_stats();
|
// maybe_cleanup_user_stats() removed from this hot path.
|
||||||
|
//
|
||||||
|
// Previously it was called here (and in every other per-packet stats
|
||||||
|
// method), triggering a DashMap::retain() scan — which takes a
|
||||||
|
// write-lock on the entire map — up to once per 60s per poll cycle.
|
||||||
|
// Under 20k concurrent connections this caused frequent global
|
||||||
|
// stop-the-world pauses on the stats map, contributing to I/O stalls
|
||||||
|
// and media loading failures. Cleanup is now driven by a dedicated
|
||||||
|
// background task (see Stats::spawn_cleanup_task) and by lower-frequency
|
||||||
|
// call sites such as connect/disconnect accounting.
|
||||||
if let Some(stats) = self.user_stats.get(user) {
|
if let Some(stats) = self.user_stats.get(user) {
|
||||||
Self::touch_user_stats(stats.value());
|
Self::touch_user_stats(stats.value());
|
||||||
stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
|
stats.octets_from_client.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
|
@ -1873,7 +1882,7 @@ impl Stats {
|
||||||
if !self.telemetry_user_enabled() {
|
if !self.telemetry_user_enabled() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.maybe_cleanup_user_stats();
|
// maybe_cleanup_user_stats() removed from hot path — see add_user_octets_from.
|
||||||
if let Some(stats) = self.user_stats.get(user) {
|
if let Some(stats) = self.user_stats.get(user) {
|
||||||
Self::touch_user_stats(stats.value());
|
Self::touch_user_stats(stats.value());
|
||||||
stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
|
stats.octets_to_client.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
|
@ -1888,7 +1897,7 @@ impl Stats {
|
||||||
if !self.telemetry_user_enabled() {
|
if !self.telemetry_user_enabled() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.maybe_cleanup_user_stats();
|
// maybe_cleanup_user_stats() removed from hot path — see add_user_octets_from.
|
||||||
if let Some(stats) = self.user_stats.get(user) {
|
if let Some(stats) = self.user_stats.get(user) {
|
||||||
Self::touch_user_stats(stats.value());
|
Self::touch_user_stats(stats.value());
|
||||||
stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
|
stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
@ -1903,7 +1912,7 @@ impl Stats {
|
||||||
if !self.telemetry_user_enabled() {
|
if !self.telemetry_user_enabled() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
self.maybe_cleanup_user_stats();
|
// maybe_cleanup_user_stats() removed from hot path — see add_user_octets_from.
|
||||||
if let Some(stats) = self.user_stats.get(user) {
|
if let Some(stats) = self.user_stats.get(user) {
|
||||||
Self::touch_user_stats(stats.value());
|
Self::touch_user_stats(stats.value());
|
||||||
stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
|
stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
@ -1924,6 +1933,40 @@ impl Stats {
|
||||||
.unwrap_or(0)
|
.unwrap_or(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run a single pass of user-stats cleanup (evict idle/disconnected entries).
|
||||||
|
///
|
||||||
|
/// This is the only correct call site for `maybe_cleanup_user_stats` after
|
||||||
|
/// the hot-path removal. Call it from a background task on a fixed interval
|
||||||
|
/// (e.g. every 60 seconds) via `spawn_cleanup_task`, or from low-frequency
|
||||||
|
/// connect/disconnect accounting methods as appropriate.
|
||||||
|
pub fn run_cleanup(&self) {
|
||||||
|
self.maybe_cleanup_user_stats();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a background Tokio task that periodically cleans up stale user
|
||||||
|
/// stats entries. This replaces the per-packet `maybe_cleanup_user_stats()`
|
||||||
|
/// calls that were removed from the I/O hot path.
|
||||||
|
///
|
||||||
|
/// The interval matches the original `USER_STATS_CLEANUP_INTERVAL_SECS`
|
||||||
|
/// (60 s), so eviction cadence is unchanged — only the call site is moved
|
||||||
|
/// off the hot path.
|
||||||
|
pub fn spawn_cleanup_task(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
|
||||||
|
let stats = Arc::clone(self);
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let interval = std::time::Duration::from_secs(60);
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(interval).await;
|
||||||
|
stats.maybe_cleanup_user_stats();
|
||||||
|
// Also evict stale quota-lock map entries. This was previously
|
||||||
|
// done inline in quota_user_lock() on the poll hot path via
|
||||||
|
// DashMap::retain(), which caused write-lock pauses under high
|
||||||
|
// connection counts. Doing it here keeps the map bounded without
|
||||||
|
// impacting I/O latency.
|
||||||
|
crate::proxy::relay::quota_user_lock_evict();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_handshake_timeouts(&self) -> u64 {
|
pub fn get_handshake_timeouts(&self) -> u64 {
|
||||||
self.handshake_timeouts.load(Ordering::Relaxed)
|
self.handshake_timeouts.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue