Instadrain + Hard-remove for long draining-state

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-19 17:45:17 +03:00
parent f655924323
commit 8d1faece60
No known key found for this signature in database
15 changed files with 170 additions and 41 deletions

View File

@ -364,6 +364,7 @@ pub(super) struct MinimalMeRuntimeData {
pub(super) me_reconnect_backoff_cap_ms: u64, pub(super) me_reconnect_backoff_cap_ms: u64,
pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_reconnect_fast_retry_count: u32,
pub(super) me_pool_drain_ttl_secs: u64, pub(super) me_pool_drain_ttl_secs: u64,
pub(super) me_instadrain: bool,
pub(super) me_pool_drain_soft_evict_enabled: bool, pub(super) me_pool_drain_soft_evict_enabled: bool,
pub(super) me_pool_drain_soft_evict_grace_secs: u64, pub(super) me_pool_drain_soft_evict_grace_secs: u64,
pub(super) me_pool_drain_soft_evict_per_writer: u8, pub(super) me_pool_drain_soft_evict_per_writer: u8,

View File

@ -431,6 +431,7 @@ async fn get_minimal_payload_cached(
me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms, me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms,
me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count, me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs, me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs,
me_instadrain: runtime.me_instadrain,
me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs, me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs,
me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer, me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer,

View File

@ -198,6 +198,7 @@ desync_all_full = false
update_every = 43200 update_every = 43200
hardswap = false hardswap = false
me_pool_drain_ttl_secs = 90 me_pool_drain_ttl_secs = 90
me_instadrain = false
me_pool_min_fresh_ratio = 0.8 me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120 me_reinit_drain_timeout_secs = 120

View File

@ -613,6 +613,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
90 90
} }
pub(crate) fn default_me_instadrain() -> bool {
false
}
pub(crate) fn default_me_pool_drain_threshold() -> u64 { pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128 128
} }

View File

@ -56,6 +56,7 @@ pub struct HotFields {
pub me_reinit_coalesce_window_ms: u64, pub me_reinit_coalesce_window_ms: u64,
pub hardswap: bool, pub hardswap: bool,
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
pub me_instadrain: bool,
pub me_pool_drain_threshold: u64, pub me_pool_drain_threshold: u64,
pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64, pub me_pool_drain_soft_evict_grace_secs: u64,
@ -143,6 +144,7 @@ impl HotFields {
me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms,
hardswap: cfg.general.hardswap, hardswap: cfg.general.hardswap,
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
me_instadrain: cfg.general.me_instadrain,
me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, me_pool_drain_threshold: cfg.general.me_pool_drain_threshold,
me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs, me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs,
@ -477,6 +479,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms;
cfg.general.hardswap = new.general.hardswap; cfg.general.hardswap = new.general.hardswap;
cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs;
cfg.general.me_instadrain = new.general.me_instadrain;
cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold;
cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled; cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled;
cfg.general.me_pool_drain_soft_evict_grace_secs = cfg.general.me_pool_drain_soft_evict_grace_secs =
@ -869,6 +872,12 @@ fn log_changes(
old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs, old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs,
); );
} }
if old_hot.me_instadrain != new_hot.me_instadrain {
info!(
"config reload: me_instadrain: {} → {}",
old_hot.me_instadrain, new_hot.me_instadrain,
);
}
if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold { if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold {
info!( info!(

View File

@ -812,6 +812,10 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_ttl_secs")] #[serde(default = "default_me_pool_drain_ttl_secs")]
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
/// Force-remove any draining writer on the next cleanup tick, regardless of age/deadline.
#[serde(default = "default_me_instadrain")]
pub me_instadrain: bool,
/// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches. /// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches.
/// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior. /// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior.
#[serde(default = "default_me_pool_drain_threshold")] #[serde(default = "default_me_pool_drain_threshold")]
@ -1020,6 +1024,7 @@ impl Default for GeneralConfig {
me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(),
proxy_secret_len_max: default_proxy_secret_len_max(), proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_instadrain: default_me_instadrain(),
me_pool_drain_threshold: default_me_pool_drain_threshold(), me_pool_drain_threshold: default_me_pool_drain_threshold(),
me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(), me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(),
me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(), me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(),

View File

@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool(
config.general.me_adaptive_floor_max_warm_writers_global, config.general.me_adaptive_floor_max_warm_writers_global,
config.general.hardswap, config.general.hardswap,
config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_ttl_secs,
config.general.me_instadrain,
config.general.me_pool_drain_threshold, config.general.me_pool_drain_threshold,
config.general.me_pool_drain_soft_evict_enabled, config.general.me_pool_drain_soft_evict_enabled,
config.general.me_pool_drain_soft_evict_grace_secs, config.general.me_pool_drain_soft_evict_grace_secs,
@ -342,6 +343,13 @@ pub(crate) async fn initialize_me_pool(
) )
.await; .await;
}); });
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break; break;
} }
Err(e) => { Err(e) => {
@ -409,6 +417,13 @@ pub(crate) async fn initialize_me_pool(
) )
.await; .await;
}); });
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break Some(pool); break Some(pool);
} }

View File

@ -298,6 +298,7 @@ async fn run_update_cycle(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs, cfg.general.me_pool_drain_soft_evict_grace_secs,
@ -530,6 +531,7 @@ pub async fn me_config_updater(
pool.update_runtime_reinit_policy( pool.update_runtime_reinit_policy(
cfg.general.hardswap, cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs, cfg.general.me_pool_drain_soft_evict_grace_secs,

View File

@ -12,6 +12,7 @@ use crate::crypto::SecureRandom;
use crate::network::IpFamily; use crate::network::IpFamily;
use super::MePool; use super::MePool;
use super::pool::MeWriter;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)] #[allow(dead_code)]
@ -31,10 +32,7 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1; const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1;
#[cfg(not(test))] const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1;
const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = true;
#[cfg(test)]
const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = false;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct DcFloorPlanEntry { struct DcFloorPlanEntry {
@ -131,6 +129,55 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
} }
} }
pub async fn me_drain_timeout_enforcer(pool: Arc<MePool>) {
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut drain_soft_evict_next_allowed: HashMap<u64, Instant> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(
HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS,
))
.await;
reap_draining_writers(
&pool,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
}
}
fn draining_writer_timeout_expired(
pool: &MePool,
writer: &MeWriter,
now_epoch_secs: u64,
drain_ttl_secs: u64,
) -> bool {
if pool
.me_instadrain
.load(std::sync::atomic::Ordering::Relaxed)
{
return true;
}
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 {
return now_epoch_secs >= deadline_epoch_secs;
}
if drain_ttl_secs == 0 {
return false;
}
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if drain_started_at_epoch_secs == 0 {
return false;
}
now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
}
pub(super) async fn reap_draining_writers( pub(super) async fn reap_draining_writers(
pool: &Arc<MePool>, pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>, warn_next_allowed: &mut HashMap<u64, Instant>,
@ -146,11 +193,16 @@ pub(super) async fn reap_draining_writers(
let activity = pool.registry.writer_activity_snapshot().await; let activity = pool.registry.writer_activity_snapshot().await;
let mut draining_writers = Vec::new(); let mut draining_writers = Vec::new();
let mut empty_writer_ids = Vec::<u64>::new(); let mut empty_writer_ids = Vec::<u64>::new();
let mut timeout_expired_writer_ids = Vec::<u64>::new();
let mut force_close_writer_ids = Vec::<u64>::new(); let mut force_close_writer_ids = Vec::<u64>::new();
for writer in writers { for writer in writers {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue; continue;
} }
if draining_writer_timeout_expired(pool, &writer, now_epoch_secs, drain_ttl_secs) {
timeout_expired_writer_ids.push(writer.id);
continue;
}
if activity if activity
.bound_clients_by_writer .bound_clients_by_writer
.get(&writer.id) .get(&writer.id)
@ -163,11 +215,6 @@ pub(super) async fn reap_draining_writers(
} }
draining_writers.push(writer); draining_writers.push(writer);
} }
if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE {
for writer in draining_writers.drain(..) {
force_close_writer_ids.push(writer.id);
}
}
if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize {
draining_writers.sort_by(|left, right| { draining_writers.sort_by(|left, right| {
@ -221,14 +268,6 @@ pub(super) async fn reap_draining_writers(
"ME draining writer remains non-empty past drain TTL" "ME draining writer remains non-empty past drain TTL"
); );
} }
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs {
warn!(writer_id = writer.id, "Drain timeout, force-closing");
force_close_writer_ids.push(writer.id);
active_draining_writer_ids.remove(&writer.id);
}
} }
warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id));
@ -313,15 +352,21 @@ pub(super) async fn reap_draining_writers(
} }
} }
let mut closed_writer_ids = HashSet::<u64>::new();
for writer_id in timeout_expired_writer_ids {
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
}
let requested_force_close = force_close_writer_ids.len(); let requested_force_close = force_close_writer_ids.len();
let requested_empty_close = empty_writer_ids.len(); let requested_empty_close = empty_writer_ids.len();
let requested_close_total = requested_force_close.saturating_add(requested_empty_close); let requested_close_total = requested_force_close.saturating_add(requested_empty_close);
let close_budget = if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { let close_budget = health_drain_close_budget();
requested_close_total
} else {
health_drain_close_budget()
};
let mut closed_writer_ids = HashSet::<u64>::new();
let mut closed_total = 0usize; let mut closed_total = 0usize;
for writer_id in force_close_writer_ids { for writer_id in force_close_writer_ids {
if closed_total >= close_budget { if closed_total >= close_budget {
@ -1581,6 +1626,7 @@ mod tests {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,

View File

@ -81,6 +81,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,
@ -213,7 +214,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(600).saturating_add(writer_id), now_epoch_secs.saturating_sub(20),
1, 1,
0, 0,
) )
@ -230,7 +231,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
} }
assert_eq!(writer_count(&pool).await, threshold as usize); assert_eq!(writer_count(&pool).await, threshold as usize);
assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]); assert_eq!(sorted_writer_ids(&pool).await, vec![1, 2, 3]);
} }
#[tokio::test] #[tokio::test]

View File

@ -80,6 +80,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,

View File

@ -74,6 +74,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_adaptive_floor_max_warm_writers_global, general.me_adaptive_floor_max_warm_writers_global,
general.hardswap, general.hardswap,
general.me_pool_drain_ttl_secs, general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold, general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs, general.me_pool_drain_soft_evict_grace_secs,
@ -180,8 +181,14 @@ async fn current_writer_ids(pool: &Arc<MePool>) -> Vec<u64> {
async fn reap_draining_writers_drops_warn_state_for_removed_writer() { async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
let pool = make_pool(128).await; let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let conn_ids = let conn_ids = insert_draining_writer(
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; &pool,
7,
now_epoch_secs.saturating_sub(180),
1,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
@ -331,17 +338,17 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_limits_closes_per_health_tick() { async fn reap_draining_writers_limits_closes_per_health_tick() {
let pool = make_pool(128).await; let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget(); let close_budget = health_drain_close_budget();
let writer_total = close_budget.saturating_add(19); let writer_total = close_budget.saturating_add(20);
for writer_id in 1..=writer_total as u64 { for writer_id in 1..=writer_total as u64 {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 1,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
@ -364,8 +371,8 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 0,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
@ -393,7 +400,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(200).saturating_add(writer_id), now_epoch_secs.saturating_sub(20),
1, 1,
0, 0,
) )
@ -429,27 +436,27 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
#[tokio::test] #[tokio::test]
async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let pool = make_pool(128).await; let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget(); let close_budget = health_drain_close_budget();
for writer_id in 1..=close_budget as u64 { for writer_id in 1..=close_budget.saturating_add(1) as u64 {
insert_draining_writer( insert_draining_writer(
&pool, &pool,
writer_id, writer_id,
now_epoch_secs.saturating_sub(20), now_epoch_secs.saturating_sub(20),
1, 1,
now_epoch_secs.saturating_sub(1), 0,
) )
.await; .await;
} }
let empty_writer_id = close_budget as u64 + 1; let empty_writer_id = close_budget.saturating_add(2) as u64;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); assert_eq!(current_writer_ids(&pool).await, vec![1, empty_writer_id]);
} }
#[tokio::test] #[tokio::test]
@ -571,7 +578,14 @@ async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() {
.store(1, Ordering::Relaxed); .store(1, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await; insert_draining_writer(
&pool,
77,
now_epoch_secs.saturating_sub(240),
3,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
@ -595,7 +609,14 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
.store(60_000, Ordering::Relaxed); .store(60_000, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await; insert_draining_writer(
&pool,
88,
now_epoch_secs.saturating_sub(240),
3,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new(); let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new();
@ -608,6 +629,21 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
} }
#[tokio::test]
async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediately() {
let pool = make_pool(0).await;
pool.me_instadrain.store(true, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 101, now_epoch_secs.saturating_sub(5), 1, 0).await;
insert_draining_writer(&pool, 102, now_epoch_secs.saturating_sub(4), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
#[test] #[test]
fn general_config_default_drain_threshold_remains_enabled() { fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);

View File

@ -30,7 +30,7 @@ mod health_adversarial_tests;
use bytes::Bytes; use bytes::Bytes;
pub use health::me_health_monitor; pub use health::{me_drain_timeout_enforcer, me_health_monitor};
#[allow(unused_imports)] #[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool; pub use pool::MePool;

View File

@ -171,6 +171,7 @@ pub struct MePool {
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_instadrain: AtomicBool,
pub(super) me_pool_drain_threshold: AtomicU64, pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_drain_soft_evict_enabled: AtomicBool, pub(super) me_pool_drain_soft_evict_enabled: AtomicBool,
pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64, pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64,
@ -279,6 +280,7 @@ impl MePool {
me_adaptive_floor_max_warm_writers_global: u32, me_adaptive_floor_max_warm_writers_global: u32,
hardswap: bool, hardswap: bool,
me_pool_drain_ttl_secs: u64, me_pool_drain_ttl_secs: u64,
me_instadrain: bool,
me_pool_drain_threshold: u64, me_pool_drain_threshold: u64,
me_pool_drain_soft_evict_enabled: bool, me_pool_drain_soft_evict_enabled: bool,
me_pool_drain_soft_evict_grace_secs: u64, me_pool_drain_soft_evict_grace_secs: u64,
@ -462,6 +464,7 @@ impl MePool {
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled), me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs), me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs),
@ -524,6 +527,7 @@ impl MePool {
&self, &self,
hardswap: bool, hardswap: bool,
drain_ttl_secs: u64, drain_ttl_secs: u64,
instadrain: bool,
pool_drain_threshold: u64, pool_drain_threshold: u64,
pool_drain_soft_evict_enabled: bool, pool_drain_soft_evict_enabled: bool,
pool_drain_soft_evict_grace_secs: u64, pool_drain_soft_evict_grace_secs: u64,
@ -568,6 +572,7 @@ impl MePool {
self.hardswap.store(hardswap, Ordering::Relaxed); self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.me_pool_drain_ttl_secs
.store(drain_ttl_secs, Ordering::Relaxed); .store(drain_ttl_secs, Ordering::Relaxed);
self.me_instadrain.store(instadrain, Ordering::Relaxed);
self.me_pool_drain_threshold self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed); .store(pool_drain_threshold, Ordering::Relaxed);
self.me_pool_drain_soft_evict_enabled self.me_pool_drain_soft_evict_enabled

View File

@ -126,6 +126,7 @@ pub(crate) struct MeApiRuntimeSnapshot {
pub me_reconnect_backoff_cap_ms: u64, pub me_reconnect_backoff_cap_ms: u64,
pub me_reconnect_fast_retry_count: u32, pub me_reconnect_fast_retry_count: u32,
pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_ttl_secs: u64,
pub me_instadrain: bool,
pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64, pub me_pool_drain_soft_evict_grace_secs: u64,
pub me_pool_drain_soft_evict_per_writer: u8, pub me_pool_drain_soft_evict_per_writer: u8,
@ -583,6 +584,7 @@ impl MePool {
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_instadrain: self.me_instadrain.load(Ordering::Relaxed),
me_pool_drain_soft_evict_enabled: self me_pool_drain_soft_evict_enabled: self
.me_pool_drain_soft_evict_enabled .me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed), .load(Ordering::Relaxed),