diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index a7d2960..942ddaf 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -20,6 +20,7 @@ use super::registry::BoundConn; const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; +const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; impl MePool { pub(crate) async fn prune_closed_writers(self: &Arc) { @@ -154,9 +155,18 @@ impl MePool { let pool_ping = Arc::downgrade(self); tokio::spawn(async move { let mut ping_id: i64 = rand::random::(); + let idle_interval_cap = Duration::from_secs(ME_IDLE_KEEPALIVE_MAX_SECS); // Per-writer jittered start to avoid phase sync. let startup_jitter = if keepalive_enabled { - let jitter_cap_ms = keepalive_interval.as_millis() / 2; + let mut interval = keepalive_interval; + if let Some(pool) = pool_ping.upgrade() { + if pool.registry.is_writer_empty(writer_id).await { + interval = interval.min(idle_interval_cap); + } + } else { + return; + } + let jitter_cap_ms = interval.as_millis() / 2; let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) } else { @@ -170,9 +180,17 @@ impl MePool { } loop { let wait = if keepalive_enabled { - let jitter_cap_ms = keepalive_interval.as_millis() / 2; + let mut interval = keepalive_interval; + if let Some(pool) = pool_ping.upgrade() { + if pool.registry.is_writer_empty(writer_id).await { + interval = interval.min(idle_interval_cap); + } + } else { + break; + } + let jitter_cap_ms = interval.as_millis() / 2; let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); - keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) + interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) } else { let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;