mirror of https://github.com/telemt/telemt.git
Teardown Monitoring in Metrics
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
ef9b7b1492
commit
aba4205dcc
357
src/stats/mod.rs
357
src/stats/mod.rs
|
|
@ -19,6 +19,137 @@ use tracing::debug;
|
||||||
use crate::config::{MeTelemetryLevel, MeWriterPickMode};
|
use crate::config::{MeTelemetryLevel, MeWriterPickMode};
|
||||||
use self::telemetry::TelemetryPolicy;
|
use self::telemetry::TelemetryPolicy;
|
||||||
|
|
||||||
|
const ME_WRITER_TEARDOWN_MODE_COUNT: usize = 2;
|
||||||
|
const ME_WRITER_TEARDOWN_REASON_COUNT: usize = 11;
|
||||||
|
const ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT: usize = 2;
|
||||||
|
const ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT: usize = 12;
|
||||||
|
const ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS: [u64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [
|
||||||
|
1_000,
|
||||||
|
5_000,
|
||||||
|
10_000,
|
||||||
|
25_000,
|
||||||
|
50_000,
|
||||||
|
100_000,
|
||||||
|
250_000,
|
||||||
|
500_000,
|
||||||
|
1_000_000,
|
||||||
|
2_500_000,
|
||||||
|
5_000_000,
|
||||||
|
10_000_000,
|
||||||
|
];
|
||||||
|
const ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS: [&str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [
|
||||||
|
"0.001",
|
||||||
|
"0.005",
|
||||||
|
"0.01",
|
||||||
|
"0.025",
|
||||||
|
"0.05",
|
||||||
|
"0.1",
|
||||||
|
"0.25",
|
||||||
|
"0.5",
|
||||||
|
"1",
|
||||||
|
"2.5",
|
||||||
|
"5",
|
||||||
|
"10",
|
||||||
|
];
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum MeWriterTeardownMode {
|
||||||
|
Normal = 0,
|
||||||
|
HardDetach = 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MeWriterTeardownMode {
|
||||||
|
pub const ALL: [Self; ME_WRITER_TEARDOWN_MODE_COUNT] =
|
||||||
|
[Self::Normal, Self::HardDetach];
|
||||||
|
|
||||||
|
pub const fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::Normal => "normal",
|
||||||
|
Self::HardDetach => "hard_detach",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn idx(self) -> usize {
|
||||||
|
self as usize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum MeWriterTeardownReason {
|
||||||
|
ReaderExit = 0,
|
||||||
|
WriterTaskExit = 1,
|
||||||
|
PingSendFail = 2,
|
||||||
|
SignalSendFail = 3,
|
||||||
|
RouteChannelClosed = 4,
|
||||||
|
CloseRpcChannelClosed = 5,
|
||||||
|
PruneClosedWriter = 6,
|
||||||
|
ReapTimeoutExpired = 7,
|
||||||
|
ReapThresholdForce = 8,
|
||||||
|
ReapEmpty = 9,
|
||||||
|
WatchdogStuckDraining = 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MeWriterTeardownReason {
|
||||||
|
pub const ALL: [Self; ME_WRITER_TEARDOWN_REASON_COUNT] = [
|
||||||
|
Self::ReaderExit,
|
||||||
|
Self::WriterTaskExit,
|
||||||
|
Self::PingSendFail,
|
||||||
|
Self::SignalSendFail,
|
||||||
|
Self::RouteChannelClosed,
|
||||||
|
Self::CloseRpcChannelClosed,
|
||||||
|
Self::PruneClosedWriter,
|
||||||
|
Self::ReapTimeoutExpired,
|
||||||
|
Self::ReapThresholdForce,
|
||||||
|
Self::ReapEmpty,
|
||||||
|
Self::WatchdogStuckDraining,
|
||||||
|
];
|
||||||
|
|
||||||
|
pub const fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::ReaderExit => "reader_exit",
|
||||||
|
Self::WriterTaskExit => "writer_task_exit",
|
||||||
|
Self::PingSendFail => "ping_send_fail",
|
||||||
|
Self::SignalSendFail => "signal_send_fail",
|
||||||
|
Self::RouteChannelClosed => "route_channel_closed",
|
||||||
|
Self::CloseRpcChannelClosed => "close_rpc_channel_closed",
|
||||||
|
Self::PruneClosedWriter => "prune_closed_writer",
|
||||||
|
Self::ReapTimeoutExpired => "reap_timeout_expired",
|
||||||
|
Self::ReapThresholdForce => "reap_threshold_force",
|
||||||
|
Self::ReapEmpty => "reap_empty",
|
||||||
|
Self::WatchdogStuckDraining => "watchdog_stuck_draining",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn idx(self) -> usize {
|
||||||
|
self as usize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum MeWriterCleanupSideEffectStep {
|
||||||
|
CloseSignalChannelFull = 0,
|
||||||
|
CloseSignalChannelClosed = 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MeWriterCleanupSideEffectStep {
|
||||||
|
pub const ALL: [Self; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT] =
|
||||||
|
[Self::CloseSignalChannelFull, Self::CloseSignalChannelClosed];
|
||||||
|
|
||||||
|
pub const fn as_str(self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
Self::CloseSignalChannelFull => "close_signal_channel_full",
|
||||||
|
Self::CloseSignalChannelClosed => "close_signal_channel_closed",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn idx(self) -> usize {
|
||||||
|
self as usize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ============= Stats =============
|
// ============= Stats =============
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
|
@ -128,6 +259,18 @@ pub struct Stats {
|
||||||
me_draining_writers_reap_progress_total: AtomicU64,
|
me_draining_writers_reap_progress_total: AtomicU64,
|
||||||
me_writer_removed_total: AtomicU64,
|
me_writer_removed_total: AtomicU64,
|
||||||
me_writer_removed_unexpected_total: AtomicU64,
|
me_writer_removed_unexpected_total: AtomicU64,
|
||||||
|
me_writer_teardown_attempt_total:
|
||||||
|
[[AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT]; ME_WRITER_TEARDOWN_REASON_COUNT],
|
||||||
|
me_writer_teardown_success_total: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
|
||||||
|
me_writer_teardown_timeout_total: AtomicU64,
|
||||||
|
me_writer_teardown_escalation_total: AtomicU64,
|
||||||
|
me_writer_teardown_noop_total: AtomicU64,
|
||||||
|
me_writer_cleanup_side_effect_failures_total:
|
||||||
|
[AtomicU64; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT],
|
||||||
|
me_writer_teardown_duration_bucket_hits:
|
||||||
|
[[AtomicU64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT + 1]; ME_WRITER_TEARDOWN_MODE_COUNT],
|
||||||
|
me_writer_teardown_duration_sum_micros: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
|
||||||
|
me_writer_teardown_duration_count: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
|
||||||
me_refill_triggered_total: AtomicU64,
|
me_refill_triggered_total: AtomicU64,
|
||||||
me_refill_skipped_inflight_total: AtomicU64,
|
me_refill_skipped_inflight_total: AtomicU64,
|
||||||
me_refill_failed_total: AtomicU64,
|
me_refill_failed_total: AtomicU64,
|
||||||
|
|
@ -765,6 +908,74 @@ impl Stats {
|
||||||
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
|
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn increment_me_writer_teardown_attempt_total(
|
||||||
|
&self,
|
||||||
|
reason: MeWriterTeardownReason,
|
||||||
|
mode: MeWriterTeardownMode,
|
||||||
|
) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()]
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writer_teardown_success_total[mode.idx()].fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_teardown_timeout_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writer_teardown_timeout_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_teardown_escalation_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writer_teardown_escalation_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_teardown_noop_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writer_teardown_noop_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_writer_cleanup_side_effect_failures_total(
|
||||||
|
&self,
|
||||||
|
step: MeWriterCleanupSideEffectStep,
|
||||||
|
) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writer_cleanup_side_effect_failures_total[step.idx()]
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn observe_me_writer_teardown_duration(
|
||||||
|
&self,
|
||||||
|
mode: MeWriterTeardownMode,
|
||||||
|
duration: Duration,
|
||||||
|
) {
|
||||||
|
if !self.telemetry_me_allows_normal() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let duration_micros = duration.as_micros().min(u64::MAX as u128) as u64;
|
||||||
|
let mut bucket_idx = ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT;
|
||||||
|
for (idx, upper_bound_micros) in ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.enumerate()
|
||||||
|
{
|
||||||
|
if duration_micros <= upper_bound_micros {
|
||||||
|
bucket_idx = idx;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx]
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.me_writer_teardown_duration_sum_micros[mode.idx()]
|
||||||
|
.fetch_add(duration_micros, Ordering::Relaxed);
|
||||||
|
self.me_writer_teardown_duration_count[mode.idx()].fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
pub fn increment_me_refill_triggered_total(&self) {
|
pub fn increment_me_refill_triggered_total(&self) {
|
||||||
if self.telemetry_me_allows_debug() {
|
if self.telemetry_me_allows_debug() {
|
||||||
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
|
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
@ -1297,6 +1508,79 @@ impl Stats {
|
||||||
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
|
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
|
||||||
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
|
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_writer_teardown_attempt_total(
|
||||||
|
&self,
|
||||||
|
reason: MeWriterTeardownReason,
|
||||||
|
mode: MeWriterTeardownMode,
|
||||||
|
) -> u64 {
|
||||||
|
self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()]
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_attempt_total_by_mode(&self, mode: MeWriterTeardownMode) -> u64 {
|
||||||
|
MeWriterTeardownReason::ALL
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.map(|reason| self.get_me_writer_teardown_attempt_total(reason, mode))
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) -> u64 {
|
||||||
|
self.me_writer_teardown_success_total[mode.idx()].load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_timeout_total(&self) -> u64 {
|
||||||
|
self.me_writer_teardown_timeout_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_escalation_total(&self) -> u64 {
|
||||||
|
self.me_writer_teardown_escalation_total
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_noop_total(&self) -> u64 {
|
||||||
|
self.me_writer_teardown_noop_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_cleanup_side_effect_failures_total(
|
||||||
|
&self,
|
||||||
|
step: MeWriterCleanupSideEffectStep,
|
||||||
|
) -> u64 {
|
||||||
|
self.me_writer_cleanup_side_effect_failures_total[step.idx()]
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_cleanup_side_effect_failures_total_all(&self) -> u64 {
|
||||||
|
MeWriterCleanupSideEffectStep::ALL
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.map(|step| self.get_me_writer_cleanup_side_effect_failures_total(step))
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
pub fn me_writer_teardown_duration_bucket_labels(
|
||||||
|
) -> &'static [&'static str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] {
|
||||||
|
&ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_duration_bucket_hits(
|
||||||
|
&self,
|
||||||
|
mode: MeWriterTeardownMode,
|
||||||
|
bucket_idx: usize,
|
||||||
|
) -> u64 {
|
||||||
|
self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx]
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_duration_bucket_total(
|
||||||
|
&self,
|
||||||
|
mode: MeWriterTeardownMode,
|
||||||
|
bucket_idx: usize,
|
||||||
|
) -> u64 {
|
||||||
|
let capped_idx = bucket_idx.min(ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT);
|
||||||
|
let mut total = 0u64;
|
||||||
|
for idx in 0..=capped_idx {
|
||||||
|
total = total.saturating_add(self.get_me_writer_teardown_duration_bucket_hits(mode, idx));
|
||||||
|
}
|
||||||
|
total
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_duration_count(&self, mode: MeWriterTeardownMode) -> u64 {
|
||||||
|
self.me_writer_teardown_duration_count[mode.idx()].load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writer_teardown_duration_sum_seconds(&self, mode: MeWriterTeardownMode) -> f64 {
|
||||||
|
self.me_writer_teardown_duration_sum_micros[mode.idx()].load(Ordering::Relaxed) as f64
|
||||||
|
/ 1_000_000.0
|
||||||
|
}
|
||||||
pub fn get_me_refill_triggered_total(&self) -> u64 {
|
pub fn get_me_refill_triggered_total(&self) -> u64 {
|
||||||
self.me_refill_triggered_total.load(Ordering::Relaxed)
|
self.me_refill_triggered_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
@ -1800,6 +2084,79 @@ mod tests {
|
||||||
assert_eq!(stats.get_me_keepalive_sent(), 0);
|
assert_eq!(stats.get_me_keepalive_sent(), 0);
|
||||||
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
|
assert_eq!(stats.get_me_route_drop_queue_full(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_teardown_counters_and_duration() {
|
||||||
|
let stats = Stats::new();
|
||||||
|
stats.increment_me_writer_teardown_attempt_total(
|
||||||
|
MeWriterTeardownReason::ReaderExit,
|
||||||
|
MeWriterTeardownMode::Normal,
|
||||||
|
);
|
||||||
|
stats.increment_me_writer_teardown_success_total(MeWriterTeardownMode::Normal);
|
||||||
|
stats.observe_me_writer_teardown_duration(
|
||||||
|
MeWriterTeardownMode::Normal,
|
||||||
|
Duration::from_millis(3),
|
||||||
|
);
|
||||||
|
stats.increment_me_writer_cleanup_side_effect_failures_total(
|
||||||
|
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
stats.get_me_writer_teardown_attempt_total(
|
||||||
|
MeWriterTeardownReason::ReaderExit,
|
||||||
|
MeWriterTeardownMode::Normal
|
||||||
|
),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
stats.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
stats.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) > 0.0
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
stats.get_me_writer_cleanup_side_effect_failures_total(
|
||||||
|
MeWriterCleanupSideEffectStep::CloseSignalChannelFull
|
||||||
|
),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_teardown_counters_respect_me_silent() {
|
||||||
|
let stats = Stats::new();
|
||||||
|
stats.apply_telemetry_policy(TelemetryPolicy {
|
||||||
|
core_enabled: true,
|
||||||
|
user_enabled: true,
|
||||||
|
me_level: MeTelemetryLevel::Silent,
|
||||||
|
});
|
||||||
|
stats.increment_me_writer_teardown_attempt_total(
|
||||||
|
MeWriterTeardownReason::ReaderExit,
|
||||||
|
MeWriterTeardownMode::Normal,
|
||||||
|
);
|
||||||
|
stats.increment_me_writer_teardown_timeout_total();
|
||||||
|
stats.observe_me_writer_teardown_duration(
|
||||||
|
MeWriterTeardownMode::Normal,
|
||||||
|
Duration::from_millis(1),
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
stats.get_me_writer_teardown_attempt_total(
|
||||||
|
MeWriterTeardownReason::ReaderExit,
|
||||||
|
MeWriterTeardownMode::Normal
|
||||||
|
),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
assert_eq!(stats.get_me_writer_teardown_timeout_total(), 0);
|
||||||
|
assert_eq!(
|
||||||
|
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal),
|
||||||
|
0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_replay_checker_basic() {
|
fn test_replay_checker_basic() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue