fix(me-pool): resolve 0-writer blackouts with zero-allocation constraints

- Converts adaptive floor logic from proactive idle drops to reactive
  global capacity constraints, fixing sudden drops to 0 active writers.
- Implements `base_req` override gateway via `can_open_writer_for_contour`,
  retaining critical connections for starved datacenters during bursts.
- Applies zero-allocation performance optimization via direct inner lock iter,
  avoiding `HashSet` generation and deep `RwLock` checks in writer validation paths.
- Scrubs now-dead variables/evaluations (`adaptive_idle_since`,
  `adaptive_recover_until`) to fulfill strict memory & hot-path constraints.
This commit is contained in:
miniusercoder
2026-04-06 20:27:17 +03:00
parent a36c7b3f66
commit 86be0d53fe
3 changed files with 54 additions and 102 deletions

View File

@@ -82,8 +82,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut degraded_interval = true; let mut degraded_interval = true;
@@ -109,8 +107,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut single_endpoint_outage, &mut single_endpoint_outage,
&mut shadow_rotate_deadline, &mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt, &mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed, &mut floor_warn_next_allowed,
) )
.await; .await;
@@ -126,8 +122,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut single_endpoint_outage, &mut single_endpoint_outage,
&mut shadow_rotate_deadline, &mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt, &mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed, &mut floor_warn_next_allowed,
) )
.await; .await;
@@ -360,8 +354,6 @@ async fn check_family(
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool { ) -> bool {
let enabled = match family { let enabled = match family {
@@ -394,8 +386,6 @@ async fn check_family(
let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget)); let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget));
if pool.floor_mode() == MeFloorMode::Static { if pool.floor_mode() == MeFloorMode::Static {
adaptive_idle_since.clear();
adaptive_recover_until.clear();
} }
let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new(); let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new();
@@ -435,8 +425,6 @@ async fn check_family(
&live_addr_counts, &live_addr_counts,
&live_writer_ids_by_addr, &live_writer_ids_by_addr,
&bound_clients_by_writer, &bound_clients_by_writer,
adaptive_idle_since,
adaptive_recover_until,
) )
.await; .await;
pool.set_adaptive_floor_runtime_caps( pool.set_adaptive_floor_runtime_caps(
@@ -503,8 +491,6 @@ async fn check_family(
outage_next_attempt.remove(&key); outage_next_attempt.remove(&key);
shadow_rotate_deadline.remove(&key); shadow_rotate_deadline.remove(&key);
idle_refresh_next_attempt.remove(&key); idle_refresh_next_attempt.remove(&key);
adaptive_idle_since.remove(&key);
adaptive_recover_until.remove(&key);
info!( info!(
dc = %dc, dc = %dc,
?family, ?family,
@@ -632,6 +618,9 @@ async fn check_family(
restored += 1; restored += 1;
continue; continue;
} }
let base_req = pool_for_reconnect.required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false);
if alive + restored >= base_req {
pool_for_reconnect pool_for_reconnect
.stats .stats
.increment_me_floor_cap_block_total(); .increment_me_floor_cap_block_total();
@@ -648,6 +637,7 @@ async fn check_family(
); );
break; break;
} }
}
let res = tokio::time::timeout( let res = tokio::time::timeout(
pool_for_reconnect.reconnect_runtime.me_one_timeout, pool_for_reconnect.reconnect_runtime.me_one_timeout,
pool_for_reconnect.connect_endpoints_round_robin( pool_for_reconnect.connect_endpoints_round_robin(
@@ -904,8 +894,6 @@ async fn build_family_floor_plan(
live_addr_counts: &HashMap<(i32, SocketAddr), usize>, live_addr_counts: &HashMap<(i32, SocketAddr), usize>,
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>, live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
bound_clients_by_writer: &HashMap<u64, usize>, bound_clients_by_writer: &HashMap<u64, usize>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> FamilyFloorPlan { ) -> FamilyFloorPlan {
let mut entries = Vec::<DcFloorPlanEntry>::new(); let mut entries = Vec::<DcFloorPlanEntry>::new();
let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new(); let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new();
@@ -921,18 +909,7 @@ async fn build_family_floor_plan(
if endpoints.is_empty() { if endpoints.is_empty() {
continue; continue;
} }
let key = (*dc, family); let _key = (*dc, family);
let reduce_for_idle = should_reduce_floor_for_idle(
pool,
key,
*dc,
endpoints,
live_writer_ids_by_addr,
bound_clients_by_writer,
adaptive_idle_since,
adaptive_recover_until,
)
.await;
let base_required = pool.required_writers_for_dc(endpoints.len()).max(1); let base_required = pool.required_writers_for_dc(endpoints.len()).max(1);
let min_required = if is_adaptive { let min_required = if is_adaptive {
adaptive_floor_class_min(pool, endpoints.len(), base_required) adaptive_floor_class_min(pool, endpoints.len(), base_required)
@@ -947,11 +924,11 @@ async fn build_family_floor_plan(
if max_required < min_required { if max_required < min_required {
max_required = min_required; max_required = min_required;
} }
let desired_raw = if is_adaptive && reduce_for_idle { // We initialize target_required at base_required to prevent 0-writer blackouts
min_required // caused by proactively dropping an idle DC to a single fragile connection.
} else { // The Adaptive Floor constraint loop below will gracefully compress idle DCs
base_required // (prioritized via has_bound_clients = false) to min_required only when global capacity is reached.
}; let desired_raw = base_required;
let target_required = desired_raw.clamp(min_required, max_required); let target_required = desired_raw.clamp(min_required, max_required);
let alive = endpoints let alive = endpoints
.iter() .iter()
@@ -1278,43 +1255,6 @@ async fn maybe_refresh_idle_writer_for_dc(
); );
} }
async fn should_reduce_floor_for_idle(
pool: &Arc<MePool>,
key: (i32, IpFamily),
dc: i32,
endpoints: &[SocketAddr],
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
bound_clients_by_writer: &HashMap<u64, usize>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
if pool.floor_mode() != MeFloorMode::Adaptive {
adaptive_idle_since.remove(&key);
adaptive_recover_until.remove(&key);
return false;
}
let now = Instant::now();
let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr);
let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer);
if has_bound_clients {
adaptive_idle_since.remove(&key);
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
return false;
}
if let Some(recover_until) = adaptive_recover_until.get(&key)
&& now < *recover_until
{
adaptive_idle_since.remove(&key);
return false;
}
adaptive_recover_until.remove(&key);
let idle_since = adaptive_idle_since.entry(key).or_insert(now);
now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
}
fn has_bound_clients_on_endpoint( fn has_bound_clients_on_endpoint(
writer_ids: &[u64], writer_ids: &[u64],
bound_clients_by_writer: &HashMap<u64, usize>, bound_clients_by_writer: &HashMap<u64, usize>,

View File

@@ -1422,22 +1422,6 @@ impl MePool {
MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed)) MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed))
} }
pub(super) fn adaptive_floor_idle_duration(&self) -> Duration {
Duration::from_secs(
self.floor_runtime
.me_adaptive_floor_idle_secs
.load(Ordering::Relaxed),
)
}
pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration {
Duration::from_secs(
self.floor_runtime
.me_adaptive_floor_recover_grace_secs
.load(Ordering::Relaxed),
)
}
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
(self (self
.floor_runtime .floor_runtime
@@ -1659,6 +1643,7 @@ impl MePool {
&self, &self,
contour: WriterContour, contour: WriterContour,
allow_coverage_override: bool, allow_coverage_override: bool,
writer_dc: i32,
) -> bool { ) -> bool {
let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await; let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await;
match contour { match contour {
@@ -1670,6 +1655,33 @@ impl MePool {
if !allow_coverage_override { if !allow_coverage_override {
return false; return false;
} }
let mut endpoints_len = 0;
let now_epoch = Self::now_epoch_secs();
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) {
if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) {
endpoints_len += addrs.len();
}
}
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) {
if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) {
endpoints_len += addrs.len();
}
}
if endpoints_len > 0 {
let base_req = self.required_writers_for_dc_with_floor_mode(endpoints_len, false);
let active_for_dc = {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| !w.draining.load(std::sync::atomic::Ordering::Relaxed) && w.writer_dc == writer_dc)
.count()
};
if active_for_dc < base_req {
return true;
}
}
let coverage_required = self.active_coverage_required_total().await; let coverage_required = self.active_coverage_required_total().await;
active_writers < coverage_required active_writers < coverage_required
} }

View File

@@ -342,7 +342,7 @@ impl MePool {
allow_coverage_override: bool, allow_coverage_override: bool,
) -> Result<()> { ) -> Result<()> {
if !self if !self
.can_open_writer_for_contour(contour, allow_coverage_override) .can_open_writer_for_contour(contour, allow_coverage_override, writer_dc)
.await .await
{ {
return Err(ProxyError::Proxy(format!( return Err(ProxyError::Proxy(format!(