Fix draining writers stuck forever and pool_drain_active gauge drift

Two additional bugs causing draining writers to appear stuck:

1. pool_drain_active gauge drift: when cleanup races caused a writer
   to be removed from the Vec without going through remove_writer_only
   (the old unconditional ws.retain bug), decrement_pool_drain_active
   was never called. The gauge drifted upward permanently, making it
   look like draining writers were accumulating even after removal.
   Fix: sync the gauge with the actual draining writer count on every
   reap_draining_writers cycle.

2. Draining writers stuck forever when drain_ttl_secs=0 and no
   per-writer deadline: draining_writer_timeout_expired returned false
   immediately when drain_ttl_secs==0, with no fallback. Writers with
   bound clients would never be force-removed.
   Fix: use a hard upper bound of 600s (10 minutes) as safety net when
   drain_ttl_secs is 0, so draining writers can never get stuck
   indefinitely.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maksim Sirotenko 2026-03-20 01:49:40 +03:00
parent 280a392164
commit 6b8d5aaf68
2 changed files with 22 additions and 4 deletions

View File

@ -715,6 +715,13 @@ impl Stats {
} }
} }
} }
/// Correct the `pool_drain_active` gauge to match the actual count.
/// Called periodically to recover from any increment/decrement drift
/// caused by cleanup races.
pub fn sync_pool_drain_active(&self, actual: u64) {
self.pool_drain_active.store(actual, Ordering::Relaxed);
}
pub fn increment_pool_force_close_total(&self) { pub fn increment_pool_force_close_total(&self) {
if self.telemetry_me_allows_normal() { if self.telemetry_me_allows_normal() {
self.pool_force_close_total.fetch_add(1, Ordering::Relaxed); self.pool_force_close_total.fetch_add(1, Ordering::Relaxed);

View File

@ -166,16 +166,20 @@ fn draining_writer_timeout_expired(
return now_epoch_secs >= deadline_epoch_secs; return now_epoch_secs >= deadline_epoch_secs;
} }
if drain_ttl_secs == 0 {
return false;
}
let drain_started_at_epoch_secs = writer let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs .draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed); .load(std::sync::atomic::Ordering::Relaxed);
if drain_started_at_epoch_secs == 0 { if drain_started_at_epoch_secs == 0 {
return false; return false;
} }
now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs // Use configured TTL, or a hard upper bound (10 minutes) as a safety net
// so draining writers can never get stuck indefinitely.
let effective_ttl = if drain_ttl_secs > 0 {
drain_ttl_secs
} else {
600
};
now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > effective_ttl
} }
pub(super) async fn reap_draining_writers( pub(super) async fn reap_draining_writers(
@ -216,6 +220,13 @@ pub(super) async fn reap_draining_writers(
draining_writers.push(writer); draining_writers.push(writer);
} }
// Sync the pool_drain_active gauge with actual count to correct any
// increment/decrement drift from cleanup races.
let actual_draining = timeout_expired_writer_ids.len()
+ empty_writer_ids.len()
+ draining_writers.len();
pool.stats.sync_pool_drain_active(actual_draining as u64);
if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
draining_writers.sort_by(|left, right| { draining_writers.sort_by(|left, right| {
let left_started = left let left_started = left