diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 4cc9933..4b59367 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -25,6 +25,8 @@ use self::telemetry::TelemetryPolicy; pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, + current_connections_direct: AtomicU64, + current_connections_me: AtomicU64, handshake_timeouts: AtomicU64, upstream_connect_attempt_total: AtomicU64, upstream_connect_success_total: AtomicU64, @@ -150,6 +152,24 @@ impl Stats { self.telemetry_me_level().allows_debug() } + fn decrement_atomic_saturating(counter: &AtomicU64) { + let mut current = counter.load(Ordering::Relaxed); + loop { + if current == 0 { + break; + } + match counter.compare_exchange_weak( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } + } + pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) { self.telemetry_core_enabled .store(policy.core_enabled, Ordering::Relaxed); @@ -177,6 +197,18 @@ impl Stats { self.connects_bad.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_current_connections_direct(&self) { + self.current_connections_direct.fetch_add(1, Ordering::Relaxed); + } + pub fn decrement_current_connections_direct(&self) { + Self::decrement_atomic_saturating(&self.current_connections_direct); + } + pub fn increment_current_connections_me(&self) { + self.current_connections_me.fetch_add(1, Ordering::Relaxed); + } + pub fn decrement_current_connections_me(&self) { + Self::decrement_atomic_saturating(&self.current_connections_me); + } pub fn increment_handshake_timeouts(&self) { if self.telemetry_core_enabled() { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); @@ -646,6 +678,16 @@ impl Stats { } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } + pub fn get_current_connections_direct(&self) -> u64 { + self.current_connections_direct.load(Ordering::Relaxed) + } + pub fn get_current_connections_me(&self) -> u64 { + self.current_connections_me.load(Ordering::Relaxed) + } + pub fn get_current_connections_total(&self) -> u64 { + self.get_current_connections_direct() + .saturating_add(self.get_current_connections_me()) + } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) }