From 6f9aef7bb458d5447acc29d494151f183602e43f Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:08:35 +0300 Subject: [PATCH] ME Writer stuck-up in draining-state fixes Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/metrics.rs | 58 +++++++++++++ src/stats/mod.rs | 32 +++++++ src/transport/middle_proxy/health.rs | 4 + .../middle_proxy/health_regression_tests.rs | 84 +++++++++++++++++++ src/transport/middle_proxy/pool_writer.rs | 26 +++++- 5 files changed, 203 insertions(+), 1 deletion(-) diff --git a/src/metrics.rs b/src/metrics.rs index 3de9896..4f7f4b6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1692,6 +1692,57 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_close_signal_drop_total {}", + if me_allows_normal { + stats.get_me_writer_close_signal_drop_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_close_signal_channel_full_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_close_signal_channel_full_total {}", + if me_allows_normal { + stats.get_me_writer_close_signal_channel_full_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_draining_writers_reap_progress_total counter" + ); + let _ = writeln!( + out, + "telemt_me_draining_writers_reap_progress_total {}", + if me_allows_normal { + stats.get_me_draining_writers_reap_progress_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!( @@ -2124,6 +2175,13 @@ mod tests { assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_close_signal_channel_full_total counter" + )); + assert!(output.contains( + "# TYPE telemt_me_draining_writers_reap_progress_total counter" + )); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 83cd03d..ad1d16b 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -123,6 +123,9 @@ pub struct Stats { pool_drain_soft_evict_total: AtomicU64, pool_drain_soft_evict_writer_total: AtomicU64, pool_stale_pick_total: AtomicU64, + me_writer_close_signal_drop_total: AtomicU64, + me_writer_close_signal_channel_full_total: AtomicU64, + me_draining_writers_reap_progress_total: AtomicU64, me_writer_removed_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64, me_refill_triggered_total: AtomicU64, @@ -734,6 +737,24 @@ impl Stats { self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_close_signal_drop_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_close_signal_drop_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_close_signal_channel_full_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_close_signal_channel_full_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_draining_writers_reap_progress_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_draining_writers_reap_progress_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_writer_removed_total(&self) { if self.telemetry_me_allows_debug() { self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); @@ -1259,6 +1280,17 @@ impl Stats { pub fn get_pool_stale_pick_total(&self) -> u64 { self.pool_stale_pick_total.load(Ordering::Relaxed) } + pub fn get_me_writer_close_signal_drop_total(&self) -> u64 { + self.me_writer_close_signal_drop_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 { + self.me_writer_close_signal_channel_full_total + .load(Ordering::Relaxed) + } + pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 { + self.me_draining_writers_reap_progress_total + .load(Ordering::Relaxed) + } pub fn get_me_writer_removed_total(&self) -> u64 { self.me_writer_removed_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 0b9b749..6d0af64 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -314,6 +314,8 @@ pub(super) async fn reap_draining_writers( } pool.stats.increment_pool_force_close_total(); pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); } for writer_id in empty_writer_ids { @@ -324,6 +326,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); } diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 606f7e5..565ac74 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; +use bytes::Bytes; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -209,6 +210,89 @@ async fn reap_draining_writers_removes_empty_draining_writers() { assert_eq!(current_writer_ids(&pool).await, vec![3]); } +#[tokio::test] +async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + let (blocked_tx, blocked_rx) = mpsc::channel::(1); + assert!( + blocked_tx + .try_send(WriterCommand::Data(Bytes::from_static(b"stuck"))) + .is_ok() + ); + let blocked_rx_guard = tokio::spawn(async move { + let _hold_rx = blocked_rx; + tokio::time::sleep(Duration::from_secs(30)).await; + }); + + let blocked_writer_id = 90u64; + let blocked_writer = MeWriter { + id: blocked_writer_id, + addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 4500 + blocked_writer_id as u16, + ), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(blocked_writer_id), + tx: blocked_tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new( + now_epoch_secs.saturating_sub(120), + )), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(blocked_writer); + pool.registry + .register_writer(blocked_writer_id, blocked_tx) + .await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + + insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await; + + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + let reap_res = tokio::time::timeout( + Duration::from_millis(500), + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed), + ) + .await; + blocked_rx_guard.abort(); + + assert!(reap_res.is_ok(), "reap should not block on close signal"); + assert!(current_writer_ids(&pool).await.is_empty()); + assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2); + assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1); + assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2); + let activity = pool.registry.writer_activity_snapshot().await; + assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id)); + assert!(!activity.bound_clients_by_writer.contains_key(&91)); + let (probe_conn_id, _rx) = pool.registry.register().await; + assert!( + !pool.registry + .bind_writer( + probe_conn_id, + blocked_writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + let _ = pool.registry.unregister(probe_conn_id).await; +} + #[tokio::test] async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { let pool = make_pool(2).await; diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 4035111..a6186b6 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use bytes::BytesMut; use rand::Rng; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; @@ -525,6 +526,11 @@ impl MePool { self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + // State invariant: + // - writer is removed from `self.writers` (pool visibility), + // - writer is removed from registry routing/binding maps via `writer_lost`. + // The close command below is only a best-effort accelerator for task shutdown. + // Cleanup progress must never depend on command-channel availability. let conns = self.registry.writer_lost(writer_id).await; { let mut tracker = self.ping_tracker.lock().await; @@ -532,7 +538,25 @@ impl MePool { } self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { - let _ = tx.send(WriterCommand::Close).await; + match tx.try_send(WriterCommand::Close) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + self.stats.increment_me_writer_close_signal_drop_total(); + self.stats + .increment_me_writer_close_signal_channel_full_total(); + debug!( + writer_id, + "Skipping close signal for removed writer: command channel is full" + ); + } + Err(TrySendError::Closed(_)) => { + self.stats.increment_me_writer_close_signal_drop_total(); + debug!( + writer_id, + "Skipping close signal for removed writer: command channel is closed" + ); + } + } } if trigger_refill && let Some(addr) = removed_addr