ME Health Core

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-25 20:01:44 +03:00
parent 97f6649584
commit 6ee4d4648c
No known key found for this signature in database
1 changed files with 72 additions and 40 deletions

View File

@ -296,12 +296,21 @@ pub(super) struct RouteRuntimeCore {
pub(super) me_route_inline_recovery_wait: Duration,
}
pub(super) struct HealthRuntimeCore {
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) family_health_v4: ArcSwap<FamilyHealthSnapshot>,
pub(super) family_health_v6: ArcSwap<FamilyHealthSnapshot>,
}
#[allow(dead_code)]
pub struct MePool {
pub(super) routing: Arc<RoutingCore>,
pub(super) reinit: Arc<ReinitCore>,
pub(super) writer_lifecycle: Arc<WriterLifecycleCore>,
pub(super) route_runtime: Arc<RouteRuntimeCore>,
pub(super) health_runtime: Arc<HealthRuntimeCore>,
pub(super) decision: NetworkDecision,
pub(super) upstream: Option<Arc<UpstreamManager>>,
pub(super) rng: Arc<SecureRandom>,
@ -395,11 +404,6 @@ pub struct MePool {
pub(super) me_writer_pick_sample_size: AtomicU8,
pub(super) me_socks_kdf_policy: AtomicU8,
pub(super) me_reader_route_data_wait_ms: Arc<AtomicU64>,
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) family_health_v4: ArcSwap<FamilyHealthSnapshot>,
pub(super) family_health_v6: ArcSwap<FamilyHealthSnapshot>,
pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool,
pub(super) me_last_drain_gate_redundancy_ok: AtomicBool,
pub(super) me_last_drain_gate_block_reason: AtomicU8,
@ -589,6 +593,25 @@ impl MePool {
me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
}),
health_runtime: Arc::new(HealthRuntimeCore {
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new(
MeFamilyRuntimeState::Healthy,
now_epoch_secs,
0,
0,
0,
)),
family_health_v6: ArcSwap::from_pointee(FamilyHealthSnapshot::new(
MeFamilyRuntimeState::Healthy,
now_epoch_secs,
0,
0,
0,
)),
}),
decision,
upstream,
rng,
@ -742,23 +765,6 @@ impl MePool {
me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)),
me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()),
me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)),
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new(
MeFamilyRuntimeState::Healthy,
now_epoch_secs,
0,
0,
0,
)),
family_health_v6: ArcSwap::from_pointee(FamilyHealthSnapshot::new(
MeFamilyRuntimeState::Healthy,
now_epoch_secs,
0,
0,
0,
)),
me_last_drain_gate_route_quorum_ok: AtomicBool::new(false),
me_last_drain_gate_redundancy_ok: AtomicBool::new(false),
me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8),
@ -810,43 +816,59 @@ impl MePool {
recover_success_streak,
));
match family {
IpFamily::V4 => self.family_health_v4.store(snapshot),
IpFamily::V6 => self.family_health_v6.store(snapshot),
IpFamily::V4 => self.health_runtime.family_health_v4.store(snapshot),
IpFamily::V6 => self.health_runtime.family_health_v6.store(snapshot),
}
}
pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState {
match family {
IpFamily::V4 => self.family_health_v4.load().state,
IpFamily::V6 => self.family_health_v6.load().state,
IpFamily::V4 => self.health_runtime.family_health_v4.load().state,
IpFamily::V6 => self.health_runtime.family_health_v6.load().state,
}
}
pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self.family_health_v4.load().state_since_epoch_secs,
IpFamily::V6 => self.family_health_v6.load().state_since_epoch_secs,
IpFamily::V4 => self.health_runtime.family_health_v4.load().state_since_epoch_secs,
IpFamily::V6 => self.health_runtime.family_health_v6.load().state_since_epoch_secs,
}
}
pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 {
match family {
IpFamily::V4 => self.family_health_v4.load().suppressed_until_epoch_secs,
IpFamily::V6 => self.family_health_v6.load().suppressed_until_epoch_secs,
IpFamily::V4 => self
.health_runtime
.family_health_v4
.load()
.suppressed_until_epoch_secs,
IpFamily::V6 => self
.health_runtime
.family_health_v6
.load()
.suppressed_until_epoch_secs,
}
}
pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self.family_health_v4.load().fail_streak,
IpFamily::V6 => self.family_health_v6.load().fail_streak,
IpFamily::V4 => self.health_runtime.family_health_v4.load().fail_streak,
IpFamily::V6 => self.health_runtime.family_health_v6.load().fail_streak,
}
}
pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 {
match family {
IpFamily::V4 => self.family_health_v4.load().recover_success_streak,
IpFamily::V6 => self.family_health_v6.load().recover_success_streak,
IpFamily::V4 => self
.health_runtime
.family_health_v4
.load()
.recover_success_streak,
IpFamily::V6 => self
.health_runtime
.family_health_v6
.load()
.recover_success_streak,
}
}
@ -1056,11 +1078,14 @@ impl MePool {
.store(adaptive_floor_max_active_writers_global, Ordering::Relaxed);
self.me_adaptive_floor_max_warm_writers_global
.store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed);
self.me_health_interval_ms_unhealthy
self.health_runtime
.me_health_interval_ms_unhealthy
.store(me_health_interval_ms_unhealthy.max(1), Ordering::Relaxed);
self.me_health_interval_ms_healthy
self.health_runtime
.me_health_interval_ms_healthy
.store(me_health_interval_ms_healthy.max(1), Ordering::Relaxed);
self.me_warn_rate_limit_ms
self.health_runtime
.me_warn_rate_limit_ms
.store(me_warn_rate_limit_ms.max(1), Ordering::Relaxed);
if previous_floor_mode != floor_mode {
self.stats.increment_me_floor_mode_switch_total();
@ -1742,7 +1767,8 @@ impl MePool {
pub(super) fn health_interval_unhealthy(&self) -> Duration {
Duration::from_millis(
self.me_health_interval_ms_unhealthy
self.health_runtime
.me_health_interval_ms_unhealthy
.load(Ordering::Relaxed)
.max(1),
)
@ -1750,13 +1776,19 @@ impl MePool {
pub(super) fn health_interval_healthy(&self) -> Duration {
Duration::from_millis(
self.me_health_interval_ms_healthy
self.health_runtime
.me_health_interval_ms_healthy
.load(Ordering::Relaxed)
.max(1),
)
}
pub(super) fn warn_rate_limit_duration(&self) -> Duration {
Duration::from_millis(self.me_warn_rate_limit_ms.load(Ordering::Relaxed).max(1))
Duration::from_millis(
self.health_runtime
.me_warn_rate_limit_ms
.load(Ordering::Relaxed)
.max(1),
)
}
}