diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index fc916f4..895bdb5 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -71,11 +71,19 @@ impl MePool { } if let Some((addr, expiry)) = earliest_quarantine { + let remaining = expiry.saturating_duration_since(now); + if remaining.is_zero() { + return vec![addr]; + } + drop(guard); debug!( %addr, - wait_ms = expiry.saturating_duration_since(now).as_millis(), - "All ME endpoints are quarantined for the DC group; retrying earliest one" + wait_ms = remaining.as_millis(), + "All ME endpoints quarantined; waiting for earliest to expire" ); + // After sleeping, the quarantine entry is expired but not removed yet. + // Callers that check is_endpoint_quarantined() will lazily clean it via retain(). + tokio::time::sleep(remaining).await; return vec![addr]; } @@ -311,4 +319,4 @@ impl MePool { dc_guard.remove(&dc_key); }); } -} +} \ No newline at end of file diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 7d78b84..fb1ba10 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -242,21 +242,27 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } - if let Some(pool) = pool.upgrade() - && cleanup_for_reader - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if cleanup_for_reader + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + if let Some(pool) = pool.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } else { + // Pool is gone (shutdown). Remove writer from Vec directly + // as a last resort — no registry/refill side effects needed + // during shutdown. conn_count is not decremented here because + // the pool (and its counters) are already dropped. + let mut ws = writers_arc.write().await; + ws.retain(|w| w.id != writer_id); + debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown"); + } } if let Err(e) = res { if !idle_close_by_peer { warn!(error = %e, "ME reader ended"); } } - let mut ws = writers_arc.write().await; - ws.retain(|w| w.id != writer_id); - info!(remaining = ws.len(), "Dead ME writer removed from pool"); }); let pool_ping = Arc::downgrade(self); @@ -346,12 +352,13 @@ impl MePool { stats_ping.increment_me_keepalive_failed(); debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); - if let Some(pool) = pool_ping.upgrade() - && cleanup_for_ping - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - pool.remove_writer_and_close_clients(writer_id).await; + if let Some(pool) = pool_ping.upgrade() { + pool.remove_writer_and_close_clients(writer_id).await; + } } break; } @@ -556,13 +563,19 @@ impl MePool { } } } + // Quarantine flapping endpoints regardless of draining state — + // a rapidly dying endpoint is unstable whether it was draining or not. + if let Some(addr) = removed_addr { + if let Some(uptime) = removed_uptime { + self.maybe_quarantine_flapping_endpoint(addr, uptime).await; + } + } + // Only trigger immediate refill for unexpected (non-draining) removals. + // Draining writers are intentionally being retired. if trigger_refill && let Some(addr) = removed_addr && let Some(writer_dc) = removed_dc { - if let Some(uptime) = removed_uptime { - self.maybe_quarantine_flapping_endpoint(addr, uptime).await; - } self.trigger_immediate_refill_for_dc(addr, writer_dc); } conns @@ -645,4 +658,4 @@ impl MePool { } } } -} +} \ No newline at end of file