Drain + Single-Endpoint Runtime Cores

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 20:29:22 +03:00
parent 6ee4d4648c
commit 41493462a1
No known key found for this signature in database
4 changed files with 180 additions and 104 deletions

View File

@ -137,9 +137,11 @@ pub(super) async fn reap_draining_writers(
let now_epoch_secs = MePool::now_epoch_secs(); let now_epoch_secs = MePool::now_epoch_secs();
let now = Instant::now(); let now = Instant::now();
let drain_ttl_secs = pool let drain_ttl_secs = pool
.drain_runtime
.me_pool_drain_ttl_secs .me_pool_drain_ttl_secs
.load(std::sync::atomic::Ordering::Relaxed); .load(std::sync::atomic::Ordering::Relaxed);
let drain_threshold = pool let drain_threshold = pool
.drain_runtime
.me_pool_drain_threshold .me_pool_drain_threshold
.load(std::sync::atomic::Ordering::Relaxed); .load(std::sync::atomic::Ordering::Relaxed);
let activity = pool.registry.writer_activity_snapshot().await; let activity = pool.registry.writer_activity_snapshot().await;
@ -223,7 +225,10 @@ pub(super) async fn reap_draining_writers(
endpoint = %writer.addr, endpoint = %writer.addr,
generation = writer.generation, generation = writer.generation,
drain_ttl_secs, drain_ttl_secs,
force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed), force_close_secs = pool
.drain_runtime
.me_pool_force_close_secs
.load(std::sync::atomic::Ordering::Relaxed),
allow_drain_fallback = writer.allow_drain_fallback, allow_drain_fallback = writer.allow_drain_fallback,
"ME draining writer remains non-empty past drain TTL" "ME draining writer remains non-empty past drain TTL"
); );

View File

@ -304,6 +304,32 @@ pub(super) struct HealthRuntimeCore {
pub(super) family_health_v6: ArcSwap<FamilyHealthSnapshot>, pub(super) family_health_v6: ArcSwap<FamilyHealthSnapshot>,
} }
pub(super) struct DrainRuntimeCore {
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_instadrain: AtomicBool,
pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_drain_soft_evict_enabled: AtomicBool,
pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64,
pub(super) me_pool_drain_soft_evict_per_writer: AtomicU8,
pub(super) me_pool_drain_soft_evict_budget_per_core: AtomicU32,
pub(super) me_pool_drain_soft_evict_cooldown_ms: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8,
pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64,
}
pub(super) struct SingleEndpointRuntimeCore {
pub(super) me_single_endpoint_shadow_writers: AtomicU8,
pub(super) me_single_endpoint_outage_mode_enabled: AtomicBool,
pub(super) me_single_endpoint_outage_disable_quarantine: AtomicBool,
pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64,
pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64,
pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64,
}
#[allow(dead_code)] #[allow(dead_code)]
pub struct MePool { pub struct MePool {
pub(super) routing: Arc<RoutingCore>, pub(super) routing: Arc<RoutingCore>,
@ -311,6 +337,8 @@ pub struct MePool {
pub(super) writer_lifecycle: Arc<WriterLifecycleCore>, pub(super) writer_lifecycle: Arc<WriterLifecycleCore>,
pub(super) route_runtime: Arc<RouteRuntimeCore>, pub(super) route_runtime: Arc<RouteRuntimeCore>,
pub(super) health_runtime: Arc<HealthRuntimeCore>, pub(super) health_runtime: Arc<HealthRuntimeCore>,
pub(super) drain_runtime: Arc<DrainRuntimeCore>,
pub(super) single_endpoint_runtime: Arc<SingleEndpointRuntimeCore>,
pub(super) decision: NetworkDecision, pub(super) decision: NetworkDecision,
pub(super) upstream: Option<Arc<UpstreamManager>>, pub(super) upstream: Option<Arc<UpstreamManager>>,
pub(super) rng: Arc<SecureRandom>, pub(super) rng: Arc<SecureRandom>,
@ -336,12 +364,6 @@ pub struct MePool {
pub(super) me_reconnect_backoff_base: Duration, pub(super) me_reconnect_backoff_base: Duration,
pub(super) me_reconnect_backoff_cap: Duration, pub(super) me_reconnect_backoff_cap: Duration,
pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_reconnect_fast_retry_count: u32,
pub(super) me_single_endpoint_shadow_writers: AtomicU8,
pub(super) me_single_endpoint_outage_mode_enabled: AtomicBool,
pub(super) me_single_endpoint_outage_disable_quarantine: AtomicBool,
pub(super) me_single_endpoint_outage_backoff_min_ms: AtomicU64,
pub(super) me_single_endpoint_outage_backoff_max_ms: AtomicU64,
pub(super) me_single_endpoint_shadow_rotate_every_secs: AtomicU64,
pub(super) me_floor_mode: AtomicU8, pub(super) me_floor_mode: AtomicU8,
pub(super) me_adaptive_floor_idle_secs: AtomicU64, pub(super) me_adaptive_floor_idle_secs: AtomicU64,
pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8, pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8,
@ -382,16 +404,6 @@ pub struct MePool {
pub(super) stats: Arc<crate::stats::Stats>, pub(super) stats: Arc<crate::stats::Stats>,
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>, pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>, pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_instadrain: AtomicBool,
pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_drain_soft_evict_enabled: AtomicBool,
pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64,
pub(super) me_pool_drain_soft_evict_per_writer: AtomicU8,
pub(super) me_pool_drain_soft_evict_budget_per_core: AtomicU32,
pub(super) me_pool_drain_soft_evict_cooldown_ms: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64,
pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64, pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64,
pub(super) me_hardswap_warmup_extra_passes: AtomicU32, pub(super) me_hardswap_warmup_extra_passes: AtomicU32,
@ -404,10 +416,6 @@ pub struct MePool {
pub(super) me_writer_pick_sample_size: AtomicU8, pub(super) me_writer_pick_sample_size: AtomicU8,
pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8,
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>, pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8,
pub(super) me_last_drain_gate_updated_at_epoch_secs: AtomicU64,
pub(super) runtime_ready: AtomicBool, pub(super) runtime_ready: AtomicBool,
pool_size: usize, pool_size: usize,
} }
@ -612,6 +620,52 @@ impl MePool {
0, 0,
)), )),
}), }),
drain_runtime: Arc::new(DrainRuntimeCore {
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(
me_pool_drain_soft_evict_grace_secs,
),
me_pool_drain_soft_evict_per_writer: AtomicU8::new(
me_pool_drain_soft_evict_per_writer.max(1),
),
me_pool_drain_soft_evict_budget_per_core: AtomicU32::new(
me_pool_drain_soft_evict_budget_per_core.max(1) as u32,
),
me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new(
me_pool_drain_soft_evict_cooldown_ms.max(1),
),
me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs(
me_pool_force_close_secs,
)),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio,
)),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs),
}),
single_endpoint_runtime: Arc::new(SingleEndpointRuntimeCore {
me_single_endpoint_shadow_writers: AtomicU8::new(me_single_endpoint_shadow_writers),
me_single_endpoint_outage_mode_enabled: AtomicBool::new(
me_single_endpoint_outage_mode_enabled,
),
me_single_endpoint_outage_disable_quarantine: AtomicBool::new(
me_single_endpoint_outage_disable_quarantine,
),
me_single_endpoint_outage_backoff_min_ms: AtomicU64::new(
me_single_endpoint_outage_backoff_min_ms,
),
me_single_endpoint_outage_backoff_max_ms: AtomicU64::new(
me_single_endpoint_outage_backoff_max_ms,
),
me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new(
me_single_endpoint_shadow_rotate_every_secs,
),
}),
decision, decision,
upstream, upstream,
rng, rng,
@ -651,22 +705,6 @@ impl MePool {
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
me_reconnect_fast_retry_count, me_reconnect_fast_retry_count,
me_single_endpoint_shadow_writers: AtomicU8::new(me_single_endpoint_shadow_writers),
me_single_endpoint_outage_mode_enabled: AtomicBool::new(
me_single_endpoint_outage_mode_enabled,
),
me_single_endpoint_outage_disable_quarantine: AtomicBool::new(
me_single_endpoint_outage_disable_quarantine,
),
me_single_endpoint_outage_backoff_min_ms: AtomicU64::new(
me_single_endpoint_outage_backoff_min_ms,
),
me_single_endpoint_outage_backoff_max_ms: AtomicU64::new(
me_single_endpoint_outage_backoff_max_ms,
),
me_single_endpoint_shadow_rotate_every_secs: AtomicU64::new(
me_single_endpoint_shadow_rotate_every_secs,
),
me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()), me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()),
me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs), me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs),
me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new(
@ -729,28 +767,6 @@ impl MePool {
draining_active_runtime: AtomicU64::new(0), draining_active_runtime: AtomicU64::new(0),
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(
me_pool_drain_soft_evict_grace_secs,
),
me_pool_drain_soft_evict_per_writer: AtomicU8::new(
me_pool_drain_soft_evict_per_writer.max(1),
),
me_pool_drain_soft_evict_budget_per_core: AtomicU32::new(
me_pool_drain_soft_evict_budget_per_core.max(1) as u32,
),
me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new(
me_pool_drain_soft_evict_cooldown_ms.max(1),
),
me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs(
me_pool_force_close_secs,
)),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio,
)),
me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms), me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms),
me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms), me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms),
me_hardswap_warmup_extra_passes: AtomicU32::new(me_hardswap_warmup_extra_passes as u32), me_hardswap_warmup_extra_passes: AtomicU32::new(me_hardswap_warmup_extra_passes as u32),
@ -765,10 +781,6 @@ impl MePool {
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs),
runtime_ready: AtomicBool::new(false), runtime_ready: AtomicBool::new(false),
}) })
} }
@ -899,32 +911,43 @@ impl MePool {
block_reason: MeDrainGateReason, block_reason: MeDrainGateReason,
updated_at_epoch_secs: u64, updated_at_epoch_secs: u64,
) { ) {
self.me_last_drain_gate_route_quorum_ok self.drain_runtime
.me_last_drain_gate_route_quorum_ok
.store(route_quorum_ok, Ordering::Relaxed); .store(route_quorum_ok, Ordering::Relaxed);
self.me_last_drain_gate_redundancy_ok self.drain_runtime
.me_last_drain_gate_redundancy_ok
.store(redundancy_ok, Ordering::Relaxed); .store(redundancy_ok, Ordering::Relaxed);
self.me_last_drain_gate_block_reason self.drain_runtime
.me_last_drain_gate_block_reason
.store(block_reason as u8, Ordering::Relaxed); .store(block_reason as u8, Ordering::Relaxed);
self.me_last_drain_gate_updated_at_epoch_secs self.drain_runtime
.me_last_drain_gate_updated_at_epoch_secs
.store(updated_at_epoch_secs, Ordering::Relaxed); .store(updated_at_epoch_secs, Ordering::Relaxed);
} }
pub(crate) fn last_drain_gate_route_quorum_ok(&self) -> bool { pub(crate) fn last_drain_gate_route_quorum_ok(&self) -> bool {
self.me_last_drain_gate_route_quorum_ok self.drain_runtime
.me_last_drain_gate_route_quorum_ok
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
pub(crate) fn last_drain_gate_redundancy_ok(&self) -> bool { pub(crate) fn last_drain_gate_redundancy_ok(&self) -> bool {
self.me_last_drain_gate_redundancy_ok self.drain_runtime
.me_last_drain_gate_redundancy_ok
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason { pub(crate) fn last_drain_gate_block_reason(&self) -> MeDrainGateReason {
MeDrainGateReason::from_u8(self.me_last_drain_gate_block_reason.load(Ordering::Relaxed)) MeDrainGateReason::from_u8(
self.drain_runtime
.me_last_drain_gate_block_reason
.load(Ordering::Relaxed),
)
} }
pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 { pub(crate) fn last_drain_gate_updated_at_epoch_secs(&self) -> u64 {
self.me_last_drain_gate_updated_at_epoch_secs self.drain_runtime
.me_last_drain_gate_updated_at_epoch_secs
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
@ -975,31 +998,42 @@ impl MePool {
me_warn_rate_limit_ms: u64, me_warn_rate_limit_ms: u64,
) { ) {
self.reinit.hardswap.store(hardswap, Ordering::Relaxed); self.reinit.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs self.drain_runtime
.me_pool_drain_ttl_secs
.store(drain_ttl_secs, Ordering::Relaxed); .store(drain_ttl_secs, Ordering::Relaxed);
self.me_instadrain.store(instadrain, Ordering::Relaxed); self.drain_runtime
self.me_pool_drain_threshold .me_instadrain
.store(instadrain, Ordering::Relaxed);
self.drain_runtime
.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed); .store(pool_drain_threshold, Ordering::Relaxed);
// Runtime soft-evict knobs are updated lock-free to keep control-plane // Runtime soft-evict knobs are updated lock-free to keep control-plane
// writes non-blocking; readers observe a short eventual-consistency // writes non-blocking; readers observe a short eventual-consistency
// window by design. // window by design.
self.me_pool_drain_soft_evict_enabled self.drain_runtime
.me_pool_drain_soft_evict_enabled
.store(pool_drain_soft_evict_enabled, Ordering::Relaxed); .store(pool_drain_soft_evict_enabled, Ordering::Relaxed);
self.me_pool_drain_soft_evict_grace_secs self.drain_runtime
.me_pool_drain_soft_evict_grace_secs
.store(pool_drain_soft_evict_grace_secs, Ordering::Relaxed); .store(pool_drain_soft_evict_grace_secs, Ordering::Relaxed);
self.me_pool_drain_soft_evict_per_writer self.drain_runtime
.me_pool_drain_soft_evict_per_writer
.store(pool_drain_soft_evict_per_writer.max(1), Ordering::Relaxed); .store(pool_drain_soft_evict_per_writer.max(1), Ordering::Relaxed);
self.me_pool_drain_soft_evict_budget_per_core.store( self.drain_runtime
pool_drain_soft_evict_budget_per_core.max(1) as u32, .me_pool_drain_soft_evict_budget_per_core
Ordering::Relaxed, .store(
); pool_drain_soft_evict_budget_per_core.max(1) as u32,
self.me_pool_drain_soft_evict_cooldown_ms Ordering::Relaxed,
);
self.drain_runtime
.me_pool_drain_soft_evict_cooldown_ms
.store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed); .store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed);
self.me_pool_force_close_secs.store( self.drain_runtime.me_pool_force_close_secs.store(
Self::normalize_force_close_secs(force_close_secs), Self::normalize_force_close_secs(force_close_secs),
Ordering::Relaxed, Ordering::Relaxed,
); );
self.me_pool_min_fresh_ratio_permille self.drain_runtime
.me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
self.me_hardswap_warmup_delay_min_ms self.me_hardswap_warmup_delay_min_ms
.store(hardswap_warmup_delay_min_ms, Ordering::Relaxed); .store(hardswap_warmup_delay_min_ms, Ordering::Relaxed);
@ -1025,17 +1059,23 @@ impl MePool {
if previous_writer_pick_mode != writer_pick_mode { if previous_writer_pick_mode != writer_pick_mode {
self.stats.increment_me_writer_pick_mode_switch_total(); self.stats.increment_me_writer_pick_mode_switch_total();
} }
self.me_single_endpoint_shadow_writers self.single_endpoint_runtime
.me_single_endpoint_shadow_writers
.store(single_endpoint_shadow_writers, Ordering::Relaxed); .store(single_endpoint_shadow_writers, Ordering::Relaxed);
self.me_single_endpoint_outage_mode_enabled self.single_endpoint_runtime
.me_single_endpoint_outage_mode_enabled
.store(single_endpoint_outage_mode_enabled, Ordering::Relaxed); .store(single_endpoint_outage_mode_enabled, Ordering::Relaxed);
self.me_single_endpoint_outage_disable_quarantine self.single_endpoint_runtime
.me_single_endpoint_outage_disable_quarantine
.store(single_endpoint_outage_disable_quarantine, Ordering::Relaxed); .store(single_endpoint_outage_disable_quarantine, Ordering::Relaxed);
self.me_single_endpoint_outage_backoff_min_ms self.single_endpoint_runtime
.me_single_endpoint_outage_backoff_min_ms
.store(single_endpoint_outage_backoff_min_ms, Ordering::Relaxed); .store(single_endpoint_outage_backoff_min_ms, Ordering::Relaxed);
self.me_single_endpoint_outage_backoff_max_ms self.single_endpoint_runtime
.me_single_endpoint_outage_backoff_max_ms
.store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed); .store(single_endpoint_outage_backoff_max_ms, Ordering::Relaxed);
self.me_single_endpoint_shadow_rotate_every_secs self.single_endpoint_runtime
.me_single_endpoint_shadow_rotate_every_secs
.store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed); .store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed);
let previous_floor_mode = self.floor_mode(); let previous_floor_mode = self.floor_mode();
self.me_floor_mode self.me_floor_mode
@ -1148,33 +1188,40 @@ impl MePool {
} }
pub(super) fn force_close_timeout(&self) -> Option<Duration> { pub(super) fn force_close_timeout(&self) -> Option<Duration> {
let secs = let secs = Self::normalize_force_close_secs(
Self::normalize_force_close_secs(self.me_pool_force_close_secs.load(Ordering::Relaxed)); self.drain_runtime
.me_pool_force_close_secs
.load(Ordering::Relaxed),
);
Some(Duration::from_secs(secs)) Some(Duration::from_secs(secs))
} }
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn drain_soft_evict_enabled(&self) -> bool { pub(super) fn drain_soft_evict_enabled(&self) -> bool {
self.me_pool_drain_soft_evict_enabled self.drain_runtime
.me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn drain_soft_evict_grace_secs(&self) -> u64 { pub(super) fn drain_soft_evict_grace_secs(&self) -> u64 {
self.me_pool_drain_soft_evict_grace_secs self.drain_runtime
.me_pool_drain_soft_evict_grace_secs
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn drain_soft_evict_per_writer(&self) -> usize { pub(super) fn drain_soft_evict_per_writer(&self) -> usize {
self.me_pool_drain_soft_evict_per_writer self.drain_runtime
.me_pool_drain_soft_evict_per_writer
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
.max(1) as usize .max(1) as usize
} }
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn drain_soft_evict_budget_per_core(&self) -> usize { pub(super) fn drain_soft_evict_budget_per_core(&self) -> usize {
self.me_pool_drain_soft_evict_budget_per_core self.drain_runtime
.me_pool_drain_soft_evict_budget_per_core
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
.max(1) as usize .max(1) as usize
} }
@ -1182,7 +1229,8 @@ impl MePool {
#[allow(dead_code)] #[allow(dead_code)]
pub(super) fn drain_soft_evict_cooldown(&self) -> Duration { pub(super) fn drain_soft_evict_cooldown(&self) -> Duration {
Duration::from_millis( Duration::from_millis(
self.me_pool_drain_soft_evict_cooldown_ms self.drain_runtime
.me_pool_drain_soft_evict_cooldown_ms
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
.max(1), .max(1),
) )
@ -1265,6 +1313,7 @@ impl MePool {
} }
if endpoint_count == 1 { if endpoint_count == 1 {
let shadow = self let shadow = self
.single_endpoint_runtime
.me_single_endpoint_shadow_writers .me_single_endpoint_shadow_writers
.load(Ordering::Relaxed) as usize; .load(Ordering::Relaxed) as usize;
return (1 + shadow).max(3); return (1 + shadow).max(3);
@ -1533,20 +1582,24 @@ impl MePool {
} }
pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool { pub(super) fn single_endpoint_outage_mode_enabled(&self) -> bool {
self.me_single_endpoint_outage_mode_enabled self.single_endpoint_runtime
.me_single_endpoint_outage_mode_enabled
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
pub(super) fn single_endpoint_outage_disable_quarantine(&self) -> bool { pub(super) fn single_endpoint_outage_disable_quarantine(&self) -> bool {
self.me_single_endpoint_outage_disable_quarantine self.single_endpoint_runtime
.me_single_endpoint_outage_disable_quarantine
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
} }
pub(super) fn single_endpoint_outage_backoff_bounds_ms(&self) -> (u64, u64) { pub(super) fn single_endpoint_outage_backoff_bounds_ms(&self) -> (u64, u64) {
let min_ms = self let min_ms = self
.single_endpoint_runtime
.me_single_endpoint_outage_backoff_min_ms .me_single_endpoint_outage_backoff_min_ms
.load(Ordering::Relaxed); .load(Ordering::Relaxed);
let max_ms = self let max_ms = self
.single_endpoint_runtime
.me_single_endpoint_outage_backoff_max_ms .me_single_endpoint_outage_backoff_max_ms
.load(Ordering::Relaxed); .load(Ordering::Relaxed);
if min_ms <= max_ms { if min_ms <= max_ms {
@ -1558,6 +1611,7 @@ impl MePool {
pub(super) fn single_endpoint_shadow_rotate_interval(&self) -> Option<Duration> { pub(super) fn single_endpoint_shadow_rotate_interval(&self) -> Option<Duration> {
let secs = self let secs = self
.single_endpoint_runtime
.me_single_endpoint_shadow_rotate_every_secs .me_single_endpoint_shadow_rotate_every_secs
.load(Ordering::Relaxed); .load(Ordering::Relaxed);
if secs == 0 { if secs == 0 {

View File

@ -449,7 +449,8 @@ impl MePool {
.map(|w| (w.writer_dc, w.addr)) .map(|w| (w.writer_dc, w.addr))
.collect(); .collect();
let min_ratio = Self::permille_to_ratio( let min_ratio = Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille self.drain_runtime
.me_pool_min_fresh_ratio_permille
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
); );
let (coverage_ratio, missing_dc) = let (coverage_ratio, missing_dc) =

View File

@ -224,7 +224,10 @@ impl MePool {
pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot { pub(crate) async fn api_status_snapshot(&self) -> MeApiStatusSnapshot {
let now_epoch_secs = Self::now_epoch_secs(); let now_epoch_secs = Self::now_epoch_secs();
let active_generation = self.current_generation(); let active_generation = self.current_generation();
let drain_ttl_secs = self.me_pool_drain_ttl_secs.load(Ordering::Relaxed); let drain_ttl_secs = self
.drain_runtime
.me_pool_drain_ttl_secs
.load(Ordering::Relaxed);
let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new(); let mut endpoints_by_dc = BTreeMap::<i16, BTreeSet<SocketAddr>>::new();
if self.decision.ipv4_me { if self.decision.ipv4_me {
@ -570,30 +573,43 @@ impl MePool {
me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64, me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64,
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), me_pool_drain_ttl_secs: self
me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed), .drain_runtime
.me_pool_drain_ttl_secs
.load(Ordering::Relaxed),
me_pool_force_close_secs: self
.drain_runtime
.me_pool_force_close_secs
.load(Ordering::Relaxed),
me_pool_min_fresh_ratio: Self::permille_to_ratio( me_pool_min_fresh_ratio: Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille self.drain_runtime
.me_pool_min_fresh_ratio_permille
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
), ),
me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()), me_bind_stale_mode: bind_stale_mode_label(self.bind_stale_mode()),
me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed), me_bind_stale_ttl_secs: self.me_bind_stale_ttl_secs.load(Ordering::Relaxed),
me_single_endpoint_shadow_writers: self me_single_endpoint_shadow_writers: self
.single_endpoint_runtime
.me_single_endpoint_shadow_writers .me_single_endpoint_shadow_writers
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_single_endpoint_outage_mode_enabled: self me_single_endpoint_outage_mode_enabled: self
.single_endpoint_runtime
.me_single_endpoint_outage_mode_enabled .me_single_endpoint_outage_mode_enabled
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_single_endpoint_outage_disable_quarantine: self me_single_endpoint_outage_disable_quarantine: self
.single_endpoint_runtime
.me_single_endpoint_outage_disable_quarantine .me_single_endpoint_outage_disable_quarantine
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_single_endpoint_outage_backoff_min_ms: self me_single_endpoint_outage_backoff_min_ms: self
.single_endpoint_runtime
.me_single_endpoint_outage_backoff_min_ms .me_single_endpoint_outage_backoff_min_ms
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_single_endpoint_outage_backoff_max_ms: self me_single_endpoint_outage_backoff_max_ms: self
.single_endpoint_runtime
.me_single_endpoint_outage_backoff_max_ms .me_single_endpoint_outage_backoff_max_ms
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_single_endpoint_shadow_rotate_every_secs: self me_single_endpoint_shadow_rotate_every_secs: self
.single_endpoint_runtime
.me_single_endpoint_shadow_rotate_every_secs .me_single_endpoint_shadow_rotate_every_secs
.load(Ordering::Relaxed), .load(Ordering::Relaxed),
me_deterministic_writer_sort: self.me_deterministic_writer_sort.load(Ordering::Relaxed), me_deterministic_writer_sort: self.me_deterministic_writer_sort.load(Ordering::Relaxed),