From 97f66495848a4a6bc85259a3ab05c0409855c182 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:56:25 +0300 Subject: [PATCH] ME Route Runtime Core Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/transport/middle_proxy/pool.rs | 54 +++++++++++++++------------- src/transport/middle_proxy/send.rs | 58 +++++++++++++++++++++--------- 2 files changed, 72 insertions(+), 40 deletions(-) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index e617a43..d4fa0aa 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -284,11 +284,24 @@ pub(super) struct WriterLifecycleCore { pub(super) writer_cmd_channel_capacity: usize, } +pub(super) struct RouteRuntimeCore { + pub(super) me_route_no_writer_mode: AtomicU8, + pub(super) me_route_no_writer_wait: Duration, + pub(super) me_route_hybrid_max_wait: Duration, + pub(super) me_route_blocking_send_timeout: Option, + pub(super) me_route_last_success_epoch_ms: AtomicU64, + pub(super) me_route_hybrid_timeout_warn_epoch_ms: AtomicU64, + pub(super) me_async_recovery_last_trigger_epoch_ms: AtomicU64, + pub(super) me_route_inline_recovery_attempts: u32, + pub(super) me_route_inline_recovery_wait: Duration, +} + #[allow(dead_code)] pub struct MePool { pub(super) routing: Arc, pub(super) reinit: Arc, pub(super) writer_lifecycle: Arc, + pub(super) route_runtime: Arc, pub(super) decision: NetworkDecision, pub(super) upstream: Option>, pub(super) rng: Arc, @@ -382,15 +395,6 @@ pub struct MePool { pub(super) me_writer_pick_sample_size: AtomicU8, pub(super) me_socks_kdf_policy: AtomicU8, pub(super) me_reader_route_data_wait_ms: Arc, - pub(super) me_route_no_writer_mode: AtomicU8, - pub(super) me_route_no_writer_wait: Duration, - pub(super) me_route_hybrid_max_wait: Duration, - pub(super) me_route_blocking_send_timeout: Option, - pub(super) me_route_last_success_epoch_ms: AtomicU64, - pub(super) me_route_hybrid_timeout_warn_epoch_ms: AtomicU64, - pub(super) me_async_recovery_last_trigger_epoch_ms: AtomicU64, - pub(super) me_route_inline_recovery_attempts: u32, - pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_healthy: AtomicU64, pub(super) me_warn_rate_limit_ms: AtomicU64, @@ -568,6 +572,23 @@ impl MePool { rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1), }), + route_runtime: Arc::new(RouteRuntimeCore { + me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), + me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), + me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms.max(50)), + me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 { + None + } else { + Some(Duration::from_millis( + me_route_blocking_send_timeout_ms.min(5_000), + )) + }, + me_route_last_success_epoch_ms: AtomicU64::new(0), + me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0), + me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0), + me_route_inline_recovery_attempts, + me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), + }), decision, upstream, rng, @@ -721,21 +742,6 @@ impl MePool { me_writer_pick_sample_size: AtomicU8::new(me_writer_pick_sample_size.clamp(2, 4)), me_socks_kdf_policy: AtomicU8::new(me_socks_kdf_policy.as_u8()), me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), - me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), - me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), - me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms.max(50)), - me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 { - None - } else { - Some(Duration::from_millis( - me_route_blocking_send_timeout_ms.min(5_000), - )) - }, - me_route_last_success_epoch_ms: AtomicU64::new(0), - me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0), - me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0), - me_route_inline_recovery_attempts, - me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 80fd2ea..faec2ec 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -71,8 +71,11 @@ impl MePool { }, ) }; - let no_writer_mode = - MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed)); + let no_writer_mode = MeRouteNoWriterMode::from_u8( + self.route_runtime + .me_route_no_writer_mode + .load(Ordering::Relaxed), + ); let (routed_dc, unknown_target_dc) = self.resolve_target_dc_for_routing(target_dc as i32).await; let mut no_writer_deadline: Option = None; @@ -81,7 +84,10 @@ impl MePool { let mut hybrid_recovery_round = 0u32; let mut hybrid_last_recovery_at: Option = None; let mut hybrid_total_deadline: Option = None; - let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); + let hybrid_wait_step = self + .route_runtime + .me_route_no_writer_wait + .max(Duration::from_millis(50)); let mut hybrid_wait_current = hybrid_wait_step; loop { @@ -126,7 +132,7 @@ impl MePool { match no_writer_mode { MeRouteNoWriterMode::AsyncRecoveryFailfast => { let deadline = *no_writer_deadline.get_or_insert_with(|| { - Instant::now() + self.me_route_no_writer_wait + Instant::now() + self.route_runtime.me_route_no_writer_wait }); if !async_recovery_triggered && !unknown_target_dc { let triggered = @@ -147,7 +153,8 @@ impl MePool { MeRouteNoWriterMode::InlineRecoveryLegacy => { self.stats.increment_me_inline_recovery_total(); if !unknown_target_dc { - for _ in 0..self.me_route_inline_recovery_attempts.max(1) { + for _ in 0..self.route_runtime.me_route_inline_recovery_attempts.max(1) + { for family in self.family_order() { let map = match family { IpFamily::V4 => self.proxy_map_v4.read().await.clone(), @@ -176,7 +183,7 @@ impl MePool { continue; } let deadline = *no_writer_deadline.get_or_insert_with(|| { - Instant::now() + self.me_route_inline_recovery_wait + Instant::now() + self.route_runtime.me_route_inline_recovery_wait }); if !self.wait_for_writer_until(deadline).await { if !self.writers.read().await.is_empty() { @@ -231,8 +238,9 @@ impl MePool { let pick_mode = self.writer_pick_mode(); match no_writer_mode { MeRouteNoWriterMode::AsyncRecoveryFailfast => { - let deadline = *no_writer_deadline - .get_or_insert_with(|| Instant::now() + self.me_route_no_writer_wait); + let deadline = *no_writer_deadline.get_or_insert_with(|| { + Instant::now() + self.route_runtime.me_route_no_writer_wait + }); if !async_recovery_triggered && !unknown_target_dc { let triggered = self.trigger_async_recovery_for_target_dc(routed_dc).await; @@ -255,7 +263,7 @@ impl MePool { self.stats.increment_me_inline_recovery_total(); if unknown_target_dc { let deadline = *no_writer_deadline.get_or_insert_with(|| { - Instant::now() + self.me_route_inline_recovery_wait + Instant::now() + self.route_runtime.me_route_inline_recovery_wait }); if self.wait_for_candidate_until(routed_dc, deadline).await { continue; @@ -267,7 +275,9 @@ impl MePool { "No ME writers available for target DC".into(), )); } - if emergency_attempts >= self.me_route_inline_recovery_attempts.max(1) { + if emergency_attempts + >= self.route_runtime.me_route_inline_recovery_attempts.max(1) + { self.stats .increment_me_writer_pick_no_candidate_total(pick_mode); self.stats.increment_me_no_writer_failfast_total(); @@ -480,7 +490,8 @@ impl MePool { .increment_me_writer_pick_blocking_fallback_total(); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let (payload, meta) = build_routed_payload(effective_our_addr); - let reserve_result = if let Some(timeout) = self.me_route_blocking_send_timeout { + let reserve_result = if let Some(timeout) = self.route_runtime.me_route_blocking_send_timeout + { match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await { Ok(result) => result, Err(_) => { @@ -646,9 +657,15 @@ impl MePool { } fn hybrid_total_wait_budget(&self) -> Duration { - let base = self.me_route_hybrid_max_wait.max(Duration::from_millis(50)); + let base = self + .route_runtime + .me_route_hybrid_max_wait + .max(Duration::from_millis(50)); let now_ms = Self::now_epoch_millis(); - let last_success_ms = self.me_route_last_success_epoch_ms.load(Ordering::Relaxed); + let last_success_ms = self + .route_runtime + .me_route_last_success_epoch_ms + .load(Ordering::Relaxed); if last_success_ms != 0 && now_ms.saturating_sub(last_success_ms) <= HYBRID_RECENT_SUCCESS_WINDOW_MS { @@ -658,7 +675,8 @@ impl MePool { } fn note_hybrid_route_success(&self) { - self.me_route_last_success_epoch_ms + self.route_runtime + .me_route_last_success_epoch_ms .store(Self::now_epoch_millis(), Ordering::Relaxed); } @@ -666,10 +684,14 @@ impl MePool { self.stats.increment_me_hybrid_timeout_total(); let now_ms = Self::now_epoch_millis(); let mut last_warn_ms = self + .route_runtime .me_route_hybrid_timeout_warn_epoch_ms .load(Ordering::Relaxed); while now_ms.saturating_sub(last_warn_ms) >= HYBRID_TIMEOUT_WARN_RATE_LIMIT_MS { - match self.me_route_hybrid_timeout_warn_epoch_ms.compare_exchange_weak( + match self + .route_runtime + .me_route_hybrid_timeout_warn_epoch_ms + .compare_exchange_weak( last_warn_ms, now_ms, Ordering::AcqRel, @@ -692,13 +714,17 @@ impl MePool { fn try_consume_hybrid_recovery_trigger_slot(&self, min_interval_ms: u64) -> bool { let now_ms = Self::now_epoch_millis(); let mut last_trigger_ms = self + .route_runtime .me_async_recovery_last_trigger_epoch_ms .load(Ordering::Relaxed); loop { if now_ms.saturating_sub(last_trigger_ms) < min_interval_ms { return false; } - match self.me_async_recovery_last_trigger_epoch_ms.compare_exchange_weak( + match self + .route_runtime + .me_async_recovery_last_trigger_epoch_ms + .compare_exchange_weak( last_trigger_ms, now_ms, Ordering::AcqRel,