diff --git a/src/api/model.rs b/src/api/model.rs index e98de8b..91d83b2 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -205,6 +205,16 @@ pub(super) struct ZeroPoolData { pub(super) refill_failed_total: u64, pub(super) writer_restored_same_endpoint_total: u64, pub(super) writer_restored_fallback_total: u64, + pub(super) teardown_attempt_total_normal: u64, + pub(super) teardown_attempt_total_hard_detach: u64, + pub(super) teardown_success_total_normal: u64, + pub(super) teardown_success_total_hard_detach: u64, + pub(super) teardown_timeout_total: u64, + pub(super) teardown_escalation_total: u64, + pub(super) teardown_noop_total: u64, + pub(super) teardown_cleanup_side_effect_failures_total: u64, + pub(super) teardown_duration_count_total: u64, + pub(super) teardown_duration_sum_seconds_total: f64, } #[derive(Serialize, Clone)] diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index f334dd0..ae3b23f 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -4,6 +4,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use serde::Serialize; use crate::config::ProxyConfig; +use crate::stats::{ + MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats, +}; use super::ApiShared; @@ -98,6 +101,50 @@ pub(super) struct RuntimeMeQualityCountersData { pub(super) reconnect_success_total: u64, } +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownAttemptData { + pub(super) reason: &'static str, + pub(super) mode: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownSuccessData { + pub(super) mode: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownSideEffectData { + pub(super) step: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownDurationBucketData { + pub(super) le_seconds: &'static str, + pub(super) total: u64, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownDurationData { + pub(super) mode: &'static str, + pub(super) count: u64, + pub(super) sum_seconds: f64, + pub(super) buckets: Vec, +} + +#[derive(Serialize)] +pub(super) struct RuntimeMeQualityTeardownData { + pub(super) attempts: Vec, + pub(super) success: Vec, + pub(super) timeout_total: u64, + pub(super) escalation_total: u64, + pub(super) noop_total: u64, + pub(super) cleanup_side_effect_failures: Vec, + pub(super) duration: Vec, +} + #[derive(Serialize)] pub(super) struct RuntimeMeQualityRouteDropData { pub(super) no_conn_total: u64, @@ -120,6 +167,7 @@ pub(super) struct RuntimeMeQualityDcRttData { #[derive(Serialize)] pub(super) struct RuntimeMeQualityPayload { pub(super) counters: RuntimeMeQualityCountersData, + pub(super) teardown: RuntimeMeQualityTeardownData, pub(super) route_drops: RuntimeMeQualityRouteDropData, pub(super) dc_rtt: Vec, } @@ -374,6 +422,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(), reconnect_success_total: shared.stats.get_me_reconnect_success(), }, + teardown: build_runtime_me_teardown_data(shared), route_drops: RuntimeMeQualityRouteDropData { no_conn_total: shared.stats.get_me_route_drop_no_conn(), channel_closed_total: shared.stats.get_me_route_drop_channel_closed(), @@ -397,6 +446,81 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime } } +fn build_runtime_me_teardown_data(shared: &ApiShared) -> RuntimeMeQualityTeardownData { + let attempts = MeWriterTeardownReason::ALL + .iter() + .copied() + .flat_map(|reason| { + MeWriterTeardownMode::ALL + .iter() + .copied() + .map(move |mode| RuntimeMeQualityTeardownAttemptData { + reason: reason.as_str(), + mode: mode.as_str(), + total: shared.stats.get_me_writer_teardown_attempt_total(reason, mode), + }) + }) + .collect(); + + let success = MeWriterTeardownMode::ALL + .iter() + .copied() + .map(|mode| RuntimeMeQualityTeardownSuccessData { + mode: mode.as_str(), + total: shared.stats.get_me_writer_teardown_success_total(mode), + }) + .collect(); + + let cleanup_side_effect_failures = MeWriterCleanupSideEffectStep::ALL + .iter() + .copied() + .map(|step| RuntimeMeQualityTeardownSideEffectData { + step: step.as_str(), + total: shared + .stats + .get_me_writer_cleanup_side_effect_failures_total(step), + }) + .collect(); + + let duration = MeWriterTeardownMode::ALL + .iter() + .copied() + .map(|mode| { + let count = shared.stats.get_me_writer_teardown_duration_count(mode); + let mut buckets: Vec = Stats::me_writer_teardown_duration_bucket_labels() + .iter() + .enumerate() + .map(|(bucket_idx, label)| RuntimeMeQualityTeardownDurationBucketData { + le_seconds: label, + total: shared + .stats + .get_me_writer_teardown_duration_bucket_total(mode, bucket_idx), + }) + .collect(); + buckets.push(RuntimeMeQualityTeardownDurationBucketData { + le_seconds: "+Inf", + total: count, + }); + RuntimeMeQualityTeardownDurationData { + mode: mode.as_str(), + count, + sum_seconds: shared.stats.get_me_writer_teardown_duration_sum_seconds(mode), + buckets, + } + }) + .collect(); + + RuntimeMeQualityTeardownData { + attempts, + success, + timeout_total: shared.stats.get_me_writer_teardown_timeout_total(), + escalation_total: shared.stats.get_me_writer_teardown_escalation_total(), + noop_total: shared.stats.get_me_writer_teardown_noop_total(), + cleanup_side_effect_failures, + duration, + } +} + pub(super) async fn build_runtime_upstream_quality_data( shared: &ApiShared, ) -> RuntimeUpstreamQualityData { diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index cdeacc0..22ff82e 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -1,7 +1,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::config::ApiConfig; -use crate::stats::Stats; +use crate::stats::{MeWriterTeardownMode, Stats}; use crate::transport::upstream::IpPreference; use crate::transport::UpstreamRouteKind; @@ -106,6 +106,29 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer refill_failed_total: stats.get_me_refill_failed_total(), writer_restored_same_endpoint_total: stats.get_me_writer_restored_same_endpoint_total(), writer_restored_fallback_total: stats.get_me_writer_restored_fallback_total(), + teardown_attempt_total_normal: stats + .get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::Normal), + teardown_attempt_total_hard_detach: stats + .get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::HardDetach), + teardown_success_total_normal: stats + .get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal), + teardown_success_total_hard_detach: stats + .get_me_writer_teardown_success_total(MeWriterTeardownMode::HardDetach), + teardown_timeout_total: stats.get_me_writer_teardown_timeout_total(), + teardown_escalation_total: stats.get_me_writer_teardown_escalation_total(), + teardown_noop_total: stats.get_me_writer_teardown_noop_total(), + teardown_cleanup_side_effect_failures_total: stats + .get_me_writer_cleanup_side_effect_failures_total_all(), + teardown_duration_count_total: stats + .get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal) + .saturating_add( + stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::HardDetach), + ), + teardown_duration_sum_seconds_total: stats + .get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) + + stats.get_me_writer_teardown_duration_sum_seconds( + MeWriterTeardownMode::HardDetach, + ), }, desync: ZeroDesyncData { secure_padding_invalid_total: stats.get_secure_padding_invalid(), diff --git a/src/metrics.rs b/src/metrics.rs index 4f7f4b6..b7272b2 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -16,7 +16,9 @@ use tracing::{info, warn, debug}; use crate::config::ProxyConfig; use crate::ip_tracker::UserIpTracker; use crate::stats::beobachten::BeobachtenStore; -use crate::stats::Stats; +use crate::stats::{ + MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats, +}; use crate::transport::{ListenOptions, create_listener}; pub async fn serve( @@ -1770,6 +1772,169 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_attempt_total ME writer teardown attempts by reason and mode" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_attempt_total counter"); + for reason in MeWriterTeardownReason::ALL { + for mode in MeWriterTeardownMode::ALL { + let _ = writeln!( + out, + "telemt_me_writer_teardown_attempt_total{{reason=\"{}\",mode=\"{}\"}} {}", + reason.as_str(), + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_attempt_total(reason, mode) + } else { + 0 + } + ); + } + } + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_success_total ME writer teardown successes by mode" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_success_total counter"); + for mode in MeWriterTeardownMode::ALL { + let _ = writeln!( + out, + "telemt_me_writer_teardown_success_total{{mode=\"{}\"}} {}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_success_total(mode) + } else { + 0 + } + ); + } + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_timeout_total Teardown operations that timed out" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_timeout_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_teardown_timeout_total {}", + if me_allows_normal { + stats.get_me_writer_teardown_timeout_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_escalation_total Watchdog teardown escalations to hard detach" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_teardown_escalation_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_teardown_escalation_total {}", + if me_allows_normal { + stats.get_me_writer_teardown_escalation_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_noop_total Teardown operations that became no-op" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_noop_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_teardown_noop_total {}", + if me_allows_normal { + stats.get_me_writer_teardown_noop_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_teardown_duration_seconds ME writer teardown latency histogram by mode" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_teardown_duration_seconds histogram" + ); + let bucket_labels = Stats::me_writer_teardown_duration_bucket_labels(); + for mode in MeWriterTeardownMode::ALL { + for (bucket_idx, label) in bucket_labels.iter().enumerate() { + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"{}\"}} {}", + mode.as_str(), + label, + if me_allows_normal { + stats.get_me_writer_teardown_duration_bucket_total(mode, bucket_idx) + } else { + 0 + } + ); + } + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"+Inf\"}} {}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_duration_count(mode) + } else { + 0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_sum{{mode=\"{}\"}} {:.6}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_duration_sum_seconds(mode) + } else { + 0.0 + } + ); + let _ = writeln!( + out, + "telemt_me_writer_teardown_duration_seconds_count{{mode=\"{}\"}} {}", + mode.as_str(), + if me_allows_normal { + stats.get_me_writer_teardown_duration_count(mode) + } else { + 0 + } + ); + } + + let _ = writeln!( + out, + "# HELP telemt_me_writer_cleanup_side_effect_failures_total Failed cleanup side effects by step" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter" + ); + for step in MeWriterCleanupSideEffectStep::ALL { + let _ = writeln!( + out, + "telemt_me_writer_cleanup_side_effect_failures_total{{step=\"{}\"}} {}", + step.as_str(), + if me_allows_normal { + stats.get_me_writer_cleanup_side_effect_failures_total(step) + } else { + 0 + } + ); + } + let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started"); let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter"); let _ = writeln!( @@ -2175,6 +2340,17 @@ mod tests { assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_attempt_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_success_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_timeout_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_escalation_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_teardown_noop_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_teardown_duration_seconds histogram" + )); + assert!(output.contains( + "# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter" + )); assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); assert!(output.contains( "# TYPE telemt_me_writer_close_signal_channel_full_total counter" diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 9d4cc70..30e562b 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -10,6 +10,7 @@ use tracing::{debug, info, warn}; use crate::config::MeFloorMode; use crate::crypto::SecureRandom; use crate::network::IpFamily; +use crate::stats::MeWriterTeardownReason; use super::MePool; use super::pool::MeWriter; @@ -358,7 +359,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.stats.increment_pool_force_close_total(); - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapTimeoutExpired) + .await; pool.stats .increment_me_draining_writers_reap_progress_total(); } @@ -376,7 +378,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.stats.increment_pool_force_close_total(); - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapThresholdForce) + .await; pool.stats .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); @@ -388,7 +391,8 @@ pub(super) async fn reap_draining_writers( if !closed_writer_ids.insert(writer_id) { continue; } - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapEmpty) + .await; pool.stats .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); @@ -1646,11 +1650,14 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { for (writer_id, had_clients) in &zombie_ids_with_meta { let result = tokio::time::timeout( Duration::from_secs(REMOVE_TIMEOUT_SECS), - pool.remove_writer_and_close_clients(*writer_id), + pool.remove_writer_and_close_clients( + *writer_id, + MeWriterTeardownReason::WatchdogStuckDraining, + ), ) .await; match result { - Ok(()) => { + Ok(true) => { removal_timeout_streak.remove(writer_id); pool.stats.increment_pool_force_close_total(); pool.stats @@ -1661,7 +1668,16 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { "Zombie writer removed by watchdog" ); } + Ok(false) => { + removal_timeout_streak.remove(writer_id); + debug!( + writer_id, + had_clients, + "Zombie writer watchdog removal became no-op" + ); + } Err(_) => { + pool.stats.increment_me_writer_teardown_timeout_total(); let streak = removal_timeout_streak .entry(*writer_id) .and_modify(|value| *value = value.saturating_add(1)) @@ -1675,10 +1691,14 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { if *streak < HARD_DETACH_TIMEOUT_STREAK { continue; } + pool.stats.increment_me_writer_teardown_escalation_total(); let hard_detach = tokio::time::timeout( Duration::from_secs(REMOVE_TIMEOUT_SECS), - pool.remove_draining_writer_hard_detach(*writer_id), + pool.remove_draining_writer_hard_detach( + *writer_id, + MeWriterTeardownReason::WatchdogStuckDraining, + ), ) .await; match hard_detach { @@ -1702,6 +1722,7 @@ pub async fn me_zombie_writer_watchdog(pool: Arc) { ); } Err(_) => { + pool.stats.increment_me_writer_teardown_timeout_total(); warn!( writer_id, had_clients, diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index ae517b3..93b1d2b 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -316,7 +316,12 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c let ids = sorted_writer_ids(&pool).await; for writer_id in ids.into_iter().take(3) { - let _ = pool.remove_writer_and_close_clients(writer_id).await; + let _ = pool + .remove_writer_and_close_clients( + writer_id, + crate::stats::MeWriterTeardownReason::ReapEmpty, + ) + .await; } reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 230cd64..3c7b919 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -197,7 +197,9 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() { reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.contains_key(&7)); - let _ = pool.remove_writer_and_close_clients(7).await; + let _ = pool + .remove_writer_and_close_clients(7, crate::stats::MeWriterTeardownReason::ReapEmpty) + .await; assert!(pool.registry.get_writer(conn_ids[0]).await.is_none()); reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; @@ -527,7 +529,12 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population let existing_writer_ids = current_writer_ids(&pool).await; for writer_id in existing_writer_ids.into_iter().take(4) { - let _ = pool.remove_writer_and_close_clients(writer_id).await; + let _ = pool + .remove_writer_and_close_clients( + writer_id, + crate::stats::MeWriterTeardownReason::ReapEmpty, + ) + .await; } reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index e3ea44d..b0ba776 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -16,6 +16,9 @@ use crate::config::MeBindStaleMode; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; +use crate::stats::{ + MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, +}; use super::codec::{RpcWriter, WriterCommand}; use super::pool::{MePool, MeWriter, WriterContour}; @@ -28,7 +31,7 @@ const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; #[derive(Clone, Copy)] -enum WriterTeardownMode { +enum WriterRemoveGuardMode { Any, DrainingOnly, } @@ -49,9 +52,16 @@ impl MePool { for writer_id in closed_writer_ids { if self.registry.is_writer_empty(writer_id).await { - let _ = self.remove_writer_only(writer_id).await; + let _ = self + .remove_writer_only(writer_id, MeWriterTeardownReason::PruneClosedWriter) + .await; } else { - let _ = self.remove_writer_and_close_clients(writer_id).await; + let _ = self + .remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::PruneClosedWriter, + ) + .await; } } } @@ -173,7 +183,11 @@ impl MePool { .is_ok() { if let Some(pool) = pool_writer_task.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::WriterTaskExit, + ) + .await; } else { cancel_wr.cancel(); } @@ -264,7 +278,11 @@ impl MePool { .is_ok() { if let Some(pool) = pool.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::ReaderExit, + ) + .await; } else { // Fallback for shutdown races: make writer task exit quickly so stale // channels are observable by periodic prune. @@ -372,7 +390,11 @@ impl MePool { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::PingSendFail, + ) + .await; } break; } @@ -465,7 +487,11 @@ impl MePool { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::SignalSendFail, + ) + .await; } break; } @@ -499,7 +525,11 @@ impl MePool { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + pool.remove_writer_and_close_clients( + writer_id, + MeWriterTeardownReason::SignalSendFail, + ) + .await; } break; } @@ -512,25 +542,48 @@ impl MePool { Ok(()) } - pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { + pub(crate) async fn remove_writer_and_close_clients( + self: &Arc, + writer_id: u64, + reason: MeWriterTeardownReason, + ) -> bool { // Full client cleanup now happens inside `registry.writer_lost` to keep // writer reap/remove paths strictly non-blocking per connection. - let _ = self - .remove_writer_with_mode(writer_id, WriterTeardownMode::Any) - .await; + self.remove_writer_with_mode( + writer_id, + reason, + MeWriterTeardownMode::Normal, + WriterRemoveGuardMode::Any, + ) + .await } pub(super) async fn remove_draining_writer_hard_detach( self: &Arc, writer_id: u64, + reason: MeWriterTeardownReason, ) -> bool { - self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly) - .await + self.remove_writer_with_mode( + writer_id, + reason, + MeWriterTeardownMode::HardDetach, + WriterRemoveGuardMode::DrainingOnly, + ) + .await } - async fn remove_writer_only(self: &Arc, writer_id: u64) -> bool { - self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any) - .await + async fn remove_writer_only( + self: &Arc, + writer_id: u64, + reason: MeWriterTeardownReason, + ) -> bool { + self.remove_writer_with_mode( + writer_id, + reason, + MeWriterTeardownMode::Normal, + WriterRemoveGuardMode::Any, + ) + .await } // Authoritative teardown primitive shared by normal cleanup and watchdog path. @@ -542,8 +595,13 @@ impl MePool { async fn remove_writer_with_mode( self: &Arc, writer_id: u64, - mode: WriterTeardownMode, + reason: MeWriterTeardownReason, + mode: MeWriterTeardownMode, + guard_mode: WriterRemoveGuardMode, ) -> bool { + let started_at = Instant::now(); + self.stats + .increment_me_writer_teardown_attempt_total(reason, mode); let mut close_tx: Option> = None; let mut removed_addr: Option = None; let mut removed_dc: Option = None; @@ -553,9 +611,12 @@ impl MePool { { let mut ws = self.writers.write().await; if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { - if matches!(mode, WriterTeardownMode::DrainingOnly) + if matches!(guard_mode, WriterRemoveGuardMode::DrainingOnly) && !ws[pos].draining.load(Ordering::Relaxed) { + self.stats.increment_me_writer_teardown_noop_total(); + self.stats + .observe_me_writer_teardown_duration(mode, started_at.elapsed()); return false; } let w = ws.remove(pos); @@ -595,6 +656,9 @@ impl MePool { self.stats.increment_me_writer_close_signal_drop_total(); self.stats .increment_me_writer_close_signal_channel_full_total(); + self.stats.increment_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelFull, + ); debug!( writer_id, "Skipping close signal for removed writer: command channel is full" @@ -602,6 +666,9 @@ impl MePool { } Err(TrySendError::Closed(_)) => { self.stats.increment_me_writer_close_signal_drop_total(); + self.stats.increment_me_writer_cleanup_side_effect_failures_total( + MeWriterCleanupSideEffectStep::CloseSignalChannelClosed, + ); debug!( writer_id, "Skipping close signal for removed writer: command channel is closed" @@ -619,6 +686,13 @@ impl MePool { self.trigger_immediate_refill_for_dc(addr, writer_dc); } } + if removed { + self.stats.increment_me_writer_teardown_success_total(mode); + } else { + self.stats.increment_me_writer_teardown_noop_total(); + } + self.stats + .observe_me_writer_teardown_duration(mode, started_at.elapsed()); removed } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 6791064..82118d8 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -14,6 +14,7 @@ use crate::config::{MeRouteNoWriterMode, MeWriterPickMode}; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; +use crate::stats::MeWriterTeardownReason; use super::MePool; use super::codec::WriterCommand; @@ -134,7 +135,11 @@ impl MePool { Ok(()) => return Ok(()), Err(TimedSendError::Closed(_)) => { warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id).await; + self.remove_writer_and_close_clients( + current.writer_id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; continue; } Err(TimedSendError::Timeout(_)) => { @@ -151,7 +156,11 @@ impl MePool { } Err(TrySendError::Closed(_)) => { warn!(writer_id = current.writer_id, "ME writer channel closed"); - self.remove_writer_and_close_clients(current.writer_id).await; + self.remove_writer_and_close_clients( + current.writer_id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; continue; } } @@ -458,7 +467,11 @@ impl MePool { Err(TrySendError::Closed(_)) => { self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed"); - self.remove_writer_and_close_clients(w.id).await; + self.remove_writer_and_close_clients( + w.id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; continue; } } @@ -503,7 +516,11 @@ impl MePool { Err(TimedSendError::Closed(_)) => { self.stats.increment_me_writer_pick_closed_total(pick_mode); warn!(writer_id = w.id, "ME writer channel closed (blocking)"); - self.remove_writer_and_close_clients(w.id).await; + self.remove_writer_and_close_clients( + w.id, + MeWriterTeardownReason::RouteChannelClosed, + ) + .await; } Err(TimedSendError::Timeout(_)) => { self.stats.increment_me_writer_pick_full_total(pick_mode); @@ -654,7 +671,11 @@ impl MePool { } Err(TrySendError::Closed(_)) => { debug!("ME close write failed"); - self.remove_writer_and_close_clients(w.writer_id).await; + self.remove_writer_and_close_clients( + w.writer_id, + MeWriterTeardownReason::CloseRpcChannelClosed, + ) + .await; } } } else {