mirror of https://github.com/telemt/telemt.git
ME Pool Flap-Detect in statistics
This commit is contained in:
parent
866c2fbd96
commit
c6c3d71b08
|
|
@ -199,6 +199,95 @@ fn render_metrics(stats: &Stats) -> String {
|
||||||
stats.get_pool_stale_pick_total()
|
stats.get_pool_stale_pick_total()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_removed_total {}",
|
||||||
|
stats.get_me_writer_removed_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_removed_unexpected_total Unexpected ME writer removals that triggered refill"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_unexpected_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_removed_unexpected_total {}",
|
||||||
|
stats.get_me_writer_removed_unexpected_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_refill_triggered_total {}",
|
||||||
|
stats.get_me_refill_triggered_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_refill_skipped_inflight_total Immediate ME refill skips due to inflight dedup"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_refill_skipped_inflight_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_refill_skipped_inflight_total {}",
|
||||||
|
stats.get_me_refill_skipped_inflight_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_refill_failed_total Immediate ME refill failures");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_refill_failed_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_refill_failed_total {}",
|
||||||
|
stats.get_me_refill_failed_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_restored_same_endpoint_total Refilled ME writer restored on the same endpoint"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_same_endpoint_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_restored_same_endpoint_total {}",
|
||||||
|
stats.get_me_writer_restored_same_endpoint_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_restored_fallback_total Refilled ME writer restored via fallback endpoint"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writer_restored_fallback_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_restored_fallback_total {}",
|
||||||
|
stats.get_me_writer_restored_fallback_total()
|
||||||
|
);
|
||||||
|
|
||||||
|
let unresolved_writer_losses = stats
|
||||||
|
.get_me_writer_removed_unexpected_total()
|
||||||
|
.saturating_sub(
|
||||||
|
stats
|
||||||
|
.get_me_writer_restored_same_endpoint_total()
|
||||||
|
.saturating_add(stats.get_me_writer_restored_fallback_total()),
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writer_removed_unexpected_minus_restored_total Unexpected writer removals not yet compensated by restore"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writer_removed_unexpected_minus_restored_total {}",
|
||||||
|
unresolved_writer_losses
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
||||||
|
|
@ -277,6 +366,10 @@ mod tests {
|
||||||
assert!(output.contains("# TYPE telemt_connections_total counter"));
|
assert!(output.contains("# TYPE telemt_connections_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
|
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
|
||||||
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
|
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
|
||||||
|
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
||||||
|
assert!(output.contains(
|
||||||
|
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,13 @@ pub struct Stats {
|
||||||
pool_drain_active: AtomicU64,
|
pool_drain_active: AtomicU64,
|
||||||
pool_force_close_total: AtomicU64,
|
pool_force_close_total: AtomicU64,
|
||||||
pool_stale_pick_total: AtomicU64,
|
pool_stale_pick_total: AtomicU64,
|
||||||
|
me_writer_removed_total: AtomicU64,
|
||||||
|
me_writer_removed_unexpected_total: AtomicU64,
|
||||||
|
me_refill_triggered_total: AtomicU64,
|
||||||
|
me_refill_skipped_inflight_total: AtomicU64,
|
||||||
|
me_refill_failed_total: AtomicU64,
|
||||||
|
me_writer_restored_same_endpoint_total: AtomicU64,
|
||||||
|
me_writer_restored_fallback_total: AtomicU64,
|
||||||
user_stats: DashMap<String, UserStats>,
|
user_stats: DashMap<String, UserStats>,
|
||||||
start_time: parking_lot::RwLock<Option<Instant>>,
|
start_time: parking_lot::RwLock<Option<Instant>>,
|
||||||
}
|
}
|
||||||
|
|
@ -142,6 +149,27 @@ impl Stats {
|
||||||
pub fn increment_pool_stale_pick_total(&self) {
|
pub fn increment_pool_stale_pick_total(&self) {
|
||||||
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_writer_removed_total(&self) {
|
||||||
|
self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_removed_unexpected_total(&self) {
|
||||||
|
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_refill_triggered_total(&self) {
|
||||||
|
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_refill_skipped_inflight_total(&self) {
|
||||||
|
self.me_refill_skipped_inflight_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_refill_failed_total(&self) {
|
||||||
|
self.me_refill_failed_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_restored_same_endpoint_total(&self) {
|
||||||
|
self.me_writer_restored_same_endpoint_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_restored_fallback_total(&self) {
|
||||||
|
self.me_writer_restored_fallback_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||||
|
|
@ -195,6 +223,27 @@ impl Stats {
|
||||||
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
pub fn get_pool_stale_pick_total(&self) -> u64 {
|
||||||
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
self.pool_stale_pick_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_writer_removed_total(&self) -> u64 {
|
||||||
|
self.me_writer_removed_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
|
||||||
|
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_refill_triggered_total(&self) -> u64 {
|
||||||
|
self.me_refill_triggered_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_refill_skipped_inflight_total(&self) -> u64 {
|
||||||
|
self.me_refill_skipped_inflight_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_refill_failed_total(&self) -> u64 {
|
||||||
|
self.me_refill_failed_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_restored_same_endpoint_total(&self) -> u64 {
|
||||||
|
self.me_writer_restored_same_endpoint_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_restored_fallback_total(&self) -> u64 {
|
||||||
|
self.me_writer_restored_fallback_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn increment_user_connects(&self, user: &str) {
|
pub fn increment_user_connects(&self, user: &str) {
|
||||||
self.user_stats.entry(user.to_string()).or_default()
|
self.user_stats.entry(user.to_string()).or_default()
|
||||||
|
|
|
||||||
|
|
@ -708,6 +708,7 @@ impl MePool {
|
||||||
match self.connect_one(addr, self.rng.as_ref()).await {
|
match self.connect_one(addr, self.rng.as_ref()).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
self.stats.increment_me_reconnect_success();
|
self.stats.increment_me_reconnect_success();
|
||||||
|
self.stats.increment_me_writer_restored_same_endpoint_total();
|
||||||
info!(
|
info!(
|
||||||
%addr,
|
%addr,
|
||||||
attempt = attempt + 1,
|
attempt = attempt + 1,
|
||||||
|
|
@ -728,6 +729,7 @@ impl MePool {
|
||||||
|
|
||||||
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
|
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
|
||||||
if dc_endpoints.is_empty() {
|
if dc_endpoints.is_empty() {
|
||||||
|
self.stats.increment_me_refill_failed_total();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -738,6 +740,7 @@ impl MePool {
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
self.stats.increment_me_reconnect_success();
|
self.stats.increment_me_reconnect_success();
|
||||||
|
self.stats.increment_me_writer_restored_fallback_total();
|
||||||
info!(
|
info!(
|
||||||
%addr,
|
%addr,
|
||||||
attempt = attempt + 1,
|
attempt = attempt + 1,
|
||||||
|
|
@ -747,6 +750,7 @@ impl MePool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.stats.increment_me_refill_failed_total();
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -756,9 +760,11 @@ impl MePool {
|
||||||
{
|
{
|
||||||
let mut guard = pool.refill_inflight.lock().await;
|
let mut guard = pool.refill_inflight.lock().await;
|
||||||
if !guard.insert(addr) {
|
if !guard.insert(addr) {
|
||||||
|
pool.stats.increment_me_refill_skipped_inflight_total();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pool.stats.increment_me_refill_triggered_total();
|
||||||
|
|
||||||
let restored = pool.refill_writer_after_loss(addr).await;
|
let restored = pool.refill_writer_after_loss(addr).await;
|
||||||
if !restored {
|
if !restored {
|
||||||
|
|
@ -1189,9 +1195,13 @@ impl MePool {
|
||||||
if was_draining {
|
if was_draining {
|
||||||
self.stats.decrement_pool_drain_active();
|
self.stats.decrement_pool_drain_active();
|
||||||
}
|
}
|
||||||
|
self.stats.increment_me_writer_removed_total();
|
||||||
w.cancel.cancel();
|
w.cancel.cancel();
|
||||||
removed_addr = Some(w.addr);
|
removed_addr = Some(w.addr);
|
||||||
trigger_refill = !was_draining;
|
trigger_refill = !was_draining;
|
||||||
|
if trigger_refill {
|
||||||
|
self.stats.increment_me_writer_removed_unexpected_total();
|
||||||
|
}
|
||||||
close_tx = Some(w.tx.clone());
|
close_tx = Some(w.tx.clone());
|
||||||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue