diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 8152599..6c72b6f 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -26,6 +26,22 @@ pub struct Stats { connects_all: AtomicU64, connects_bad: AtomicU64, handshake_timeouts: AtomicU64, + upstream_connect_attempt_total: AtomicU64, + upstream_connect_success_total: AtomicU64, + upstream_connect_fail_total: AtomicU64, + upstream_connect_failfast_hard_error_total: AtomicU64, + upstream_connect_attempts_bucket_1: AtomicU64, + upstream_connect_attempts_bucket_2: AtomicU64, + upstream_connect_attempts_bucket_3_4: AtomicU64, + upstream_connect_attempts_bucket_gt_4: AtomicU64, + upstream_connect_duration_success_bucket_le_100ms: AtomicU64, + upstream_connect_duration_success_bucket_101_500ms: AtomicU64, + upstream_connect_duration_success_bucket_501_1000ms: AtomicU64, + upstream_connect_duration_success_bucket_gt_1000ms: AtomicU64, + upstream_connect_duration_fail_bucket_le_100ms: AtomicU64, + upstream_connect_duration_fail_bucket_101_500ms: AtomicU64, + upstream_connect_duration_fail_bucket_501_1000ms: AtomicU64, + upstream_connect_duration_fail_bucket_gt_1000ms: AtomicU64, me_keepalive_sent: AtomicU64, me_keepalive_failed: AtomicU64, me_keepalive_pong: AtomicU64, @@ -155,6 +171,99 @@ impl Stats { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_upstream_connect_attempt_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_attempt_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_success_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_success_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_fail_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_fail_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_upstream_connect_failfast_hard_error_total(&self) { + if self.telemetry_core_enabled() { + self.upstream_connect_failfast_hard_error_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn observe_upstream_connect_attempts_per_request(&self, attempts: u32) { + if !self.telemetry_core_enabled() { + return; + } + match attempts { + 0 => {} + 1 => { + self.upstream_connect_attempts_bucket_1 + .fetch_add(1, Ordering::Relaxed); + } + 2 => { + self.upstream_connect_attempts_bucket_2 + .fetch_add(1, Ordering::Relaxed); + } + 3..=4 => { + self.upstream_connect_attempts_bucket_3_4 + .fetch_add(1, Ordering::Relaxed); + } + _ => { + self.upstream_connect_attempts_bucket_gt_4 + .fetch_add(1, Ordering::Relaxed); + } + } + } + pub fn observe_upstream_connect_duration_ms(&self, duration_ms: u64, success: bool) { + if !self.telemetry_core_enabled() { + return; + } + let bucket = match duration_ms { + 0..=100 => 0u8, + 101..=500 => 1u8, + 501..=1000 => 2u8, + _ => 3u8, + }; + match (success, bucket) { + (true, 0) => { + self.upstream_connect_duration_success_bucket_le_100ms + .fetch_add(1, Ordering::Relaxed); + } + (true, 1) => { + self.upstream_connect_duration_success_bucket_101_500ms + .fetch_add(1, Ordering::Relaxed); + } + (true, 2) => { + self.upstream_connect_duration_success_bucket_501_1000ms + .fetch_add(1, Ordering::Relaxed); + } + (true, _) => { + self.upstream_connect_duration_success_bucket_gt_1000ms + .fetch_add(1, Ordering::Relaxed); + } + (false, 0) => { + self.upstream_connect_duration_fail_bucket_le_100ms + .fetch_add(1, Ordering::Relaxed); + } + (false, 1) => { + self.upstream_connect_duration_fail_bucket_101_500ms + .fetch_add(1, Ordering::Relaxed); + } + (false, 2) => { + self.upstream_connect_duration_fail_bucket_501_1000ms + .fetch_add(1, Ordering::Relaxed); + } + (false, _) => { + self.upstream_connect_duration_fail_bucket_gt_1000ms + .fetch_add(1, Ordering::Relaxed); + } + } + } pub fn increment_me_keepalive_sent(&self) { if self.telemetry_me_allows_debug() { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); @@ -703,6 +812,65 @@ impl Stats { } pub fn get_handshake_timeouts(&self) -> u64 { self.handshake_timeouts.load(Ordering::Relaxed) } + pub fn get_upstream_connect_attempt_total(&self) -> u64 { + self.upstream_connect_attempt_total.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_success_total(&self) -> u64 { + self.upstream_connect_success_total.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_fail_total(&self) -> u64 { + self.upstream_connect_fail_total.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_failfast_hard_error_total(&self) -> u64 { + self.upstream_connect_failfast_hard_error_total + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_1(&self) -> u64 { + self.upstream_connect_attempts_bucket_1.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_2(&self) -> u64 { + self.upstream_connect_attempts_bucket_2.load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_3_4(&self) -> u64 { + self.upstream_connect_attempts_bucket_3_4 + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_attempts_bucket_gt_4(&self) -> u64 { + self.upstream_connect_attempts_bucket_gt_4 + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_le_100ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_le_100ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_101_500ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_101_500ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_501_1000ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_501_1000ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_success_bucket_gt_1000ms(&self) -> u64 { + self.upstream_connect_duration_success_bucket_gt_1000ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_le_100ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_le_100ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_101_500ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_101_500ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_501_1000ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_501_1000ms + .load(Ordering::Relaxed) + } + pub fn get_upstream_connect_duration_fail_bucket_gt_1000ms(&self) -> u64 { + self.upstream_connect_duration_fail_bucket_gt_1000ms + .load(Ordering::Relaxed) + } pub fn iter_user_stats(&self) -> dashmap::iter::Iter<'_, String, UserStats> { self.user_stats.iter()