diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 6c7aef9..e8fc52a 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -323,8 +323,8 @@ fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool { let dedup_current = DESYNC_DEDUP.get_or_init(DashMap::new); let dedup_previous = DESYNC_DEDUP_PREVIOUS.get_or_init(DashMap::new); - let rotation_state = DESYNC_DEDUP_ROTATION_STATE - .get_or_init(|| Mutex::new(DesyncDedupRotationState::default())); + let rotation_state = + DESYNC_DEDUP_ROTATION_STATE.get_or_init(|| Mutex::new(DesyncDedupRotationState::default())); let mut state = match rotation_state.lock() { Ok(guard) => guard, diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 9cba3e8..4144f82 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -1206,8 +1206,7 @@ impl Stats { } pub fn increment_me_hybrid_timeout_total(&self) { if self.telemetry_me_allows_normal() { - self.me_hybrid_timeout_total - .fetch_add(1, Ordering::Relaxed); + self.me_hybrid_timeout_total.fetch_add(1, Ordering::Relaxed); } } pub fn increment_me_async_recovery_trigger_total(&self) { diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index b6eff37..01206e2 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -161,7 +161,7 @@ impl MePool { } else { let connect_fut = async { if addr.is_ipv6() - && let Some(v6) = self.detected_ipv6 + && let Some(v6) = self.nat_runtime.detected_ipv6 { match TcpSocket::new_v6() { Ok(sock) => { @@ -305,7 +305,7 @@ impl MePool { } MeSocksKdfPolicy::Compat => { self.stats.increment_me_socks_kdf_compat_fallback(); - if self.nat_probe { + if self.nat_runtime.nat_probe { let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress); self.maybe_reflect_public_addr(family, bind_ip).await } else { @@ -313,7 +313,7 @@ impl MePool { } } } - } else if self.nat_probe { + } else if self.nat_runtime.nat_probe { let bind_ip = Self::direct_bind_ip_for_stun(family, upstream_egress); self.maybe_reflect_public_addr(family, bind_ip).await } else { @@ -343,7 +343,10 @@ impl MePool { .unwrap_or_default() .as_secs() as u32; - let secret_atomic_snapshot = self.secret_atomic_snapshot.load(Ordering::Relaxed); + let secret_atomic_snapshot = self + .writer_selection_policy + .secret_atomic_snapshot + .load(Ordering::Relaxed); let (ks, secret) = if secret_atomic_snapshot { let snapshot = self.secret_snapshot().await; (snapshot.key_selector, snapshot.secret) diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 29a6f7d..ca6e681 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -530,7 +530,7 @@ async fn check_family( let now = Instant::now(); if reconnect_sem.available_permits() == 0 { - let base_ms = pool.me_reconnect_backoff_base.as_millis() as u64; + let base_ms = pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64; let next_ms = (*backoff.get(&key).unwrap_or(&base_ms)).max(base_ms); let jitter = next_ms / JITTER_FRAC_NUM; let wait = Duration::from_millis(next_ms) @@ -553,7 +553,10 @@ async fn check_family( continue; } - let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize; + let max_concurrent = pool + .reconnect_runtime + .me_reconnect_max_concurrent_per_dc + .max(1) as usize; if *inflight.get(&key).unwrap_or(&0) >= max_concurrent { continue; } @@ -610,7 +613,7 @@ async fn check_family( break; } let res = tokio::time::timeout( - pool.me_one_timeout, + pool.reconnect_runtime.me_one_timeout, pool.connect_endpoints_round_robin(dc, &endpoints, rng.as_ref()), ) .await; @@ -641,17 +644,21 @@ async fn check_family( endpoint_count = endpoints.len(), "ME writer floor restored for DC" ); - backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64); - let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; - let wait = pool.me_reconnect_backoff_base + backoff.insert( + key, + pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64, + ); + let jitter = pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64 + / JITTER_FRAC_NUM; + let wait = pool.reconnect_runtime.me_reconnect_backoff_base + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); next_attempt.insert(key, now + wait); } else { let curr = *backoff .get(&key) - .unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64)); - let next_ms = - (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64); + .unwrap_or(&(pool.reconnect_runtime.me_reconnect_backoff_base.as_millis() as u64)); + let next_ms = (curr.saturating_mul(2)) + .min(pool.reconnect_runtime.me_reconnect_backoff_cap.as_millis() as u64); backoff.insert(key, next_ms); let jitter = next_ms / JITTER_FRAC_NUM; let wait = Duration::from_millis(next_ms) @@ -723,6 +730,7 @@ fn adaptive_floor_class_min( ) -> usize { if endpoint_count <= 1 { let min_single = (pool + .floor_runtime .me_adaptive_floor_min_writers_single_endpoint .load(std::sync::atomic::Ordering::Relaxed) as usize) .max(1); @@ -979,7 +987,7 @@ async fn maybe_swap_idle_writer_for_cap( }; let connected = match tokio::time::timeout( - pool.me_one_timeout, + pool.reconnect_runtime.me_one_timeout, pool.connect_one_for_dc(endpoint, dc, rng.as_ref()), ) .await @@ -1085,7 +1093,7 @@ async fn maybe_refresh_idle_writer_for_dc( }; let rotate_ok = match tokio::time::timeout( - pool.me_one_timeout, + pool.reconnect_runtime.me_one_timeout, pool.connect_one_for_dc(endpoint, dc, rng.as_ref()), ) .await @@ -1236,7 +1244,7 @@ async fn recover_single_endpoint_outage( pool.stats .increment_me_single_endpoint_quarantine_bypass_total(); match tokio::time::timeout( - pool.me_one_timeout, + pool.reconnect_runtime.me_one_timeout, pool.connect_one_for_dc(endpoint, key.0, rng.as_ref()), ) .await @@ -1265,7 +1273,7 @@ async fn recover_single_endpoint_outage( } else { let one_endpoint = [endpoint]; match tokio::time::timeout( - pool.me_one_timeout, + pool.reconnect_runtime.me_one_timeout, pool.connect_endpoints_round_robin(key.0, &one_endpoint, rng.as_ref()), ) .await @@ -1390,7 +1398,7 @@ async fn maybe_rotate_single_endpoint_shadow( }; let rotate_ok = match tokio::time::timeout( - pool.me_one_timeout, + pool.reconnect_runtime.me_one_timeout, pool.connect_one_for_dc(endpoint, dc, rng.as_ref()), ) .await diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 916f39c..af37a0b 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -339,21 +339,7 @@ pub(super) struct BindingPolicyCore { pub(super) me_bind_stale_ttl_secs: AtomicU64, } -#[allow(dead_code)] -pub struct MePool { - pub(super) routing: Arc, - pub(super) reinit: Arc, - pub(super) writer_lifecycle: Arc, - pub(super) route_runtime: Arc, - pub(super) health_runtime: Arc, - pub(super) drain_runtime: Arc, - pub(super) single_endpoint_runtime: Arc, - pub(super) binding_policy: Arc, - pub(super) decision: NetworkDecision, - pub(super) upstream: Option>, - pub(super) rng: Arc, - pub(super) proxy_tag: Option>, - pub(super) proxy_secret: Arc>, +pub(super) struct NatRuntimeCore { pub(super) nat_ip_cfg: Option, pub(super) nat_ip_detected: Arc>>, pub(super) nat_probe: bool, @@ -365,6 +351,13 @@ pub struct MePool { pub(super) nat_probe_attempts: std::sync::atomic::AtomicU8, pub(super) nat_probe_disabled: std::sync::atomic::AtomicBool, pub(super) stun_backoff_until: Arc>>, + pub(super) nat_reflection_cache: Arc>, + pub(super) nat_reflection_singleflight_v4: Arc>, + pub(super) nat_reflection_singleflight_v6: Arc>, +} + +pub(super) struct ReconnectRuntimeCore { + #[allow(dead_code)] pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, pub(super) me_warmup_stagger_enabled: bool, @@ -374,6 +367,9 @@ pub struct MePool { pub(super) me_reconnect_backoff_base: Duration, pub(super) me_reconnect_backoff_cap: Duration, pub(super) me_reconnect_fast_retry_count: u32, +} + +pub(super) struct FloorRuntimeCore { pub(super) me_floor_mode: AtomicU8, pub(super) me_adaptive_floor_idle_secs: AtomicU64, pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8, @@ -398,15 +394,46 @@ pub struct MePool { pub(super) me_adaptive_floor_warm_cap_effective: AtomicU64, pub(super) me_adaptive_floor_active_writers_current: AtomicU64, pub(super) me_adaptive_floor_warm_writers_current: AtomicU64, +} + +pub(super) struct WriterSelectionPolicyCore { + pub(super) secret_atomic_snapshot: AtomicBool, + pub(super) me_deterministic_writer_sort: AtomicBool, + pub(super) me_writer_pick_mode: AtomicU8, + pub(super) me_writer_pick_sample_size: AtomicU8, +} + +pub(super) struct TransportPolicyCore { + pub(super) me_socks_kdf_policy: AtomicU8, + pub(super) me_reader_route_data_wait_ms: Arc, +} + +#[allow(dead_code)] +pub struct MePool { + pub(super) routing: Arc, + pub(super) reinit: Arc, + pub(super) writer_lifecycle: Arc, + pub(super) route_runtime: Arc, + pub(super) health_runtime: Arc, + pub(super) drain_runtime: Arc, + pub(super) single_endpoint_runtime: Arc, + pub(super) binding_policy: Arc, + pub(super) nat_runtime: Arc, + pub(super) reconnect_runtime: Arc, + pub(super) floor_runtime: Arc, + pub(super) writer_selection_policy: Arc, + pub(super) transport_policy: Arc, + pub(super) decision: NetworkDecision, + pub(super) upstream: Option>, + pub(super) rng: Arc, + pub(super) proxy_tag: Option>, + pub(super) proxy_secret: Arc>, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) endpoint_dc_map: Arc>>>, pub(super) default_dc: AtomicI32, pub(super) next_writer_id: AtomicU64, pub(super) rtt_stats: Arc>>, - pub(super) nat_reflection_cache: Arc>, - pub(super) nat_reflection_singleflight_v4: Arc>, - pub(super) nat_reflection_singleflight_v6: Arc>, pub(super) refill_inflight: Arc>>, pub(super) refill_inflight_dc: Arc>>, pub(super) conn_count: AtomicUsize, @@ -414,12 +441,6 @@ pub struct MePool { pub(super) stats: Arc, pub(super) endpoint_quarantine: Arc>>, pub(super) kdf_material_fingerprint: Arc>>, - pub(super) secret_atomic_snapshot: AtomicBool, - pub(super) me_deterministic_writer_sort: AtomicBool, - pub(super) me_writer_pick_mode: AtomicU8, - pub(super) me_writer_pick_sample_size: AtomicU8, - pub(super) me_socks_kdf_policy: AtomicU8, - pub(super) me_reader_route_data_wait_ms: Arc, pub(super) runtime_ready: AtomicBool, pool_size: usize, } @@ -599,7 +620,9 @@ impl MePool { route_runtime: Arc::new(RouteRuntimeCore { me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), - me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms.max(50)), + me_route_hybrid_max_wait: Duration::from_millis( + me_route_hybrid_max_wait_ms.max(50), + ), me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 { None } else { @@ -611,10 +634,14 @@ impl MePool { me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0), me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0), me_route_inline_recovery_attempts, - me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), + me_route_inline_recovery_wait: Duration::from_millis( + me_route_inline_recovery_wait_ms, + ), }), health_runtime: Arc::new(HealthRuntimeCore { - me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), + me_health_interval_ms_unhealthy: AtomicU64::new( + me_health_interval_ms_unhealthy.max(1), + ), me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new( @@ -682,6 +709,93 @@ impl MePool { me_bind_stale_mode: AtomicU8::new(me_bind_stale_mode.as_u8()), me_bind_stale_ttl_secs: AtomicU64::new(me_bind_stale_ttl_secs), }), + nat_runtime: Arc::new(NatRuntimeCore { + nat_ip_cfg: nat_ip, + nat_ip_detected: Arc::new(RwLock::new(None)), + nat_probe, + nat_stun, + nat_stun_servers, + nat_stun_live_servers: Arc::new(RwLock::new(Vec::new())), + nat_probe_concurrency: nat_probe_concurrency.max(1), + detected_ipv6, + nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), + nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), + stun_backoff_until: Arc::new(RwLock::new(None)), + nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), + nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), + nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), + }), + reconnect_runtime: Arc::new(ReconnectRuntimeCore { + me_one_retry, + me_one_timeout: Duration::from_millis(me_one_timeout_ms), + me_warmup_stagger_enabled, + me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), + me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), + me_reconnect_max_concurrent_per_dc, + me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), + me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), + me_reconnect_fast_retry_count, + }), + floor_runtime: Arc::new(FloorRuntimeCore { + me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()), + me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs), + me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( + me_adaptive_floor_min_writers_single_endpoint, + ), + me_adaptive_floor_min_writers_multi_endpoint: AtomicU8::new( + me_adaptive_floor_min_writers_multi_endpoint, + ), + me_adaptive_floor_recover_grace_secs: AtomicU64::new( + me_adaptive_floor_recover_grace_secs, + ), + me_adaptive_floor_writers_per_core_total: AtomicU32::new( + me_adaptive_floor_writers_per_core_total as u32, + ), + me_adaptive_floor_cpu_cores_override: AtomicU32::new( + me_adaptive_floor_cpu_cores_override as u32, + ), + me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32::new( + me_adaptive_floor_max_extra_writers_single_per_core as u32, + ), + me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new( + me_adaptive_floor_max_extra_writers_multi_per_core as u32, + ), + me_adaptive_floor_max_active_writers_per_core: AtomicU32::new( + me_adaptive_floor_max_active_writers_per_core as u32, + ), + me_adaptive_floor_max_warm_writers_per_core: AtomicU32::new( + me_adaptive_floor_max_warm_writers_per_core as u32, + ), + me_adaptive_floor_max_active_writers_global: AtomicU32::new( + me_adaptive_floor_max_active_writers_global, + ), + me_adaptive_floor_max_warm_writers_global: AtomicU32::new( + me_adaptive_floor_max_warm_writers_global, + ), + me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1), + me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1), + me_adaptive_floor_global_cap_raw: AtomicU64::new(0), + me_adaptive_floor_global_cap_effective: AtomicU64::new(0), + me_adaptive_floor_target_writers_total: AtomicU64::new(0), + me_adaptive_floor_active_cap_configured: AtomicU64::new(0), + me_adaptive_floor_active_cap_effective: AtomicU64::new(0), + me_adaptive_floor_warm_cap_configured: AtomicU64::new(0), + me_adaptive_floor_warm_cap_effective: AtomicU64::new(0), + me_adaptive_floor_active_writers_current: AtomicU64::new(0), + me_adaptive_floor_warm_writers_current: AtomicU64::new(0), + }), + writer_selection_policy: Arc::new(WriterSelectionPolicyCore { + secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), + me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), + me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), + me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), + }), + transport_policy: Arc::new(TransportPolicyCore { + me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), + me_reader_route_data_wait_ms: Arc::new(AtomicU64::new( + me_reader_route_data_wait_ms, + )), + }), decision, upstream, rng, @@ -700,73 +814,7 @@ impl MePool { }, secret: proxy_secret, })), - nat_ip_cfg: nat_ip, - nat_ip_detected: Arc::new(RwLock::new(None)), - nat_probe, - nat_stun, - nat_stun_servers, - nat_stun_live_servers: Arc::new(RwLock::new(Vec::new())), - nat_probe_concurrency: nat_probe_concurrency.max(1), - detected_ipv6, - nat_probe_attempts: std::sync::atomic::AtomicU8::new(0), - nat_probe_disabled: std::sync::atomic::AtomicBool::new(false), - stun_backoff_until: Arc::new(RwLock::new(None)), - me_one_retry, - me_one_timeout: Duration::from_millis(me_one_timeout_ms), stats, - me_warmup_stagger_enabled, - me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), - me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), - me_reconnect_max_concurrent_per_dc, - me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), - me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), - me_reconnect_fast_retry_count, - me_floor_mode: AtomicU8::new(me_floor_mode.as_u8()), - me_adaptive_floor_idle_secs: AtomicU64::new(me_adaptive_floor_idle_secs), - me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( - me_adaptive_floor_min_writers_single_endpoint, - ), - me_adaptive_floor_min_writers_multi_endpoint: AtomicU8::new( - me_adaptive_floor_min_writers_multi_endpoint, - ), - me_adaptive_floor_recover_grace_secs: AtomicU64::new( - me_adaptive_floor_recover_grace_secs, - ), - me_adaptive_floor_writers_per_core_total: AtomicU32::new( - me_adaptive_floor_writers_per_core_total as u32, - ), - me_adaptive_floor_cpu_cores_override: AtomicU32::new( - me_adaptive_floor_cpu_cores_override as u32, - ), - me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32::new( - me_adaptive_floor_max_extra_writers_single_per_core as u32, - ), - me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new( - me_adaptive_floor_max_extra_writers_multi_per_core as u32, - ), - me_adaptive_floor_max_active_writers_per_core: AtomicU32::new( - me_adaptive_floor_max_active_writers_per_core as u32, - ), - me_adaptive_floor_max_warm_writers_per_core: AtomicU32::new( - me_adaptive_floor_max_warm_writers_per_core as u32, - ), - me_adaptive_floor_max_active_writers_global: AtomicU32::new( - me_adaptive_floor_max_active_writers_global, - ), - me_adaptive_floor_max_warm_writers_global: AtomicU32::new( - me_adaptive_floor_max_warm_writers_global, - ), - me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1), - me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1), - me_adaptive_floor_global_cap_raw: AtomicU64::new(0), - me_adaptive_floor_global_cap_effective: AtomicU64::new(0), - me_adaptive_floor_target_writers_total: AtomicU64::new(0), - me_adaptive_floor_active_cap_configured: AtomicU64::new(0), - me_adaptive_floor_active_cap_effective: AtomicU64::new(0), - me_adaptive_floor_warm_cap_configured: AtomicU64::new(0), - me_adaptive_floor_warm_cap_effective: AtomicU64::new(0), - me_adaptive_floor_active_writers_current: AtomicU64::new(0), - me_adaptive_floor_warm_writers_current: AtomicU64::new(0), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -774,21 +822,12 @@ impl MePool { default_dc: AtomicI32::new(default_dc.unwrap_or(2)), next_writer_id: AtomicU64::new(1), rtt_stats: Arc::new(Mutex::new(HashMap::new())), - nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), - nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), - nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), refill_inflight: Arc::new(Mutex::new(HashSet::new())), refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), conn_count: AtomicUsize::new(0), draining_active_runtime: AtomicU64::new(0), endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())), kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), - secret_atomic_snapshot: AtomicBool::new(me_secret_atomic_snapshot), - me_deterministic_writer_sort: AtomicBool::new(me_deterministic_writer_sort), - me_writer_pick_mode: AtomicU8::new(me_writer_pick_mode.as_u8()), - me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), - me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), - me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), runtime_ready: AtomicBool::new(false), }) } @@ -850,23 +889,35 @@ impl MePool { pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 { match family { - IpFamily::V4 => self.health_runtime.family_health_v4.load().state_since_epoch_secs, - IpFamily::V6 => self.health_runtime.family_health_v6.load().state_since_epoch_secs, + IpFamily::V4 => { + self.health_runtime + .family_health_v4 + .load() + .state_since_epoch_secs + } + IpFamily::V6 => { + self.health_runtime + .family_health_v6 + .load() + .state_since_epoch_secs + } } } pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 { match family { - IpFamily::V4 => self - .health_runtime - .family_health_v4 - .load() - .suppressed_until_epoch_secs, - IpFamily::V6 => self - .health_runtime - .family_health_v6 - .load() - .suppressed_until_epoch_secs, + IpFamily::V4 => { + self.health_runtime + .family_health_v4 + .load() + .suppressed_until_epoch_secs + } + IpFamily::V6 => { + self.health_runtime + .family_health_v6 + .load() + .suppressed_until_epoch_secs + } } } @@ -879,16 +930,18 @@ impl MePool { pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 { match family { - IpFamily::V4 => self - .health_runtime - .family_health_v4 - .load() - .recover_success_streak, - IpFamily::V6 => self - .health_runtime - .family_health_v6 - .load() - .recover_success_streak, + IpFamily::V4 => { + self.health_runtime + .family_health_v4 + .load() + .recover_success_streak + } + IpFamily::V6 => { + self.health_runtime + .family_health_v6 + .load() + .recover_success_streak + } } } @@ -1061,14 +1114,18 @@ impl MePool { self.binding_policy .me_bind_stale_ttl_secs .store(bind_stale_ttl_secs, Ordering::Relaxed); - self.secret_atomic_snapshot + self.writer_selection_policy + .secret_atomic_snapshot .store(secret_atomic_snapshot, Ordering::Relaxed); - self.me_deterministic_writer_sort + self.writer_selection_policy + .me_deterministic_writer_sort .store(deterministic_writer_sort, Ordering::Relaxed); let previous_writer_pick_mode = self.writer_pick_mode(); - self.me_writer_pick_mode + self.writer_selection_policy + .me_writer_pick_mode .store(writer_pick_mode.as_u8(), Ordering::Relaxed); - self.me_writer_pick_sample_size + self.writer_selection_policy + .me_writer_pick_sample_size .store(writer_pick_sample_size.clamp(2, 4), Ordering::Relaxed); if previous_writer_pick_mode != writer_pick_mode { self.stats.increment_me_writer_pick_mode_switch_total(); @@ -1092,45 +1149,62 @@ impl MePool { .me_single_endpoint_shadow_rotate_every_secs .store(single_endpoint_shadow_rotate_every_secs, Ordering::Relaxed); let previous_floor_mode = self.floor_mode(); - self.me_floor_mode + self.floor_runtime + .me_floor_mode .store(floor_mode.as_u8(), Ordering::Relaxed); - self.me_adaptive_floor_idle_secs + self.floor_runtime + .me_adaptive_floor_idle_secs .store(adaptive_floor_idle_secs, Ordering::Relaxed); - self.me_adaptive_floor_min_writers_single_endpoint.store( - adaptive_floor_min_writers_single_endpoint, - Ordering::Relaxed, - ); - self.me_adaptive_floor_min_writers_multi_endpoint + self.floor_runtime + .me_adaptive_floor_min_writers_single_endpoint + .store( + adaptive_floor_min_writers_single_endpoint, + Ordering::Relaxed, + ); + self.floor_runtime + .me_adaptive_floor_min_writers_multi_endpoint .store(adaptive_floor_min_writers_multi_endpoint, Ordering::Relaxed); - self.me_adaptive_floor_recover_grace_secs + self.floor_runtime + .me_adaptive_floor_recover_grace_secs .store(adaptive_floor_recover_grace_secs, Ordering::Relaxed); - self.me_adaptive_floor_writers_per_core_total.store( - adaptive_floor_writers_per_core_total as u32, - Ordering::Relaxed, - ); - self.me_adaptive_floor_cpu_cores_override + self.floor_runtime + .me_adaptive_floor_writers_per_core_total + .store( + adaptive_floor_writers_per_core_total as u32, + Ordering::Relaxed, + ); + self.floor_runtime + .me_adaptive_floor_cpu_cores_override .store(adaptive_floor_cpu_cores_override as u32, Ordering::Relaxed); - self.me_adaptive_floor_max_extra_writers_single_per_core + self.floor_runtime + .me_adaptive_floor_max_extra_writers_single_per_core .store( adaptive_floor_max_extra_writers_single_per_core as u32, Ordering::Relaxed, ); - self.me_adaptive_floor_max_extra_writers_multi_per_core + self.floor_runtime + .me_adaptive_floor_max_extra_writers_multi_per_core .store( adaptive_floor_max_extra_writers_multi_per_core as u32, Ordering::Relaxed, ); - self.me_adaptive_floor_max_active_writers_per_core.store( - adaptive_floor_max_active_writers_per_core as u32, - Ordering::Relaxed, - ); - self.me_adaptive_floor_max_warm_writers_per_core.store( - adaptive_floor_max_warm_writers_per_core as u32, - Ordering::Relaxed, - ); - self.me_adaptive_floor_max_active_writers_global + self.floor_runtime + .me_adaptive_floor_max_active_writers_per_core + .store( + adaptive_floor_max_active_writers_per_core as u32, + Ordering::Relaxed, + ); + self.floor_runtime + .me_adaptive_floor_max_warm_writers_per_core + .store( + adaptive_floor_max_warm_writers_per_core as u32, + Ordering::Relaxed, + ); + self.floor_runtime + .me_adaptive_floor_max_active_writers_global .store(adaptive_floor_max_active_writers_global, Ordering::Relaxed); - self.me_adaptive_floor_max_warm_writers_global + self.floor_runtime + .me_adaptive_floor_max_warm_writers_global .store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed); self.health_runtime .me_health_interval_ms_unhealthy @@ -1158,9 +1232,13 @@ impl MePool { } pub fn reset_stun_state(&self) { - self.nat_probe_attempts.store(0, Ordering::Relaxed); - self.nat_probe_disabled.store(false, Ordering::Relaxed); - if let Ok(mut live) = self.nat_stun_live_servers.try_write() { + self.nat_runtime + .nat_probe_attempts + .store(0, Ordering::Relaxed); + self.nat_runtime + .nat_probe_disabled + .store(false, Ordering::Relaxed); + if let Ok(mut live) = self.nat_runtime.nat_stun_live_servers.try_write() { live.clear(); } } @@ -1182,9 +1260,11 @@ impl MePool { route_backpressure_high_watermark_pct: u8, reader_route_data_wait_ms: u64, ) { - self.me_socks_kdf_policy + self.transport_policy + .me_socks_kdf_policy .store(socks_kdf_policy.as_u8(), Ordering::Relaxed); - self.me_reader_route_data_wait_ms + self.transport_policy + .me_reader_route_data_wait_ms .store(reader_route_data_wait_ms, Ordering::Relaxed); self.registry.update_route_backpressure_policy( route_backpressure_base_timeout_ms, @@ -1194,7 +1274,11 @@ impl MePool { } pub(super) fn socks_kdf_policy(&self) -> MeSocksKdfPolicy { - MeSocksKdfPolicy::from_u8(self.me_socks_kdf_policy.load(Ordering::Relaxed)) + MeSocksKdfPolicy::from_u8( + self.transport_policy + .me_socks_kdf_policy + .load(Ordering::Relaxed), + ) } pub(super) fn writers_arc(&self) -> Arc { @@ -1316,11 +1400,16 @@ impl MePool { } pub(super) fn writer_pick_mode(&self) -> MeWriterPickMode { - MeWriterPickMode::from_u8(self.me_writer_pick_mode.load(Ordering::Relaxed)) + MeWriterPickMode::from_u8( + self.writer_selection_policy + .me_writer_pick_mode + .load(Ordering::Relaxed), + ) } pub(super) fn writer_pick_sample_size(&self) -> usize { - self.me_writer_pick_sample_size + self.writer_selection_policy + .me_writer_pick_sample_size .load(Ordering::Relaxed) .clamp(2, 4) as usize } @@ -1340,39 +1429,48 @@ impl MePool { } pub(super) fn floor_mode(&self) -> MeFloorMode { - MeFloorMode::from_u8(self.me_floor_mode.load(Ordering::Relaxed)) + MeFloorMode::from_u8(self.floor_runtime.me_floor_mode.load(Ordering::Relaxed)) } pub(super) fn adaptive_floor_idle_duration(&self) -> Duration { - Duration::from_secs(self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed)) + Duration::from_secs( + self.floor_runtime + .me_adaptive_floor_idle_secs + .load(Ordering::Relaxed), + ) } pub(super) fn adaptive_floor_recover_grace_duration(&self) -> Duration { Duration::from_secs( - self.me_adaptive_floor_recover_grace_secs + self.floor_runtime + .me_adaptive_floor_recover_grace_secs .load(Ordering::Relaxed), ) } pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { (self + .floor_runtime .me_adaptive_floor_min_writers_multi_endpoint .load(Ordering::Relaxed) as usize) .max(1) } pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize { - self.me_adaptive_floor_max_extra_writers_single_per_core + self.floor_runtime + .me_adaptive_floor_max_extra_writers_single_per_core .load(Ordering::Relaxed) as usize } pub(super) fn adaptive_floor_max_extra_multi_per_core(&self) -> usize { - self.me_adaptive_floor_max_extra_writers_multi_per_core + self.floor_runtime + .me_adaptive_floor_max_extra_writers_multi_per_core .load(Ordering::Relaxed) as usize } pub(super) fn adaptive_floor_max_active_writers_per_core(&self) -> usize { (self + .floor_runtime .me_adaptive_floor_max_active_writers_per_core .load(Ordering::Relaxed) as usize) .max(1) @@ -1380,6 +1478,7 @@ impl MePool { pub(super) fn adaptive_floor_max_warm_writers_per_core(&self) -> usize { (self + .floor_runtime .me_adaptive_floor_max_warm_writers_per_core .load(Ordering::Relaxed) as usize) .max(1) @@ -1387,6 +1486,7 @@ impl MePool { pub(super) fn adaptive_floor_max_active_writers_global(&self) -> usize { (self + .floor_runtime .me_adaptive_floor_max_active_writers_global .load(Ordering::Relaxed) as usize) .max(1) @@ -1394,6 +1494,7 @@ impl MePool { pub(super) fn adaptive_floor_max_warm_writers_global(&self) -> usize { (self + .floor_runtime .me_adaptive_floor_max_warm_writers_global .load(Ordering::Relaxed) as usize) .max(1) @@ -1409,6 +1510,7 @@ impl MePool { pub(super) fn adaptive_floor_effective_cpu_cores(&self) -> usize { let detected = self.adaptive_floor_detected_cpu_cores(); let override_cores = self + .floor_runtime .me_adaptive_floor_cpu_cores_override .load(Ordering::Relaxed) as usize; let effective = if override_cores == 0 { @@ -1416,9 +1518,11 @@ impl MePool { } else { override_cores.max(1) }; - self.me_adaptive_floor_cpu_cores_detected + self.floor_runtime + .me_adaptive_floor_cpu_cores_detected .store(detected as u32, Ordering::Relaxed); - self.me_adaptive_floor_cpu_cores_effective + self.floor_runtime + .me_adaptive_floor_cpu_cores_effective .store(effective as u32, Ordering::Relaxed); self.stats .set_me_floor_cpu_cores_detected_gauge(detected as u64); @@ -1450,7 +1554,8 @@ impl MePool { .min(self.adaptive_floor_max_active_writers_global()) .min(per_contour_budget) .max(1); - self.me_adaptive_floor_active_cap_configured + self.floor_runtime + .me_adaptive_floor_active_cap_configured .store(configured as u64, Ordering::Relaxed); self.stats .set_me_floor_active_cap_configured_gauge(configured as u64); @@ -1465,7 +1570,8 @@ impl MePool { .min(self.adaptive_floor_max_warm_writers_global()) .min(per_contour_budget) .max(1); - self.me_adaptive_floor_warm_cap_configured + self.floor_runtime + .me_adaptive_floor_warm_cap_configured .store(configured as u64, Ordering::Relaxed); self.stats .set_me_floor_warm_cap_configured_gauge(configured as u64); @@ -1482,23 +1588,32 @@ impl MePool { active_writers_current: usize, warm_writers_current: usize, ) { - self.me_adaptive_floor_global_cap_raw + self.floor_runtime + .me_adaptive_floor_global_cap_raw .store(active_cap_configured as u64, Ordering::Relaxed); - self.me_adaptive_floor_global_cap_effective + self.floor_runtime + .me_adaptive_floor_global_cap_effective .store(active_cap_effective as u64, Ordering::Relaxed); - self.me_adaptive_floor_target_writers_total + self.floor_runtime + .me_adaptive_floor_target_writers_total .store(target_writers_total as u64, Ordering::Relaxed); - self.me_adaptive_floor_active_cap_configured + self.floor_runtime + .me_adaptive_floor_active_cap_configured .store(active_cap_configured as u64, Ordering::Relaxed); - self.me_adaptive_floor_active_cap_effective + self.floor_runtime + .me_adaptive_floor_active_cap_effective .store(active_cap_effective as u64, Ordering::Relaxed); - self.me_adaptive_floor_warm_cap_configured + self.floor_runtime + .me_adaptive_floor_warm_cap_configured .store(warm_cap_configured as u64, Ordering::Relaxed); - self.me_adaptive_floor_warm_cap_effective + self.floor_runtime + .me_adaptive_floor_warm_cap_effective .store(warm_cap_effective as u64, Ordering::Relaxed); - self.me_adaptive_floor_active_writers_current + self.floor_runtime + .me_adaptive_floor_active_writers_current .store(active_writers_current as u64, Ordering::Relaxed); - self.me_adaptive_floor_warm_writers_current + self.floor_runtime + .me_adaptive_floor_warm_writers_current .store(warm_writers_current as u64, Ordering::Relaxed); self.stats .set_me_floor_global_cap_raw_gauge(active_cap_configured as u64); @@ -1587,11 +1702,13 @@ impl MePool { } let min_writers = if endpoint_count == 1 { (self + .floor_runtime .me_adaptive_floor_min_writers_single_endpoint .load(Ordering::Relaxed) as usize) .max(1) } else { (self + .floor_runtime .me_adaptive_floor_min_writers_multi_endpoint .load(Ordering::Relaxed) as usize) .max(1) diff --git a/src/transport/middle_proxy/pool_init.rs b/src/transport/middle_proxy/pool_init.rs index 2e3bc1d..3f7cad7 100644 --- a/src/transport/middle_proxy/pool_init.rs +++ b/src/transport/middle_proxy/pool_init.rs @@ -14,7 +14,10 @@ use super::pool::MePool; impl MePool { pub async fn init(self: &Arc, pool_size: usize, rng: &Arc) -> Result<()> { let family_order = self.family_order(); - let connect_concurrency = self.me_reconnect_max_concurrent_per_dc.max(1) as usize; + let connect_concurrency = self + .reconnect_runtime + .me_reconnect_max_concurrent_per_dc + .max(1) as usize; let ks = self.key_selector().await; info!( me_servers = self.proxy_map_v4.read().await.len(), @@ -250,10 +253,12 @@ impl MePool { return false; } - if self.me_warmup_stagger_enabled { - let jitter = - rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64); - let delay_ms = self.me_warmup_step_delay.as_millis() as u64 + jitter; + if self.reconnect_runtime.me_warmup_stagger_enabled { + let jitter = rand::rng().random_range( + 0..=self.reconnect_runtime.me_warmup_step_jitter.as_millis() as u64, + ); + let delay_ms = + self.reconnect_runtime.me_warmup_step_delay.as_millis() as u64 + jitter; tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } } diff --git a/src/transport/middle_proxy/pool_nat.rs b/src/transport/middle_proxy/pool_nat.rs index f382fd4..be2d9df 100644 --- a/src/transport/middle_proxy/pool_nat.rs +++ b/src/transport/middle_proxy/pool_nat.rs @@ -42,10 +42,10 @@ pub async fn detect_public_ip() -> Option { impl MePool { fn configured_stun_servers(&self) -> Vec { - if !self.nat_stun_servers.is_empty() { - return self.nat_stun_servers.clone(); + if !self.nat_runtime.nat_stun_servers.is_empty() { + return self.nat_runtime.nat_stun_servers.clone(); } - if let Some(s) = &self.nat_stun + if let Some(s) = &self.nat_runtime.nat_stun && !s.trim().is_empty() { return vec![s.clone()]; @@ -64,7 +64,7 @@ impl MePool { let mut next_idx = 0usize; let mut live_servers = Vec::new(); let mut best_by_ip: HashMap = HashMap::new(); - let concurrency = self.nat_probe_concurrency.max(1); + let concurrency = self.nat_runtime.nat_probe_concurrency.max(1); while next_idx < servers.len() || !join_set.is_empty() { while next_idx < servers.len() && join_set.len() < concurrency { @@ -137,9 +137,13 @@ impl MePool { } pub(super) fn translate_ip_for_nat(&self, ip: IpAddr) -> IpAddr { - let nat_ip = self - .nat_ip_cfg - .or_else(|| self.nat_ip_detected.try_read().ok().and_then(|g| *g)); + let nat_ip = self.nat_runtime.nat_ip_cfg.or_else(|| { + self.nat_runtime + .nat_ip_detected + .try_read() + .ok() + .and_then(|g| *g) + }); let Some(nat_ip) = nat_ip else { return ip; @@ -163,7 +167,7 @@ impl MePool { addr: std::net::SocketAddr, reflected: Option, ) -> std::net::SocketAddr { - let ip = if let Some(nat_ip) = self.nat_ip_cfg { + let ip = if let Some(nat_ip) = self.nat_runtime.nat_ip_cfg { match (addr.ip(), nat_ip) { (IpAddr::V4(_), IpAddr::V4(dst)) => IpAddr::V4(dst), (IpAddr::V6(_), IpAddr::V6(dst)) => IpAddr::V6(dst), @@ -185,22 +189,22 @@ impl MePool { } pub(super) async fn maybe_detect_nat_ip(&self, local_ip: IpAddr) -> Option { - if self.nat_ip_cfg.is_some() { - return self.nat_ip_cfg; + if self.nat_runtime.nat_ip_cfg.is_some() { + return self.nat_runtime.nat_ip_cfg; } if !(is_bogon(local_ip) || local_ip.is_loopback() || local_ip.is_unspecified()) { return None; } - if let Some(ip) = *self.nat_ip_detected.read().await { + if let Some(ip) = *self.nat_runtime.nat_ip_detected.read().await { return Some(ip); } match fetch_public_ipv4_with_retry().await { Ok(Some(ip)) => { { - let mut guard = self.nat_ip_detected.write().await; + let mut guard = self.nat_runtime.nat_ip_detected.write().await; *guard = Some(IpAddr::V4(ip)); } info!(public_ip = %ip, "Auto-detected public IP for NAT translation"); @@ -231,10 +235,10 @@ impl MePool { } // Backoff window if use_shared_cache - && let Some(until) = *self.stun_backoff_until.read().await + && let Some(until) = *self.nat_runtime.stun_backoff_until.read().await && Instant::now() < until { - if let Ok(cache) = self.nat_reflection_cache.try_lock() { + if let Ok(cache) = self.nat_runtime.nat_reflection_cache.try_lock() { let slot = match family { IpFamily::V4 => cache.v4, IpFamily::V6 => cache.v6, @@ -244,7 +248,8 @@ impl MePool { return None; } - if use_shared_cache && let Ok(mut cache) = self.nat_reflection_cache.try_lock() { + if use_shared_cache && let Ok(mut cache) = self.nat_runtime.nat_reflection_cache.try_lock() + { let slot = match family { IpFamily::V4 => &mut cache.v4, IpFamily::V6 => &mut cache.v6, @@ -258,18 +263,18 @@ impl MePool { let _singleflight_guard = if use_shared_cache { Some(match family { - IpFamily::V4 => self.nat_reflection_singleflight_v4.lock().await, - IpFamily::V6 => self.nat_reflection_singleflight_v6.lock().await, + IpFamily::V4 => self.nat_runtime.nat_reflection_singleflight_v4.lock().await, + IpFamily::V6 => self.nat_runtime.nat_reflection_singleflight_v6.lock().await, }) } else { None }; if use_shared_cache - && let Some(until) = *self.stun_backoff_until.read().await + && let Some(until) = *self.nat_runtime.stun_backoff_until.read().await && Instant::now() < until { - if let Ok(cache) = self.nat_reflection_cache.try_lock() { + if let Ok(cache) = self.nat_runtime.nat_reflection_cache.try_lock() { let slot = match family { IpFamily::V4 => cache.v4, IpFamily::V6 => cache.v6, @@ -279,7 +284,8 @@ impl MePool { return None; } - if use_shared_cache && let Ok(mut cache) = self.nat_reflection_cache.try_lock() { + if use_shared_cache && let Ok(mut cache) = self.nat_runtime.nat_reflection_cache.try_lock() + { let slot = match family { IpFamily::V4 => &mut cache.v4, IpFamily::V6 => &mut cache.v6, @@ -292,13 +298,14 @@ impl MePool { } let attempt = if use_shared_cache { - self.nat_probe_attempts + self.nat_runtime + .nat_probe_attempts .fetch_add(1, std::sync::atomic::Ordering::Relaxed) } else { 0 }; let configured_servers = self.configured_stun_servers(); - let live_snapshot = self.nat_stun_live_servers.read().await.clone(); + let live_snapshot = self.nat_runtime.nat_stun_live_servers.read().await.clone(); let primary_servers = if live_snapshot.is_empty() { configured_servers.clone() } else { @@ -322,14 +329,15 @@ impl MePool { let live_server_count = live_servers.len(); if !live_servers.is_empty() { - *self.nat_stun_live_servers.write().await = live_servers; + *self.nat_runtime.nat_stun_live_servers.write().await = live_servers; } else { - self.nat_stun_live_servers.write().await.clear(); + self.nat_runtime.nat_stun_live_servers.write().await.clear(); } if let Some(reflected_addr) = selected_reflected { if use_shared_cache { - self.nat_probe_attempts + self.nat_runtime + .nat_probe_attempts .store(0, std::sync::atomic::Ordering::Relaxed); } info!( @@ -338,7 +346,9 @@ impl MePool { "STUN-Quorum reached, IP: {}", reflected_addr.ip() ); - if use_shared_cache && let Ok(mut cache) = self.nat_reflection_cache.try_lock() { + if use_shared_cache + && let Ok(mut cache) = self.nat_runtime.nat_reflection_cache.try_lock() + { let slot = match family { IpFamily::V4 => &mut cache.v4, IpFamily::V6 => &mut cache.v6, @@ -350,7 +360,7 @@ impl MePool { if use_shared_cache { let backoff = Duration::from_secs(60 * 2u64.pow((attempt as u32).min(6))); - *self.stun_backoff_until.write().await = Some(Instant::now() + backoff); + *self.nat_runtime.stun_backoff_until.write().await = Some(Instant::now() + backoff); } None } diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index fc5c996..ddeb3e3 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -219,7 +219,7 @@ impl MePool { } async fn refill_writer_after_loss(self: &Arc, addr: SocketAddr, writer_dc: i32) -> bool { - let fast_retries = self.me_reconnect_fast_retry_count.max(1); + let fast_retries = self.reconnect_runtime.me_reconnect_fast_retry_count.max(1); let mut total_attempts = 0u32; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; diff --git a/src/transport/middle_proxy/pool_reinit.rs b/src/transport/middle_proxy/pool_reinit.rs index 009f850..db6411c 100644 --- a/src/transport/middle_proxy/pool_reinit.rs +++ b/src/transport/middle_proxy/pool_reinit.rs @@ -37,7 +37,9 @@ impl MePool { } fn clear_pending_hardswap_state(&self) { - self.reinit.pending_hardswap_generation.store(0, Ordering::Relaxed); + self.reinit + .pending_hardswap_generation + .store(0, Ordering::Relaxed); self.reinit .pending_hardswap_started_at_epoch_secs .store(0, Ordering::Relaxed); @@ -213,7 +215,8 @@ impl MePool { .reinit .me_hardswap_warmup_pass_backoff_base_ms .load(Ordering::Relaxed); - let cap_ms = (self.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms); + let cap_ms = + (self.reconnect_runtime.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms); let shift = (pass_idx as u32).min(20); let scaled = base_ms.saturating_mul(1u64 << shift); let core = scaled.min(cap_ms); @@ -392,7 +395,10 @@ impl MePool { .reinit .pending_hardswap_started_at_epoch_secs .load(Ordering::Relaxed); - let pending_map_hash = self.reinit.pending_hardswap_map_hash.load(Ordering::Relaxed); + let pending_map_hash = self + .reinit + .pending_hardswap_map_hash + .load(Ordering::Relaxed); let pending_age_secs = now_epoch_secs.saturating_sub(pending_started_at); let pending_ttl_expired = pending_started_at > 0 && pending_age_secs > ME_HARDSWAP_PENDING_TTL_SECS; @@ -443,7 +449,9 @@ impl MePool { }; if hardswap { - self.reinit.warm_generation.store(generation, Ordering::Relaxed); + self.reinit + .warm_generation + .store(generation, Ordering::Relaxed); self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc) .await; } else { diff --git a/src/transport/middle_proxy/pool_runtime_api.rs b/src/transport/middle_proxy/pool_runtime_api.rs index 7c15216..539f397 100644 --- a/src/transport/middle_proxy/pool_runtime_api.rs +++ b/src/transport/middle_proxy/pool_runtime_api.rs @@ -94,9 +94,9 @@ impl MePool { pub(crate) async fn api_nat_stun_snapshot(&self) -> MeApiNatStunSnapshot { let now = Instant::now(); - let mut configured_servers = if !self.nat_stun_servers.is_empty() { - self.nat_stun_servers.clone() - } else if let Some(stun) = &self.nat_stun { + let mut configured_servers = if !self.nat_runtime.nat_stun_servers.is_empty() { + self.nat_runtime.nat_stun_servers.clone() + } else if let Some(stun) = &self.nat_runtime.nat_stun { if stun.trim().is_empty() { Vec::new() } else { @@ -108,11 +108,11 @@ impl MePool { configured_servers.sort(); configured_servers.dedup(); - let mut live_servers = self.nat_stun_live_servers.read().await.clone(); + let mut live_servers = self.nat_runtime.nat_stun_live_servers.read().await.clone(); live_servers.sort(); live_servers.dedup(); - let reflection = self.nat_reflection_cache.lock().await; + let reflection = self.nat_runtime.nat_reflection_cache.lock().await; let reflection_v4 = reflection.v4.map(|(ts, addr)| MeApiNatReflectionSnapshot { addr, age_secs: now.saturating_duration_since(ts).as_secs(), @@ -123,17 +123,19 @@ impl MePool { }); drop(reflection); - let backoff_until = *self.stun_backoff_until.read().await; + let backoff_until = *self.nat_runtime.stun_backoff_until.read().await; let stun_backoff_remaining_ms = backoff_until.and_then(|until| { (until > now).then_some(until.duration_since(now).as_millis() as u64) }); MeApiNatStunSnapshot { - nat_probe_enabled: self.nat_probe, + nat_probe_enabled: self.nat_runtime.nat_probe, nat_probe_disabled_runtime: self + .nat_runtime .nat_probe_disabled .load(std::sync::atomic::Ordering::Relaxed), nat_probe_attempts: self + .nat_runtime .nat_probe_attempts .load(std::sync::atomic::Ordering::Relaxed), configured_servers, diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 6cd5cec..ae9038b 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -339,6 +339,7 @@ impl MePool { let mut fresh_alive_writers = 0usize; let floor_mode = self.floor_mode(); let adaptive_cpu_cores = (self + .floor_runtime .me_adaptive_floor_cpu_cores_effective .load(Ordering::Relaxed) as usize) .max(1); @@ -353,22 +354,26 @@ impl MePool { self.required_writers_for_dc_with_floor_mode(endpoint_count, false); let floor_min = if endpoint_count <= 1 { (self + .floor_runtime .me_adaptive_floor_min_writers_single_endpoint .load(Ordering::Relaxed) as usize) .max(1) .min(base_required.max(1)) } else { (self + .floor_runtime .me_adaptive_floor_min_writers_multi_endpoint .load(Ordering::Relaxed) as usize) .max(1) .min(base_required.max(1)) }; let extra_per_core = if endpoint_count <= 1 { - self.me_adaptive_floor_max_extra_writers_single_per_core + self.floor_runtime + .me_adaptive_floor_max_extra_writers_single_per_core .load(Ordering::Relaxed) as usize } else { - self.me_adaptive_floor_max_extra_writers_multi_per_core + self.floor_runtime + .me_adaptive_floor_max_extra_writers_multi_per_core .load(Ordering::Relaxed) as usize }; let floor_max = @@ -490,75 +495,100 @@ impl MePool { pending_hardswap_age_secs, hardswap_enabled: self.reinit.hardswap.load(Ordering::Relaxed), floor_mode: floor_mode_label(self.floor_mode()), - adaptive_floor_idle_secs: self.me_adaptive_floor_idle_secs.load(Ordering::Relaxed), + adaptive_floor_idle_secs: self + .floor_runtime + .me_adaptive_floor_idle_secs + .load(Ordering::Relaxed), adaptive_floor_min_writers_single_endpoint: self + .floor_runtime .me_adaptive_floor_min_writers_single_endpoint .load(Ordering::Relaxed), adaptive_floor_min_writers_multi_endpoint: self + .floor_runtime .me_adaptive_floor_min_writers_multi_endpoint .load(Ordering::Relaxed), adaptive_floor_recover_grace_secs: self + .floor_runtime .me_adaptive_floor_recover_grace_secs .load(Ordering::Relaxed), adaptive_floor_writers_per_core_total: self + .floor_runtime .me_adaptive_floor_writers_per_core_total .load(Ordering::Relaxed) as u16, adaptive_floor_cpu_cores_override: self + .floor_runtime .me_adaptive_floor_cpu_cores_override .load(Ordering::Relaxed) as u16, adaptive_floor_max_extra_writers_single_per_core: self + .floor_runtime .me_adaptive_floor_max_extra_writers_single_per_core .load(Ordering::Relaxed) as u16, adaptive_floor_max_extra_writers_multi_per_core: self + .floor_runtime .me_adaptive_floor_max_extra_writers_multi_per_core .load(Ordering::Relaxed) as u16, adaptive_floor_max_active_writers_per_core: self + .floor_runtime .me_adaptive_floor_max_active_writers_per_core .load(Ordering::Relaxed) as u16, adaptive_floor_max_warm_writers_per_core: self + .floor_runtime .me_adaptive_floor_max_warm_writers_per_core .load(Ordering::Relaxed) as u16, adaptive_floor_max_active_writers_global: self + .floor_runtime .me_adaptive_floor_max_active_writers_global .load(Ordering::Relaxed), adaptive_floor_max_warm_writers_global: self + .floor_runtime .me_adaptive_floor_max_warm_writers_global .load(Ordering::Relaxed), adaptive_floor_cpu_cores_detected: self + .floor_runtime .me_adaptive_floor_cpu_cores_detected .load(Ordering::Relaxed), adaptive_floor_cpu_cores_effective: self + .floor_runtime .me_adaptive_floor_cpu_cores_effective .load(Ordering::Relaxed), adaptive_floor_global_cap_raw: self + .floor_runtime .me_adaptive_floor_global_cap_raw .load(Ordering::Relaxed), adaptive_floor_global_cap_effective: self + .floor_runtime .me_adaptive_floor_global_cap_effective .load(Ordering::Relaxed), adaptive_floor_target_writers_total: self + .floor_runtime .me_adaptive_floor_target_writers_total .load(Ordering::Relaxed), adaptive_floor_active_cap_configured: self + .floor_runtime .me_adaptive_floor_active_cap_configured .load(Ordering::Relaxed), adaptive_floor_active_cap_effective: self + .floor_runtime .me_adaptive_floor_active_cap_effective .load(Ordering::Relaxed), adaptive_floor_warm_cap_configured: self + .floor_runtime .me_adaptive_floor_warm_cap_configured .load(Ordering::Relaxed), adaptive_floor_warm_cap_effective: self + .floor_runtime .me_adaptive_floor_warm_cap_effective .load(Ordering::Relaxed), adaptive_floor_active_writers_current: self + .floor_runtime .me_adaptive_floor_active_writers_current .load(Ordering::Relaxed), adaptive_floor_warm_writers_current: self + .floor_runtime .me_adaptive_floor_warm_writers_current .load(Ordering::Relaxed), me_keepalive_enabled: self.writer_lifecycle.me_keepalive_enabled, @@ -569,10 +599,16 @@ impl MePool { .writer_lifecycle .rpc_proxy_req_every_secs .load(Ordering::Relaxed), - me_reconnect_max_concurrent_per_dc: self.me_reconnect_max_concurrent_per_dc, - me_reconnect_backoff_base_ms: self.me_reconnect_backoff_base.as_millis() as u64, - me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, - me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, + me_reconnect_max_concurrent_per_dc: self + .reconnect_runtime + .me_reconnect_max_concurrent_per_dc, + me_reconnect_backoff_base_ms: self + .reconnect_runtime + .me_reconnect_backoff_base + .as_millis() as u64, + me_reconnect_backoff_cap_ms: self.reconnect_runtime.me_reconnect_backoff_cap.as_millis() + as u64, + me_reconnect_fast_retry_count: self.reconnect_runtime.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: self .drain_runtime .me_pool_drain_ttl_secs @@ -615,7 +651,10 @@ impl MePool { .single_endpoint_runtime .me_single_endpoint_shadow_rotate_every_secs .load(Ordering::Relaxed), - me_deterministic_writer_sort: self.me_deterministic_writer_sort.load(Ordering::Relaxed), + me_deterministic_writer_sort: self + .writer_selection_policy + .me_deterministic_writer_sort + .load(Ordering::Relaxed), me_writer_pick_mode: writer_pick_mode_label(self.writer_pick_mode()), me_writer_pick_sample_size: self.writer_pick_sample_size() as u8, me_socks_kdf_policy: socks_kdf_policy_label(self.socks_kdf_policy()), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index c1f3de9..75f2d65 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -1,6 +1,6 @@ +use std::collections::HashMap; use std::io::ErrorKind; use std::net::SocketAddr; -use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; @@ -97,7 +97,8 @@ async fn ping_loop( let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) } else { - let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); + let jitter = + rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; Duration::from_secs(wait) }; @@ -116,9 +117,11 @@ async fn ping_loop( } let jitter_cap_ms = interval.as_millis() / 2; let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); - interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) + interval + + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) } else { - let jitter = rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); + let jitter = + rand::rng().random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let secs = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; Duration::from_secs(secs) }; @@ -193,7 +196,8 @@ async fn rpc_proxy_req_signal_loop( .as_millis() .min(jitter_cap_ms) .max(1); - interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) + interval + + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64)) }; tokio::select! { @@ -365,9 +369,8 @@ impl MePool { let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0)); let allow_drain_fallback = Arc::new(AtomicBool::new(false)); - let (tx, rx) = mpsc::channel::( - self.writer_lifecycle.writer_cmd_channel_capacity, - ); + let (tx, rx) = + mpsc::channel::(self.writer_lifecycle.writer_cmd_channel_capacity); let rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, @@ -430,7 +433,7 @@ impl MePool { let cancel_signal = cancel.clone(); let cancel_select = cancel.clone(); let cancel_cleanup = cancel.clone(); - let reader_route_data_wait_ms = self.me_reader_route_data_wait_ms.clone(); + let reader_route_data_wait_ms = self.transport_policy.me_reader_route_data_wait_ms.clone(); tokio::spawn(async move { // Reader MUST be the first branch in biased select! to avoid read starvation. diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 8277e7f..6d830a1 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -162,7 +162,8 @@ impl ConnRegistry { inner.routing.map.remove(&id); inner.binding.meta.remove(&id); if let Some(writer_id) = inner.binding.writer_for_conn.remove(&id) { - let became_empty = if let Some(set) = inner.binding.conns_for_writer.get_mut(&writer_id) { + let became_empty = if let Some(set) = inner.binding.conns_for_writer.get_mut(&writer_id) + { set.remove(&id); set.is_empty() } else { @@ -337,7 +338,10 @@ impl ConnRegistry { inner.binding.meta.insert(conn_id, meta.clone()); inner.binding.last_meta_for_writer.insert(writer_id, meta); - inner.binding.writer_idle_since_epoch_secs.remove(&writer_id); + inner + .binding + .writer_idle_since_epoch_secs + .remove(&writer_id); inner .binding .conns_for_writer @@ -375,7 +379,12 @@ impl ConnRegistry { let inner = self.inner.read().await; let mut out = HashMap::::with_capacity(writer_ids.len()); for writer_id in writer_ids { - if let Some(idle_since) = inner.binding.writer_idle_since_epoch_secs.get(writer_id).copied() { + if let Some(idle_since) = inner + .binding + .writer_idle_since_epoch_secs + .get(writer_id) + .copied() + { out.insert(*writer_id, idle_since); } } @@ -456,7 +465,10 @@ impl ConnRegistry { let mut inner = self.inner.write().await; inner.binding.writers.remove(&writer_id); inner.binding.last_meta_for_writer.remove(&writer_id); - inner.binding.writer_idle_since_epoch_secs.remove(&writer_id); + inner + .binding + .writer_idle_since_epoch_secs + .remove(&writer_id); let conns = inner .binding .conns_for_writer @@ -510,7 +522,10 @@ impl ConnRegistry { inner.binding.writers.remove(&writer_id); inner.binding.last_meta_for_writer.remove(&writer_id); - inner.binding.writer_idle_since_epoch_secs.remove(&writer_id); + inner + .binding + .writer_idle_since_epoch_secs + .remove(&writer_id); inner.binding.conns_for_writer.remove(&writer_id); true } diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index faec2ec..9a5c828 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -153,7 +153,8 @@ impl MePool { MeRouteNoWriterMode::InlineRecoveryLegacy => { self.stats.increment_me_inline_recovery_total(); if !unknown_target_dc { - for _ in 0..self.route_runtime.me_route_inline_recovery_attempts.max(1) + for _ in + 0..self.route_runtime.me_route_inline_recovery_attempts.max(1) { for family in self.family_order() { let map = match family { @@ -319,8 +320,9 @@ impl MePool { } } MeRouteNoWriterMode::HybridAsyncPersistent => { - let total_deadline = *hybrid_total_deadline - .get_or_insert_with(|| Instant::now() + self.hybrid_total_wait_budget()); + let total_deadline = *hybrid_total_deadline.get_or_insert_with(|| { + Instant::now() + self.hybrid_total_wait_budget() + }); if Instant::now() >= total_deadline { self.on_hybrid_timeout(total_deadline, routed_dc); return Err(ProxyError::Proxy( @@ -368,7 +370,11 @@ impl MePool { pick_sample_size, ) } else { - if self.me_deterministic_writer_sort.load(Ordering::Relaxed) { + if self + .writer_selection_policy + .me_deterministic_writer_sort + .load(Ordering::Relaxed) + { candidate_indices.sort_by(|lhs, rhs| { let left = &writers_snapshot[*lhs]; let right = &writers_snapshot[*rhs]; @@ -490,18 +496,18 @@ impl MePool { .increment_me_writer_pick_blocking_fallback_total(); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let (payload, meta) = build_routed_payload(effective_our_addr); - let reserve_result = if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout - { - match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await { - Ok(result) => result, - Err(_) => { - self.stats.increment_me_writer_pick_full_total(pick_mode); - continue; + let reserve_result = + if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout { + match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await { + Ok(result) => result, + Err(_) => { + self.stats.increment_me_writer_pick_full_total(pick_mode); + continue; + } } - } - } else { - w.tx.clone().reserve_owned().await - }; + } else { + w.tx.clone().reserve_owned().await + }; match reserve_result { Ok(permit) => { if !self.registry.bind_writer(conn_id, w.id, meta).await { @@ -637,8 +643,7 @@ impl MePool { hybrid_last_recovery_at: &mut Option, hybrid_wait_step: Duration, ) { - if !self.try_consume_hybrid_recovery_trigger_slot(HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS) - { + if !self.try_consume_hybrid_recovery_trigger_slot(HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS) { return; } if let Some(last) = *hybrid_last_recovery_at @@ -691,12 +696,8 @@ impl MePool { match self .route_runtime .me_route_hybrid_timeout_warn_epoch_ms - .compare_exchange_weak( - last_warn_ms, - now_ms, - Ordering::AcqRel, - Ordering::Relaxed, - ) { + .compare_exchange_weak(last_warn_ms, now_ms, Ordering::AcqRel, Ordering::Relaxed) + { Ok(_) => { warn!( routed_dc, @@ -724,12 +725,8 @@ impl MePool { match self .route_runtime .me_async_recovery_last_trigger_epoch_ms - .compare_exchange_weak( - last_trigger_ms, - now_ms, - Ordering::AcqRel, - Ordering::Relaxed, - ) { + .compare_exchange_weak(last_trigger_ms, now_ms, Ordering::AcqRel, Ordering::Relaxed) + { Ok(_) => return true, Err(actual) => last_trigger_ms = actual, }