From ef9b7b149236030f5bf4f658f435de267d06cf4f Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:45:53 +0300 Subject: [PATCH 1/3] Teardown Monitoring in API Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/model.rs | 10 + src/api/runtime_min.rs | 124 ++++++++++++ src/api/runtime_stats.rs | 25 ++- src/metrics.rs | 178 +++++++++++++++++- src/transport/middle_proxy/health.rs | 33 +++- .../middle_proxy/health_adversarial_tests.rs | 7 +- .../middle_proxy/health_regression_tests.rs | 11 +- src/transport/middle_proxy/pool_writer.rs | 112 +++++++++-- src/transport/middle_proxy/send.rs | 31 ++- 9 files changed, 496 insertions(+), 35 deletions(-) diff --git a/src/api/model.rs b/src/api/model.rs index e98de8b..91d83b2 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -205,6 +205,16 @@ pub(super) struct ZeroPoolData { pub(super) refill_failed_total: u64, pub(super) writer_restored_same_endpoint_total: u64, pub(super) writer_restored_fallback_total: u64, + pub(super) teardown_attempt_total_normal: u64, + pub(super) teardown_attempt_total_hard_detach: u64, + pub(super) teardown_success_total_normal: u64, + pub(super) teardown_success_total_hard_detach: u64, + pub(super) teardown_timeout_total: u64, + pub(super) teardown_escalation_total: u64, + pub(super) teardown_noop_total: u64, + pub(super) teardown_cleanup_side_effect_failures_total: u64, + pub(super) teardown_duration_count_total: u64, + pub(super) teardown_duration_sum_seconds_total: f64, } #[derive(Serialize, Clone)] diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index f334dd0..ae3b23f 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -4,6 +4,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use serde::Serialize; use crate::config::ProxyConfig; +use crate::stats::{ + MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats, +}; use super::ApiShared; @@ -98,6 +101,50 @@ pub(super) struct RuntimeMeQualityCountersData { pub(super) reconnect_success_total: u64, } +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownAttemptData { + pub(super) reason: &'static str, + pub(super) mode: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownSuccessData { + pub(super) mode: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownSideEffectData { + pub(super) step: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownDurationBucketData { + pub(super) le_seconds: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownDurationData { + pub(super) mode: &'static str, + pub(super) count: u64, + pub(super) sum_seconds: f64, + pub(super) buckets: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownData { + pub(super) attempts: Vec, + pub(super) success: Vec, + pub(super) timeout_total: u64, + pub(super) escalation_total: u64, + pub(super) noop_total: u64, + pub(super) cleanup_side_effect_failures: Vec, + pub(super) duration: Vec, +} + #[derive(Serialize)] pub(super) struct RuntimeMeQualityRouteDropData { pub(super) no_conn_total: u64, @@ -120,6 +167,7 @@ pub(super) struct RuntimeMeQualityDcRttData { #[derive(Serialize)] pub(super) struct RuntimeMeQualityPayload { pub(super) counters: RuntimeMeQualityCountersData, + pub(super) teardown: RuntimeMeQualityTeardownData, pub(super) route_drops: RuntimeMeQualityRouteDropData, pub(super) dc_rtt: Vec, } @@ -374,6 +422,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(), reconnect_success_total: shared.stats.get_me_reconnect_success(), }, + teardown: build_runtime_me_teardown_data(shared), route_drops: RuntimeMeQualityRouteDropData { no_conn_total: shared.stats.get_me_route_drop_no_conn(), channel_closed_total: shared.stats.get_me_route_drop_channel_closed(), @@ -397,6 +446,81 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime } } +fn build_runtime_me_teardown_data(shared: &ApiShared) -> RuntimeMeQualityTeardownData { + let attempts = MeWriterTeardownReason::ALL + .iter() + .copied() + .flat_map(|reason| { + MeWriterTeardownMode::ALL + .iter() + .copied() + .map(move |mode| RuntimeMeQualityTeardownAttemptData { + reason: reason.as_str(), + mode: mode.as_str(), + total: shared.stats.get_me_writer_teardown_attempt_total(reason, mode), + }) + }) + .collect(); + + let success = MeWriterTeardownMode::ALL + .iter() + .copied() + .map(|mode| RuntimeMeQualityTeardownSuccessData { + mode: mode.as_str(), + total: shared.stats.get_me_writer_teardown_success_total(mode), + }) + .collect(); + + let cleanup_side_effect_failures = MeWriterCleanupSideEffectStep::ALL + .iter() + .copied() + .map(|step| RuntimeMeQualityTeardownSideEffectData { + step: step.as_str(), + total: shared + .stats + .get_me_writer_cleanup_side_effect_failures_total(step), + }) + .collect(); + + let duration = MeWriterTeardownMode::ALL + .iter() + .copied() + .map(|mode| { + let count = shared.stats.get_me_writer_teardown_duration_count(mode); + let mut buckets: Vec = Stats::me_writer_teardown_duration_bucket_labels() + .iter() + .enumerate() + .map(|(bucket_idx, label)| RuntimeMeQualityTeardownDurationBucketData { + le_seconds: label, + total: shared + .stats + .get_me_writer_teardown_duration_bucket_total(mode, bucket_idx), + }) + .collect(); + buckets.push(RuntimeMeQualityTeardownDurationBucketData { + le_seconds: "+Inf", + total: count, + }); + RuntimeMeQualityTeardownDurationData { + mode: mode.as_str(), + count, + sum_seconds: shared.stats.get_me_writer_teardown_duration_sum_seconds(mode), + buckets, + } + }) + .collect(); + + RuntimeMeQualityTeardownData { + attempts, + success, + timeout_total: shared.stats.get_me_writer_teardown_timeout_total(), + escalation_total: shared.stats.get_me_writer_teardown_escalation_total(), + noop_total: shared.stats.get_me_writer_teardown_noop_total(), + cleanup_side_effect_failures, + duration, + } +} + pub(super) async fn build_runtime_upstream_quality_data( shared: &ApiShared, ) -> RuntimeUpstreamQualityData { diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index cdeacc0..22ff82e 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -1,7 +1,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::config::ApiConfig; -use crate::stats::Stats; +use crate::stats::{MeWriterTeardownMode, Stats}; use crate::transport::upstream::IpPreference; use crate::transport::UpstreamRouteKind; @@ -106,6 +106,29 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer refill_failed_total: stats.get_me_refill_failed_total(), writer_restored_same_endpoint_total: stats.get_me_writer_restored_same_endpoint_total(), writer_restored_fallback_total: stats.get_me_writer_restored_fallback_total(), + teardown_attempt_total_normal: stats + .get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::Normal), + teardown_attempt_total_hard_detach: stats + .get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::HardDetach), + teardown_success_total_normal: stats + .get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal), + teardown_success_total_hard_detach: stats + .get_me_writer_teardown_success_total(MeWriterTeardownMode::HardDetach), + teardown_timeout_total: stats.get_me_writer_teardown_timeout_total(), + teardown_escalation_total: stats.get_me_writer_teardown_escalation_total(), + teardown_noop_total: stats.get_me_writer_teardown_noop_total(), + teardown_cleanup_side_effect_failures_total: stats + .get_me_writer_cleanup_side_effect_failures_total_all(), + teardown_duration_count_total: stats + .get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal) + .saturating_add( + stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::HardDetach), + ), + teardown_duration_sum_seconds_total: stats + .get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) + + stats.get_me_writer_teardown_duration_sum_seconds( + MeWriterTeardownMode::HardDetach, + ), }, desync: ZeroDesyncData { secure_padding_invalid_total: stats.get_secure_padding_invalid(), diff --git a/src/metrics.rs b/src/metrics.rs index 4f7f4b6..b7272b2 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -16,7 +16,9 @@ use tracing::{info, warn, debug}; use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; use crate::stats::beobachten::BeobachtenStore; -use crate::stats::Stats; +use crate::stats::{ + MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats, +}; use crate::transport::{ListenOptions, create_listener}; pub async fn serve( @@ -1770,6 +1772,169 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_attempt_total ME writer teardown attempts by reason and mode" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_attempt_total counter"); + for reason in MeWriterTeardownReason::ALL { + for mode in MeWriterTeardownMode::ALL { + let _ = writeln!( + out, + "telemt_me_writer_teardown_attempt_total{{reason=\"{}\",mode=\"{}\"}} {}", + reason.as_str(), + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_attempt_total(reason, mode) + } else { + 0 + } + ); + } + } + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_success_total ME writer teardown successes by mode" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_success_total counter"); + for mode in MeWriterTeardownMode::ALL { + let _ = writeln!( + out, + "telemt_me_writer_teardown_success_total{{mode=\"{}\"}} {}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_success_total(mode) + } else { + 0 + } + ); + } + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_timeout_total Teardown operations that timed out" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_timeout_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_teardown_timeout_total {}", + if me_allows_normal { + stats.get_me_writer_teardown_timeout_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_escalation_total Watchdog teardown escalations to hard detach" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_teardown_escalation_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_teardown_escalation_total {}", + if me_allows_normal { + stats.get_me_writer_teardown_escalation_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_noop_total Teardown operations that became no-op" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_noop_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_teardown_noop_total {}", + if me_allows_normal { + stats.get_me_writer_teardown_noop_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_duration_seconds ME writer teardown latency histogram by mode" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_teardown_duration_seconds histogram" + ); + let bucket_labels = Stats::me_writer_teardown_duration_bucket_labels(); + for mode in MeWriterTeardownMode::ALL { + for (bucket_idx, label) in bucket_labels.iter().enumerate() { + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"{}\"}} {}", + mode.as_str(), + label, + if me_allows_normal { + stats.get_me_writer_teardown_duration_bucket_total(mode, bucket_idx) + } else { + 0 + } + ); + } + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"+Inf\"}} {}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_duration_count(mode) + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_sum{{mode=\"{}\"}} {:.6}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_duration_sum_seconds(mode) + } else { + 0.0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_count{{mode=\"{}\"}} {}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_duration_count(mode) + } else { + 0 + } + ); + } + + let _ = writeln!( + out, + "# HELP telemt_me_writer_cleanup_side_effect_failures_total Failed cleanup side effects by step" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter" + ); + for step in MeWriterCleanupSideEffectStep::ALL { + let _ = writeln!( + out, + "telemt_me_writer_cleanup_side_effect_failures_total{{step=\"{}\"}} {}", + step.as_str(), + if me_allows_normal { + stats.get_me_writer_cleanup_side_effect_failures_total(step) + } else { + 0 + } + ); + } + let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started"); let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter"); let _ = writeln!( @@ -2175,6 +2340,17 @@ mod tests { assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_attempt_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_success_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_timeout_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_escalation_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_noop_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_teardown_duration_seconds histogram" + )); + assert!(output.contains( + "# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter" + )); assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); assert!(output.contains( "# TYPE telemt_me_writer_close_signal_channel_full_total counter" diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 9d4cc70..30e562b 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -10,6 +10,7 @@ use tracing::{debug, info, warn}; use crate::config::MeFloorMode; use crate::crypto::SecureRandom; use crate::network::IpFamily; +use crate::stats::MeWriterTeardownReason; use super::MePool; use super::pool::MeWriter; @@ -358,7 +359,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.stats.increment_pool_force_close_total(); - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapTimeoutExpired) + .await; pool.stats .increment_me_draining_writers_reap_progress_total(); } @@ -376,7 +378,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.stats.increment_pool_force_close_total(); - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapThresholdForce) + .await; pool.stats .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); @@ -388,7 +391,8 @@ pub(super) async fn reap_draining_writers( if !closed_writer_ids.insert(writer_id) { continue; } - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapEmpty) + .await; pool.stats .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); @@ -1646,11 +1650,14 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { for (writer_id, had_clients) in &zombie_ids_with_meta { let result = tokio::time::timeout( Duration::from_secs(REMOVE_TIMEOUT_SECS), - pool.remove_writer_and_close_clients(*writer_id), + pool.remove_writer_and_close_clients( + *writer_id, + MeWriterTeardownReason::WatchdogStuckDraining, + ), ) .await; match result { - Ok(()) => { + Ok(true) => { removal_timeout_streak.remove(writer_id); pool.stats.increment_pool_force_close_total(); pool.stats @@ -1661,7 +1668,16 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { "Zombie writer removed by watchdog" ); } + Ok(false) => { + removal_timeout_streak.remove(writer_id); + debug!( + writer_id, + had_clients, + "Zombie writer watchdog removal became no-op" + ); + } Err(_) => { + pool.stats.increment_me_writer_teardown_timeout_total(); let streak = removal_timeout_streak .entry(*writer_id) .and_modify(|value| *value = value.saturating_add(1)) @@ -1675,10 +1691,14 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { if *streak < HARD_DETACH_TIMEOUT_STREAK { continue; } + pool.stats.increment_me_writer_teardown_escalation_total(); let hard_detach = tokio::time::timeout( Duration::from_secs(REMOVE_TIMEOUT_SECS), - pool.remove_draining_writer_hard_detach(*writer_id), + pool.remove_draining_writer_hard_detach( + *writer_id, + MeWriterTeardownReason::WatchdogStuckDraining, + ), ) .await; match hard_detach { @@ -1702,6 +1722,7 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { ); } Err(_) => { + pool.stats.increment_me_writer_teardown_timeout_total(); warn!( writer_id, had_clients, diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index ae517b3..93b1d2b 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -316,7 +316,12 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c let ids = sorted_writer_ids(&pool).await; for writer_id in ids.into_iter().take(3) { - let _ = pool.remove_writer_and_close_clients(writer_id).await; + let _ = pool + .remove_writer_and_close_clients( + writer_id, + crate::stats::MeWriterTeardownReason::ReapEmpty, + ) + .await; } reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 230cd64..3c7b919 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -197,7 +197,9 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() { reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.contains_key(&7)); - let _ = pool.remove_writer_and_close_clients(7).await; + let _ = pool + .remove_writer_and_close_clients(7, crate::stats::MeWriterTeardownReason::ReapEmpty) + .await; assert!(pool.registry.get_writer(conn_ids[0]).await.is_none()); reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; @@ -527,7 +529,12 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population let existing_writer_ids = current_writer_ids(&pool).await; for writer_id in existing_writer_ids.into_iter().take(4) { - let _ = pool.remove_writer_and_close_clients(writer_id).await; + let _ = pool + .remove_writer_and_close_clients( + writer_id, + crate::stats::MeWriterTeardownReason::ReapEmpty, + ) + .await; } reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index e3ea44d..b0ba776 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -16,6 +16,9 @@ use crate::config::MeBindStaleMode; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; +use crate::stats::{ + MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, +}; use super::codec::{RpcWriter, WriterCommand}; use super::pool::{MePool, MeWriter, WriterContour}; @@ -28,7 +31,7 @@ const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; #[derive(Clone, Copy)] -enum WriterTeardownMode { +enum WriterRemoveGuardMode { Any, DrainingOnly, } @@ -49,9 +52,16 @@ impl MePool { for writer_id in closed_writer_ids { if self.registry.is_writer_empty(writer_id).await { - let _ = self.remove_writer_only(writer_id).await; + let _ = self + .remove_writer_only(writer_id, MeWriterTeardownReason::PruneClosedWriter) + .await; } else { - let _ = self.remove_writer_and_close_clients(writer_id).await; + let _ = self + .remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::PruneClosedWriter, + ) + .await; } } } @@ -173,7 +183,11 @@ impl MePool { .is_ok() { if let Some(pool) = pool_writer_task.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::WriterTaskExit, + ) + .await; } else { cancel_wr.cancel(); } @@ -264,7 +278,11 @@ impl MePool { .is_ok() { if let Some(pool) = pool.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::ReaderExit, + ) + .await; } else { // Fallback for shutdown races: make writer task exit quickly so stale // channels are observable by periodic prune. @@ -372,7 +390,11 @@ impl MePool { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::PingSendFail, + ) + .await; } break; } @@ -465,7 +487,11 @@ impl MePool { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::SignalSendFail, + ) + .await; } break; } @@ -499,7 +525,11 @@ impl MePool { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::SignalSendFail, + ) + .await; } break; } @@ -512,25 +542,48 @@ impl MePool { Ok(()) } - pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { + pub(crate) async fn remove_writer_and_close_clients( + self: &Arc, + writer_id: u64, + reason: MeWriterTeardownReason, + ) -> bool { // Full client cleanup now happens inside `registry.writer_lost` to keep // writer reap/remove paths strictly non-blocking per connection. - let _ = self - .remove_writer_with_mode(writer_id, WriterTeardownMode::Any) - .await; + self.remove_writer_with_mode( + writer_id, + reason, + MeWriterTeardownMode::Normal, + WriterRemoveGuardMode::Any, + ) + .await } pub(super) async fn remove_draining_writer_hard_detach( self: &Arc, writer_id: u64, + reason: MeWriterTeardownReason, ) -> bool { - self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly) - .await + self.remove_writer_with_mode( + writer_id, + reason, + MeWriterTeardownMode::HardDetach, + WriterRemoveGuardMode::DrainingOnly, + ) + .await } - async fn remove_writer_only(self: &Arc, writer_id: u64) -> bool { - self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any) - .await + async fn remove_writer_only( + self: &Arc, + writer_id: u64, + reason: MeWriterTeardownReason, + ) -> bool { + self.remove_writer_with_mode( + writer_id, + reason, + MeWriterTeardownMode::Normal, + WriterRemoveGuardMode::Any, + ) + .await } // Authoritative teardown primitive shared by normal cleanup and watchdog path. @@ -542,8 +595,13 @@ impl MePool { async fn remove_writer_with_mode( self: &Arc, writer_id: u64, - mode: WriterTeardownMode, + reason: MeWriterTeardownReason, + mode: MeWriterTeardownMode, + guard_mode: WriterRemoveGuardMode, ) -> bool { + let started_at = Instant::now(); + self.stats + .increment_me_writer_teardown_attempt_total(reason, mode); let mut close_tx: Option> = None; let mut removed_addr: Option = None; let mut removed_dc: Option = None; @@ -553,9 +611,12 @@ impl MePool { { let mut ws = self.writers.write().await; if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { - if matches!(mode, WriterTeardownMode::DrainingOnly) + if matches!(guard_mode, WriterRemoveGuardMode::DrainingOnly) && !ws[pos].draining.load(Ordering::Relaxed) { + self.stats.increment_me_writer_teardown_noop_total(); + self.stats + .observe_me_writer_teardown_duration(mode, started_at.elapsed()); return false; } let w = ws.remove(pos); @@ -595,6 +656,9 @@ impl MePool { self.stats.increment_me_writer_close_signal_drop_total(); self.stats .increment_me_writer_close_signal_channel_full_total(); + self.stats.increment_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelFull, + ); debug!( writer_id, "Skipping close signal for removed writer: command channel is full" @@ -602,6 +666,9 @@ impl MePool { } Err(TrySendError::Closed(_)) => { self.stats.increment_me_writer_close_signal_drop_total(); + self.stats.increment_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelClosed, + ); debug!( writer_id, "Skipping close signal for removed writer: command channel is closed" @@ -619,6 +686,13 @@ impl MePool { self.trigger_immediate_refill_for_dc(addr, writer_dc); } } + if removed { + self.stats.increment_me_writer_teardown_success_total(mode); + } else { + self.stats.increment_me_writer_teardown_noop_total(); + } + self.stats + .observe_me_writer_teardown_duration(mode, started_at.elapsed()); removed } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 6791064..82118d8 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -14,6 +14,7 @@ use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; +use crate::stats::MeWriterTeardownReason; use super::MePool; use super::codec::WriterCommand; @@ -134,7 +135,11 @@ impl MePool { Ok(()) => return Ok(()), Err(TimedSendError::Closed(_)) => { warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id).await; + self.remove_writer_and_close_clients( + current.writer_id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; continue; } Err(TimedSendError::Timeout(_)) => { @@ -151,7 +156,11 @@ impl MePool { } Err(TrySendError::Closed(_)) => { warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id).await; + self.remove_writer_and_close_clients( + current.writer_id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; continue; } } @@ -458,7 +467,11 @@ impl MePool { Err(TrySendError::Closed(_)) => { self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed"); - self.remove_writer_and_close_clients(w.id).await; + self.remove_writer_and_close_clients( + w.id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; continue; } } @@ -503,7 +516,11 @@ impl MePool { Err(TimedSendError::Closed(_)) => { self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed (blocking)"); - self.remove_writer_and_close_clients(w.id).await; + self.remove_writer_and_close_clients( + w.id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; } Err(TimedSendError::Timeout(_)) => { self.stats.increment_me_writer_pick_full_total(pick_mode); @@ -654,7 +671,11 @@ impl MePool { } Err(TrySendError::Closed(_)) => { debug!("ME close write failed"); - self.remove_writer_and_close_clients(w.writer_id).await; + self.remove_writer_and_close_clients( + w.writer_id, + MeWriterTeardownReason::CloseRpcChannelClosed, + ) + .await; } } } else { From aba4205dccd3acb98bd5b8d403bce82dde21dd2b Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:46:35 +0300 Subject: [PATCH 2/3] Teardown Monitoring in Metrics Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/stats/mod.rs | 357 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 357 insertions(+) diff --git a/src/stats/mod.rs b/src/stats/mod.rs index ad1d16b..0df4dc0 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -19,6 +19,137 @@ use tracing::debug; use crate::config::{MeTelemetryLevel, MeWriterPickMode}; use self::telemetry::TelemetryPolicy; +const ME_WRITER_TEARDOWN_MODE_COUNT: usize = 2; +const ME_WRITER_TEARDOWN_REASON_COUNT: usize = 11; +const ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT: usize = 2; +const ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT: usize = 12; +const ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS: [u64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [ + 1_000, + 5_000, + 10_000, + 25_000, + 50_000, + 100_000, + 250_000, + 500_000, + 1_000_000, + 2_500_000, + 5_000_000, + 10_000_000, +]; +const ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS: [&str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [ + "0.001", + "0.005", + "0.01", + "0.025", + "0.05", + "0.1", + "0.25", + "0.5", + "1", + "2.5", + "5", + "10", +]; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MeWriterTeardownMode { + Normal = 0, + HardDetach = 1, +} + +impl MeWriterTeardownMode { + pub const ALL: [Self; ME_WRITER_TEARDOWN_MODE_COUNT] = + [Self::Normal, Self::HardDetach]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::Normal => "normal", + Self::HardDetach => "hard_detach", + } + } + + const fn idx(self) -> usize { + self as usize + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MeWriterTeardownReason { + ReaderExit = 0, + WriterTaskExit = 1, + PingSendFail = 2, + SignalSendFail = 3, + RouteChannelClosed = 4, + CloseRpcChannelClosed = 5, + PruneClosedWriter = 6, + ReapTimeoutExpired = 7, + ReapThresholdForce = 8, + ReapEmpty = 9, + WatchdogStuckDraining = 10, +} + +impl MeWriterTeardownReason { + pub const ALL: [Self; ME_WRITER_TEARDOWN_REASON_COUNT] = [ + Self::ReaderExit, + Self::WriterTaskExit, + Self::PingSendFail, + Self::SignalSendFail, + Self::RouteChannelClosed, + Self::CloseRpcChannelClosed, + Self::PruneClosedWriter, + Self::ReapTimeoutExpired, + Self::ReapThresholdForce, + Self::ReapEmpty, + Self::WatchdogStuckDraining, + ]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::ReaderExit => "reader_exit", + Self::WriterTaskExit => "writer_task_exit", + Self::PingSendFail => "ping_send_fail", + Self::SignalSendFail => "signal_send_fail", + Self::RouteChannelClosed => "route_channel_closed", + Self::CloseRpcChannelClosed => "close_rpc_channel_closed", + Self::PruneClosedWriter => "prune_closed_writer", + Self::ReapTimeoutExpired => "reap_timeout_expired", + Self::ReapThresholdForce => "reap_threshold_force", + Self::ReapEmpty => "reap_empty", + Self::WatchdogStuckDraining => "watchdog_stuck_draining", + } + } + + const fn idx(self) -> usize { + self as usize + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum MeWriterCleanupSideEffectStep { + CloseSignalChannelFull = 0, + CloseSignalChannelClosed = 1, +} + +impl MeWriterCleanupSideEffectStep { + pub const ALL: [Self; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT] = + [Self::CloseSignalChannelFull, Self::CloseSignalChannelClosed]; + + pub const fn as_str(self) -> &'static str { + match self { + Self::CloseSignalChannelFull => "close_signal_channel_full", + Self::CloseSignalChannelClosed => "close_signal_channel_closed", + } + } + + const fn idx(self) -> usize { + self as usize + } +} + // ============= Stats ============= #[derive(Default)] @@ -128,6 +259,18 @@ pub struct Stats { me_draining_writers_reap_progress_total: AtomicU64, me_writer_removed_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64, + me_writer_teardown_attempt_total: + [[AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT]; ME_WRITER_TEARDOWN_REASON_COUNT], + me_writer_teardown_success_total: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT], + me_writer_teardown_timeout_total: AtomicU64, + me_writer_teardown_escalation_total: AtomicU64, + me_writer_teardown_noop_total: AtomicU64, + me_writer_cleanup_side_effect_failures_total: + [AtomicU64; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT], + me_writer_teardown_duration_bucket_hits: + [[AtomicU64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT + 1]; ME_WRITER_TEARDOWN_MODE_COUNT], + me_writer_teardown_duration_sum_micros: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT], + me_writer_teardown_duration_count: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT], me_refill_triggered_total: AtomicU64, me_refill_skipped_inflight_total: AtomicU64, me_refill_failed_total: AtomicU64, @@ -765,6 +908,74 @@ impl Stats { self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_teardown_attempt_total( + &self, + reason: MeWriterTeardownReason, + mode: MeWriterTeardownMode, + ) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()] + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_success_total[mode.idx()].fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_timeout_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_escalation_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_escalation_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_teardown_noop_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_teardown_noop_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_cleanup_side_effect_failures_total( + &self, + step: MeWriterCleanupSideEffectStep, + ) { + if self.telemetry_me_allows_normal() { + self.me_writer_cleanup_side_effect_failures_total[step.idx()] + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn observe_me_writer_teardown_duration( + &self, + mode: MeWriterTeardownMode, + duration: Duration, + ) { + if !self.telemetry_me_allows_normal() { + return; + } + let duration_micros = duration.as_micros().min(u64::MAX as u128) as u64; + let mut bucket_idx = ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT; + for (idx, upper_bound_micros) in ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS + .iter() + .copied() + .enumerate() + { + if duration_micros <= upper_bound_micros { + bucket_idx = idx; + break; + } + } + self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx] + .fetch_add(1, Ordering::Relaxed); + self.me_writer_teardown_duration_sum_micros[mode.idx()] + .fetch_add(duration_micros, Ordering::Relaxed); + self.me_writer_teardown_duration_count[mode.idx()].fetch_add(1, Ordering::Relaxed); + } pub fn increment_me_refill_triggered_total(&self) { if self.telemetry_me_allows_debug() { self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); @@ -1297,6 +1508,79 @@ impl Stats { pub fn get_me_writer_removed_unexpected_total(&self) -> u64 { self.me_writer_removed_unexpected_total.load(Ordering::Relaxed) } + pub fn get_me_writer_teardown_attempt_total( + &self, + reason: MeWriterTeardownReason, + mode: MeWriterTeardownMode, + ) -> u64 { + self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()] + .load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_attempt_total_by_mode(&self, mode: MeWriterTeardownMode) -> u64 { + MeWriterTeardownReason::ALL + .iter() + .copied() + .map(|reason| self.get_me_writer_teardown_attempt_total(reason, mode)) + .sum() + } + pub fn get_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) -> u64 { + self.me_writer_teardown_success_total[mode.idx()].load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_timeout_total(&self) -> u64 { + self.me_writer_teardown_timeout_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_escalation_total(&self) -> u64 { + self.me_writer_teardown_escalation_total + .load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_noop_total(&self) -> u64 { + self.me_writer_teardown_noop_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_cleanup_side_effect_failures_total( + &self, + step: MeWriterCleanupSideEffectStep, + ) -> u64 { + self.me_writer_cleanup_side_effect_failures_total[step.idx()] + .load(Ordering::Relaxed) + } + pub fn get_me_writer_cleanup_side_effect_failures_total_all(&self) -> u64 { + MeWriterCleanupSideEffectStep::ALL + .iter() + .copied() + .map(|step| self.get_me_writer_cleanup_side_effect_failures_total(step)) + .sum() + } + pub fn me_writer_teardown_duration_bucket_labels( + ) -> &'static [&'static str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] { + &ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS + } + pub fn get_me_writer_teardown_duration_bucket_hits( + &self, + mode: MeWriterTeardownMode, + bucket_idx: usize, + ) -> u64 { + self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx] + .load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_duration_bucket_total( + &self, + mode: MeWriterTeardownMode, + bucket_idx: usize, + ) -> u64 { + let capped_idx = bucket_idx.min(ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT); + let mut total = 0u64; + for idx in 0..=capped_idx { + total = total.saturating_add(self.get_me_writer_teardown_duration_bucket_hits(mode, idx)); + } + total + } + pub fn get_me_writer_teardown_duration_count(&self, mode: MeWriterTeardownMode) -> u64 { + self.me_writer_teardown_duration_count[mode.idx()].load(Ordering::Relaxed) + } + pub fn get_me_writer_teardown_duration_sum_seconds(&self, mode: MeWriterTeardownMode) -> f64 { + self.me_writer_teardown_duration_sum_micros[mode.idx()].load(Ordering::Relaxed) as f64 + / 1_000_000.0 + } pub fn get_me_refill_triggered_total(&self) -> u64 { self.me_refill_triggered_total.load(Ordering::Relaxed) } @@ -1800,6 +2084,79 @@ mod tests { assert_eq!(stats.get_me_keepalive_sent(), 0); assert_eq!(stats.get_me_route_drop_queue_full(), 0); } + + #[test] + fn test_teardown_counters_and_duration() { + let stats = Stats::new(); + stats.increment_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal, + ); + stats.increment_me_writer_teardown_success_total(MeWriterTeardownMode::Normal); + stats.observe_me_writer_teardown_duration( + MeWriterTeardownMode::Normal, + Duration::from_millis(3), + ); + stats.increment_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelFull, + ); + + assert_eq!( + stats.get_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal + ), + 1 + ); + assert_eq!( + stats.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal), + 1 + ); + assert_eq!( + stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal), + 1 + ); + assert!( + stats.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) > 0.0 + ); + assert_eq!( + stats.get_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelFull + ), + 1 + ); + } + + #[test] + fn test_teardown_counters_respect_me_silent() { + let stats = Stats::new(); + stats.apply_telemetry_policy(TelemetryPolicy { + core_enabled: true, + user_enabled: true, + me_level: MeTelemetryLevel::Silent, + }); + stats.increment_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal, + ); + stats.increment_me_writer_teardown_timeout_total(); + stats.observe_me_writer_teardown_duration( + MeWriterTeardownMode::Normal, + Duration::from_millis(1), + ); + assert_eq!( + stats.get_me_writer_teardown_attempt_total( + MeWriterTeardownReason::ReaderExit, + MeWriterTeardownMode::Normal + ), + 0 + ); + assert_eq!(stats.get_me_writer_teardown_timeout_total(), 0); + assert_eq!( + stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal), + 0 + ); + } #[test] fn test_replay_checker_basic() { From 4a610d83a36dd6489e7317b0cda791029a5ad6a2 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:56:13 +0300 Subject: [PATCH 3/3] Update Cargo.toml Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fab12f7..7c5fdc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.26" +version = "3.3.27" edition = "2024" [dependencies]