ME Dead Writer w/o dead-lock on timeout

This commit is contained in:
Alexey 2026-02-26 19:37:17 +03:00
parent 04e6135935
commit 144f81c473
No known key found for this signature in database
1 changed files with 21 additions and 3 deletions

View File

@ -20,6 +20,7 @@ use super::registry::BoundConn;
const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
impl MePool { impl MePool {
pub(crate) async fn prune_closed_writers(self: &Arc<Self>) { pub(crate) async fn prune_closed_writers(self: &Arc<Self>) {
@ -154,9 +155,18 @@ impl MePool {
let pool_ping = Arc::downgrade(self); let pool_ping = Arc::downgrade(self);
tokio::spawn(async move { tokio::spawn(async move {
let mut ping_id: i64 = rand::random::<i64>(); let mut ping_id: i64 = rand::random::<i64>();
let idle_interval_cap = Duration::from_secs(ME_IDLE_KEEPALIVE_MAX_SECS);
// Per-writer jittered start to avoid phase sync. // Per-writer jittered start to avoid phase sync.
let startup_jitter = if keepalive_enabled { let startup_jitter = if keepalive_enabled {
let jitter_cap_ms = keepalive_interval.as_millis() / 2; let mut interval = keepalive_interval;
if let Some(pool) = pool_ping.upgrade() {
if pool.registry.is_writer_empty(writer_id).await {
interval = interval.min(idle_interval_cap);
}
} else {
return;
}
let jitter_cap_ms = interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else { } else {
@ -170,9 +180,17 @@ impl MePool {
} }
loop { loop {
let wait = if keepalive_enabled { let wait = if keepalive_enabled {
let jitter_cap_ms = keepalive_interval.as_millis() / 2; let mut interval = keepalive_interval;
if let Some(pool) = pool_ping.upgrade() {
if pool.registry.is_writer_empty(writer_id).await {
interval = interval.min(idle_interval_cap);
}
} else {
break;
}
let jitter_cap_ms = interval.as_millis() / 2;
let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1);
keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))
} else { } else {
let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;