From 090b2ca636bedc4958c5228bd24ab979403c3540 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sun, 10 May 2026 13:43:41 +0300 Subject: [PATCH] Stats and Cleanup-proccess beyond Hot-path --- src/maestro/runtime_tasks.rs | 5 ++++ src/stats/mod.rs | 55 +++++++++++++++++++++++++----------- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/src/maestro/runtime_tasks.rs b/src/maestro/runtime_tasks.rs index 3637694..d5b5f6b 100644 --- a/src/maestro/runtime_tasks.rs +++ b/src/maestro/runtime_tasks.rs @@ -73,6 +73,11 @@ pub(crate) async fn spawn_runtime_tasks( rc_clone.run_periodic_cleanup().await; }); + let stats_maintenance = stats.clone(); + tokio::spawn(async move { + stats_maintenance.run_periodic_user_stats_maintenance().await; + }); + let detected_ip_v4: Option = probe.detected_ipv4.map(IpAddr::V4); let detected_ip_v6: Option = probe.detected_ipv6.map(IpAddr::V6); debug!( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 20697fb..a2e662a 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -288,6 +288,7 @@ pub struct Stats { telemetry_core_enabled: AtomicBool, telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, + cached_epoch_secs: AtomicU64, user_stats: DashMap>, user_stats_last_cleanup_epoch_secs: AtomicU64, start_time: parking_lot::RwLock>, @@ -357,6 +358,7 @@ impl Stats { pub fn new() -> Self { let stats = Self::default(); stats.apply_telemetry_policy(TelemetryPolicy::default()); + stats.refresh_cached_epoch_secs(); *stats.start_time.write() = Some(Instant::now()); stats } @@ -406,33 +408,55 @@ impl Stats { .as_secs() } - fn touch_user_stats(stats: &UserStats) { + fn refresh_cached_epoch_secs(&self) -> u64 { + let now_epoch_secs = Self::now_epoch_secs(); + self.cached_epoch_secs + .store(now_epoch_secs, Ordering::Relaxed); + now_epoch_secs + } + + fn cached_epoch_secs(&self) -> u64 { + let cached = self.cached_epoch_secs.load(Ordering::Relaxed); + if cached != 0 { + return cached; + } + self.refresh_cached_epoch_secs() + } + + fn touch_user_stats(&self, stats: &UserStats) { stats .last_seen_epoch_secs - .store(Self::now_epoch_secs(), Ordering::Relaxed); + .store(self.cached_epoch_secs(), Ordering::Relaxed); } pub(crate) fn get_or_create_user_stats_handle(&self, user: &str) -> Arc { - self.maybe_cleanup_user_stats(); if let Some(existing) = self.user_stats.get(user) { let handle = Arc::clone(existing.value()); - Self::touch_user_stats(handle.as_ref()); + self.touch_user_stats(handle.as_ref()); return handle; } let entry = self.user_stats.entry(user.to_string()).or_default(); if entry.last_seen_epoch_secs.load(Ordering::Relaxed) == 0 { - Self::touch_user_stats(entry.value().as_ref()); + self.touch_user_stats(entry.value().as_ref()); } Arc::clone(entry.value()) } + pub(crate) async fn run_periodic_user_stats_maintenance(self: Arc) { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + loop { + interval.tick().await; + self.maybe_cleanup_user_stats(); + } + } + #[inline] pub(crate) fn add_user_octets_from_handle(&self, user_stats: &UserStats, bytes: u64) { if !self.telemetry_user_enabled() { return; } - Self::touch_user_stats(user_stats); + self.touch_user_stats(user_stats); user_stats .octets_from_client .fetch_add(bytes, Ordering::Relaxed); @@ -443,7 +467,7 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - Self::touch_user_stats(user_stats); + self.touch_user_stats(user_stats); user_stats .octets_to_client .fetch_add(bytes, Ordering::Relaxed); @@ -454,7 +478,7 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - Self::touch_user_stats(user_stats); + self.touch_user_stats(user_stats); user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed); } @@ -463,7 +487,7 @@ impl Stats { if !self.telemetry_user_enabled() { return; } - Self::touch_user_stats(user_stats); + self.touch_user_stats(user_stats); user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed); } @@ -473,7 +497,7 @@ impl Stats { /// mixing reserve and post-charge on a single I/O event. #[inline] pub(crate) fn quota_charge_post_write(&self, user_stats: &UserStats, bytes: u64) -> u64 { - Self::touch_user_stats(user_stats); + self.touch_user_stats(user_stats); user_stats .quota_used .fetch_add(bytes, Ordering::Relaxed) @@ -484,7 +508,7 @@ impl Stats { const USER_STATS_CLEANUP_INTERVAL_SECS: u64 = 60; const USER_STATS_IDLE_TTL_SECS: u64 = 24 * 60 * 60; - let now_epoch_secs = Self::now_epoch_secs(); + let now_epoch_secs = self.refresh_cached_epoch_secs(); let last_cleanup_epoch_secs = self .user_stats_last_cleanup_epoch_secs .load(Ordering::Relaxed); @@ -2383,7 +2407,7 @@ impl Stats { return; } let stats = self.get_or_create_user_stats_handle(user); - Self::touch_user_stats(stats.as_ref()); + self.touch_user_stats(stats.as_ref()); stats.connects.fetch_add(1, Ordering::Relaxed); } @@ -2392,7 +2416,7 @@ impl Stats { return; } let stats = self.get_or_create_user_stats_handle(user); - Self::touch_user_stats(stats.as_ref()); + self.touch_user_stats(stats.as_ref()); stats.curr_connects.fetch_add(1, Ordering::Relaxed); } @@ -2402,7 +2426,7 @@ impl Stats { } let stats = self.get_or_create_user_stats_handle(user); - Self::touch_user_stats(stats.as_ref()); + self.touch_user_stats(stats.as_ref()); let counter = &stats.curr_connects; let mut current = counter.load(Ordering::Relaxed); @@ -2425,9 +2449,8 @@ impl Stats { } pub fn decrement_user_curr_connects(&self, user: &str) { - self.maybe_cleanup_user_stats(); if let Some(stats) = self.user_stats.get(user) { - Self::touch_user_stats(stats.value().as_ref()); + self.touch_user_stats(stats.value().as_ref()); let counter = &stats.curr_connects; let mut current = counter.load(Ordering::Relaxed); loop {