From 49f4a7bb2218e0c8fc55fdae2624491c784f37cb Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 2 Mar 2026 00:39:18 +0300 Subject: [PATCH] ME Hardswap Generation stability Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/health.rs | 13 +- src/transport/middle_proxy/pool.rs | 5 + src/transport/middle_proxy/pool_refill.rs | 137 ++++++++++++++++++---- src/transport/middle_proxy/pool_reinit.rs | 26 +++- src/transport/middle_proxy/pool_writer.rs | 6 + src/transport/middle_proxy/reader.rs | 6 +- 6 files changed, 165 insertions(+), 28 deletions(-) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 06cca03..afa96c6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -112,7 +112,18 @@ async fn check_family( let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize; if *inflight.get(&key).unwrap_or(&0) >= max_concurrent { - return; + continue; + } + if pool.has_refill_inflight_for_endpoints(&endpoints).await { + debug!( + dc = %dc, + ?family, + alive, + required, + endpoint_count = endpoints.len(), + "Skipping health reconnect: immediate refill is already in flight for this DC group" + ); + continue; } *inflight.entry(key).or_insert(0) += 1; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index d87430a..c3e7533 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -21,6 +21,7 @@ pub struct MeWriter { pub id: u64, pub addr: SocketAddr, pub generation: u64, + pub created_at: Instant, pub tx: mpsc::Sender, pub cancel: CancellationToken, pub degraded: Arc, @@ -82,7 +83,9 @@ pub struct MePool { pub(super) conn_count: AtomicUsize, pub(super) stats: Arc, pub(super) generation: AtomicU64, + pub(super) pending_hardswap_generation: AtomicU64, pub(super) hardswap: AtomicBool, + pub(super) endpoint_quarantine: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, @@ -232,7 +235,9 @@ impl MePool { refill_inflight: Arc::new(Mutex::new(HashSet::new())), conn_count: AtomicUsize::new(0), generation: AtomicU64::new(1), + pending_hardswap_generation: AtomicU64::new(0), hardswap: AtomicBool::new(hardswap), + endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 6dea6c9..a286e65 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -2,6 +2,7 @@ use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::Ordering; +use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; @@ -9,19 +10,97 @@ use crate::crypto::SecureRandom; use super::pool::MePool; +const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20; +const ME_FLAP_QUARANTINE_SECS: u64 = 25; + impl MePool { + pub(super) async fn maybe_quarantine_flapping_endpoint( + &self, + addr: SocketAddr, + uptime: Duration, + ) { + if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) { + return; + } + + let until = Instant::now() + Duration::from_secs(ME_FLAP_QUARANTINE_SECS); + let mut guard = self.endpoint_quarantine.lock().await; + guard.retain(|_, expiry| *expiry > Instant::now()); + guard.insert(addr, until); + warn!( + %addr, + uptime_ms = uptime.as_millis(), + quarantine_secs = ME_FLAP_QUARANTINE_SECS, + "ME endpoint temporarily quarantined due to rapid writer flap" + ); + } + + async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool { + let mut guard = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + guard.retain(|_, expiry| *expiry > now); + guard.contains_key(&addr) + } + + async fn connectable_endpoints(&self, endpoints: &[SocketAddr]) -> Vec { + if endpoints.is_empty() { + return Vec::new(); + } + + let mut guard = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + guard.retain(|_, expiry| *expiry > now); + + let mut ready = Vec::::with_capacity(endpoints.len()); + let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; + for addr in endpoints { + if let Some(expiry) = guard.get(addr).copied() { + match earliest_quarantine { + Some((_, current_expiry)) if current_expiry <= expiry => {} + _ => earliest_quarantine = Some((*addr, expiry)), + } + } else { + ready.push(*addr); + } + } + + if !ready.is_empty() { + return ready; + } + + if let Some((addr, expiry)) = earliest_quarantine { + debug!( + %addr, + wait_ms = expiry.saturating_duration_since(now).as_millis(), + "All ME endpoints are quarantined for the DC group; retrying earliest one" + ); + return vec![addr]; + } + + Vec::new() + } + + pub(super) async fn has_refill_inflight_for_endpoints(&self, endpoints: &[SocketAddr]) -> bool { + if endpoints.is_empty() { + return false; + } + let guard = self.refill_inflight.lock().await; + endpoints.iter().any(|addr| guard.contains(addr)) + } + pub(super) async fn connect_endpoints_round_robin( self: &Arc, endpoints: &[SocketAddr], rng: &SecureRandom, ) -> bool { - if endpoints.is_empty() { + let candidates = self.connectable_endpoints(endpoints).await; + if candidates.is_empty() { return false; } - let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % endpoints.len(); - for offset in 0..endpoints.len() { - let idx = (start + offset) % endpoints.len(); - let addr = endpoints[idx]; + let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len(); + for offset in 0..candidates.len() { + let idx = (start + offset) % candidates.len(); + let addr = candidates[idx]; match self.connect_one(addr, rng).await { Ok(()) => return true, Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"), @@ -83,29 +162,37 @@ impl MePool { async fn refill_writer_after_loss(self: &Arc, addr: SocketAddr) -> bool { let fast_retries = self.me_reconnect_fast_retry_count.max(1); + let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; - for attempt in 0..fast_retries { - self.stats.increment_me_reconnect_attempt(); - match self.connect_one(addr, self.rng.as_ref()).await { - Ok(()) => { - self.stats.increment_me_reconnect_success(); - self.stats.increment_me_writer_restored_same_endpoint_total(); - info!( - %addr, - attempt = attempt + 1, - "ME writer restored on the same endpoint" - ); - return true; - } - Err(e) => { - debug!( - %addr, - attempt = attempt + 1, - error = %e, - "ME immediate same-endpoint reconnect failed" - ); + if !same_endpoint_quarantined { + for attempt in 0..fast_retries { + self.stats.increment_me_reconnect_attempt(); + match self.connect_one(addr, self.rng.as_ref()).await { + Ok(()) => { + self.stats.increment_me_reconnect_success(); + self.stats.increment_me_writer_restored_same_endpoint_total(); + info!( + %addr, + attempt = attempt + 1, + "ME writer restored on the same endpoint" + ); + return true; + } + Err(e) => { + debug!( + %addr, + attempt = attempt + 1, + error = %e, + "ME immediate same-endpoint reconnect failed" + ); + } } } + } else { + debug!( + %addr, + "Skipping immediate same-endpoint reconnect because endpoint is quarantined" + ); } let dc_endpoints = self.endpoints_for_same_dc(addr).await; diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 261ac02..5552fb6 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -266,8 +266,26 @@ impl MePool { } let previous_generation = self.current_generation(); - let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1; let hardswap = self.hardswap.load(Ordering::Relaxed); + let generation = if hardswap { + let pending_generation = self.pending_hardswap_generation.load(Ordering::Relaxed); + if pending_generation != 0 && pending_generation >= previous_generation { + debug!( + previous_generation, + generation = pending_generation, + "ME hardswap continues with pending generation" + ); + pending_generation + } else { + let next_generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1; + self.pending_hardswap_generation + .store(next_generation, Ordering::Relaxed); + next_generation + } + } else { + self.pending_hardswap_generation.store(0, Ordering::Relaxed); + self.generation.fetch_add(1, Ordering::Relaxed) + 1 + }; if hardswap { self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc) @@ -354,6 +372,9 @@ impl MePool { drop(writers); if stale_writer_ids.is_empty() { + if hardswap { + self.pending_hardswap_generation.store(0, Ordering::Relaxed); + } debug!("ME reinit cycle completed with no stale writers"); return; } @@ -375,6 +396,9 @@ impl MePool { self.mark_writer_draining_with_timeout(writer_id, drain_timeout, !hardswap) .await; } + if hardswap { + self.pending_hardswap_generation.store(0, Ordering::Relaxed); + } } pub async fn zero_downtime_reinit_periodic(self: &Arc, rng: &SecureRandom) { diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index a8cc5a5..77ab891 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -89,6 +89,7 @@ impl MePool { id: writer_id, addr, generation, + created_at: Instant::now(), tx: tx.clone(), cancel: cancel.clone(), degraded: degraded.clone(), @@ -249,6 +250,7 @@ impl MePool { async fn remove_writer_only(self: &Arc, writer_id: u64) -> Vec { let mut close_tx: Option> = None; let mut removed_addr: Option = None; + let mut removed_uptime: Option = None; let mut trigger_refill = false; { let mut ws = self.writers.write().await; @@ -261,6 +263,7 @@ impl MePool { self.stats.increment_me_writer_removed_total(); w.cancel.cancel(); removed_addr = Some(w.addr); + removed_uptime = Some(w.created_at.elapsed()); trigger_refill = !was_draining; if trigger_refill { self.stats.increment_me_writer_removed_unexpected_total(); @@ -275,6 +278,9 @@ impl MePool { if trigger_refill && let Some(addr) = removed_addr { + if let Some(uptime) = removed_uptime { + self.maybe_quarantine_flapping_endpoint(addr, uptime).await; + } self.trigger_immediate_refill(addr); } self.rtt_stats.lock().await.remove(&writer_id); diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index ea0dd75..632e34a 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::io::ErrorKind; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Instant; @@ -45,7 +46,10 @@ pub(crate) async fn reader_loop( _ = cancel.cancelled() => return Ok(()), }; if n == 0 { - return Ok(()); + return Err(ProxyError::Io(std::io::Error::new( + ErrorKind::UnexpectedEof, + "ME socket closed by peer", + ))); } raw.extend_from_slice(&tmp[..n]);