Stats and Cleanup-proccess beyond Hot-path

This commit is contained in:
Alexey
2026-05-10 13:43:41 +03:00
parent e10c070dc1
commit 090b2ca636
2 changed files with 44 additions and 16 deletions

View File

@@ -73,6 +73,11 @@ pub(crate) async fn spawn_runtime_tasks(
rc_clone.run_periodic_cleanup().await; rc_clone.run_periodic_cleanup().await;
}); });
let stats_maintenance = stats.clone();
tokio::spawn(async move {
stats_maintenance.run_periodic_user_stats_maintenance().await;
});
let detected_ip_v4: Option<IpAddr> = probe.detected_ipv4.map(IpAddr::V4); let detected_ip_v4: Option<IpAddr> = probe.detected_ipv4.map(IpAddr::V4);
let detected_ip_v6: Option<IpAddr> = probe.detected_ipv6.map(IpAddr::V6); let detected_ip_v6: Option<IpAddr> = probe.detected_ipv6.map(IpAddr::V6);
debug!( debug!(

View File

@@ -288,6 +288,7 @@ pub struct Stats {
telemetry_core_enabled: AtomicBool, telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool, telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8, telemetry_me_level: AtomicU8,
cached_epoch_secs: AtomicU64,
user_stats: DashMap<String, Arc<UserStats>>, user_stats: DashMap<String, Arc<UserStats>>,
user_stats_last_cleanup_epoch_secs: AtomicU64, user_stats_last_cleanup_epoch_secs: AtomicU64,
start_time: parking_lot::RwLock<Option<Instant>>, start_time: parking_lot::RwLock<Option<Instant>>,
@@ -357,6 +358,7 @@ impl Stats {
pub fn new() -> Self { pub fn new() -> Self {
let stats = Self::default(); let stats = Self::default();
stats.apply_telemetry_policy(TelemetryPolicy::default()); stats.apply_telemetry_policy(TelemetryPolicy::default());
stats.refresh_cached_epoch_secs();
*stats.start_time.write() = Some(Instant::now()); *stats.start_time.write() = Some(Instant::now());
stats stats
} }
@@ -406,33 +408,55 @@ impl Stats {
.as_secs() .as_secs()
} }
fn touch_user_stats(stats: &UserStats) { fn refresh_cached_epoch_secs(&self) -> u64 {
let now_epoch_secs = Self::now_epoch_secs();
self.cached_epoch_secs
.store(now_epoch_secs, Ordering::Relaxed);
now_epoch_secs
}
fn cached_epoch_secs(&self) -> u64 {
let cached = self.cached_epoch_secs.load(Ordering::Relaxed);
if cached != 0 {
return cached;
}
self.refresh_cached_epoch_secs()
}
fn touch_user_stats(&self, stats: &UserStats) {
stats stats
.last_seen_epoch_secs .last_seen_epoch_secs
.store(Self::now_epoch_secs(), Ordering::Relaxed); .store(self.cached_epoch_secs(), Ordering::Relaxed);
} }
pub(crate) fn get_or_create_user_stats_handle(&self, user: &str) -> Arc<UserStats> { pub(crate) fn get_or_create_user_stats_handle(&self, user: &str) -> Arc<UserStats> {
self.maybe_cleanup_user_stats();
if let Some(existing) = self.user_stats.get(user) { if let Some(existing) = self.user_stats.get(user) {
let handle = Arc::clone(existing.value()); let handle = Arc::clone(existing.value());
Self::touch_user_stats(handle.as_ref()); self.touch_user_stats(handle.as_ref());
return handle; return handle;
} }
let entry = self.user_stats.entry(user.to_string()).or_default(); let entry = self.user_stats.entry(user.to_string()).or_default();
if entry.last_seen_epoch_secs.load(Ordering::Relaxed) == 0 { if entry.last_seen_epoch_secs.load(Ordering::Relaxed) == 0 {
Self::touch_user_stats(entry.value().as_ref()); self.touch_user_stats(entry.value().as_ref());
} }
Arc::clone(entry.value()) Arc::clone(entry.value())
} }
pub(crate) async fn run_periodic_user_stats_maintenance(self: Arc<Self>) {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
self.maybe_cleanup_user_stats();
}
}
#[inline] #[inline]
pub(crate) fn add_user_octets_from_handle(&self, user_stats: &UserStats, bytes: u64) { pub(crate) fn add_user_octets_from_handle(&self, user_stats: &UserStats, bytes: u64) {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {
return; return;
} }
Self::touch_user_stats(user_stats); self.touch_user_stats(user_stats);
user_stats user_stats
.octets_from_client .octets_from_client
.fetch_add(bytes, Ordering::Relaxed); .fetch_add(bytes, Ordering::Relaxed);
@@ -443,7 +467,7 @@ impl Stats {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {
return; return;
} }
Self::touch_user_stats(user_stats); self.touch_user_stats(user_stats);
user_stats user_stats
.octets_to_client .octets_to_client
.fetch_add(bytes, Ordering::Relaxed); .fetch_add(bytes, Ordering::Relaxed);
@@ -454,7 +478,7 @@ impl Stats {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {
return; return;
} }
Self::touch_user_stats(user_stats); self.touch_user_stats(user_stats);
user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed); user_stats.msgs_from_client.fetch_add(1, Ordering::Relaxed);
} }
@@ -463,7 +487,7 @@ impl Stats {
if !self.telemetry_user_enabled() { if !self.telemetry_user_enabled() {
return; return;
} }
Self::touch_user_stats(user_stats); self.touch_user_stats(user_stats);
user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed); user_stats.msgs_to_client.fetch_add(1, Ordering::Relaxed);
} }
@@ -473,7 +497,7 @@ impl Stats {
/// mixing reserve and post-charge on a single I/O event. /// mixing reserve and post-charge on a single I/O event.
#[inline] #[inline]
pub(crate) fn quota_charge_post_write(&self, user_stats: &UserStats, bytes: u64) -> u64 { pub(crate) fn quota_charge_post_write(&self, user_stats: &UserStats, bytes: u64) -> u64 {
Self::touch_user_stats(user_stats); self.touch_user_stats(user_stats);
user_stats user_stats
.quota_used .quota_used
.fetch_add(bytes, Ordering::Relaxed) .fetch_add(bytes, Ordering::Relaxed)
@@ -484,7 +508,7 @@ impl Stats {
const USER_STATS_CLEANUP_INTERVAL_SECS: u64 = 60; const USER_STATS_CLEANUP_INTERVAL_SECS: u64 = 60;
const USER_STATS_IDLE_TTL_SECS: u64 = 24 * 60 * 60; const USER_STATS_IDLE_TTL_SECS: u64 = 24 * 60 * 60;
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = self.refresh_cached_epoch_secs();
let last_cleanup_epoch_secs = self let last_cleanup_epoch_secs = self
.user_stats_last_cleanup_epoch_secs .user_stats_last_cleanup_epoch_secs
.load(Ordering::Relaxed); .load(Ordering::Relaxed);
@@ -2383,7 +2407,7 @@ impl Stats {
return; return;
} }
let stats = self.get_or_create_user_stats_handle(user); let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref()); self.touch_user_stats(stats.as_ref());
stats.connects.fetch_add(1, Ordering::Relaxed); stats.connects.fetch_add(1, Ordering::Relaxed);
} }
@@ -2392,7 +2416,7 @@ impl Stats {
return; return;
} }
let stats = self.get_or_create_user_stats_handle(user); let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref()); self.touch_user_stats(stats.as_ref());
stats.curr_connects.fetch_add(1, Ordering::Relaxed); stats.curr_connects.fetch_add(1, Ordering::Relaxed);
} }
@@ -2402,7 +2426,7 @@ impl Stats {
} }
let stats = self.get_or_create_user_stats_handle(user); let stats = self.get_or_create_user_stats_handle(user);
Self::touch_user_stats(stats.as_ref()); self.touch_user_stats(stats.as_ref());
let counter = &stats.curr_connects; let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed); let mut current = counter.load(Ordering::Relaxed);
@@ -2425,9 +2449,8 @@ impl Stats {
} }
pub fn decrement_user_curr_connects(&self, user: &str) { pub fn decrement_user_curr_connects(&self, user: &str) {
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) { if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value().as_ref()); self.touch_user_stats(stats.value().as_ref());
let counter = &stats.curr_connects; let counter = &stats.curr_connects;
let mut current = counter.load(Ordering::Relaxed); let mut current = counter.load(Ordering::Relaxed);
loop { loop {