diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 6d51bc8..916f39c 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -273,6 +273,10 @@ pub(super) struct ReinitCore { pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64, pub(super) pending_hardswap_map_hash: AtomicU64, pub(super) hardswap: AtomicBool, + pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, + pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64, + pub(super) me_hardswap_warmup_extra_passes: AtomicU32, + pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64, } pub(super) struct WriterLifecycleCore { @@ -330,6 +334,11 @@ pub(super) struct SingleEndpointRuntimeCore { pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64, } +pub(super) struct BindingPolicyCore { + pub(super) me_bind_stale_mode: AtomicU8, + pub(super) me_bind_stale_ttl_secs: AtomicU64, +} + #[allow(dead_code)] pub struct MePool { pub(super) routing: Arc, @@ -339,6 +348,7 @@ pub struct MePool { pub(super) health_runtime: Arc, pub(super) drain_runtime: Arc, pub(super) single_endpoint_runtime: Arc, + pub(super) binding_policy: Arc, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, @@ -404,12 +414,6 @@ pub struct MePool { pub(super) stats: Arc, pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, - pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, - pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64, - pub(super) me_hardswap_warmup_extra_passes: AtomicU32, - pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64, - pub(super) me_bind_stale_mode: AtomicU8, - pub(super) me_bind_stale_ttl_secs: AtomicU64, pub(super) secret_atomic_snapshot: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool, pub(super) me_writer_pick_mode: AtomicU8, @@ -575,6 +579,14 @@ impl MePool { pending_hardswap_started_at_epoch_secs: AtomicU64::new(0), pending_hardswap_map_hash: AtomicU64::new(0), hardswap: AtomicBool::new(hardswap), + me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms), + me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms), + me_hardswap_warmup_extra_passes: AtomicU32::new( + me_hardswap_warmup_extra_passes as u32, + ), + me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new( + me_hardswap_warmup_pass_backoff_base_ms, + ), }), writer_lifecycle: Arc::new(WriterLifecycleCore { me_keepalive_enabled, @@ -666,6 +678,10 @@ impl MePool { me_single_endpoint_shadow_rotate_every_secs, ), }), + binding_policy: Arc::new(BindingPolicyCore { + me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()), + me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), + }), decision, upstream, rng, @@ -767,14 +783,6 @@ impl MePool { draining_active_runtime: AtomicU64::new(0), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), - me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms), - me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms), - me_hardswap_warmup_extra_passes: AtomicU32::new(me_hardswap_warmup_extra_passes as u32), - me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new( - me_hardswap_warmup_pass_backoff_base_ms, - ), - me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()), - me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), @@ -1035,17 +1043,23 @@ impl MePool { self.drain_runtime .me_pool_min_fresh_ratio_permille .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); - self.me_hardswap_warmup_delay_min_ms + self.reinit + .me_hardswap_warmup_delay_min_ms .store(hardswap_warmup_delay_min_ms, Ordering::Relaxed); - self.me_hardswap_warmup_delay_max_ms + self.reinit + .me_hardswap_warmup_delay_max_ms .store(hardswap_warmup_delay_max_ms, Ordering::Relaxed); - self.me_hardswap_warmup_extra_passes + self.reinit + .me_hardswap_warmup_extra_passes .store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed); - self.me_hardswap_warmup_pass_backoff_base_ms + self.reinit + .me_hardswap_warmup_pass_backoff_base_ms .store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed); - self.me_bind_stale_mode + self.binding_policy + .me_bind_stale_mode .store(bind_stale_mode.as_u8(), Ordering::Relaxed); - self.me_bind_stale_ttl_secs + self.binding_policy + .me_bind_stale_ttl_secs .store(bind_stale_ttl_secs, Ordering::Relaxed); self.secret_atomic_snapshot .store(secret_atomic_snapshot, Ordering::Relaxed); @@ -1294,7 +1308,11 @@ impl MePool { } pub(super) fn bind_stale_mode(&self) -> MeBindStaleMode { - MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) + MeBindStaleMode::from_u8( + self.binding_policy + .me_bind_stale_mode + .load(Ordering::Relaxed), + ) } pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode { diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 2bf2e5b..009f850 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -189,8 +189,14 @@ impl MePool { } fn hardswap_warmup_connect_delay_ms(&self) -> u64 { - let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed); - let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed); + let min_ms = self + .reinit + .me_hardswap_warmup_delay_min_ms + .load(Ordering::Relaxed); + let max_ms = self + .reinit + .me_hardswap_warmup_delay_max_ms + .load(Ordering::Relaxed); let (min_ms, max_ms) = if min_ms <= max_ms { (min_ms, max_ms) } else { @@ -204,6 +210,7 @@ impl MePool { fn hardswap_warmup_backoff_ms(&self, pass_idx: usize) -> u64 { let base_ms = self + .reinit .me_hardswap_warmup_pass_backoff_base_ms .load(Ordering::Relaxed); let cap_ms = (self.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms); @@ -249,6 +256,7 @@ impl MePool { desired_by_dc: &HashMap>, ) { let extra_passes = self + .reinit .me_hardswap_warmup_extra_passes .load(Ordering::Relaxed) .min(10) as usize; diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index d8d4a39..6cd5cec 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -587,7 +587,10 @@ impl MePool { .load(Ordering::Relaxed), ), me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()), - me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed), + me_bind_stale_ttl_secs: self + .binding_policy + .me_bind_stale_ttl_secs + .load(Ordering::Relaxed), me_single_endpoint_shadow_writers: self .single_endpoint_runtime .me_single_endpoint_shadow_writers diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index f697257..c1f3de9 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -681,7 +681,10 @@ impl MePool { MeBindStaleMode::Never => false, MeBindStaleMode::Always => true, MeBindStaleMode::Ttl => { - let ttl_secs = self.me_bind_stale_ttl_secs.load(Ordering::Relaxed); + let ttl_secs = self + .binding_policy + .me_bind_stale_ttl_secs + .load(Ordering::Relaxed); if ttl_secs == 0 { return true; }