diff --git a/src/api/model.rs b/src/api/model.rs index ac4e297..e98de8b 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -364,6 +364,7 @@ pub(super) struct MinimalMeRuntimeData { pub(super) me_reconnect_backoff_cap_ms: u64, pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_pool_drain_ttl_secs: u64, + pub(super) me_instadrain: bool, pub(super) me_pool_drain_soft_evict_enabled: bool, pub(super) me_pool_drain_soft_evict_grace_secs: u64, pub(super) me_pool_drain_soft_evict_per_writer: u8, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index f8948d1..cdeacc0 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -431,6 +431,7 @@ async fn get_minimal_payload_cached( me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms, me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs, + me_instadrain: runtime.me_instadrain, me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs, me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer, diff --git a/src/cli.rs b/src/cli.rs index a1182a7..5fbd7d5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -198,6 +198,7 @@ desync_all_full = false update_every = 43200 hardswap = false me_pool_drain_ttl_secs = 90 +me_instadrain = false me_pool_min_fresh_ratio = 0.8 me_reinit_drain_timeout_secs = 120 diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 54a53b3..6d74c93 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -613,6 +613,10 @@ pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 { 90 } +pub(crate) fn default_me_instadrain() -> bool { + false +} + pub(crate) fn default_me_pool_drain_threshold() -> u64 { 128 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 7b94999..1315f9c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -56,6 +56,7 @@ pub struct HotFields { pub me_reinit_coalesce_window_ms: u64, pub hardswap: bool, pub me_pool_drain_ttl_secs: u64, + pub me_instadrain: bool, pub me_pool_drain_threshold: u64, pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_grace_secs: u64, @@ -143,6 +144,7 @@ impl HotFields { me_reinit_coalesce_window_ms: cfg.general.me_reinit_coalesce_window_ms, hardswap: cfg.general.hardswap, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, + me_instadrain: cfg.general.me_instadrain, me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled, me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs, @@ -477,6 +479,7 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_reinit_coalesce_window_ms = new.general.me_reinit_coalesce_window_ms; cfg.general.hardswap = new.general.hardswap; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; + cfg.general.me_instadrain = new.general.me_instadrain; cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled; cfg.general.me_pool_drain_soft_evict_grace_secs = @@ -869,6 +872,12 @@ fn log_changes( old_hot.me_pool_drain_ttl_secs, new_hot.me_pool_drain_ttl_secs, ); } + if old_hot.me_instadrain != new_hot.me_instadrain { + info!( + "config reload: me_instadrain: {} → {}", + old_hot.me_instadrain, new_hot.me_instadrain, + ); + } if old_hot.me_pool_drain_threshold != new_hot.me_pool_drain_threshold { info!( diff --git a/src/config/types.rs b/src/config/types.rs index 047f3c2..ecd051d 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -812,6 +812,10 @@ pub struct GeneralConfig { #[serde(default = "default_me_pool_drain_ttl_secs")] pub me_pool_drain_ttl_secs: u64, + /// Force-remove any draining writer on the next cleanup tick, regardless of age/deadline. + #[serde(default = "default_me_instadrain")] + pub me_instadrain: bool, + /// Maximum allowed number of draining ME writers before oldest ones are force-closed in batches. /// Set to 0 to disable threshold-based draining cleanup and keep timeout-only behavior. #[serde(default = "default_me_pool_drain_threshold")] @@ -1020,6 +1024,7 @@ impl Default for GeneralConfig { me_secret_atomic_snapshot: default_me_secret_atomic_snapshot(), proxy_secret_len_max: default_proxy_secret_len_max(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), + me_instadrain: default_me_instadrain(), me_pool_drain_threshold: default_me_pool_drain_threshold(), me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(), me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(), diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 827b00c..0b1310a 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -237,6 +237,7 @@ pub(crate) async fn initialize_me_pool( config.general.me_adaptive_floor_max_warm_writers_global, config.general.hardswap, config.general.me_pool_drain_ttl_secs, + config.general.me_instadrain, config.general.me_pool_drain_threshold, config.general.me_pool_drain_soft_evict_enabled, config.general.me_pool_drain_soft_evict_grace_secs, @@ -342,6 +343,13 @@ pub(crate) async fn initialize_me_pool( ) .await; }); + let pool_drain_enforcer = pool_bg.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer( + pool_drain_enforcer, + ) + .await; + }); break; } Err(e) => { @@ -409,6 +417,13 @@ pub(crate) async fn initialize_me_pool( ) .await; }); + let pool_drain_enforcer = pool.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer( + pool_drain_enforcer, + ) + .await; + }); break Some(pool); } diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 43a3569..26ec497 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -298,6 +298,7 @@ async fn run_update_cycle( pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, + cfg.general.me_instadrain, cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_grace_secs, @@ -530,6 +531,7 @@ pub async fn me_config_updater( pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, + cfg.general.me_instadrain, cfg.general.me_pool_drain_threshold, cfg.general.me_pool_drain_soft_evict_enabled, cfg.general.me_pool_drain_soft_evict_grace_secs, diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 6d0af64..8b62cff 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -12,6 +12,7 @@ use crate::crypto::SecureRandom; use crate::network::IpFamily; use super::MePool; +use super::pool::MeWriter; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] @@ -30,6 +31,8 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; +const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1; +const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -99,6 +102,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut adaptive_idle_since, &mut adaptive_recover_until, &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, ) .await; let v6_degraded = check_family( @@ -116,12 +121,63 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut adaptive_idle_since, &mut adaptive_recover_until, &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, ) .await; degraded_interval = v4_degraded || v6_degraded; } } +pub async fn me_drain_timeout_enforcer(pool: Arc) { + let mut drain_warn_next_allowed: HashMap = HashMap::new(); + let mut drain_soft_evict_next_allowed: HashMap = HashMap::new(); + loop { + tokio::time::sleep(Duration::from_secs( + HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS, + )) + .await; + reap_draining_writers( + &pool, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, + ) + .await; + } +} + +fn draining_writer_timeout_expired( + pool: &MePool, + writer: &MeWriter, + now_epoch_secs: u64, + drain_ttl_secs: u64, +) -> bool { + if pool + .me_instadrain + .load(std::sync::atomic::Ordering::Relaxed) + { + return true; + } + + let deadline_epoch_secs = writer + .drain_deadline_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if deadline_epoch_secs != 0 { + return now_epoch_secs >= deadline_epoch_secs; + } + + if drain_ttl_secs == 0 { + return false; + } + let drain_started_at_epoch_secs = writer + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if drain_started_at_epoch_secs == 0 { + return false; + } + now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs +} + pub(super) async fn reap_draining_writers( pool: &Arc, warn_next_allowed: &mut HashMap, @@ -137,11 +193,16 @@ pub(super) async fn reap_draining_writers( let activity = pool.registry.writer_activity_snapshot().await; let mut draining_writers = Vec::new(); let mut empty_writer_ids = Vec::::new(); + let mut timeout_expired_writer_ids = Vec::::new(); let mut force_close_writer_ids = Vec::::new(); for writer in writers { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { continue; } + if draining_writer_timeout_expired(pool, &writer, now_epoch_secs, drain_ttl_secs) { + timeout_expired_writer_ids.push(writer.id); + continue; + } if activity .bound_clients_by_writer .get(&writer.id) @@ -207,14 +268,6 @@ pub(super) async fn reap_draining_writers( "ME draining writer remains non-empty past drain TTL" ); } - let deadline_epoch_secs = writer - .drain_deadline_epoch_secs - .load(std::sync::atomic::Ordering::Relaxed); - if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs { - warn!(writer_id = writer.id, "Drain timeout, force-closing"); - force_close_writer_ids.push(writer.id); - active_draining_writer_ids.remove(&writer.id); - } } warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); @@ -299,11 +352,21 @@ pub(super) async fn reap_draining_writers( } } - let close_budget = health_drain_close_budget(); + let mut closed_writer_ids = HashSet::::new(); + for writer_id in timeout_expired_writer_ids { + if !closed_writer_ids.insert(writer_id) { + continue; + } + pool.stats.increment_pool_force_close_total(); + pool.remove_writer_and_close_clients(writer_id).await; + pool.stats + .increment_me_draining_writers_reap_progress_total(); + } + let requested_force_close = force_close_writer_ids.len(); let requested_empty_close = empty_writer_ids.len(); let requested_close_total = requested_force_close.saturating_add(requested_empty_close); - let mut closed_writer_ids = HashSet::::new(); + let close_budget = health_drain_close_budget(); let mut closed_total = 0usize; for writer_id in force_close_writer_ids { if closed_total >= close_budget { @@ -396,6 +459,8 @@ async fn check_family( adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, + drain_warn_next_allowed: &mut HashMap, + drain_soft_evict_next_allowed: &mut HashMap, ) -> bool { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -476,8 +541,15 @@ async fn check_family( floor_plan.active_writers_current, floor_plan.warm_writers_current, ); + let mut next_drain_reap_at = Instant::now(); for (dc, endpoints) in dc_endpoints { + if Instant::now() >= next_drain_reap_at { + reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed) + .await; + next_drain_reap_at = Instant::now() + + Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS); + } if endpoints.is_empty() { continue; } @@ -621,6 +693,12 @@ async fn check_family( let mut restored = 0usize; for _ in 0..missing { + if Instant::now() >= next_drain_reap_at { + reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed) + .await; + next_drain_reap_at = Instant::now() + + Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS); + } if reconnect_budget == 0 { break; } @@ -1548,6 +1626,7 @@ mod tests { general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index 3f182e4..ae517b3 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -81,6 +81,7 @@ async fn make_pool( general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, @@ -213,7 +214,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle insert_draining_writer( &pool, writer_id, - now_epoch_secs.saturating_sub(600).saturating_add(writer_id), + now_epoch_secs.saturating_sub(20), 1, 0, ) @@ -230,7 +231,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle } assert_eq!(writer_count(&pool).await, threshold as usize); - assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]); + assert_eq!(sorted_writer_ids(&pool).await, vec![1, 2, 3]); } #[tokio::test] diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 7f99d2a..fbbffce 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -80,6 +80,7 @@ async fn make_pool( general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 565ac74..bcdaf2e 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -74,6 +74,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.me_adaptive_floor_max_warm_writers_global, general.hardswap, general.me_pool_drain_ttl_secs, + general.me_instadrain, general.me_pool_drain_threshold, general.me_pool_drain_soft_evict_enabled, general.me_pool_drain_soft_evict_grace_secs, @@ -180,8 +181,14 @@ async fn current_writer_ids(pool: &Arc) -> Vec { async fn reap_draining_writers_drops_warn_state_for_removed_writer() { let pool = make_pool(128).await; let now_epoch_secs = MePool::now_epoch_secs(); - let conn_ids = - insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; + let conn_ids = insert_draining_writer( + &pool, + 7, + now_epoch_secs.saturating_sub(180), + 1, + now_epoch_secs.saturating_add(3_600), + ) + .await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); @@ -331,17 +338,17 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() { #[tokio::test] async fn reap_draining_writers_limits_closes_per_health_tick() { - let pool = make_pool(128).await; + let pool = make_pool(1).await; let now_epoch_secs = MePool::now_epoch_secs(); let close_budget = health_drain_close_budget(); - let writer_total = close_budget.saturating_add(19); + let writer_total = close_budget.saturating_add(20); for writer_id in 1..=writer_total as u64 { insert_draining_writer( &pool, writer_id, now_epoch_secs.saturating_sub(20), 1, - now_epoch_secs.saturating_sub(1), + 0, ) .await; } @@ -364,8 +371,8 @@ async fn reap_draining_writers_backlog_drains_across_ticks() { &pool, writer_id, now_epoch_secs.saturating_sub(20), - 1, - now_epoch_secs.saturating_sub(1), + 0, + 0, ) .await; } @@ -393,7 +400,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() { insert_draining_writer( &pool, writer_id, - now_epoch_secs.saturating_sub(200).saturating_add(writer_id), + now_epoch_secs.saturating_sub(20), 1, 0, ) @@ -429,27 +436,27 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr #[tokio::test] async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { - let pool = make_pool(128).await; + let pool = make_pool(1).await; let now_epoch_secs = MePool::now_epoch_secs(); let close_budget = health_drain_close_budget(); - for writer_id in 1..=close_budget as u64 { + for writer_id in 1..=close_budget.saturating_add(1) as u64 { insert_draining_writer( &pool, writer_id, now_epoch_secs.saturating_sub(20), 1, - now_epoch_secs.saturating_sub(1), + 0, ) .await; } - let empty_writer_id = close_budget as u64 + 1; + let empty_writer_id = close_budget.saturating_add(2) as u64; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; - assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); + assert_eq!(current_writer_ids(&pool).await, vec![1, empty_writer_id]); } #[tokio::test] @@ -571,7 +578,14 @@ async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() { .store(1, Ordering::Relaxed); let now_epoch_secs = MePool::now_epoch_secs(); - insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await; + insert_draining_writer( + &pool, + 77, + now_epoch_secs.saturating_sub(240), + 3, + now_epoch_secs.saturating_add(3_600), + ) + .await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); @@ -595,7 +609,14 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() { .store(60_000, Ordering::Relaxed); let now_epoch_secs = MePool::now_epoch_secs(); - insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await; + insert_draining_writer( + &pool, + 88, + now_epoch_secs.saturating_sub(240), + 3, + now_epoch_secs.saturating_add(3_600), + ) + .await; let mut warn_next_allowed = HashMap::new(); let mut soft_evict_next_allowed = HashMap::new(); @@ -608,6 +629,21 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() { assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); } +#[tokio::test] +async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediately() { + let pool = make_pool(0).await; + pool.me_instadrain.store(true, Ordering::Relaxed); + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 101, now_epoch_secs.saturating_sub(5), 1, 0).await; + insert_draining_writer(&pool, 102, now_epoch_secs.saturating_sub(4), 1, 0).await; + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + + assert!(current_writer_ids(&pool).await.is_empty()); +} + #[test] fn general_config_default_drain_threshold_remains_enabled() { assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 590c996..26ded29 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -30,7 +30,7 @@ mod health_adversarial_tests; use bytes::Bytes; -pub use health::me_health_monitor; +pub use health::{me_drain_timeout_enforcer, me_health_monitor}; #[allow(unused_imports)] pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use pool::MePool; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index d09f07c..441d41d 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -171,6 +171,7 @@ pub struct MePool { pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, + pub(super) me_instadrain: AtomicBool, pub(super) me_pool_drain_threshold: AtomicU64, pub(super) me_pool_drain_soft_evict_enabled: AtomicBool, pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64, @@ -279,6 +280,7 @@ impl MePool { me_adaptive_floor_max_warm_writers_global: u32, hardswap: bool, me_pool_drain_ttl_secs: u64, + me_instadrain: bool, me_pool_drain_threshold: u64, me_pool_drain_soft_evict_enabled: bool, me_pool_drain_soft_evict_grace_secs: u64, @@ -462,6 +464,7 @@ impl MePool { endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), + me_instadrain: AtomicBool::new(me_instadrain), me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled), me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs), @@ -524,6 +527,7 @@ impl MePool { &self, hardswap: bool, drain_ttl_secs: u64, + instadrain: bool, pool_drain_threshold: u64, pool_drain_soft_evict_enabled: bool, pool_drain_soft_evict_grace_secs: u64, @@ -568,6 +572,7 @@ impl MePool { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs .store(drain_ttl_secs, Ordering::Relaxed); + self.me_instadrain.store(instadrain, Ordering::Relaxed); self.me_pool_drain_threshold .store(pool_drain_threshold, Ordering::Relaxed); self.me_pool_drain_soft_evict_enabled diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 214ee49..5fe45cb 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -126,6 +126,7 @@ pub(crate) struct MeApiRuntimeSnapshot { pub me_reconnect_backoff_cap_ms: u64, pub me_reconnect_fast_retry_count: u32, pub me_pool_drain_ttl_secs: u64, + pub me_instadrain: bool, pub me_pool_drain_soft_evict_enabled: bool, pub me_pool_drain_soft_evict_grace_secs: u64, pub me_pool_drain_soft_evict_per_writer: u8, @@ -583,6 +584,7 @@ impl MePool { me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), + me_instadrain: self.me_instadrain.load(Ordering::Relaxed), me_pool_drain_soft_evict_enabled: self .me_pool_drain_soft_evict_enabled .load(Ordering::Relaxed),