diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 6d0af64..21619c7 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -30,6 +30,11 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; +const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1; +#[cfg(not(test))] +const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = true; +#[cfg(test)] +const HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE: bool = false; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -99,6 +104,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut adaptive_idle_since, &mut adaptive_recover_until, &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, ) .await; let v6_degraded = check_family( @@ -116,6 +123,8 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut adaptive_idle_since, &mut adaptive_recover_until, &mut floor_warn_next_allowed, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, ) .await; degraded_interval = v4_degraded || v6_degraded; @@ -154,6 +163,11 @@ pub(super) async fn reap_draining_writers( } draining_writers.push(writer); } + if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { + for writer in draining_writers.drain(..) { + force_close_writer_ids.push(writer.id); + } + } if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { draining_writers.sort_by(|left, right| { @@ -299,10 +313,14 @@ pub(super) async fn reap_draining_writers( } } - let close_budget = health_drain_close_budget(); let requested_force_close = force_close_writer_ids.len(); let requested_empty_close = empty_writer_ids.len(); let requested_close_total = requested_force_close.saturating_add(requested_empty_close); + let close_budget = if HEALTH_DRAIN_STRICT_IMMEDIATE_FORCE_CLOSE { + requested_close_total + } else { + health_drain_close_budget() + }; let mut closed_writer_ids = HashSet::::new(); let mut closed_total = 0usize; for writer_id in force_close_writer_ids { @@ -396,6 +414,8 @@ async fn check_family( adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, + drain_warn_next_allowed: &mut HashMap, + drain_soft_evict_next_allowed: &mut HashMap, ) -> bool { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -476,8 +496,15 @@ async fn check_family( floor_plan.active_writers_current, floor_plan.warm_writers_current, ); + let mut next_drain_reap_at = Instant::now(); for (dc, endpoints) in dc_endpoints { + if Instant::now() >= next_drain_reap_at { + reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed) + .await; + next_drain_reap_at = Instant::now() + + Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS); + } if endpoints.is_empty() { continue; } @@ -621,6 +648,12 @@ async fn check_family( let mut restored = 0usize; for _ in 0..missing { + if Instant::now() >= next_drain_reap_at { + reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed) + .await; + next_drain_reap_at = Instant::now() + + Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS); + } if reconnect_budget == 0 { break; }