mirror of https://github.com/telemt/telemt.git
ME Writers Anti-stuck and Quarantine fixes
Co-Authored-By: Nook Scheel <nook@live.ru>
This commit is contained in:
parent
67dc1e8d18
commit
1c6c73beda
|
|
@ -71,11 +71,19 @@ impl MePool {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some((addr, expiry)) = earliest_quarantine {
|
if let Some((addr, expiry)) = earliest_quarantine {
|
||||||
|
let remaining = expiry.saturating_duration_since(now);
|
||||||
|
if remaining.is_zero() {
|
||||||
|
return vec![addr];
|
||||||
|
}
|
||||||
|
drop(guard);
|
||||||
debug!(
|
debug!(
|
||||||
%addr,
|
%addr,
|
||||||
wait_ms = expiry.saturating_duration_since(now).as_millis(),
|
wait_ms = remaining.as_millis(),
|
||||||
"All ME endpoints are quarantined for the DC group; retrying earliest one"
|
"All ME endpoints quarantined; waiting for earliest to expire"
|
||||||
);
|
);
|
||||||
|
// After sleeping, the quarantine entry is expired but not removed yet.
|
||||||
|
// Callers that check is_endpoint_quarantined() will lazily clean it via retain().
|
||||||
|
tokio::time::sleep(remaining).await;
|
||||||
return vec![addr];
|
return vec![addr];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -242,21 +242,27 @@ impl MePool {
|
||||||
stats_reader_close.increment_me_idle_close_by_peer_total();
|
stats_reader_close.increment_me_idle_close_by_peer_total();
|
||||||
info!(writer_id, "ME socket closed by peer on idle writer");
|
info!(writer_id, "ME socket closed by peer on idle writer");
|
||||||
}
|
}
|
||||||
if let Some(pool) = pool.upgrade()
|
if cleanup_for_reader
|
||||||
&& cleanup_for_reader
|
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
||||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
.is_ok()
|
||||||
.is_ok()
|
|
||||||
{
|
{
|
||||||
pool.remove_writer_and_close_clients(writer_id).await;
|
if let Some(pool) = pool.upgrade() {
|
||||||
|
pool.remove_writer_and_close_clients(writer_id).await;
|
||||||
|
} else {
|
||||||
|
// Pool is gone (shutdown). Remove writer from Vec directly
|
||||||
|
// as a last resort — no registry/refill side effects needed
|
||||||
|
// during shutdown. conn_count is not decremented here because
|
||||||
|
// the pool (and its counters) are already dropped.
|
||||||
|
let mut ws = writers_arc.write().await;
|
||||||
|
ws.retain(|w| w.id != writer_id);
|
||||||
|
debug!(writer_id, remaining = ws.len(), "Writer removed during pool shutdown");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
if !idle_close_by_peer {
|
if !idle_close_by_peer {
|
||||||
warn!(error = %e, "ME reader ended");
|
warn!(error = %e, "ME reader ended");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut ws = writers_arc.write().await;
|
|
||||||
ws.retain(|w| w.id != writer_id);
|
|
||||||
info!(remaining = ws.len(), "Dead ME writer removed from pool");
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let pool_ping = Arc::downgrade(self);
|
let pool_ping = Arc::downgrade(self);
|
||||||
|
|
@ -346,12 +352,13 @@ impl MePool {
|
||||||
stats_ping.increment_me_keepalive_failed();
|
stats_ping.increment_me_keepalive_failed();
|
||||||
debug!("ME ping failed, removing dead writer");
|
debug!("ME ping failed, removing dead writer");
|
||||||
cancel_ping.cancel();
|
cancel_ping.cancel();
|
||||||
if let Some(pool) = pool_ping.upgrade()
|
if cleanup_for_ping
|
||||||
&& cleanup_for_ping
|
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
||||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
|
.is_ok()
|
||||||
.is_ok()
|
|
||||||
{
|
{
|
||||||
pool.remove_writer_and_close_clients(writer_id).await;
|
if let Some(pool) = pool_ping.upgrade() {
|
||||||
|
pool.remove_writer_and_close_clients(writer_id).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
@ -556,13 +563,19 @@ impl MePool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Quarantine flapping endpoints regardless of draining state —
|
||||||
|
// a rapidly dying endpoint is unstable whether it was draining or not.
|
||||||
|
if let Some(addr) = removed_addr {
|
||||||
|
if let Some(uptime) = removed_uptime {
|
||||||
|
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Only trigger immediate refill for unexpected (non-draining) removals.
|
||||||
|
// Draining writers are intentionally being retired.
|
||||||
if trigger_refill
|
if trigger_refill
|
||||||
&& let Some(addr) = removed_addr
|
&& let Some(addr) = removed_addr
|
||||||
&& let Some(writer_dc) = removed_dc
|
&& let Some(writer_dc) = removed_dc
|
||||||
{
|
{
|
||||||
if let Some(uptime) = removed_uptime {
|
|
||||||
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
|
|
||||||
}
|
|
||||||
self.trigger_immediate_refill_for_dc(addr, writer_dc);
|
self.trigger_immediate_refill_for_dc(addr, writer_dc);
|
||||||
}
|
}
|
||||||
conns
|
conns
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue