Merge pull request #647 from miniusercoder/flow

fix(me): stabilize single-endpoint DC writer recovery and floor behavior
This commit is contained in:
Alexey
2026-04-12 10:19:25 +03:00
committed by GitHub
4 changed files with 113 additions and 114 deletions

View File

@@ -67,10 +67,8 @@ struct FamilyReconnectOutcome {
key: (i32, IpFamily), key: (i32, IpFamily),
dc: i32, dc: i32,
family: IpFamily, family: IpFamily,
alive: usize,
required: usize, required: usize,
endpoint_count: usize, endpoint_count: usize,
restored: usize,
} }
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) { pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
@@ -82,8 +80,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new(); let mut single_endpoint_outage: HashSet<(i32, IpFamily)> = HashSet::new();
let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut shadow_rotate_deadline: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut degraded_interval = true; let mut degraded_interval = true;
@@ -109,8 +105,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut single_endpoint_outage, &mut single_endpoint_outage,
&mut shadow_rotate_deadline, &mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt, &mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed, &mut floor_warn_next_allowed,
) )
.await; .await;
@@ -126,8 +120,6 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut single_endpoint_outage, &mut single_endpoint_outage,
&mut shadow_rotate_deadline, &mut shadow_rotate_deadline,
&mut idle_refresh_next_attempt, &mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed, &mut floor_warn_next_allowed,
) )
.await; .await;
@@ -360,8 +352,6 @@ async fn check_family(
single_endpoint_outage: &mut HashSet<(i32, IpFamily)>, single_endpoint_outage: &mut HashSet<(i32, IpFamily)>,
shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>, shadow_rotate_deadline: &mut HashMap<(i32, IpFamily), Instant>,
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool { ) -> bool {
let enabled = match family { let enabled = match family {
@@ -393,10 +383,7 @@ async fn check_family(
let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len());
let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget)); let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget));
if pool.floor_mode() == MeFloorMode::Static { if pool.floor_mode() == MeFloorMode::Static {}
adaptive_idle_since.clear();
adaptive_recover_until.clear();
}
let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new(); let mut live_addr_counts = HashMap::<(i32, SocketAddr), usize>::new();
let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec<u64>>::new(); let mut live_writer_ids_by_addr = HashMap::<(i32, SocketAddr), Vec<u64>>::new();
@@ -435,8 +422,6 @@ async fn check_family(
&live_addr_counts, &live_addr_counts,
&live_writer_ids_by_addr, &live_writer_ids_by_addr,
&bound_clients_by_writer, &bound_clients_by_writer,
adaptive_idle_since,
adaptive_recover_until,
) )
.await; .await;
pool.set_adaptive_floor_runtime_caps( pool.set_adaptive_floor_runtime_caps(
@@ -503,8 +488,6 @@ async fn check_family(
outage_next_attempt.remove(&key); outage_next_attempt.remove(&key);
shadow_rotate_deadline.remove(&key); shadow_rotate_deadline.remove(&key);
idle_refresh_next_attempt.remove(&key); idle_refresh_next_attempt.remove(&key);
adaptive_idle_since.remove(&key);
adaptive_recover_until.remove(&key);
info!( info!(
dc = %dc, dc = %dc,
?family, ?family,
@@ -632,6 +615,10 @@ async fn check_family(
restored += 1; restored += 1;
continue; continue;
} }
let base_req = pool_for_reconnect
.required_writers_for_dc_with_floor_mode(endpoints_for_dc.len(), false);
if alive + restored >= base_req {
pool_for_reconnect pool_for_reconnect
.stats .stats
.increment_me_floor_cap_block_total(); .increment_me_floor_cap_block_total();
@@ -648,6 +635,8 @@ async fn check_family(
); );
break; break;
} }
}
pool_for_reconnect.stats.increment_me_reconnect_attempt();
let res = tokio::time::timeout( let res = tokio::time::timeout(
pool_for_reconnect.reconnect_runtime.me_one_timeout, pool_for_reconnect.reconnect_runtime.me_one_timeout,
pool_for_reconnect.connect_endpoints_round_robin( pool_for_reconnect.connect_endpoints_round_robin(
@@ -663,11 +652,9 @@ async fn check_family(
pool_for_reconnect.stats.increment_me_reconnect_success(); pool_for_reconnect.stats.increment_me_reconnect_success();
} }
Ok(false) => { Ok(false) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME round-robin reconnect failed") debug!(dc = %dc, ?family, "ME round-robin reconnect failed")
} }
Err(_) => { Err(_) => {
pool_for_reconnect.stats.increment_me_reconnect_attempt();
debug!(dc = %dc, ?family, "ME reconnect timed out"); debug!(dc = %dc, ?family, "ME reconnect timed out");
} }
} }
@@ -678,10 +665,8 @@ async fn check_family(
key, key,
dc, dc,
family, family,
alive,
required, required,
endpoint_count: endpoints_for_dc.len(), endpoint_count: endpoints_for_dc.len(),
restored,
} }
}); });
} }
@@ -695,7 +680,7 @@ async fn check_family(
} }
}; };
let now = Instant::now(); let now = Instant::now();
let now_alive = outcome.alive + outcome.restored; let now_alive = live_active_writers_for_dc_family(pool, outcome.dc, outcome.family).await;
if now_alive >= outcome.required { if now_alive >= outcome.required {
info!( info!(
dc = %outcome.dc, dc = %outcome.dc,
@@ -851,6 +836,33 @@ fn should_emit_rate_limited_warn(
false false
} }
async fn live_active_writers_for_dc_family(pool: &Arc<MePool>, dc: i32, family: IpFamily) -> usize {
let writers = pool.writers.read().await;
writers
.iter()
.filter(|writer| {
if writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
return false;
}
if writer.writer_dc != dc {
return false;
}
if !matches!(
super::pool::WriterContour::from_u8(
writer.contour.load(std::sync::atomic::Ordering::Relaxed),
),
super::pool::WriterContour::Active
) {
return false;
}
match family {
IpFamily::V4 => writer.addr.is_ipv4(),
IpFamily::V6 => writer.addr.is_ipv6(),
}
})
.count()
}
fn adaptive_floor_class_min( fn adaptive_floor_class_min(
pool: &Arc<MePool>, pool: &Arc<MePool>,
endpoint_count: usize, endpoint_count: usize,
@@ -904,8 +916,6 @@ async fn build_family_floor_plan(
live_addr_counts: &HashMap<(i32, SocketAddr), usize>, live_addr_counts: &HashMap<(i32, SocketAddr), usize>,
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>, live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
bound_clients_by_writer: &HashMap<u64, usize>, bound_clients_by_writer: &HashMap<u64, usize>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> FamilyFloorPlan { ) -> FamilyFloorPlan {
let mut entries = Vec::<DcFloorPlanEntry>::new(); let mut entries = Vec::<DcFloorPlanEntry>::new();
let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new(); let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new();
@@ -921,18 +931,7 @@ async fn build_family_floor_plan(
if endpoints.is_empty() { if endpoints.is_empty() {
continue; continue;
} }
let key = (*dc, family); let _key = (*dc, family);
let reduce_for_idle = should_reduce_floor_for_idle(
pool,
key,
*dc,
endpoints,
live_writer_ids_by_addr,
bound_clients_by_writer,
adaptive_idle_since,
adaptive_recover_until,
)
.await;
let base_required = pool.required_writers_for_dc(endpoints.len()).max(1); let base_required = pool.required_writers_for_dc(endpoints.len()).max(1);
let min_required = if is_adaptive { let min_required = if is_adaptive {
adaptive_floor_class_min(pool, endpoints.len(), base_required) adaptive_floor_class_min(pool, endpoints.len(), base_required)
@@ -947,11 +946,11 @@ async fn build_family_floor_plan(
if max_required < min_required { if max_required < min_required {
max_required = min_required; max_required = min_required;
} }
let desired_raw = if is_adaptive && reduce_for_idle { // We initialize target_required at base_required to prevent 0-writer blackouts
min_required // caused by proactively dropping an idle DC to a single fragile connection.
} else { // The Adaptive Floor constraint loop below will gracefully compress idle DCs
base_required // (prioritized via has_bound_clients = false) to min_required only when global capacity is reached.
}; let desired_raw = base_required;
let target_required = desired_raw.clamp(min_required, max_required); let target_required = desired_raw.clamp(min_required, max_required);
let alive = endpoints let alive = endpoints
.iter() .iter()
@@ -1278,43 +1277,6 @@ async fn maybe_refresh_idle_writer_for_dc(
); );
} }
async fn should_reduce_floor_for_idle(
pool: &Arc<MePool>,
key: (i32, IpFamily),
dc: i32,
endpoints: &[SocketAddr],
live_writer_ids_by_addr: &HashMap<(i32, SocketAddr), Vec<u64>>,
bound_clients_by_writer: &HashMap<u64, usize>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
if pool.floor_mode() != MeFloorMode::Adaptive {
adaptive_idle_since.remove(&key);
adaptive_recover_until.remove(&key);
return false;
}
let now = Instant::now();
let writer_ids = list_writer_ids_for_endpoints(dc, endpoints, live_writer_ids_by_addr);
let has_bound_clients = has_bound_clients_on_endpoint(&writer_ids, bound_clients_by_writer);
if has_bound_clients {
adaptive_idle_since.remove(&key);
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
return false;
}
if let Some(recover_until) = adaptive_recover_until.get(&key)
&& now < *recover_until
{
adaptive_idle_since.remove(&key);
return false;
}
adaptive_recover_until.remove(&key);
let idle_since = adaptive_idle_since.entry(key).or_insert(now);
now.saturating_duration_since(*idle_since) >= pool.adaptive_floor_idle_duration()
}
fn has_bound_clients_on_endpoint( fn has_bound_clients_on_endpoint(
writer_ids: &[u64], writer_ids: &[u64],
bound_clients_by_writer: &HashMap<u64, usize>, bound_clients_by_writer: &HashMap<u64, usize>,
@@ -1364,6 +1326,7 @@ async fn recover_single_endpoint_outage(
); );
return; return;
}; };
pool.stats.increment_me_reconnect_attempt();
pool.stats pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total(); .increment_me_single_endpoint_outage_reconnect_attempt_total();
@@ -1439,7 +1402,6 @@ async fn recover_single_endpoint_outage(
return; return;
} }
pool.stats.increment_me_reconnect_attempt();
let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms); let current_ms = *outage_backoff.get(&key).unwrap_or(&min_backoff_ms);
let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms); let next_ms = current_ms.saturating_mul(2).min(max_backoff_ms);
outage_backoff.insert(key, next_ms); outage_backoff.insert(key, next_ms);

View File

@@ -1422,22 +1422,6 @@ impl MePool {
MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed)) MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed))
} }
pub(super) fn adaptive_floor_idle_duration(&self) -> Duration {
Duration::from_secs(
self.floor_runtime
.me_adaptive_floor_idle_secs
.load(Ordering::Relaxed),
)
}
pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration {
Duration::from_secs(
self.floor_runtime
.me_adaptive_floor_recover_grace_secs
.load(Ordering::Relaxed),
)
}
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
(self (self
.floor_runtime .floor_runtime
@@ -1659,6 +1643,7 @@ impl MePool {
&self, &self,
contour: WriterContour, contour: WriterContour,
allow_coverage_override: bool, allow_coverage_override: bool,
writer_dc: i32,
) -> bool { ) -> bool {
let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await; let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await;
match contour { match contour {
@@ -1670,6 +1655,43 @@ impl MePool {
if !allow_coverage_override { if !allow_coverage_override {
return false; return false;
} }
let mut endpoints_len = 0;
let now_epoch = Self::now_epoch_secs();
if self.family_enabled_for_drain_coverage(IpFamily::V4, now_epoch) {
if let Some(addrs) = self.proxy_map_v4.read().await.get(&writer_dc) {
endpoints_len += addrs.len();
}
}
if self.family_enabled_for_drain_coverage(IpFamily::V6, now_epoch) {
if let Some(addrs) = self.proxy_map_v6.read().await.get(&writer_dc) {
endpoints_len += addrs.len();
}
}
if endpoints_len > 0 {
let base_req =
self.required_writers_for_dc_with_floor_mode(endpoints_len, false);
let active_for_dc = {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| {
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
&& w.writer_dc == writer_dc
&& matches!(
WriterContour::from_u8(
w.contour.load(std::sync::atomic::Ordering::Relaxed),
),
WriterContour::Active
)
})
.count()
};
if active_for_dc < base_req {
return true;
}
}
let coverage_required = self.active_coverage_required_total().await; let coverage_required = self.active_coverage_required_total().await;
active_writers < coverage_required active_writers < coverage_required
} }

View File

@@ -77,6 +77,12 @@ impl MePool {
return Vec::new(); return Vec::new();
} }
if endpoints.len() == 1 && self.single_endpoint_outage_disable_quarantine() {
let mut guard = self.endpoint_quarantine.lock().await;
guard.retain(|_, expiry| *expiry > Instant::now());
return endpoints.to_vec();
}
let mut guard = self.endpoint_quarantine.lock().await; let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now(); let now = Instant::now();
guard.retain(|_, expiry| *expiry > now); guard.retain(|_, expiry| *expiry > now);
@@ -236,8 +242,18 @@ impl MePool {
let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1); let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1);
let mut total_attempts = 0u32; let mut total_attempts = 0u32;
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
let single_endpoint_dc = dc_endpoints.len() == 1 && dc_endpoints[0] == addr;
let bypass_quarantine_for_single_endpoint =
single_endpoint_dc && self.single_endpoint_outage_disable_quarantine();
if !same_endpoint_quarantined { if !same_endpoint_quarantined || bypass_quarantine_for_single_endpoint {
if same_endpoint_quarantined && bypass_quarantine_for_single_endpoint {
debug!(
%addr,
"Bypassing quarantine for immediate reconnect on single-endpoint DC"
);
}
for attempt in 0..fast_retries { for attempt in 0..fast_retries {
if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP {
break; break;
@@ -276,7 +292,6 @@ impl MePool {
); );
} }
let dc_endpoints = self.endpoints_for_dc(writer_dc).await;
if dc_endpoints.is_empty() { if dc_endpoints.is_empty() {
self.stats.increment_me_refill_failed_total(); self.stats.increment_me_refill_failed_total();
return false; return false;

View File

@@ -342,7 +342,7 @@ impl MePool {
allow_coverage_override: bool, allow_coverage_override: bool,
) -> Result<()> { ) -> Result<()> {
if !self if !self
.can_open_writer_for_contour(contour, allow_coverage_override) .can_open_writer_for_contour(contour, allow_coverage_override, writer_dc)
.await .await
{ {
return Err(ProxyError::Proxy(format!( return Err(ProxyError::Proxy(format!(