mirror of https://github.com/telemt/telemt.git
Upstream Connect in Stats
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
bd0dcfff15
commit
af5f0b9692
168
src/stats/mod.rs
168
src/stats/mod.rs
|
|
@ -26,6 +26,22 @@ pub struct Stats {
|
|||
connects_all: AtomicU64,
|
||||
connects_bad: AtomicU64,
|
||||
handshake_timeouts: AtomicU64,
|
||||
upstream_connect_attempt_total: AtomicU64,
|
||||
upstream_connect_success_total: AtomicU64,
|
||||
upstream_connect_fail_total: AtomicU64,
|
||||
upstream_connect_failfast_hard_error_total: AtomicU64,
|
||||
upstream_connect_attempts_bucket_1: AtomicU64,
|
||||
upstream_connect_attempts_bucket_2: AtomicU64,
|
||||
upstream_connect_attempts_bucket_3_4: AtomicU64,
|
||||
upstream_connect_attempts_bucket_gt_4: AtomicU64,
|
||||
upstream_connect_duration_success_bucket_le_100ms: AtomicU64,
|
||||
upstream_connect_duration_success_bucket_101_500ms: AtomicU64,
|
||||
upstream_connect_duration_success_bucket_501_1000ms: AtomicU64,
|
||||
upstream_connect_duration_success_bucket_gt_1000ms: AtomicU64,
|
||||
upstream_connect_duration_fail_bucket_le_100ms: AtomicU64,
|
||||
upstream_connect_duration_fail_bucket_101_500ms: AtomicU64,
|
||||
upstream_connect_duration_fail_bucket_501_1000ms: AtomicU64,
|
||||
upstream_connect_duration_fail_bucket_gt_1000ms: AtomicU64,
|
||||
me_keepalive_sent: AtomicU64,
|
||||
me_keepalive_failed: AtomicU64,
|
||||
me_keepalive_pong: AtomicU64,
|
||||
|
|
@ -155,6 +171,99 @@ impl Stats {
|
|||
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_upstream_connect_attempt_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.upstream_connect_attempt_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_upstream_connect_success_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.upstream_connect_success_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_upstream_connect_fail_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.upstream_connect_fail_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn increment_upstream_connect_failfast_hard_error_total(&self) {
|
||||
if self.telemetry_core_enabled() {
|
||||
self.upstream_connect_failfast_hard_error_total
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
pub fn observe_upstream_connect_attempts_per_request(&self, attempts: u32) {
|
||||
if !self.telemetry_core_enabled() {
|
||||
return;
|
||||
}
|
||||
match attempts {
|
||||
0 => {}
|
||||
1 => {
|
||||
self.upstream_connect_attempts_bucket_1
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
2 => {
|
||||
self.upstream_connect_attempts_bucket_2
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
3..=4 => {
|
||||
self.upstream_connect_attempts_bucket_3_4
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
_ => {
|
||||
self.upstream_connect_attempts_bucket_gt_4
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn observe_upstream_connect_duration_ms(&self, duration_ms: u64, success: bool) {
|
||||
if !self.telemetry_core_enabled() {
|
||||
return;
|
||||
}
|
||||
let bucket = match duration_ms {
|
||||
0..=100 => 0u8,
|
||||
101..=500 => 1u8,
|
||||
501..=1000 => 2u8,
|
||||
_ => 3u8,
|
||||
};
|
||||
match (success, bucket) {
|
||||
(true, 0) => {
|
||||
self.upstream_connect_duration_success_bucket_le_100ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(true, 1) => {
|
||||
self.upstream_connect_duration_success_bucket_101_500ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(true, 2) => {
|
||||
self.upstream_connect_duration_success_bucket_501_1000ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(true, _) => {
|
||||
self.upstream_connect_duration_success_bucket_gt_1000ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(false, 0) => {
|
||||
self.upstream_connect_duration_fail_bucket_le_100ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(false, 1) => {
|
||||
self.upstream_connect_duration_fail_bucket_101_500ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(false, 2) => {
|
||||
self.upstream_connect_duration_fail_bucket_501_1000ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
(false, _) => {
|
||||
self.upstream_connect_duration_fail_bucket_gt_1000ms
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn increment_me_keepalive_sent(&self) {
|
||||
if self.telemetry_me_allows_debug() {
|
||||
self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed);
|
||||
|
|
@ -703,6 +812,65 @@ impl Stats {
|
|||
}
|
||||
|
||||
pub fn get_handshake_timeouts(&self) -> u64 { self.handshake_timeouts.load(Ordering::Relaxed) }
|
||||
pub fn get_upstream_connect_attempt_total(&self) -> u64 {
|
||||
self.upstream_connect_attempt_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_success_total(&self) -> u64 {
|
||||
self.upstream_connect_success_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_fail_total(&self) -> u64 {
|
||||
self.upstream_connect_fail_total.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_failfast_hard_error_total(&self) -> u64 {
|
||||
self.upstream_connect_failfast_hard_error_total
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_attempts_bucket_1(&self) -> u64 {
|
||||
self.upstream_connect_attempts_bucket_1.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_attempts_bucket_2(&self) -> u64 {
|
||||
self.upstream_connect_attempts_bucket_2.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_attempts_bucket_3_4(&self) -> u64 {
|
||||
self.upstream_connect_attempts_bucket_3_4
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_attempts_bucket_gt_4(&self) -> u64 {
|
||||
self.upstream_connect_attempts_bucket_gt_4
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_success_bucket_le_100ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_success_bucket_le_100ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_success_bucket_101_500ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_success_bucket_101_500ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_success_bucket_501_1000ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_success_bucket_501_1000ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_success_bucket_gt_1000ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_success_bucket_gt_1000ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_fail_bucket_le_100ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_fail_bucket_le_100ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_fail_bucket_101_500ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_fail_bucket_101_500ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_fail_bucket_501_1000ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_fail_bucket_501_1000ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn get_upstream_connect_duration_fail_bucket_gt_1000ms(&self) -> u64 {
|
||||
self.upstream_connect_duration_fail_bucket_gt_1000ms
|
||||
.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn iter_user_stats(&self) -> dashmap::iter::Iter<'_, String, UserStats> {
|
||||
self.user_stats.iter()
|
||||
|
|
|
|||
Loading…
Reference in New Issue