diff --git a/src/stats/mod.rs b/src/stats/mod.rs index ad1d16b..0df4dc0 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -19,6 +19,137 @@ use tracing::debug; use crate::config::{MeTelemetryLevel, MeWriterPickMode}; use self::telemetry::TelemetryPolicy; +const ME_WRITER_TEARDOWN_MODE_COUNT: usize = 2; +const ME_WRITER_TEARDOWN_REASON_COUNT: usize = 11; +const ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT: usize = 2; +const ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT: usize = 12; +const ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS: [u64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [ + 1_000, + 5_000, + 10_000, + 25_000, + 50_000, + 100_000, + 250_000, + 500_000, + 1_000_000, + 2_500_000, + 5_000_000, + 10_000_000, +]; +const ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS: [&str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [ + "0.001", + "0.005", + "0.01", + "0.025", + "0.05", + "0.1", + "0.25", + "0.5", + "1", + "2.5", + "5", + "10", +]; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MeWriterTeardownMode { + Normal = 0, + HardDetach = 1, +} + +impl MeWriterTeardownMode { + pub const ALL: [Self; ME_WRITER_TEARDOWN_MODE_COUNT] = + [Self::Normal, Self::HardDetach]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::Normal => "normal", + Self::HardDetach => "hard_detach", + } + } + + const fn idx(self) -> usize { + self as usize + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MeWriterTeardownReason { + ReaderExit = 0, + WriterTaskExit = 1, + PingSendFail = 2, + SignalSendFail = 3, + RouteChannelClosed = 4, + CloseRpcChannelClosed = 5, + PruneClosedWriter = 6, + ReapTimeoutExpired = 7, + ReapThresholdForce = 8, + ReapEmpty = 9, + WatchdogStuckDraining = 10, +} + +impl MeWriterTeardownReason { + pub const ALL: [Self; ME_WRITER_TEARDOWN_REASON_COUNT] = [ + Self::ReaderExit, + Self::WriterTaskExit, + Self::PingSendFail, + Self::SignalSendFail, + Self::RouteChannelClosed, + Self::CloseRpcChannelClosed, + Self::PruneClosedWriter, + Self::ReapTimeoutExpired, + Self::ReapThresholdForce, + Self::ReapEmpty, + Self::WatchdogStuckDraining, + ]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::ReaderExit => "reader_exit", + Self::WriterTaskExit => "writer_task_exit", + Self::PingSendFail => "ping_send_fail", + Self::SignalSendFail => "signal_send_fail", + Self::RouteChannelClosed => "route_channel_closed", + Self::CloseRpcChannelClosed => "close_rpc_channel_closed", + Self::PruneClosedWriter => "prune_closed_writer", + Self::ReapTimeoutExpired => "reap_timeout_expired", + Self::ReapThresholdForce => "reap_threshold_force", + Self::ReapEmpty => "reap_empty", + Self::WatchdogStuckDraining => "watchdog_stuck_draining", + } + } + + const fn idx(self) -> usize { + self as usize + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MeWriterCleanupSideEffectStep { + CloseSignalChannelFull = 0, + CloseSignalChannelClosed = 1, +} + +impl MeWriterCleanupSideEffectStep { + pub const ALL: [Self; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT] = + [Self::CloseSignalChannelFull, Self::CloseSignalChannelClosed]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::CloseSignalChannelFull => "close_signal_channel_full", + Self::CloseSignalChannelClosed => "close_signal_channel_closed", + } + } + + const fn idx(self) -> usize { + self as usize + } +} + // ============= Stats ============= #[derive(Default)] @@ -128,6 +259,18 @@ pub struct Stats { me_draining_writers_reap_progress_total: AtomicU64, me_writer_removed_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64, + me_writer_teardown_attempt_total: + [[AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT]; ME_WRITER_TEARDOWN_REASON_COUNT], + me_writer_teardown_success_total: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT], + me_writer_teardown_timeout_total: AtomicU64, + me_writer_teardown_escalation_total: AtomicU64, + me_writer_teardown_noop_total: AtomicU64, + me_writer_cleanup_side_effect_failures_total: + [AtomicU64; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT], + me_writer_teardown_duration_bucket_hits: + [[AtomicU64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT + 1]; ME_WRITER_TEARDOWN_MODE_COUNT], + me_writer_teardown_duration_sum_micros: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT], + me_writer_teardown_duration_count: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT], me_refill_triggered_total: AtomicU64, me_refill_skipped_inflight_total: AtomicU64, me_refill_failed_total: AtomicU64, @@ -765,6 +908,74 @@ impl Stats { self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_teardown_attempt_total( + &self, + reason: MeWriterTeardownReason, + mode: MeWriterTeardownMode, + ) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()] + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_success_total[mode.idx()].fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_timeout_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_escalation_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_escalation_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_noop_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_noop_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_cleanup_side_effect_failures_total( + &self, + step: MeWriterCleanupSideEffectStep, + ) { + if self.telemetry_me_allows_normal() { + self.me_writer_cleanup_side_effect_failures_total[step.idx()] + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn observe_me_writer_teardown_duration( + &self, + mode: MeWriterTeardownMode, + duration: Duration, + ) { + if !self.telemetry_me_allows_normal() { + return; + } + let duration_micros = duration.as_micros().min(u64::MAX as u128) as u64; + let mut bucket_idx = ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT; + for (idx, upper_bound_micros) in ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS + .iter() + .copied() + .enumerate() + { + if duration_micros <= upper_bound_micros { + bucket_idx = idx; + break; + } + } + self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx] + .fetch_add(1, Ordering::Relaxed); + self.me_writer_teardown_duration_sum_micros[mode.idx()] + .fetch_add(duration_micros, Ordering::Relaxed); + self.me_writer_teardown_duration_count[mode.idx()].fetch_add(1, Ordering::Relaxed); + } pub fn increment_me_refill_triggered_total(&self) { if self.telemetry_me_allows_debug() { self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); @@ -1297,6 +1508,79 @@ impl Stats { pub fn get_me_writer_removed_unexpected_total(&self) -> u64 { self.me_writer_removed_unexpected_total.load(Ordering::Relaxed) } + pub fn get_me_writer_teardown_attempt_total( + &self, + reason: MeWriterTeardownReason, + mode: MeWriterTeardownMode, + ) -> u64 { + self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()] + .load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_attempt_total_by_mode(&self, mode: MeWriterTeardownMode) -> u64 { + MeWriterTeardownReason::ALL + .iter() + .copied() + .map(|reason| self.get_me_writer_teardown_attempt_total(reason, mode)) + .sum() + } + pub fn get_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) -> u64 { + self.me_writer_teardown_success_total[mode.idx()].load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_timeout_total(&self) -> u64 { + self.me_writer_teardown_timeout_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_escalation_total(&self) -> u64 { + self.me_writer_teardown_escalation_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_noop_total(&self) -> u64 { + self.me_writer_teardown_noop_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_cleanup_side_effect_failures_total( + &self, + step: MeWriterCleanupSideEffectStep, + ) -> u64 { + self.me_writer_cleanup_side_effect_failures_total[step.idx()] + .load(Ordering::Relaxed) + } + pub fn get_me_writer_cleanup_side_effect_failures_total_all(&self) -> u64 { + MeWriterCleanupSideEffectStep::ALL + .iter() + .copied() + .map(|step| self.get_me_writer_cleanup_side_effect_failures_total(step)) + .sum() + } + pub fn me_writer_teardown_duration_bucket_labels( + ) -> &'static [&'static str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] { + &ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS + } + pub fn get_me_writer_teardown_duration_bucket_hits( + &self, + mode: MeWriterTeardownMode, + bucket_idx: usize, + ) -> u64 { + self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx] + .load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_duration_bucket_total( + &self, + mode: MeWriterTeardownMode, + bucket_idx: usize, + ) -> u64 { + let capped_idx = bucket_idx.min(ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT); + let mut total = 0u64; + for idx in 0..=capped_idx { + total = total.saturating_add(self.get_me_writer_teardown_duration_bucket_hits(mode, idx)); + } + total + } + pub fn get_me_writer_teardown_duration_count(&self, mode: MeWriterTeardownMode) -> u64 { + self.me_writer_teardown_duration_count[mode.idx()].load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_duration_sum_seconds(&self, mode: MeWriterTeardownMode) -> f64 { + self.me_writer_teardown_duration_sum_micros[mode.idx()].load(Ordering::Relaxed) as f64 + / 1_000_000.0 + } pub fn get_me_refill_triggered_total(&self) -> u64 { self.me_refill_triggered_total.load(Ordering::Relaxed) } @@ -1800,6 +2084,79 @@ mod tests { assert_eq!(stats.get_me_keepalive_sent(), 0); assert_eq!(stats.get_me_route_drop_queue_full(), 0); } + + #[test] + fn test_teardown_counters_and_duration() { + let stats = Stats::new(); + stats.increment_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal, + ); + stats.increment_me_writer_teardown_success_total(MeWriterTeardownMode::Normal); + stats.observe_me_writer_teardown_duration( + MeWriterTeardownMode::Normal, + Duration::from_millis(3), + ); + stats.increment_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelFull, + ); + + assert_eq!( + stats.get_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal + ), + 1 + ); + assert_eq!( + stats.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal), + 1 + ); + assert_eq!( + stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal), + 1 + ); + assert!( + stats.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) > 0.0 + ); + assert_eq!( + stats.get_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelFull + ), + 1 + ); + } + + #[test] + fn test_teardown_counters_respect_me_silent() { + let stats = Stats::new(); + stats.apply_telemetry_policy(TelemetryPolicy { + core_enabled: true, + user_enabled: true, + me_level: MeTelemetryLevel::Silent, + }); + stats.increment_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal, + ); + stats.increment_me_writer_teardown_timeout_total(); + stats.observe_me_writer_teardown_duration( + MeWriterTeardownMode::Normal, + Duration::from_millis(1), + ); + assert_eq!( + stats.get_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal + ), + 0 + ); + assert_eq!(stats.get_me_writer_teardown_timeout_total(), 0); + assert_eq!( + stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal), + 0 + ); + } #[test] fn test_replay_checker_basic() {