ME Reinit Core advancing + Binding Policy Core

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 20:35:57 +03:00
parent 41493462a1
commit 7ce5fc66db
No known key found for this signature in database
4 changed files with 57 additions and 25 deletions

View File

@ -273,6 +273,10 @@ pub(super) struct ReinitCore {
pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64, pub(super) pending_hardswap_started_at_epoch_secs: AtomicU64,
pub(super) pending_hardswap_map_hash: AtomicU64, pub(super) pending_hardswap_map_hash: AtomicU64,
pub(super) hardswap: AtomicBool, pub(super) hardswap: AtomicBool,
pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64,
pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64,
pub(super) me_hardswap_warmup_extra_passes: AtomicU32,
pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64,
} }
pub(super) struct WriterLifecycleCore { pub(super) struct WriterLifecycleCore {
@ -330,6 +334,11 @@ pub(super) struct SingleEndpointRuntimeCore {
pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64, pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64,
} }
pub(super) struct BindingPolicyCore {
pub(super) me_bind_stale_mode: AtomicU8,
pub(super) me_bind_stale_ttl_secs: AtomicU64,
}
#[allow(dead_code)] #[allow(dead_code)]
pub struct MePool { pub struct MePool {
pub(super) routing: Arc<RoutingCore>, pub(super) routing: Arc<RoutingCore>,
@ -339,6 +348,7 @@ pub struct MePool {
pub(super) health_runtime: Arc<HealthRuntimeCore>, pub(super) health_runtime: Arc<HealthRuntimeCore>,
pub(super) drain_runtime: Arc<DrainRuntimeCore>, pub(super) drain_runtime: Arc<DrainRuntimeCore>,
pub(super) single_endpoint_runtime: Arc<SingleEndpointRuntimeCore>, pub(super) single_endpoint_runtime: Arc<SingleEndpointRuntimeCore>,
pub(super) binding_policy: Arc<BindingPolicyCore>,
pub(super) decision: NetworkDecision, pub(super) decision: NetworkDecision,
pub(super) upstream: Option<Arc<UpstreamManager>>, pub(super) upstream: Option<Arc<UpstreamManager>>,
pub(super) rng: Arc<SecureRandom>, pub(super) rng: Arc<SecureRandom>,
@ -404,12 +414,6 @@ pub struct MePool {
pub(super) stats: Arc<crate::stats::Stats>, pub(super) stats: Arc<crate::stats::Stats>,
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64,
pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64,
pub(super) me_hardswap_warmup_extra_passes: AtomicU32,
pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64,
pub(super) me_bind_stale_mode: AtomicU8,
pub(super) me_bind_stale_ttl_secs: AtomicU64,
pub(super) secret_atomic_snapshot: AtomicBool, pub(super) secret_atomic_snapshot: AtomicBool,
pub(super) me_deterministic_writer_sort: AtomicBool, pub(super) me_deterministic_writer_sort: AtomicBool,
pub(super) me_writer_pick_mode: AtomicU8, pub(super) me_writer_pick_mode: AtomicU8,
@ -575,6 +579,14 @@ impl MePool {
pending_hardswap_started_at_epoch_secs: AtomicU64::new(0), pending_hardswap_started_at_epoch_secs: AtomicU64::new(0),
pending_hardswap_map_hash: AtomicU64::new(0), pending_hardswap_map_hash: AtomicU64::new(0),
hardswap: AtomicBool::new(hardswap), hardswap: AtomicBool::new(hardswap),
me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms),
me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms),
me_hardswap_warmup_extra_passes: AtomicU32::new(
me_hardswap_warmup_extra_passes as u32,
),
me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new(
me_hardswap_warmup_pass_backoff_base_ms,
),
}), }),
writer_lifecycle: Arc::new(WriterLifecycleCore { writer_lifecycle: Arc::new(WriterLifecycleCore {
me_keepalive_enabled, me_keepalive_enabled,
@ -666,6 +678,10 @@ impl MePool {
me_single_endpoint_shadow_rotate_every_secs, me_single_endpoint_shadow_rotate_every_secs,
), ),
}), }),
binding_policy: Arc::new(BindingPolicyCore {
me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()),
me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs),
}),
decision, decision,
upstream, upstream,
rng, rng,
@ -767,14 +783,6 @@ impl MePool {
draining_active_runtime: AtomicU64::new(0), draining_active_runtime: AtomicU64::new(0),
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms),
me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms),
me_hardswap_warmup_extra_passes: AtomicU32::new(me_hardswap_warmup_extra_passes as u32),
me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new(
me_hardswap_warmup_pass_backoff_base_ms,
),
me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()),
me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs),
secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot),
me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort),
me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()),
@ -1035,17 +1043,23 @@ impl MePool {
self.drain_runtime self.drain_runtime
.me_pool_min_fresh_ratio_permille .me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
self.me_hardswap_warmup_delay_min_ms self.reinit
.me_hardswap_warmup_delay_min_ms
.store(hardswap_warmup_delay_min_ms, Ordering::Relaxed); .store(hardswap_warmup_delay_min_ms, Ordering::Relaxed);
self.me_hardswap_warmup_delay_max_ms self.reinit
.me_hardswap_warmup_delay_max_ms
.store(hardswap_warmup_delay_max_ms, Ordering::Relaxed); .store(hardswap_warmup_delay_max_ms, Ordering::Relaxed);
self.me_hardswap_warmup_extra_passes self.reinit
.me_hardswap_warmup_extra_passes
.store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed); .store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed);
self.me_hardswap_warmup_pass_backoff_base_ms self.reinit
.me_hardswap_warmup_pass_backoff_base_ms
.store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed); .store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed);
self.me_bind_stale_mode self.binding_policy
.me_bind_stale_mode
.store(bind_stale_mode.as_u8(), Ordering::Relaxed); .store(bind_stale_mode.as_u8(), Ordering::Relaxed);
self.me_bind_stale_ttl_secs self.binding_policy
.me_bind_stale_ttl_secs
.store(bind_stale_ttl_secs, Ordering::Relaxed); .store(bind_stale_ttl_secs, Ordering::Relaxed);
self.secret_atomic_snapshot self.secret_atomic_snapshot
.store(secret_atomic_snapshot, Ordering::Relaxed); .store(secret_atomic_snapshot, Ordering::Relaxed);
@ -1294,7 +1308,11 @@ impl MePool {
} }
pub(super) fn bind_stale_mode(&self) -> MeBindStaleMode { pub(super) fn bind_stale_mode(&self) -> MeBindStaleMode {
MeBindStaleMode::from_u8(self.me_bind_stale_mode.load(Ordering::Relaxed)) MeBindStaleMode::from_u8(
self.binding_policy
.me_bind_stale_mode
.load(Ordering::Relaxed),
)
} }
pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode { pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode {

View File

@ -189,8 +189,14 @@ impl MePool {
} }
fn hardswap_warmup_connect_delay_ms(&self) -> u64 { fn hardswap_warmup_connect_delay_ms(&self) -> u64 {
let min_ms = self.me_hardswap_warmup_delay_min_ms.load(Ordering::Relaxed); let min_ms = self
let max_ms = self.me_hardswap_warmup_delay_max_ms.load(Ordering::Relaxed); .reinit
.me_hardswap_warmup_delay_min_ms
.load(Ordering::Relaxed);
let max_ms = self
.reinit
.me_hardswap_warmup_delay_max_ms
.load(Ordering::Relaxed);
let (min_ms, max_ms) = if min_ms <= max_ms { let (min_ms, max_ms) = if min_ms <= max_ms {
(min_ms, max_ms) (min_ms, max_ms)
} else { } else {
@ -204,6 +210,7 @@ impl MePool {
fn hardswap_warmup_backoff_ms(&self, pass_idx: usize) -> u64 { fn hardswap_warmup_backoff_ms(&self, pass_idx: usize) -> u64 {
let base_ms = self let base_ms = self
.reinit
.me_hardswap_warmup_pass_backoff_base_ms .me_hardswap_warmup_pass_backoff_base_ms
.load(Ordering::Relaxed); .load(Ordering::Relaxed);
let cap_ms = (self.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms); let cap_ms = (self.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms);
@ -249,6 +256,7 @@ impl MePool {
desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>, desired_by_dc: &HashMap<i32, HashSet<SocketAddr>>,
) { ) {
let extra_passes = self let extra_passes = self
.reinit
.me_hardswap_warmup_extra_passes .me_hardswap_warmup_extra_passes
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
.min(10) as usize; .min(10) as usize;

View File

@ -587,7 +587,10 @@ impl MePool {
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
), ),
me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()), me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()),
me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed), me_bind_stale_ttl_secs: self
.binding_policy
.me_bind_stale_ttl_secs
.load(Ordering::Relaxed),
me_single_endpoint_shadow_writers: self me_single_endpoint_shadow_writers: self
.single_endpoint_runtime .single_endpoint_runtime
.me_single_endpoint_shadow_writers .me_single_endpoint_shadow_writers

View File

@ -681,7 +681,10 @@ impl MePool {
MeBindStaleMode::Never => false, MeBindStaleMode::Never => false,
MeBindStaleMode::Always => true, MeBindStaleMode::Always => true,
MeBindStaleMode::Ttl => { MeBindStaleMode::Ttl => {
let ttl_secs = self.me_bind_stale_ttl_secs.load(Ordering::Relaxed); let ttl_secs = self
.binding_policy
.me_bind_stale_ttl_secs
.load(Ordering::Relaxed);
if ttl_secs == 0 { if ttl_secs == 0 {
return true; return true;
} }