From 1c3e0d4e46b1809ecaabcf80bb2442ee908dea5d Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:43:02 +0300 Subject: [PATCH] ME Reinit Core Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool.rs | 38 +++++++++++-------- src/transport/middle_proxy/pool_reinit.rs | 45 +++++++++++++++-------- src/transport/middle_proxy/pool_status.rs | 12 ++++-- 3 files changed, 59 insertions(+), 36 deletions(-) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index fdda988..9ebc412 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -265,9 +265,20 @@ pub struct RoutingCore { pub(super) preferred_endpoints_by_dc: ArcSwap>>, } +pub(super) struct ReinitCore { + pub(super) generation: AtomicU64, + pub(super) active_generation: AtomicU64, + pub(super) warm_generation: AtomicU64, + pub(super) pending_hardswap_generation: AtomicU64, + pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64, + pub(super) pending_hardswap_map_hash: AtomicU64, + pub(super) hardswap: AtomicBool, +} + #[allow(dead_code)] pub struct MePool { pub(super) routing: Arc, + pub(super) reinit: Arc, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, @@ -343,13 +354,6 @@ pub struct MePool { pub(super) conn_count: AtomicUsize, pub(super) draining_active_runtime: AtomicU64, pub(super) stats: Arc, - pub(super) generation: AtomicU64, - pub(super) active_generation: AtomicU64, - pub(super) warm_generation: AtomicU64, - pub(super) pending_hardswap_generation: AtomicU64, - pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64, - pub(super) pending_hardswap_map_hash: AtomicU64, - pub(super) hardswap: AtomicBool, pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, @@ -543,6 +547,15 @@ impl MePool { writer_epoch, preferred_endpoints_by_dc: ArcSwap::from_pointee(preferred_endpoints_by_dc), }), + reinit: Arc::new(ReinitCore { + generation: AtomicU64::new(1), + active_generation: AtomicU64::new(1), + warm_generation: AtomicU64::new(0), + pending_hardswap_generation: AtomicU64::new(0), + pending_hardswap_started_at_epoch_secs: AtomicU64::new(0), + pending_hardswap_map_hash: AtomicU64::new(0), + hardswap: AtomicBool::new(hardswap), + }), decision, upstream, rng, @@ -664,13 +677,6 @@ impl MePool { refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), conn_count: AtomicUsize::new(0), draining_active_runtime: AtomicU64::new(0), - generation: AtomicU64::new(1), - active_generation: AtomicU64::new(1), - warm_generation: AtomicU64::new(0), - pending_hardswap_generation: AtomicU64::new(0), - pending_hardswap_started_at_epoch_secs: AtomicU64::new(0), - pending_hardswap_map_hash: AtomicU64::new(0), - hardswap: AtomicBool::new(hardswap), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), @@ -750,7 +756,7 @@ impl MePool { } pub fn current_generation(&self) -> u64 { - self.active_generation.load(Ordering::Relaxed) + self.reinit.active_generation.load(Ordering::Relaxed) } pub fn set_runtime_ready(&self, ready: bool) { @@ -934,7 +940,7 @@ impl MePool { me_health_interval_ms_healthy: u64, me_warn_rate_limit_ms: u64, ) { - self.hardswap.store(hardswap, Ordering::Relaxed); + self.reinit.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs .store(drain_ttl_secs, Ordering::Relaxed); self.me_instadrain.store(instadrain, Ordering::Relaxed); diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 663007b..be08649 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -37,16 +37,21 @@ impl MePool { } fn clear_pending_hardswap_state(&self) { - self.pending_hardswap_generation.store(0, Ordering::Relaxed); - self.pending_hardswap_started_at_epoch_secs + self.reinit.pending_hardswap_generation.store(0, Ordering::Relaxed); + self.reinit + .pending_hardswap_started_at_epoch_secs .store(0, Ordering::Relaxed); - self.pending_hardswap_map_hash.store(0, Ordering::Relaxed); - self.warm_generation.store(0, Ordering::Relaxed); + self.reinit + .pending_hardswap_map_hash + .store(0, Ordering::Relaxed); + self.reinit.warm_generation.store(0, Ordering::Relaxed); } async fn promote_warm_generation_to_active(&self, generation: u64) { - self.active_generation.store(generation, Ordering::Relaxed); - self.warm_generation.store(0, Ordering::Relaxed); + self.reinit + .active_generation + .store(generation, Ordering::Relaxed); + self.reinit.warm_generation.store(0, Ordering::Relaxed); let ws = self.writers.read().await; for writer in ws.iter() { @@ -369,13 +374,17 @@ impl MePool { let desired_map_hash = Self::desired_map_hash(&desired_by_dc); let previous_generation = self.current_generation(); - let hardswap = self.hardswap.load(Ordering::Relaxed); + let hardswap = self.reinit.hardswap.load(Ordering::Relaxed); let generation = if hardswap { - let pending_generation = self.pending_hardswap_generation.load(Ordering::Relaxed); + let pending_generation = self + .reinit + .pending_hardswap_generation + .load(Ordering::Relaxed); let pending_started_at = self + .reinit .pending_hardswap_started_at_epoch_secs .load(Ordering::Relaxed); - let pending_map_hash = self.pending_hardswap_map_hash.load(Ordering::Relaxed); + let pending_map_hash = self.reinit.pending_hardswap_map_hash.load(Ordering::Relaxed); let pending_age_secs = now_epoch_secs.saturating_sub(pending_started_at); let pending_ttl_expired = pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS; @@ -405,24 +414,28 @@ impl MePool { "ME hardswap pending generation expired by TTL; starting fresh generation" ); } - let next_generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1; - self.pending_hardswap_generation + let next_generation = self.reinit.generation.fetch_add(1, Ordering::Relaxed) + 1; + self.reinit + .pending_hardswap_generation .store(next_generation, Ordering::Relaxed); - self.pending_hardswap_started_at_epoch_secs + self.reinit + .pending_hardswap_started_at_epoch_secs .store(now_epoch_secs, Ordering::Relaxed); - self.pending_hardswap_map_hash + self.reinit + .pending_hardswap_map_hash .store(desired_map_hash, Ordering::Relaxed); - self.warm_generation + self.reinit + .warm_generation .store(next_generation, Ordering::Relaxed); next_generation } } else { self.clear_pending_hardswap_state(); - self.generation.fetch_add(1, Ordering::Relaxed) + 1 + self.reinit.generation.fetch_add(1, Ordering::Relaxed) + 1 }; if hardswap { - self.warm_generation.store(generation, Ordering::Relaxed); + self.reinit.warm_generation.store(generation, Ordering::Relaxed); self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc) .await; } else { diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index afb8efe..e636b34 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -436,6 +436,7 @@ impl MePool { let now = Instant::now(); let now_epoch_secs = Self::now_epoch_secs(); let pending_started_at = self + .reinit .pending_hardswap_started_at_epoch_secs .load(Ordering::Relaxed); let pending_hardswap_age_secs = @@ -477,11 +478,14 @@ impl MePool { } MeApiRuntimeSnapshot { - active_generation: self.active_generation.load(Ordering::Relaxed), - warm_generation: self.warm_generation.load(Ordering::Relaxed), - pending_hardswap_generation: self.pending_hardswap_generation.load(Ordering::Relaxed), + active_generation: self.reinit.active_generation.load(Ordering::Relaxed), + warm_generation: self.reinit.warm_generation.load(Ordering::Relaxed), + pending_hardswap_generation: self + .reinit + .pending_hardswap_generation + .load(Ordering::Relaxed), pending_hardswap_age_secs, - hardswap_enabled: self.hardswap.load(Ordering::Relaxed), + hardswap_enabled: self.reinit.hardswap.load(Ordering::Relaxed), floor_mode: floor_mode_label(self.floor_mode()), adaptive_floor_idle_secs: self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed), adaptive_floor_min_writers_single_endpoint: self