From 6f9aef7bb458d5447acc29d494151f183602e43f Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:08:35 +0300 Subject: [PATCH 1/2] ME Writer stuck-up in draining-state fixes Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/metrics.rs | 58 +++++++++++++ src/stats/mod.rs | 32 +++++++ src/transport/middle_proxy/health.rs | 4 + .../middle_proxy/health_regression_tests.rs | 84 +++++++++++++++++++ src/transport/middle_proxy/pool_writer.rs | 26 +++++- 5 files changed, 203 insertions(+), 1 deletion(-) diff --git a/src/metrics.rs b/src/metrics.rs index 3de9896..4f7f4b6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1692,6 +1692,57 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers" + ); + let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter"); + let _ = writeln!( + out, + "telemt_me_writer_close_signal_drop_total {}", + if me_allows_normal { + stats.get_me_writer_close_signal_drop_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_writer_close_signal_channel_full_total counter" + ); + let _ = writeln!( + out, + "telemt_me_writer_close_signal_channel_full_total {}", + if me_allows_normal { + stats.get_me_writer_close_signal_channel_full_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_draining_writers_reap_progress_total counter" + ); + let _ = writeln!( + out, + "telemt_me_draining_writers_reap_progress_total {}", + if me_allows_normal { + stats.get_me_draining_writers_reap_progress_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!( @@ -2124,6 +2175,13 @@ mod tests { assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); + assert!(output.contains( + "# TYPE telemt_me_writer_close_signal_channel_full_total counter" + )); + assert!(output.contains( + "# TYPE telemt_me_draining_writers_reap_progress_total counter" + )); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains( diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 83cd03d..ad1d16b 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -123,6 +123,9 @@ pub struct Stats { pool_drain_soft_evict_total: AtomicU64, pool_drain_soft_evict_writer_total: AtomicU64, pool_stale_pick_total: AtomicU64, + me_writer_close_signal_drop_total: AtomicU64, + me_writer_close_signal_channel_full_total: AtomicU64, + me_draining_writers_reap_progress_total: AtomicU64, me_writer_removed_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64, me_refill_triggered_total: AtomicU64, @@ -734,6 +737,24 @@ impl Stats { self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_writer_close_signal_drop_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_close_signal_drop_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_writer_close_signal_channel_full_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_writer_close_signal_channel_full_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_draining_writers_reap_progress_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_draining_writers_reap_progress_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_writer_removed_total(&self) { if self.telemetry_me_allows_debug() { self.me_writer_removed_total.fetch_add(1, Ordering::Relaxed); @@ -1259,6 +1280,17 @@ impl Stats { pub fn get_pool_stale_pick_total(&self) -> u64 { self.pool_stale_pick_total.load(Ordering::Relaxed) } + pub fn get_me_writer_close_signal_drop_total(&self) -> u64 { + self.me_writer_close_signal_drop_total.load(Ordering::Relaxed) + } + pub fn get_me_writer_close_signal_channel_full_total(&self) -> u64 { + self.me_writer_close_signal_channel_full_total + .load(Ordering::Relaxed) + } + pub fn get_me_draining_writers_reap_progress_total(&self) -> u64 { + self.me_draining_writers_reap_progress_total + .load(Ordering::Relaxed) + } pub fn get_me_writer_removed_total(&self) -> u64 { self.me_writer_removed_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 0b9b749..6d0af64 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -314,6 +314,8 @@ pub(super) async fn reap_draining_writers( } pool.stats.increment_pool_force_close_total(); pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); } for writer_id in empty_writer_ids { @@ -324,6 +326,8 @@ pub(super) async fn reap_draining_writers( continue; } pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); closed_total = closed_total.saturating_add(1); } diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 606f7e5..565ac74 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; +use bytes::Bytes; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -209,6 +210,89 @@ async fn reap_draining_writers_removes_empty_draining_writers() { assert_eq!(current_writer_ids(&pool).await, vec![3]); } +#[tokio::test] +async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + let (blocked_tx, blocked_rx) = mpsc::channel::(1); + assert!( + blocked_tx + .try_send(WriterCommand::Data(Bytes::from_static(b"stuck"))) + .is_ok() + ); + let blocked_rx_guard = tokio::spawn(async move { + let _hold_rx = blocked_rx; + tokio::time::sleep(Duration::from_secs(30)).await; + }); + + let blocked_writer_id = 90u64; + let blocked_writer = MeWriter { + id: blocked_writer_id, + addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 4500 + blocked_writer_id as u16, + ), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(blocked_writer_id), + tx: blocked_tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new( + now_epoch_secs.saturating_sub(120), + )), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(blocked_writer); + pool.registry + .register_writer(blocked_writer_id, blocked_tx) + .await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + + insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await; + + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + let reap_res = tokio::time::timeout( + Duration::from_millis(500), + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed), + ) + .await; + blocked_rx_guard.abort(); + + assert!(reap_res.is_ok(), "reap should not block on close signal"); + assert!(current_writer_ids(&pool).await.is_empty()); + assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2); + assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1); + assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2); + let activity = pool.registry.writer_activity_snapshot().await; + assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id)); + assert!(!activity.bound_clients_by_writer.contains_key(&91)); + let (probe_conn_id, _rx) = pool.registry.register().await; + assert!( + !pool.registry + .bind_writer( + probe_conn_id, + blocked_writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + let _ = pool.registry.unregister(probe_conn_id).await; +} + #[tokio::test] async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { let pool = make_pool(2).await; diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 4035111..a6186b6 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use bytes::BytesMut; use rand::Rng; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; @@ -525,6 +526,11 @@ impl MePool { self.conn_count.fetch_sub(1, Ordering::Relaxed); } } + // State invariant: + // - writer is removed from `self.writers` (pool visibility), + // - writer is removed from registry routing/binding maps via `writer_lost`. + // The close command below is only a best-effort accelerator for task shutdown. + // Cleanup progress must never depend on command-channel availability. let conns = self.registry.writer_lost(writer_id).await; { let mut tracker = self.ping_tracker.lock().await; @@ -532,7 +538,25 @@ impl MePool { } self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { - let _ = tx.send(WriterCommand::Close).await; + match tx.try_send(WriterCommand::Close) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + self.stats.increment_me_writer_close_signal_drop_total(); + self.stats + .increment_me_writer_close_signal_channel_full_total(); + debug!( + writer_id, + "Skipping close signal for removed writer: command channel is full" + ); + } + Err(TrySendError::Closed(_)) => { + self.stats.increment_me_writer_close_signal_drop_total(); + debug!( + writer_id, + "Skipping close signal for removed writer: command channel is closed" + ); + } + } } if trigger_refill && let Some(addr) = removed_addr From 3279f6d46a5744b754741abf828e6b4d50c3a3ac Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:07:20 +0300 Subject: [PATCH 2/2] Cleanup-path as non-blocking Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/load.rs | 51 ++++++++++++++ src/transport/middle_proxy/pool_writer.rs | 8 +-- src/transport/middle_proxy/reader.rs | 32 ++++++--- src/transport/middle_proxy/registry.rs | 85 +++++++++++++++++------ src/transport/middle_proxy/send.rs | 28 +++++--- 5 files changed, 158 insertions(+), 46 deletions(-) diff --git a/src/config/load.rs b/src/config/load.rs index 0635f80..c296697 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -612,6 +612,11 @@ impl ProxyConfig { "general.me_route_backpressure_base_timeout_ms must be > 0".to_string(), )); } + if config.general.me_route_backpressure_base_timeout_ms > 5000 { + return Err(ProxyError::Config( + "general.me_route_backpressure_base_timeout_ms must be within [1, 5000]".to_string(), + )); + } if config.general.me_route_backpressure_high_timeout_ms < config.general.me_route_backpressure_base_timeout_ms @@ -620,6 +625,11 @@ impl ProxyConfig { "general.me_route_backpressure_high_timeout_ms must be >= general.me_route_backpressure_base_timeout_ms".to_string(), )); } + if config.general.me_route_backpressure_high_timeout_ms > 5000 { + return Err(ProxyError::Config( + "general.me_route_backpressure_high_timeout_ms must be within [1, 5000]".to_string(), + )); + } if !(1..=100).contains(&config.general.me_route_backpressure_high_watermark_pct) { return Err(ProxyError::Config( @@ -1624,6 +1634,47 @@ mod tests { let _ = std::fs::remove_file(path_valid); } + #[test] + fn me_route_backpressure_base_timeout_ms_out_of_range_is_rejected() { + let toml = r#" + [general] + me_route_backpressure_base_timeout_ms = 5001 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_backpressure_base_timeout_ms_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_route_backpressure_base_timeout_ms must be within [1, 5000]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_route_backpressure_high_timeout_ms_out_of_range_is_rejected() { + let toml = r#" + [general] + me_route_backpressure_base_timeout_ms = 100 + me_route_backpressure_high_timeout_ms = 5001 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_route_backpressure_high_timeout_ms_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_route_backpressure_high_timeout_ms must be within [1, 5000]")); + let _ = std::fs::remove_file(path); + } + #[test] fn me_route_no_writer_wait_ms_out_of_range_is_rejected() { let toml = r#" diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index a6186b6..7d78b84 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -492,11 +492,9 @@ impl MePool { } pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { - let conns = self.remove_writer_only(writer_id).await; - for bound in conns { - let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; - let _ = self.registry.unregister(bound.conn_id).await; - } + // Full client cleanup now happens inside `registry.writer_lost` to keep + // writer reap/remove paths strictly non-blocking per connection. + let _ = self.remove_writer_only(writer_id).await; } async fn remove_writer_only(self: &Arc, writer_id: u64) -> Vec { diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 785bc2c..8b15fc1 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut}; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; use tokio::sync::{Mutex, mpsc}; +use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; @@ -173,12 +174,12 @@ pub(crate) async fn reader_loop( } else if pt == RPC_CLOSE_EXT_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); debug!(cid, "RPC_CLOSE_EXT from ME"); - reg.route(cid, MeResponse::Close).await; + let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.unregister(cid).await; } else if pt == RPC_CLOSE_CONN_U32 && body.len() >= 8 { let cid = u64::from_le_bytes(body[0..8].try_into().unwrap()); debug!(cid, "RPC_CLOSE_CONN from ME"); - reg.route(cid, MeResponse::Close).await; + let _ = reg.route_nowait(cid, MeResponse::Close).await; reg.unregister(cid).await; } else if pt == RPC_PING_U32 && body.len() >= 8 { let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -186,13 +187,15 @@ pub(crate) async fn reader_loop( let mut pong = Vec::with_capacity(12); pong.extend_from_slice(&RPC_PONG_U32.to_le_bytes()); pong.extend_from_slice(&ping_id.to_le_bytes()); - if tx - .send(WriterCommand::DataAndFlush(Bytes::from(pong))) - .await - .is_err() - { - warn!("PONG send failed"); - break; + match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(pong))) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!(ping_id, "PONG dropped: writer command channel is full"); + } + Err(TrySendError::Closed(_)) => { + warn!("PONG send failed: writer channel closed"); + break; + } } } else if pt == RPC_PONG_U32 && body.len() >= 8 { let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); @@ -232,6 +235,13 @@ async fn send_close_conn(tx: &mpsc::Sender, conn_id: u64) { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - - let _ = tx.send(WriterCommand::DataAndFlush(Bytes::from(p))).await; + match tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!(conn_id, "ME close_conn signal skipped: writer command channel is full"); + } + Err(TrySendError::Closed(_)) => { + debug!(conn_id, "ME close_conn signal skipped: writer command channel is closed"); + } + } } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index b8a926e..2ee55c1 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -169,6 +169,7 @@ impl ConnRegistry { None } + #[allow(dead_code)] pub async fn route(&self, id: u64, resp: MeResponse) -> RouteResult { let tx = { let inner = self.inner.read().await; @@ -445,30 +446,38 @@ impl ConnRegistry { } pub async fn writer_lost(&self, writer_id: u64) -> Vec { - let mut inner = self.inner.write().await; - inner.writers.remove(&writer_id); - inner.last_meta_for_writer.remove(&writer_id); - inner.writer_idle_since_epoch_secs.remove(&writer_id); - let conns = inner - .conns_for_writer - .remove(&writer_id) - .unwrap_or_default() - .into_iter() - .collect::>(); - + let mut close_txs = Vec::>::new(); let mut out = Vec::new(); - for conn_id in conns { - if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { - continue; - } - inner.writer_for_conn.remove(&conn_id); - if let Some(m) = inner.meta.get(&conn_id) { - out.push(BoundConn { - conn_id, - meta: m.clone(), - }); + { + let mut inner = self.inner.write().await; + inner.writers.remove(&writer_id); + inner.last_meta_for_writer.remove(&writer_id); + inner.writer_idle_since_epoch_secs.remove(&writer_id); + let conns = inner + .conns_for_writer + .remove(&writer_id) + .unwrap_or_default() + .into_iter() + .collect::>(); + + for conn_id in conns { + if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + continue; + } + inner.writer_for_conn.remove(&conn_id); + if let Some(client_tx) = inner.map.remove(&conn_id) { + close_txs.push(client_tx); + } + if let Some(meta) = inner.meta.remove(&conn_id) { + out.push(BoundConn { conn_id, meta }); + } } } + + for client_tx in close_txs { + let _ = client_tx.try_send(MeResponse::Close); + } + out } @@ -491,6 +500,7 @@ impl ConnRegistry { #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::time::Duration; use super::ConnMeta; use super::ConnRegistry; @@ -663,6 +673,39 @@ mod tests { assert!(registry.is_writer_empty(20).await); } + #[tokio::test] + async fn writer_lost_removes_bound_conn_from_registry_and_signals_close() { + let registry = ConnRegistry::new(); + let (conn_id, mut rx) = registry.register().await; + let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx).await; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + + let lost = registry.writer_lost(10).await; + assert_eq!(lost.len(), 1); + assert_eq!(lost[0].conn_id, conn_id); + assert!(registry.get_writer(conn_id).await.is_none()); + assert!(registry.get_meta(conn_id).await.is_none()); + assert_eq!(registry.unregister(conn_id).await, None); + let close = tokio::time::timeout(Duration::from_millis(50), rx.recv()).await; + assert!(matches!(close, Ok(Some(MeResponse::Close)))); + } + #[tokio::test] async fn bind_writer_rejects_unregistered_writer() { let registry = ConnRegistry::new(); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 1c255ef..6791064 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -643,13 +643,19 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_CLOSE_EXT_U32.to_le_bytes()); p.extend_from_slice(&conn_id.to_le_bytes()); - if w.tx - .send(WriterCommand::DataAndFlush(Bytes::from(p))) - .await - .is_err() - { - debug!("ME close write failed"); - self.remove_writer_and_close_clients(w.writer_id).await; + match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!( + conn_id, + writer_id = w.writer_id, + "ME close skipped: writer command channel is full" + ); + } + Err(TrySendError::Closed(_)) => { + debug!("ME close write failed"); + self.remove_writer_and_close_clients(w.writer_id).await; + } } } else { debug!(conn_id, "ME close skipped (writer missing)"); @@ -666,8 +672,12 @@ impl MePool { p.extend_from_slice(&conn_id.to_le_bytes()); match w.tx.try_send(WriterCommand::DataAndFlush(Bytes::from(p))) { Ok(()) => {} - Err(TrySendError::Full(cmd)) => { - let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await; + Err(TrySendError::Full(_)) => { + debug!( + conn_id, + writer_id = w.writer_id, + "ME close_conn skipped: writer command channel is full" + ); } Err(TrySendError::Closed(_)) => { debug!(conn_id, "ME close_conn skipped: writer channel closed");