Merge pull request #505 from telemt/flow-me

Teardown Monitoring in API and Metrics
This commit is contained in:
Alexey 2026-03-20 12:59:39 +03:00 committed by GitHub
commit 9d38b42d88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 854 additions and 36 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "telemt" name = "telemt"
version = "3.3.26" version = "3.3.27"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -205,6 +205,16 @@ pub(super) struct ZeroPoolData {
pub(super) refill_failed_total: u64, pub(super) refill_failed_total: u64,
pub(super) writer_restored_same_endpoint_total: u64, pub(super) writer_restored_same_endpoint_total: u64,
pub(super) writer_restored_fallback_total: u64, pub(super) writer_restored_fallback_total: u64,
pub(super) teardown_attempt_total_normal: u64,
pub(super) teardown_attempt_total_hard_detach: u64,
pub(super) teardown_success_total_normal: u64,
pub(super) teardown_success_total_hard_detach: u64,
pub(super) teardown_timeout_total: u64,
pub(super) teardown_escalation_total: u64,
pub(super) teardown_noop_total: u64,
pub(super) teardown_cleanup_side_effect_failures_total: u64,
pub(super) teardown_duration_count_total: u64,
pub(super) teardown_duration_sum_seconds_total: f64,
} }
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]

View File

@ -4,6 +4,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
use serde::Serialize; use serde::Serialize;
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats,
};
use super::ApiShared; use super::ApiShared;
@ -98,6 +101,50 @@ pub(super) struct RuntimeMeQualityCountersData {
pub(super) reconnect_success_total: u64, pub(super) reconnect_success_total: u64,
} }
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownAttemptData {
pub(super) reason: &'static str,
pub(super) mode: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownSuccessData {
pub(super) mode: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownSideEffectData {
pub(super) step: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownDurationBucketData {
pub(super) le_seconds: &'static str,
pub(super) total: u64,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownDurationData {
pub(super) mode: &'static str,
pub(super) count: u64,
pub(super) sum_seconds: f64,
pub(super) buckets: Vec<RuntimeMeQualityTeardownDurationBucketData>,
}
#[derive(Serialize)]
pub(super) struct RuntimeMeQualityTeardownData {
pub(super) attempts: Vec<RuntimeMeQualityTeardownAttemptData>,
pub(super) success: Vec<RuntimeMeQualityTeardownSuccessData>,
pub(super) timeout_total: u64,
pub(super) escalation_total: u64,
pub(super) noop_total: u64,
pub(super) cleanup_side_effect_failures: Vec<RuntimeMeQualityTeardownSideEffectData>,
pub(super) duration: Vec<RuntimeMeQualityTeardownDurationData>,
}
#[derive(Serialize)] #[derive(Serialize)]
pub(super) struct RuntimeMeQualityRouteDropData { pub(super) struct RuntimeMeQualityRouteDropData {
pub(super) no_conn_total: u64, pub(super) no_conn_total: u64,
@ -120,6 +167,7 @@ pub(super) struct RuntimeMeQualityDcRttData {
#[derive(Serialize)] #[derive(Serialize)]
pub(super) struct RuntimeMeQualityPayload { pub(super) struct RuntimeMeQualityPayload {
pub(super) counters: RuntimeMeQualityCountersData, pub(super) counters: RuntimeMeQualityCountersData,
pub(super) teardown: RuntimeMeQualityTeardownData,
pub(super) route_drops: RuntimeMeQualityRouteDropData, pub(super) route_drops: RuntimeMeQualityRouteDropData,
pub(super) dc_rtt: Vec<RuntimeMeQualityDcRttData>, pub(super) dc_rtt: Vec<RuntimeMeQualityDcRttData>,
} }
@ -374,6 +422,7 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(), reconnect_attempt_total: shared.stats.get_me_reconnect_attempts(),
reconnect_success_total: shared.stats.get_me_reconnect_success(), reconnect_success_total: shared.stats.get_me_reconnect_success(),
}, },
teardown: build_runtime_me_teardown_data(shared),
route_drops: RuntimeMeQualityRouteDropData { route_drops: RuntimeMeQualityRouteDropData {
no_conn_total: shared.stats.get_me_route_drop_no_conn(), no_conn_total: shared.stats.get_me_route_drop_no_conn(),
channel_closed_total: shared.stats.get_me_route_drop_channel_closed(), channel_closed_total: shared.stats.get_me_route_drop_channel_closed(),
@ -397,6 +446,81 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
} }
} }
fn build_runtime_me_teardown_data(shared: &ApiShared) -> RuntimeMeQualityTeardownData {
let attempts = MeWriterTeardownReason::ALL
.iter()
.copied()
.flat_map(|reason| {
MeWriterTeardownMode::ALL
.iter()
.copied()
.map(move |mode| RuntimeMeQualityTeardownAttemptData {
reason: reason.as_str(),
mode: mode.as_str(),
total: shared.stats.get_me_writer_teardown_attempt_total(reason, mode),
})
})
.collect();
let success = MeWriterTeardownMode::ALL
.iter()
.copied()
.map(|mode| RuntimeMeQualityTeardownSuccessData {
mode: mode.as_str(),
total: shared.stats.get_me_writer_teardown_success_total(mode),
})
.collect();
let cleanup_side_effect_failures = MeWriterCleanupSideEffectStep::ALL
.iter()
.copied()
.map(|step| RuntimeMeQualityTeardownSideEffectData {
step: step.as_str(),
total: shared
.stats
.get_me_writer_cleanup_side_effect_failures_total(step),
})
.collect();
let duration = MeWriterTeardownMode::ALL
.iter()
.copied()
.map(|mode| {
let count = shared.stats.get_me_writer_teardown_duration_count(mode);
let mut buckets: Vec<RuntimeMeQualityTeardownDurationBucketData> = Stats::me_writer_teardown_duration_bucket_labels()
.iter()
.enumerate()
.map(|(bucket_idx, label)| RuntimeMeQualityTeardownDurationBucketData {
le_seconds: label,
total: shared
.stats
.get_me_writer_teardown_duration_bucket_total(mode, bucket_idx),
})
.collect();
buckets.push(RuntimeMeQualityTeardownDurationBucketData {
le_seconds: "+Inf",
total: count,
});
RuntimeMeQualityTeardownDurationData {
mode: mode.as_str(),
count,
sum_seconds: shared.stats.get_me_writer_teardown_duration_sum_seconds(mode),
buckets,
}
})
.collect();
RuntimeMeQualityTeardownData {
attempts,
success,
timeout_total: shared.stats.get_me_writer_teardown_timeout_total(),
escalation_total: shared.stats.get_me_writer_teardown_escalation_total(),
noop_total: shared.stats.get_me_writer_teardown_noop_total(),
cleanup_side_effect_failures,
duration,
}
}
pub(super) async fn build_runtime_upstream_quality_data( pub(super) async fn build_runtime_upstream_quality_data(
shared: &ApiShared, shared: &ApiShared,
) -> RuntimeUpstreamQualityData { ) -> RuntimeUpstreamQualityData {

View File

@ -1,7 +1,7 @@
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::config::ApiConfig; use crate::config::ApiConfig;
use crate::stats::Stats; use crate::stats::{MeWriterTeardownMode, Stats};
use crate::transport::upstream::IpPreference; use crate::transport::upstream::IpPreference;
use crate::transport::UpstreamRouteKind; use crate::transport::UpstreamRouteKind;
@ -106,6 +106,29 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
refill_failed_total: stats.get_me_refill_failed_total(), refill_failed_total: stats.get_me_refill_failed_total(),
writer_restored_same_endpoint_total: stats.get_me_writer_restored_same_endpoint_total(), writer_restored_same_endpoint_total: stats.get_me_writer_restored_same_endpoint_total(),
writer_restored_fallback_total: stats.get_me_writer_restored_fallback_total(), writer_restored_fallback_total: stats.get_me_writer_restored_fallback_total(),
teardown_attempt_total_normal: stats
.get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::Normal),
teardown_attempt_total_hard_detach: stats
.get_me_writer_teardown_attempt_total_by_mode(MeWriterTeardownMode::HardDetach),
teardown_success_total_normal: stats
.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal),
teardown_success_total_hard_detach: stats
.get_me_writer_teardown_success_total(MeWriterTeardownMode::HardDetach),
teardown_timeout_total: stats.get_me_writer_teardown_timeout_total(),
teardown_escalation_total: stats.get_me_writer_teardown_escalation_total(),
teardown_noop_total: stats.get_me_writer_teardown_noop_total(),
teardown_cleanup_side_effect_failures_total: stats
.get_me_writer_cleanup_side_effect_failures_total_all(),
teardown_duration_count_total: stats
.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal)
.saturating_add(
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::HardDetach),
),
teardown_duration_sum_seconds_total: stats
.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal)
+ stats.get_me_writer_teardown_duration_sum_seconds(
MeWriterTeardownMode::HardDetach,
),
}, },
desync: ZeroDesyncData { desync: ZeroDesyncData {
secure_padding_invalid_total: stats.get_secure_padding_invalid(), secure_padding_invalid_total: stats.get_secure_padding_invalid(),

View File

@ -16,7 +16,9 @@ use tracing::{info, warn, debug};
use crate::config::ProxyConfig; use crate::config::ProxyConfig;
use crate::ip_tracker::UserIpTracker; use crate::ip_tracker::UserIpTracker;
use crate::stats::beobachten::BeobachtenStore; use crate::stats::beobachten::BeobachtenStore;
use crate::stats::Stats; use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason, Stats,
};
use crate::transport::{ListenOptions, create_listener}; use crate::transport::{ListenOptions, create_listener};
pub async fn serve( pub async fn serve(
@ -1770,6 +1772,169 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
} }
); );
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_attempt_total ME writer teardown attempts by reason and mode"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_attempt_total counter");
for reason in MeWriterTeardownReason::ALL {
for mode in MeWriterTeardownMode::ALL {
let _ = writeln!(
out,
"telemt_me_writer_teardown_attempt_total{{reason=\"{}\",mode=\"{}\"}} {}",
reason.as_str(),
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_attempt_total(reason, mode)
} else {
0
}
);
}
}
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_success_total ME writer teardown successes by mode"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_success_total counter");
for mode in MeWriterTeardownMode::ALL {
let _ = writeln!(
out,
"telemt_me_writer_teardown_success_total{{mode=\"{}\"}} {}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_success_total(mode)
} else {
0
}
);
}
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_timeout_total Teardown operations that timed out"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_timeout_total counter");
let _ = writeln!(
out,
"telemt_me_writer_teardown_timeout_total {}",
if me_allows_normal {
stats.get_me_writer_teardown_timeout_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_escalation_total Watchdog teardown escalations to hard detach"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_teardown_escalation_total counter"
);
let _ = writeln!(
out,
"telemt_me_writer_teardown_escalation_total {}",
if me_allows_normal {
stats.get_me_writer_teardown_escalation_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_noop_total Teardown operations that became no-op"
);
let _ = writeln!(out, "# TYPE telemt_me_writer_teardown_noop_total counter");
let _ = writeln!(
out,
"telemt_me_writer_teardown_noop_total {}",
if me_allows_normal {
stats.get_me_writer_teardown_noop_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_me_writer_teardown_duration_seconds ME writer teardown latency histogram by mode"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_teardown_duration_seconds histogram"
);
let bucket_labels = Stats::me_writer_teardown_duration_bucket_labels();
for mode in MeWriterTeardownMode::ALL {
for (bucket_idx, label) in bucket_labels.iter().enumerate() {
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"{}\"}} {}",
mode.as_str(),
label,
if me_allows_normal {
stats.get_me_writer_teardown_duration_bucket_total(mode, bucket_idx)
} else {
0
}
);
}
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_bucket{{mode=\"{}\",le=\"+Inf\"}} {}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_duration_count(mode)
} else {
0
}
);
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_sum{{mode=\"{}\"}} {:.6}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_duration_sum_seconds(mode)
} else {
0.0
}
);
let _ = writeln!(
out,
"telemt_me_writer_teardown_duration_seconds_count{{mode=\"{}\"}} {}",
mode.as_str(),
if me_allows_normal {
stats.get_me_writer_teardown_duration_count(mode)
} else {
0
}
);
}
let _ = writeln!(
out,
"# HELP telemt_me_writer_cleanup_side_effect_failures_total Failed cleanup side effects by step"
);
let _ = writeln!(
out,
"# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter"
);
for step in MeWriterCleanupSideEffectStep::ALL {
let _ = writeln!(
out,
"telemt_me_writer_cleanup_side_effect_failures_total{{step=\"{}\"}} {}",
step.as_str(),
if me_allows_normal {
stats.get_me_writer_cleanup_side_effect_failures_total(step)
} else {
0
}
);
}
let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started"); let _ = writeln!(out, "# HELP telemt_me_refill_triggered_total Immediate ME refill runs started");
let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter"); let _ = writeln!(out, "# TYPE telemt_me_refill_triggered_total counter");
let _ = writeln!( let _ = writeln!(
@ -2175,6 +2340,17 @@ mod tests {
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_attempt_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_success_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_timeout_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_escalation_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_teardown_noop_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_teardown_duration_seconds histogram"
));
assert!(output.contains(
"# TYPE telemt_me_writer_cleanup_side_effect_failures_total counter"
));
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
assert!(output.contains( assert!(output.contains(
"# TYPE telemt_me_writer_close_signal_channel_full_total counter" "# TYPE telemt_me_writer_close_signal_channel_full_total counter"

View File

@ -19,6 +19,137 @@ use tracing::debug;
use crate::config::{MeTelemetryLevel, MeWriterPickMode}; use crate::config::{MeTelemetryLevel, MeWriterPickMode};
use self::telemetry::TelemetryPolicy; use self::telemetry::TelemetryPolicy;
const ME_WRITER_TEARDOWN_MODE_COUNT: usize = 2;
const ME_WRITER_TEARDOWN_REASON_COUNT: usize = 11;
const ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT: usize = 2;
const ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT: usize = 12;
const ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS: [u64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [
1_000,
5_000,
10_000,
25_000,
50_000,
100_000,
250_000,
500_000,
1_000_000,
2_500_000,
5_000_000,
10_000_000,
];
const ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS: [&str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] = [
"0.001",
"0.005",
"0.01",
"0.025",
"0.05",
"0.1",
"0.25",
"0.5",
"1",
"2.5",
"5",
"10",
];
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum MeWriterTeardownMode {
Normal = 0,
HardDetach = 1,
}
impl MeWriterTeardownMode {
pub const ALL: [Self; ME_WRITER_TEARDOWN_MODE_COUNT] =
[Self::Normal, Self::HardDetach];
pub const fn as_str(self) -> &'static str {
match self {
Self::Normal => "normal",
Self::HardDetach => "hard_detach",
}
}
const fn idx(self) -> usize {
self as usize
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum MeWriterTeardownReason {
ReaderExit = 0,
WriterTaskExit = 1,
PingSendFail = 2,
SignalSendFail = 3,
RouteChannelClosed = 4,
CloseRpcChannelClosed = 5,
PruneClosedWriter = 6,
ReapTimeoutExpired = 7,
ReapThresholdForce = 8,
ReapEmpty = 9,
WatchdogStuckDraining = 10,
}
impl MeWriterTeardownReason {
pub const ALL: [Self; ME_WRITER_TEARDOWN_REASON_COUNT] = [
Self::ReaderExit,
Self::WriterTaskExit,
Self::PingSendFail,
Self::SignalSendFail,
Self::RouteChannelClosed,
Self::CloseRpcChannelClosed,
Self::PruneClosedWriter,
Self::ReapTimeoutExpired,
Self::ReapThresholdForce,
Self::ReapEmpty,
Self::WatchdogStuckDraining,
];
pub const fn as_str(self) -> &'static str {
match self {
Self::ReaderExit => "reader_exit",
Self::WriterTaskExit => "writer_task_exit",
Self::PingSendFail => "ping_send_fail",
Self::SignalSendFail => "signal_send_fail",
Self::RouteChannelClosed => "route_channel_closed",
Self::CloseRpcChannelClosed => "close_rpc_channel_closed",
Self::PruneClosedWriter => "prune_closed_writer",
Self::ReapTimeoutExpired => "reap_timeout_expired",
Self::ReapThresholdForce => "reap_threshold_force",
Self::ReapEmpty => "reap_empty",
Self::WatchdogStuckDraining => "watchdog_stuck_draining",
}
}
const fn idx(self) -> usize {
self as usize
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum MeWriterCleanupSideEffectStep {
CloseSignalChannelFull = 0,
CloseSignalChannelClosed = 1,
}
impl MeWriterCleanupSideEffectStep {
pub const ALL: [Self; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT] =
[Self::CloseSignalChannelFull, Self::CloseSignalChannelClosed];
pub const fn as_str(self) -> &'static str {
match self {
Self::CloseSignalChannelFull => "close_signal_channel_full",
Self::CloseSignalChannelClosed => "close_signal_channel_closed",
}
}
const fn idx(self) -> usize {
self as usize
}
}
// ============= Stats ============= // ============= Stats =============
#[derive(Default)] #[derive(Default)]
@ -128,6 +259,18 @@ pub struct Stats {
me_draining_writers_reap_progress_total: AtomicU64, me_draining_writers_reap_progress_total: AtomicU64,
me_writer_removed_total: AtomicU64, me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64,
me_writer_teardown_attempt_total:
[[AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT]; ME_WRITER_TEARDOWN_REASON_COUNT],
me_writer_teardown_success_total: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
me_writer_teardown_timeout_total: AtomicU64,
me_writer_teardown_escalation_total: AtomicU64,
me_writer_teardown_noop_total: AtomicU64,
me_writer_cleanup_side_effect_failures_total:
[AtomicU64; ME_WRITER_CLEANUP_SIDE_EFFECT_STEP_COUNT],
me_writer_teardown_duration_bucket_hits:
[[AtomicU64; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT + 1]; ME_WRITER_TEARDOWN_MODE_COUNT],
me_writer_teardown_duration_sum_micros: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
me_writer_teardown_duration_count: [AtomicU64; ME_WRITER_TEARDOWN_MODE_COUNT],
me_refill_triggered_total: AtomicU64, me_refill_triggered_total: AtomicU64,
me_refill_skipped_inflight_total: AtomicU64, me_refill_skipped_inflight_total: AtomicU64,
me_refill_failed_total: AtomicU64, me_refill_failed_total: AtomicU64,
@ -765,6 +908,74 @@ impl Stats {
self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed); self.me_writer_removed_unexpected_total.fetch_add(1, Ordering::Relaxed);
} }
} }
pub fn increment_me_writer_teardown_attempt_total(
&self,
reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()]
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_success_total[mode.idx()].fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_timeout_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_timeout_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_escalation_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_escalation_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_teardown_noop_total(&self) {
if self.telemetry_me_allows_normal() {
self.me_writer_teardown_noop_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_me_writer_cleanup_side_effect_failures_total(
&self,
step: MeWriterCleanupSideEffectStep,
) {
if self.telemetry_me_allows_normal() {
self.me_writer_cleanup_side_effect_failures_total[step.idx()]
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn observe_me_writer_teardown_duration(
&self,
mode: MeWriterTeardownMode,
duration: Duration,
) {
if !self.telemetry_me_allows_normal() {
return;
}
let duration_micros = duration.as_micros().min(u64::MAX as u128) as u64;
let mut bucket_idx = ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT;
for (idx, upper_bound_micros) in ME_WRITER_TEARDOWN_DURATION_BUCKET_BOUNDS_MICROS
.iter()
.copied()
.enumerate()
{
if duration_micros <= upper_bound_micros {
bucket_idx = idx;
break;
}
}
self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx]
.fetch_add(1, Ordering::Relaxed);
self.me_writer_teardown_duration_sum_micros[mode.idx()]
.fetch_add(duration_micros, Ordering::Relaxed);
self.me_writer_teardown_duration_count[mode.idx()].fetch_add(1, Ordering::Relaxed);
}
pub fn increment_me_refill_triggered_total(&self) { pub fn increment_me_refill_triggered_total(&self) {
if self.telemetry_me_allows_debug() { if self.telemetry_me_allows_debug() {
self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed); self.me_refill_triggered_total.fetch_add(1, Ordering::Relaxed);
@ -1297,6 +1508,79 @@ impl Stats {
pub fn get_me_writer_removed_unexpected_total(&self) -> u64 { pub fn get_me_writer_removed_unexpected_total(&self) -> u64 {
self.me_writer_removed_unexpected_total.load(Ordering::Relaxed) self.me_writer_removed_unexpected_total.load(Ordering::Relaxed)
} }
pub fn get_me_writer_teardown_attempt_total(
&self,
reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
) -> u64 {
self.me_writer_teardown_attempt_total[reason.idx()][mode.idx()]
.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_attempt_total_by_mode(&self, mode: MeWriterTeardownMode) -> u64 {
MeWriterTeardownReason::ALL
.iter()
.copied()
.map(|reason| self.get_me_writer_teardown_attempt_total(reason, mode))
.sum()
}
pub fn get_me_writer_teardown_success_total(&self, mode: MeWriterTeardownMode) -> u64 {
self.me_writer_teardown_success_total[mode.idx()].load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_timeout_total(&self) -> u64 {
self.me_writer_teardown_timeout_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_escalation_total(&self) -> u64 {
self.me_writer_teardown_escalation_total
.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_noop_total(&self) -> u64 {
self.me_writer_teardown_noop_total.load(Ordering::Relaxed)
}
pub fn get_me_writer_cleanup_side_effect_failures_total(
&self,
step: MeWriterCleanupSideEffectStep,
) -> u64 {
self.me_writer_cleanup_side_effect_failures_total[step.idx()]
.load(Ordering::Relaxed)
}
pub fn get_me_writer_cleanup_side_effect_failures_total_all(&self) -> u64 {
MeWriterCleanupSideEffectStep::ALL
.iter()
.copied()
.map(|step| self.get_me_writer_cleanup_side_effect_failures_total(step))
.sum()
}
pub fn me_writer_teardown_duration_bucket_labels(
) -> &'static [&'static str; ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT] {
&ME_WRITER_TEARDOWN_DURATION_BUCKET_LABELS
}
pub fn get_me_writer_teardown_duration_bucket_hits(
&self,
mode: MeWriterTeardownMode,
bucket_idx: usize,
) -> u64 {
self.me_writer_teardown_duration_bucket_hits[mode.idx()][bucket_idx]
.load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_duration_bucket_total(
&self,
mode: MeWriterTeardownMode,
bucket_idx: usize,
) -> u64 {
let capped_idx = bucket_idx.min(ME_WRITER_TEARDOWN_DURATION_BUCKET_COUNT);
let mut total = 0u64;
for idx in 0..=capped_idx {
total = total.saturating_add(self.get_me_writer_teardown_duration_bucket_hits(mode, idx));
}
total
}
pub fn get_me_writer_teardown_duration_count(&self, mode: MeWriterTeardownMode) -> u64 {
self.me_writer_teardown_duration_count[mode.idx()].load(Ordering::Relaxed)
}
pub fn get_me_writer_teardown_duration_sum_seconds(&self, mode: MeWriterTeardownMode) -> f64 {
self.me_writer_teardown_duration_sum_micros[mode.idx()].load(Ordering::Relaxed) as f64
/ 1_000_000.0
}
pub fn get_me_refill_triggered_total(&self) -> u64 { pub fn get_me_refill_triggered_total(&self) -> u64 {
self.me_refill_triggered_total.load(Ordering::Relaxed) self.me_refill_triggered_total.load(Ordering::Relaxed)
} }
@ -1801,6 +2085,79 @@ mod tests {
assert_eq!(stats.get_me_route_drop_queue_full(), 0); assert_eq!(stats.get_me_route_drop_queue_full(), 0);
} }
#[test]
fn test_teardown_counters_and_duration() {
let stats = Stats::new();
stats.increment_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal,
);
stats.increment_me_writer_teardown_success_total(MeWriterTeardownMode::Normal);
stats.observe_me_writer_teardown_duration(
MeWriterTeardownMode::Normal,
Duration::from_millis(3),
);
stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
);
assert_eq!(
stats.get_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal
),
1
);
assert_eq!(
stats.get_me_writer_teardown_success_total(MeWriterTeardownMode::Normal),
1
);
assert_eq!(
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal),
1
);
assert!(
stats.get_me_writer_teardown_duration_sum_seconds(MeWriterTeardownMode::Normal) > 0.0
);
assert_eq!(
stats.get_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull
),
1
);
}
#[test]
fn test_teardown_counters_respect_me_silent() {
let stats = Stats::new();
stats.apply_telemetry_policy(TelemetryPolicy {
core_enabled: true,
user_enabled: true,
me_level: MeTelemetryLevel::Silent,
});
stats.increment_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal,
);
stats.increment_me_writer_teardown_timeout_total();
stats.observe_me_writer_teardown_duration(
MeWriterTeardownMode::Normal,
Duration::from_millis(1),
);
assert_eq!(
stats.get_me_writer_teardown_attempt_total(
MeWriterTeardownReason::ReaderExit,
MeWriterTeardownMode::Normal
),
0
);
assert_eq!(stats.get_me_writer_teardown_timeout_total(), 0);
assert_eq!(
stats.get_me_writer_teardown_duration_count(MeWriterTeardownMode::Normal),
0
);
}
#[test] #[test]
fn test_replay_checker_basic() { fn test_replay_checker_basic() {
let checker = ReplayChecker::new(100, Duration::from_secs(60)); let checker = ReplayChecker::new(100, Duration::from_secs(60));

View File

@ -10,6 +10,7 @@ use tracing::{debug, info, warn};
use crate::config::MeFloorMode; use crate::config::MeFloorMode;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::stats::MeWriterTeardownReason;
use super::MePool; use super::MePool;
use super::pool::MeWriter; use super::pool::MeWriter;
@ -358,7 +359,8 @@ pub(super) async fn reap_draining_writers(
continue; continue;
} }
pool.stats.increment_pool_force_close_total(); pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapTimeoutExpired)
.await;
pool.stats pool.stats
.increment_me_draining_writers_reap_progress_total(); .increment_me_draining_writers_reap_progress_total();
} }
@ -376,7 +378,8 @@ pub(super) async fn reap_draining_writers(
continue; continue;
} }
pool.stats.increment_pool_force_close_total(); pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapThresholdForce)
.await;
pool.stats pool.stats
.increment_me_draining_writers_reap_progress_total(); .increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
@ -388,7 +391,8 @@ pub(super) async fn reap_draining_writers(
if !closed_writer_ids.insert(writer_id) { if !closed_writer_ids.insert(writer_id) {
continue; continue;
} }
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapEmpty)
.await;
pool.stats pool.stats
.increment_me_draining_writers_reap_progress_total(); .increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1); closed_total = closed_total.saturating_add(1);
@ -1646,11 +1650,14 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
for (writer_id, had_clients) in &zombie_ids_with_meta { for (writer_id, had_clients) in &zombie_ids_with_meta {
let result = tokio::time::timeout( let result = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS), Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_writer_and_close_clients(*writer_id), pool.remove_writer_and_close_clients(
*writer_id,
MeWriterTeardownReason::WatchdogStuckDraining,
),
) )
.await; .await;
match result { match result {
Ok(()) => { Ok(true) => {
removal_timeout_streak.remove(writer_id); removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total(); pool.stats.increment_pool_force_close_total();
pool.stats pool.stats
@ -1661,7 +1668,16 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
"Zombie writer removed by watchdog" "Zombie writer removed by watchdog"
); );
} }
Ok(false) => {
removal_timeout_streak.remove(writer_id);
debug!(
writer_id,
had_clients,
"Zombie writer watchdog removal became no-op"
);
}
Err(_) => { Err(_) => {
pool.stats.increment_me_writer_teardown_timeout_total();
let streak = removal_timeout_streak let streak = removal_timeout_streak
.entry(*writer_id) .entry(*writer_id)
.and_modify(|value| *value = value.saturating_add(1)) .and_modify(|value| *value = value.saturating_add(1))
@ -1675,10 +1691,14 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
if *streak < HARD_DETACH_TIMEOUT_STREAK { if *streak < HARD_DETACH_TIMEOUT_STREAK {
continue; continue;
} }
pool.stats.increment_me_writer_teardown_escalation_total();
let hard_detach = tokio::time::timeout( let hard_detach = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS), Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_draining_writer_hard_detach(*writer_id), pool.remove_draining_writer_hard_detach(
*writer_id,
MeWriterTeardownReason::WatchdogStuckDraining,
),
) )
.await; .await;
match hard_detach { match hard_detach {
@ -1702,6 +1722,7 @@ pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
); );
} }
Err(_) => { Err(_) => {
pool.stats.increment_me_writer_teardown_timeout_total();
warn!( warn!(
writer_id, writer_id,
had_clients, had_clients,

View File

@ -316,7 +316,12 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let ids = sorted_writer_ids(&pool).await; let ids = sorted_writer_ids(&pool).await;
for writer_id in ids.into_iter().take(3) { for writer_id in ids.into_iter().take(3) {
let _ = pool.remove_writer_and_close_clients(writer_id).await; let _ = pool
.remove_writer_and_close_clients(
writer_id,
crate::stats::MeWriterTeardownReason::ReapEmpty,
)
.await;
} }
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;

View File

@ -197,7 +197,9 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.contains_key(&7)); assert!(warn_next_allowed.contains_key(&7));
let _ = pool.remove_writer_and_close_clients(7).await; let _ = pool
.remove_writer_and_close_clients(7, crate::stats::MeWriterTeardownReason::ReapEmpty)
.await;
assert!(pool.registry.get_writer(conn_ids[0]).await.is_none()); assert!(pool.registry.get_writer(conn_ids[0]).await.is_none());
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
@ -527,7 +529,12 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
let existing_writer_ids = current_writer_ids(&pool).await; let existing_writer_ids = current_writer_ids(&pool).await;
for writer_id in existing_writer_ids.into_iter().take(4) { for writer_id in existing_writer_ids.into_iter().take(4) {
let _ = pool.remove_writer_and_close_clients(writer_id).await; let _ = pool
.remove_writer_and_close_clients(
writer_id,
crate::stats::MeWriterTeardownReason::ReapEmpty,
)
.await;
} }
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); assert!(warn_next_allowed.len() <= pool.writers.read().await.len());

View File

@ -16,6 +16,9 @@ use crate::config::MeBindStaleMode;
use crate::crypto::SecureRandom; use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason,
};
use super::codec::{RpcWriter, WriterCommand}; use super::codec::{RpcWriter, WriterCommand};
use super::pool::{MePool, MeWriter, WriterContour}; use super::pool::{MePool, MeWriter, WriterContour};
@ -28,7 +31,7 @@ const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum WriterTeardownMode { enum WriterRemoveGuardMode {
Any, Any,
DrainingOnly, DrainingOnly,
} }
@ -49,9 +52,16 @@ impl MePool {
for writer_id in closed_writer_ids { for writer_id in closed_writer_ids {
if self.registry.is_writer_empty(writer_id).await { if self.registry.is_writer_empty(writer_id).await {
let _ = self.remove_writer_only(writer_id).await; let _ = self
.remove_writer_only(writer_id, MeWriterTeardownReason::PruneClosedWriter)
.await;
} else { } else {
let _ = self.remove_writer_and_close_clients(writer_id).await; let _ = self
.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::PruneClosedWriter,
)
.await;
} }
} }
} }
@ -173,7 +183,11 @@ impl MePool {
.is_ok() .is_ok()
{ {
if let Some(pool) = pool_writer_task.upgrade() { if let Some(pool) = pool_writer_task.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::WriterTaskExit,
)
.await;
} else { } else {
cancel_wr.cancel(); cancel_wr.cancel();
} }
@ -264,7 +278,11 @@ impl MePool {
.is_ok() .is_ok()
{ {
if let Some(pool) = pool.upgrade() { if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::ReaderExit,
)
.await;
} else { } else {
// Fallback for shutdown races: make writer task exit quickly so stale // Fallback for shutdown races: make writer task exit quickly so stale
// channels are observable by periodic prune. // channels are observable by periodic prune.
@ -372,7 +390,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::PingSendFail,
)
.await;
} }
break; break;
} }
@ -465,7 +487,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::SignalSendFail,
)
.await;
} }
break; break;
} }
@ -499,7 +525,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok() .is_ok()
{ {
pool.remove_writer_and_close_clients(writer_id).await; pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::SignalSendFail,
)
.await;
} }
break; break;
} }
@ -512,25 +542,48 @@ impl MePool {
Ok(()) Ok(())
} }
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) { pub(crate) async fn remove_writer_and_close_clients(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
// Full client cleanup now happens inside `registry.writer_lost` to keep // Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection. // writer reap/remove paths strictly non-blocking per connection.
let _ = self self.remove_writer_with_mode(
.remove_writer_with_mode(writer_id, WriterTeardownMode::Any) writer_id,
.await; reason,
MeWriterTeardownMode::Normal,
WriterRemoveGuardMode::Any,
)
.await
} }
pub(super) async fn remove_draining_writer_hard_detach( pub(super) async fn remove_draining_writer_hard_detach(
self: &Arc<Self>, self: &Arc<Self>,
writer_id: u64, writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool { ) -> bool {
self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly) self.remove_writer_with_mode(
.await writer_id,
reason,
MeWriterTeardownMode::HardDetach,
WriterRemoveGuardMode::DrainingOnly,
)
.await
} }
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> bool { async fn remove_writer_only(
self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any) self: &Arc<Self>,
.await writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::Normal,
WriterRemoveGuardMode::Any,
)
.await
} }
// Authoritative teardown primitive shared by normal cleanup and watchdog path. // Authoritative teardown primitive shared by normal cleanup and watchdog path.
@ -542,8 +595,13 @@ impl MePool {
async fn remove_writer_with_mode( async fn remove_writer_with_mode(
self: &Arc<Self>, self: &Arc<Self>,
writer_id: u64, writer_id: u64,
mode: WriterTeardownMode, reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
guard_mode: WriterRemoveGuardMode,
) -> bool { ) -> bool {
let started_at = Instant::now();
self.stats
.increment_me_writer_teardown_attempt_total(reason, mode);
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None; let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None; let mut removed_addr: Option<SocketAddr> = None;
let mut removed_dc: Option<i32> = None; let mut removed_dc: Option<i32> = None;
@ -553,9 +611,12 @@ impl MePool {
{ {
let mut ws = self.writers.write().await; let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
if matches!(mode, WriterTeardownMode::DrainingOnly) if matches!(guard_mode, WriterRemoveGuardMode::DrainingOnly)
&& !ws[pos].draining.load(Ordering::Relaxed) && !ws[pos].draining.load(Ordering::Relaxed)
{ {
self.stats.increment_me_writer_teardown_noop_total();
self.stats
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
return false; return false;
} }
let w = ws.remove(pos); let w = ws.remove(pos);
@ -595,6 +656,9 @@ impl MePool {
self.stats.increment_me_writer_close_signal_drop_total(); self.stats.increment_me_writer_close_signal_drop_total();
self.stats self.stats
.increment_me_writer_close_signal_channel_full_total(); .increment_me_writer_close_signal_channel_full_total();
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
);
debug!( debug!(
writer_id, writer_id,
"Skipping close signal for removed writer: command channel is full" "Skipping close signal for removed writer: command channel is full"
@ -602,6 +666,9 @@ impl MePool {
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total(); self.stats.increment_me_writer_close_signal_drop_total();
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelClosed,
);
debug!( debug!(
writer_id, writer_id,
"Skipping close signal for removed writer: command channel is closed" "Skipping close signal for removed writer: command channel is closed"
@ -619,6 +686,13 @@ impl MePool {
self.trigger_immediate_refill_for_dc(addr, writer_dc); self.trigger_immediate_refill_for_dc(addr, writer_dc);
} }
} }
if removed {
self.stats.increment_me_writer_teardown_success_total(mode);
} else {
self.stats.increment_me_writer_teardown_noop_total();
}
self.stats
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
removed removed
} }

View File

@ -14,6 +14,7 @@ use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
use crate::error::{ProxyError, Result}; use crate::error::{ProxyError, Result};
use crate::network::IpFamily; use crate::network::IpFamily;
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
use crate::stats::MeWriterTeardownReason;
use super::MePool; use super::MePool;
use super::codec::WriterCommand; use super::codec::WriterCommand;
@ -134,7 +135,11 @@ impl MePool {
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(TimedSendError::Closed(_)) => { Err(TimedSendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await; self.remove_writer_and_close_clients(
current.writer_id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue; continue;
} }
Err(TimedSendError::Timeout(_)) => { Err(TimedSendError::Timeout(_)) => {
@ -151,7 +156,11 @@ impl MePool {
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed"); warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await; self.remove_writer_and_close_clients(
current.writer_id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue; continue;
} }
} }
@ -458,7 +467,11 @@ impl MePool {
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode); self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed"); warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(
w.id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue; continue;
} }
} }
@ -503,7 +516,11 @@ impl MePool {
Err(TimedSendError::Closed(_)) => { Err(TimedSendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode); self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed (blocking)"); warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await; self.remove_writer_and_close_clients(
w.id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
} }
Err(TimedSendError::Timeout(_)) => { Err(TimedSendError::Timeout(_)) => {
self.stats.increment_me_writer_pick_full_total(pick_mode); self.stats.increment_me_writer_pick_full_total(pick_mode);
@ -654,7 +671,11 @@ impl MePool {
} }
Err(TrySendError::Closed(_)) => { Err(TrySendError::Closed(_)) => {
debug!("ME close write failed"); debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await; self.remove_writer_and_close_clients(
w.writer_id,
MeWriterTeardownReason::CloseRpcChannelClosed,
)
.await;
} }
} }
} else { } else {