ME Pool Health + Rotation

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-24 22:59:59 +03:00
parent 7d7ef84868
commit 4a95f6d195
7 changed files with 424 additions and 116 deletions

View File

@@ -75,6 +75,7 @@ pub struct MePool {
pub(super) rtt_stats: Arc<Mutex<HashMap<u64, (f64, f64)>>>,
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
pub(super) writer_available: Arc<Notify>,
pub(super) refill_inflight: Arc<Mutex<HashSet<SocketAddr>>>,
pub(super) conn_count: AtomicUsize,
pub(super) stats: Arc<crate::stats::Stats>,
pub(super) generation: AtomicU64,
@@ -180,6 +181,7 @@ impl MePool {
rtt_stats: Arc::new(Mutex::new(HashMap::new())),
nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())),
writer_available: Arc::new(Notify::new()),
refill_inflight: Arc::new(Mutex::new(HashSet::new())),
conn_count: AtomicUsize::new(0),
generation: AtomicU64::new(1),
hardswap: AtomicBool::new(hardswap),
@@ -324,34 +326,66 @@ impl MePool {
out
}
pub(super) fn required_writers_for_dc(endpoint_count: usize) -> usize {
endpoint_count.max(3)
}
pub(super) async fn connect_endpoints_round_robin(
self: &Arc<Self>,
endpoints: &[SocketAddr],
rng: &SecureRandom,
) -> bool {
if endpoints.is_empty() {
return false;
}
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % endpoints.len();
for offset in 0..endpoints.len() {
let idx = (start + offset) % endpoints.len();
let addr = endpoints[idx];
match self.connect_one(addr, rng).await {
Ok(()) => return true,
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
}
}
false
}
async fn warmup_generation_for_all_dcs(
self: &Arc<Self>,
rng: &SecureRandom,
generation: u64,
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
) {
for endpoints in desired_by_dc.values() {
for (dc, endpoints) in desired_by_dc {
if endpoints.is_empty() {
continue;
}
let has_fresh = {
let ws = self.writers.read().await;
ws.iter().any(|w| {
!w.draining.load(Ordering::Relaxed)
&& w.generation == generation
&& endpoints.contains(&w.addr)
})
};
let mut endpoint_list: Vec<SocketAddr> = endpoints.iter().copied().collect();
endpoint_list.sort_unstable();
let required = Self::required_writers_for_dc(endpoint_list.len());
if has_fresh {
continue;
}
loop {
let fresh_count = {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| endpoints.contains(&w.addr))
.count()
};
if fresh_count >= required {
break;
}
let mut shuffled: Vec<SocketAddr> = endpoints.iter().copied().collect();
shuffled.shuffle(&mut rand::rng());
for addr in shuffled {
if self.connect_one(addr, rng).await.is_ok() {
if !self.connect_endpoints_round_robin(&endpoint_list, rng).await {
warn!(
dc = *dc,
fresh_count,
required,
endpoint_count = endpoint_list.len(),
"ME warmup stopped: unable to reach required writer floor for DC"
);
break;
}
}
@@ -364,7 +398,7 @@ impl MePool {
) {
let desired_by_dc = self.desired_dc_endpoints().await;
if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty after update; skipping stale writer drain");
warn!("ME endpoint map is empty; skipping stale writer drain");
return;
}
@@ -403,19 +437,26 @@ impl MePool {
}
if hardswap {
let fresh_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.map(|w| w.addr)
.collect();
let (fresh_ratio, fresh_missing_dc) =
Self::coverage_ratio(&desired_by_dc, &fresh_writer_addrs);
let mut fresh_missing_dc = Vec::<(i32, usize, usize)>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
let required = Self::required_writers_for_dc(endpoints.len());
let fresh_count = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| w.generation == generation)
.filter(|w| endpoints.contains(&w.addr))
.count();
if fresh_count < required {
fresh_missing_dc.push((*dc, fresh_count, required));
}
}
if !fresh_missing_dc.is_empty() {
warn!(
previous_generation,
generation,
fresh_ratio = format_args!("{fresh_ratio:.3}"),
missing_dc = ?fresh_missing_dc,
"ME hardswap pending: fresh generation coverage incomplete"
);
@@ -425,7 +466,7 @@ impl MePool {
warn!(
missing_dc = ?missing_dc,
// Keep stale writers alive when fresh coverage is incomplete.
"ME reinit coverage incomplete after map update; keeping stale writers"
"ME reinit coverage incomplete; keeping stale writers"
);
return;
}
@@ -450,7 +491,7 @@ impl MePool {
drop(writers);
if stale_writer_ids.is_empty() {
debug!("ME map update completed with no stale writers");
debug!("ME reinit cycle completed with no stale writers");
return;
}
@@ -464,7 +505,7 @@ impl MePool {
coverage_ratio = format_args!("{coverage_ratio:.3}"),
min_ratio = format_args!("{min_ratio:.3}"),
drain_timeout_secs,
"ME map update covered; draining stale writers"
"ME reinit cycle covered; draining stale writers"
);
self.stats.increment_pool_swap_total();
for writer_id in stale_writer_ids {
@@ -473,6 +514,134 @@ impl MePool {
}
}
pub async fn zero_downtime_reinit_periodic(
self: &Arc<Self>,
rng: &SecureRandom,
) {
self.zero_downtime_reinit_after_map_change(rng).await;
}
async fn endpoints_for_same_dc(&self, addr: SocketAddr) -> Vec<SocketAddr> {
let mut target_dc = HashSet::<i32>::new();
let mut endpoints = HashSet::<SocketAddr>::new();
if self.decision.ipv4_me {
let map = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in &map {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port));
}
}
}
}
}
if self.decision.ipv6_me {
let map = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in &map {
if addrs
.iter()
.any(|(ip, port)| SocketAddr::new(*ip, *port) == addr)
{
target_dc.insert(dc.abs());
}
}
for dc in &target_dc {
for key in [*dc, -*dc] {
if let Some(addrs) = map.get(&key) {
for (ip, port) in addrs {
endpoints.insert(SocketAddr::new(*ip, *port));
}
}
}
}
}
let mut sorted: Vec<SocketAddr> = endpoints.into_iter().collect();
sorted.sort_unstable();
sorted
}
async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr) -> bool {
let fast_retries = self.me_reconnect_fast_retry_count.max(1);
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
match self.connect_one(addr, self.rng.as_ref()).await {
Ok(()) => {
self.stats.increment_me_reconnect_success();
info!(
%addr,
attempt = attempt + 1,
"ME writer restored on the same endpoint"
);
return true;
}
Err(e) => {
debug!(
%addr,
attempt = attempt + 1,
error = %e,
"ME immediate same-endpoint reconnect failed"
);
}
}
}
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
if dc_endpoints.is_empty() {
return false;
}
for attempt in 0..fast_retries {
self.stats.increment_me_reconnect_attempt();
if self
.connect_endpoints_round_robin(&dc_endpoints, self.rng.as_ref())
.await
{
self.stats.increment_me_reconnect_success();
info!(
%addr,
attempt = attempt + 1,
"ME writer restored via DC fallback endpoint"
);
return true;
}
}
false
}
pub(crate) fn trigger_immediate_refill(self: &Arc<Self>, addr: SocketAddr) {
let pool = Arc::clone(self);
tokio::spawn(async move {
{
let mut guard = pool.refill_inflight.lock().await;
if !guard.insert(addr) {
return;
}
}
let restored = pool.refill_writer_after_loss(addr).await;
if !restored {
warn!(%addr, "ME immediate refill failed");
}
let mut guard = pool.refill_inflight.lock().await;
guard.remove(&addr);
});
}
pub async fn update_proxy_maps(
&self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
@@ -880,16 +1049,21 @@ impl MePool {
}
}
async fn remove_writer_only(&self, writer_id: u64) -> Vec<BoundConn> {
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None;
let mut trigger_refill = false;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
let w = ws.remove(pos);
if w.draining.load(Ordering::Relaxed) {
let was_draining = w.draining.load(Ordering::Relaxed);
if was_draining {
self.stats.decrement_pool_drain_active();
}
w.cancel.cancel();
removed_addr = Some(w.addr);
trigger_refill = !was_draining;
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
}
@@ -897,6 +1071,11 @@ impl MePool {
if let Some(tx) = close_tx {
let _ = tx.send(WriterCommand::Close).await;
}
if trigger_refill
&& let Some(addr) = removed_addr
{
self.trigger_immediate_refill(addr);
}
self.rtt_stats.lock().await.remove(&writer_id);
self.registry.writer_lost(writer_id).await
}