diff --git a/src/stats/mod.rs b/src/stats/mod.rs index ad1d16b..b6b2ca5 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -715,6 +715,13 @@ impl Stats { } } } + + /// Correct the `pool_drain_active` gauge to match the actual count. + /// Called periodically to recover from any increment/decrement drift + /// caused by cleanup races. + pub fn sync_pool_drain_active(&self, actual: u64) { + self.pool_drain_active.store(actual, Ordering::Relaxed); + } pub fn increment_pool_force_close_total(&self) { if self.telemetry_me_allows_normal() { self.pool_force_close_total.fetch_add(1, Ordering::Relaxed); diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 8b62cff..b0caa2f 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -166,16 +166,20 @@ fn draining_writer_timeout_expired( return now_epoch_secs >= deadline_epoch_secs; } - if drain_ttl_secs == 0 { - return false; - } let drain_started_at_epoch_secs = writer .draining_started_at_epoch_secs .load(std::sync::atomic::Ordering::Relaxed); if drain_started_at_epoch_secs == 0 { return false; } - now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs + // Use configured TTL, or a hard upper bound (10 minutes) as a safety net + // so draining writers can never get stuck indefinitely. + let effective_ttl = if drain_ttl_secs > 0 { + drain_ttl_secs + } else { + 600 + }; + now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > effective_ttl } pub(super) async fn reap_draining_writers( @@ -216,6 +220,13 @@ pub(super) async fn reap_draining_writers( draining_writers.push(writer); } + // Sync the pool_drain_active gauge with actual count to correct any + // increment/decrement drift from cleanup races. + let actual_draining = timeout_expired_writer_ids.len() + + empty_writer_ids.len() + + draining_writers.len(); + pool.stats.sync_pool_drain_active(actual_draining as u64); + if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { draining_writers.sort_by(|left, right| { let left_started = left