diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index d4fa0aa..0146e78 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -296,12 +296,21 @@ pub(super) struct RouteRuntimeCore { pub(super) me_route_inline_recovery_wait: Duration, } +pub(super) struct HealthRuntimeCore { + pub(super) me_health_interval_ms_unhealthy: AtomicU64, + pub(super) me_health_interval_ms_healthy: AtomicU64, + pub(super) me_warn_rate_limit_ms: AtomicU64, + pub(super) family_health_v4: ArcSwap, + pub(super) family_health_v6: ArcSwap, +} + #[allow(dead_code)] pub struct MePool { pub(super) routing: Arc, pub(super) reinit: Arc, pub(super) writer_lifecycle: Arc, pub(super) route_runtime: Arc, + pub(super) health_runtime: Arc, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, @@ -395,11 +404,6 @@ pub struct MePool { pub(super) me_writer_pick_sample_size: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_reader_route_data_wait_ms: Arc, - pub(super) me_health_interval_ms_unhealthy: AtomicU64, - pub(super) me_health_interval_ms_healthy: AtomicU64, - pub(super) me_warn_rate_limit_ms: AtomicU64, - pub(super) family_health_v4: ArcSwap, - pub(super) family_health_v6: ArcSwap, pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool, pub(super) me_last_drain_gate_redundancy_ok: AtomicBool, pub(super) me_last_drain_gate_block_reason: AtomicU8, @@ -589,6 +593,25 @@ impl MePool { me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), }), + health_runtime: Arc::new(HealthRuntimeCore { + me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), + me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), + me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), + family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new( + MeFamilyRuntimeState::Healthy, + now_epoch_secs, + 0, + 0, + 0, + )), + family_health_v6: ArcSwap::from_pointee(FamilyHealthSnapshot::new( + MeFamilyRuntimeState::Healthy, + now_epoch_secs, + 0, + 0, + 0, + )), + }), decision, upstream, rng, @@ -742,23 +765,6 @@ impl MePool { me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), - me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), - me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), - me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), - family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new( - MeFamilyRuntimeState::Healthy, - now_epoch_secs, - 0, - 0, - 0, - )), - family_health_v6: ArcSwap::from_pointee(FamilyHealthSnapshot::new( - MeFamilyRuntimeState::Healthy, - now_epoch_secs, - 0, - 0, - 0, - )), me_last_drain_gate_route_quorum_ok: AtomicBool::new(false), me_last_drain_gate_redundancy_ok: AtomicBool::new(false), me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8), @@ -810,43 +816,59 @@ impl MePool { recover_success_streak, )); match family { - IpFamily::V4 => self.family_health_v4.store(snapshot), - IpFamily::V6 => self.family_health_v6.store(snapshot), + IpFamily::V4 => self.health_runtime.family_health_v4.store(snapshot), + IpFamily::V6 => self.health_runtime.family_health_v6.store(snapshot), } } pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState { match family { - IpFamily::V4 => self.family_health_v4.load().state, - IpFamily::V6 => self.family_health_v6.load().state, + IpFamily::V4 => self.health_runtime.family_health_v4.load().state, + IpFamily::V6 => self.health_runtime.family_health_v6.load().state, } } pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 { match family { - IpFamily::V4 => self.family_health_v4.load().state_since_epoch_secs, - IpFamily::V6 => self.family_health_v6.load().state_since_epoch_secs, + IpFamily::V4 => self.health_runtime.family_health_v4.load().state_since_epoch_secs, + IpFamily::V6 => self.health_runtime.family_health_v6.load().state_since_epoch_secs, } } pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 { match family { - IpFamily::V4 => self.family_health_v4.load().suppressed_until_epoch_secs, - IpFamily::V6 => self.family_health_v6.load().suppressed_until_epoch_secs, + IpFamily::V4 => self + .health_runtime + .family_health_v4 + .load() + .suppressed_until_epoch_secs, + IpFamily::V6 => self + .health_runtime + .family_health_v6 + .load() + .suppressed_until_epoch_secs, } } pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 { match family { - IpFamily::V4 => self.family_health_v4.load().fail_streak, - IpFamily::V6 => self.family_health_v6.load().fail_streak, + IpFamily::V4 => self.health_runtime.family_health_v4.load().fail_streak, + IpFamily::V6 => self.health_runtime.family_health_v6.load().fail_streak, } } pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 { match family { - IpFamily::V4 => self.family_health_v4.load().recover_success_streak, - IpFamily::V6 => self.family_health_v6.load().recover_success_streak, + IpFamily::V4 => self + .health_runtime + .family_health_v4 + .load() + .recover_success_streak, + IpFamily::V6 => self + .health_runtime + .family_health_v6 + .load() + .recover_success_streak, } } @@ -1056,11 +1078,14 @@ impl MePool { .store(adaptive_floor_max_active_writers_global, Ordering::Relaxed); self.me_adaptive_floor_max_warm_writers_global .store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed); - self.me_health_interval_ms_unhealthy + self.health_runtime + .me_health_interval_ms_unhealthy .store(me_health_interval_ms_unhealthy.max(1), Ordering::Relaxed); - self.me_health_interval_ms_healthy + self.health_runtime + .me_health_interval_ms_healthy .store(me_health_interval_ms_healthy.max(1), Ordering::Relaxed); - self.me_warn_rate_limit_ms + self.health_runtime + .me_warn_rate_limit_ms .store(me_warn_rate_limit_ms.max(1), Ordering::Relaxed); if previous_floor_mode != floor_mode { self.stats.increment_me_floor_mode_switch_total(); @@ -1742,7 +1767,8 @@ impl MePool { pub(super) fn health_interval_unhealthy(&self) -> Duration { Duration::from_millis( - self.me_health_interval_ms_unhealthy + self.health_runtime + .me_health_interval_ms_unhealthy .load(Ordering::Relaxed) .max(1), ) @@ -1750,13 +1776,19 @@ impl MePool { pub(super) fn health_interval_healthy(&self) -> Duration { Duration::from_millis( - self.me_health_interval_ms_healthy + self.health_runtime + .me_health_interval_ms_healthy .load(Ordering::Relaxed) .max(1), ) } pub(super) fn warn_rate_limit_duration(&self) -> Duration { - Duration::from_millis(self.me_warn_rate_limit_ms.load(Ordering::Relaxed).max(1)) + Duration::from_millis( + self.health_runtime + .me_warn_rate_limit_ms + .load(Ordering::Relaxed) + .max(1), + ) } }