Fix ME writer cleanup races and quarantine bypass during flapping

Three bugs caused ME writers to not be properly removed when ME
connections flapped:

1. Reader task's unconditional ws.retain() removed writers from the
   pool Vec without going through remove_writer_only(), skipping
   registry cleanup, quarantine, and refill side effects. Fixed by
   moving retain inside the cleanup_done CAS block as shutdown-only
   fallback.

2. Draining writers bypassed quarantine entirely because trigger_refill
   gated both quarantine and refill. Separated: quarantine now runs for
   all removals (flapping endpoint is unstable regardless of drain
   state), refill remains non-draining only.

3. connectable_endpoints() returned quarantined endpoints immediately
   when all DC endpoints were quarantined, nullifying the circuit
   breaker for single-endpoint DCs. Now waits for quarantine expiry
   with proper Mutex guard drop before sleep.

Also normalized the CAS ordering in ping task cleanup to match the
reader task (CAS-first, then pool.upgrade check).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maksim Sirotenko 2026-03-19 22:36:46 +03:00
parent 67dc1e8d18
commit 280a392164
3 changed files with 40 additions and 19 deletions

2
Cargo.lock generated
View File

@ -2087,7 +2087,7 @@ dependencies = [
[[package]]
name = "telemt"
version = "3.3.19"
version = "3.3.25"
dependencies = [
"aes",
"anyhow",

View File

@ -71,11 +71,19 @@ impl MePool {
}
if let Some((addr, expiry)) = earliest_quarantine {
let remaining = expiry.saturating_duration_since(now);
if remaining.is_zero() {
return vec![addr];
}
drop(guard);
debug!(
%addr,
wait_ms = expiry.saturating_duration_since(now).as_millis(),
"All ME endpoints are quarantined for the DC group; retrying earliest one"
wait_ms = remaining.as_millis(),
"All ME endpoints quarantined; waiting for earliest to expire"
);
// After sleeping, the quarantine entry is expired but not removed yet.
// Callers that check is_endpoint_quarantined() will lazily clean it via retain().
tokio::time::sleep(remaining).await;
return vec![addr];
}

View File

@ -242,21 +242,27 @@ impl MePool {
stats_reader_close.increment_me_idle_close_by_peer_total();
info!(writer_id, "ME socket closed by peer on idle writer");
}
if let Some(pool) = pool.upgrade()
&& cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
// Pool is gone (shutdown). Remove writer from Vec directly
// as a last resort — no registry/refill side effects needed
// during shutdown. conn_count is not decremented here because
// the pool (and its counters) are already dropped.
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown");
}
}
if let Err(e) = res {
if !idle_close_by_peer {
warn!(error = %e, "ME reader ended");
}
}
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
info!(remaining = ws.len(), "Dead ME writer removed from pool");
});
let pool_ping = Arc::downgrade(self);
@ -346,12 +352,13 @@ impl MePool {
stats_ping.increment_me_keepalive_failed();
debug!("ME ping failed, removing dead writer");
cancel_ping.cancel();
if let Some(pool) = pool_ping.upgrade()
&& cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
if cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
if let Some(pool) = pool_ping.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
}
}
break;
}
@ -556,13 +563,19 @@ impl MePool {
}
}
}
// Quarantine flapping endpoints regardless of draining state —
// a rapidly dying endpoint is unstable whether it was draining or not.
if let Some(addr) = removed_addr {
if let Some(uptime) = removed_uptime {
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
}
// Only trigger immediate refill for unexpected (non-draining) removals.
// Draining writers are intentionally being retired.
if trigger_refill
&& let Some(addr) = removed_addr
&& let Some(writer_dc) = removed_dc
{
if let Some(uptime) = removed_uptime {
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
conns