Atomics in Stats

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-06 18:53:13 +03:00
parent 487e95a66e
commit 44b825edf5
No known key found for this signature in database
1 changed files with 42 additions and 0 deletions

View File

@ -25,6 +25,8 @@ use self::telemetry::TelemetryPolicy;
pub struct Stats {
connects_all: AtomicU64,
connects_bad: AtomicU64,
current_connections_direct: AtomicU64,
current_connections_me: AtomicU64,
handshake_timeouts: AtomicU64,
upstream_connect_attempt_total: AtomicU64,
upstream_connect_success_total: AtomicU64,
@ -150,6 +152,24 @@ impl Stats {
self.telemetry_me_level().allows_debug()
}
fn decrement_atomic_saturating(counter: &AtomicU64) {
let mut current = counter.load(Ordering::Relaxed);
loop {
if current == 0 {
break;
}
match counter.compare_exchange_weak(
current,
current - 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn apply_telemetry_policy(&self, policy: TelemetryPolicy) {
self.telemetry_core_enabled
.store(policy.core_enabled, Ordering::Relaxed);
@ -177,6 +197,18 @@ impl Stats {
self.connects_bad.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_current_connections_direct(&self) {
self.current_connections_direct.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_current_connections_direct(&self) {
Self::decrement_atomic_saturating(&self.current_connections_direct);
}
pub fn increment_current_connections_me(&self) {
self.current_connections_me.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_current_connections_me(&self) {
Self::decrement_atomic_saturating(&self.current_connections_me);
}
pub fn increment_handshake_timeouts(&self) {
if self.telemetry_core_enabled() {
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
@ -646,6 +678,16 @@ impl Stats {
}
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
pub fn get_current_connections_direct(&self) -> u64 {
self.current_connections_direct.load(Ordering::Relaxed)
}
pub fn get_current_connections_me(&self) -> u64 {
self.current_connections_me.load(Ordering::Relaxed)
}
pub fn get_current_connections_total(&self) -> u64 {
self.get_current_connections_direct()
.saturating_add(self.get_current_connections_me())
}
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) }