From 280a3921644222ecd8327c2941233e4cf74c5437 Mon Sep 17 00:00:00 2001 From: Maksim Sirotenko Date: Thu, 19 Mar 2026 22:36:46 +0300 Subject: [PATCH] Fix ME writer cleanup races and quarantine bypass during flapping Three bugs caused ME writers to not be properly removed when ME connections flapped: 1. Reader task's unconditional ws.retain() removed writers from the pool Vec without going through remove_writer_only(), skipping registry cleanup, quarantine, and refill side effects. Fixed by moving retain inside the cleanup_done CAS block as shutdown-only fallback. 2. Draining writers bypassed quarantine entirely because trigger_refill gated both quarantine and refill. Separated: quarantine now runs for all removals (flapping endpoint is unstable regardless of drain state), refill remains non-draining only. 3. connectable_endpoints() returned quarantined endpoints immediately when all DC endpoints were quarantined, nullifying the circuit breaker for single-endpoint DCs. Now waits for quarantine expiry with proper Mutex guard drop before sleep. Also normalized the CAS ordering in ping task cleanup to match the reader task (CAS-first, then pool.upgrade check). Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 2 +- src/transport/middle_proxy/pool_refill.rs | 12 +++++- src/transport/middle_proxy/pool_writer.rs | 45 +++++++++++++++-------- 3 files changed, 40 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a704404..0c74ae9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2087,7 +2087,7 @@ dependencies = [ [[package]] name = "telemt" -version = "3.3.19" +version = "3.3.25" dependencies = [ "aes", "anyhow", diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index fc916f4..543f2d4 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -71,11 +71,19 @@ impl MePool { } if let Some((addr, expiry)) = earliest_quarantine { + let remaining = expiry.saturating_duration_since(now); + if remaining.is_zero() { + return vec![addr]; + } + drop(guard); debug!( %addr, - wait_ms = expiry.saturating_duration_since(now).as_millis(), - "All ME endpoints are quarantined for the DC group; retrying earliest one" + wait_ms = remaining.as_millis(), + "All ME endpoints quarantined; waiting for earliest to expire" ); + // After sleeping, the quarantine entry is expired but not removed yet. + // Callers that check is_endpoint_quarantined() will lazily clean it via retain(). + tokio::time::sleep(remaining).await; return vec![addr]; } diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 7d78b84..02a34a5 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -242,21 +242,27 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } - if let Some(pool) = pool.upgrade() - && cleanup_for_reader - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if cleanup_for_reader + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + if let Some(pool) = pool.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } else { + // Pool is gone (shutdown). Remove writer from Vec directly + // as a last resort — no registry/refill side effects needed + // during shutdown. conn_count is not decremented here because + // the pool (and its counters) are already dropped. + let mut ws = writers_arc.write().await; + ws.retain(|w| w.id != writer_id); + debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown"); + } } if let Err(e) = res { if !idle_close_by_peer { warn!(error = %e, "ME reader ended"); } } - let mut ws = writers_arc.write().await; - ws.retain(|w| w.id != writer_id); - info!(remaining = ws.len(), "Dead ME writer removed from pool"); }); let pool_ping = Arc::downgrade(self); @@ -346,12 +352,13 @@ impl MePool { stats_ping.increment_me_keepalive_failed(); debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); - if let Some(pool) = pool_ping.upgrade() - && cleanup_for_ping - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + if let Some(pool) = pool_ping.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } } break; } @@ -556,13 +563,19 @@ impl MePool { } } } + // Quarantine flapping endpoints regardless of draining state — + // a rapidly dying endpoint is unstable whether it was draining or not. + if let Some(addr) = removed_addr { + if let Some(uptime) = removed_uptime { + self.maybe_quarantine_flapping_endpoint(addr, uptime).await; + } + } + // Only trigger immediate refill for unexpected (non-draining) removals. + // Draining writers are intentionally being retired. if trigger_refill && let Some(addr) = removed_addr && let Some(writer_dc) = removed_dc { - if let Some(uptime) = removed_uptime { - self.maybe_quarantine_flapping_endpoint(addr, uptime).await; - } self.trigger_immediate_refill_for_dc(addr, writer_dc); } conns