From 7538967d3c4b56b4d122c7839e35d8cb25b0d4d0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Tue, 24 Feb 2026 23:36:33 +0300 Subject: [PATCH] ME Hardswap being softer Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 16 ++ src/config/load.rs | 141 +++++++++++++++++ src/config/types.rs | 20 +++ src/main.rs | 4 + src/transport/middle_proxy/config_updater.rs | 8 + src/transport/middle_proxy/pool.rs | 158 +++++++++++++++++-- 6 files changed, 332 insertions(+), 15 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 4f563ba..d43ace9 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -186,6 +186,22 @@ pub(crate) fn default_me_reinit_every_secs() -> u64 { 15 * 60 } +pub(crate) fn default_me_hardswap_warmup_delay_min_ms() -> u64 { + 1000 +} + +pub(crate) fn default_me_hardswap_warmup_delay_max_ms() -> u64 { + 2000 +} + +pub(crate) fn default_me_hardswap_warmup_extra_passes() -> u8 { + 3 +} + +pub(crate) fn default_me_hardswap_warmup_pass_backoff_base_ms() -> u64 { + 500 +} + pub(crate) fn default_me_config_stable_snapshots() -> u8 { 2 } diff --git a/src/config/load.rs b/src/config/load.rs index c18c84f..5698a71 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -153,6 +153,32 @@ impl ProxyConfig { )); } + if config.general.me_hardswap_warmup_delay_max_ms == 0 { + return Err(ProxyError::Config( + "general.me_hardswap_warmup_delay_max_ms must be > 0".to_string(), + )); + } + + if config.general.me_hardswap_warmup_delay_min_ms + > config.general.me_hardswap_warmup_delay_max_ms + { + return Err(ProxyError::Config( + "general.me_hardswap_warmup_delay_min_ms must be <= general.me_hardswap_warmup_delay_max_ms".to_string(), + )); + } + + if config.general.me_hardswap_warmup_extra_passes > 10 { + return Err(ProxyError::Config( + "general.me_hardswap_warmup_extra_passes must be within [0, 10]".to_string(), + )); + } + + if config.general.me_hardswap_warmup_pass_backoff_base_ms == 0 { + return Err(ProxyError::Config( + "general.me_hardswap_warmup_pass_backoff_base_ms must be > 0".to_string(), + )); + } + if config.general.me_config_stable_snapshots == 0 { return Err(ProxyError::Config( "general.me_config_stable_snapshots must be > 0".to_string(), @@ -526,6 +552,121 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn me_hardswap_warmup_defaults_are_set() { + let toml = r#" + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_hardswap_warmup_defaults_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!( + cfg.general.me_hardswap_warmup_delay_min_ms, + default_me_hardswap_warmup_delay_min_ms() + ); + assert_eq!( + cfg.general.me_hardswap_warmup_delay_max_ms, + default_me_hardswap_warmup_delay_max_ms() + ); + assert_eq!( + cfg.general.me_hardswap_warmup_extra_passes, + default_me_hardswap_warmup_extra_passes() + ); + assert_eq!( + cfg.general.me_hardswap_warmup_pass_backoff_base_ms, + default_me_hardswap_warmup_pass_backoff_base_ms() + ); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_hardswap_warmup_delay_range_is_validated() { + let toml = r#" + [general] + me_hardswap_warmup_delay_min_ms = 2001 + me_hardswap_warmup_delay_max_ms = 2000 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_hardswap_warmup_delay_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains( + "general.me_hardswap_warmup_delay_min_ms must be <= general.me_hardswap_warmup_delay_max_ms" + )); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_hardswap_warmup_delay_max_zero_is_rejected() { + let toml = r#" + [general] + me_hardswap_warmup_delay_max_ms = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_hardswap_warmup_delay_max_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_hardswap_warmup_delay_max_ms must be > 0")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_hardswap_warmup_extra_passes_out_of_range_is_rejected() { + let toml = r#" + [general] + me_hardswap_warmup_extra_passes = 11 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_hardswap_warmup_extra_passes_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_hardswap_warmup_extra_passes must be within [0, 10]")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn me_hardswap_warmup_pass_backoff_zero_is_rejected() { + let toml = r#" + [general] + me_hardswap_warmup_pass_backoff_base_ms = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_hardswap_warmup_backoff_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_hardswap_warmup_pass_backoff_base_ms must be > 0")); + let _ = std::fs::remove_file(path); + } + #[test] fn me_config_stable_snapshots_zero_is_rejected() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 03417c5..0cda9f4 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -271,6 +271,22 @@ pub struct GeneralConfig { #[serde(default = "default_me_reinit_every_secs")] pub me_reinit_every_secs: u64, + /// Minimum delay in ms between hardswap warmup connect attempts. + #[serde(default = "default_me_hardswap_warmup_delay_min_ms")] + pub me_hardswap_warmup_delay_min_ms: u64, + + /// Maximum delay in ms between hardswap warmup connect attempts. + #[serde(default = "default_me_hardswap_warmup_delay_max_ms")] + pub me_hardswap_warmup_delay_max_ms: u64, + + /// Additional warmup passes in the same hardswap cycle after the base pass. + #[serde(default = "default_me_hardswap_warmup_extra_passes")] + pub me_hardswap_warmup_extra_passes: u8, + + /// Base backoff in ms between hardswap warmup passes when floor is still incomplete. + #[serde(default = "default_me_hardswap_warmup_pass_backoff_base_ms")] + pub me_hardswap_warmup_pass_backoff_base_ms: u64, + /// Number of identical getProxyConfig snapshots required before applying ME map updates. #[serde(default = "default_me_config_stable_snapshots")] pub me_config_stable_snapshots: u8, @@ -371,6 +387,10 @@ impl Default for GeneralConfig { fast_mode_min_tls_record: default_fast_mode_min_tls_record(), update_every: Some(default_update_every_secs()), me_reinit_every_secs: default_me_reinit_every_secs(), + me_hardswap_warmup_delay_min_ms: default_me_hardswap_warmup_delay_min_ms(), + me_hardswap_warmup_delay_max_ms: default_me_hardswap_warmup_delay_max_ms(), + me_hardswap_warmup_extra_passes: default_me_hardswap_warmup_extra_passes(), + me_hardswap_warmup_pass_backoff_base_ms: default_me_hardswap_warmup_pass_backoff_base_ms(), me_config_stable_snapshots: default_me_config_stable_snapshots(), me_config_apply_cooldown_secs: default_me_config_apply_cooldown_secs(), proxy_secret_stable_snapshots: default_proxy_secret_stable_snapshots(), diff --git a/src/main.rs b/src/main.rs index d9a692d..3bcbf3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -373,6 +373,10 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), config.general.me_pool_min_fresh_ratio, + config.general.me_hardswap_warmup_delay_min_ms, + config.general.me_hardswap_warmup_delay_max_ms, + config.general.me_hardswap_warmup_extra_passes, + config.general.me_hardswap_warmup_pass_backoff_base_ms, ); let pool_size = config.general.middle_proxy_pool_size.max(1); diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index fc9ed3d..4e8e63f 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -228,6 +228,10 @@ async fn run_update_cycle( cfg.general.me_pool_drain_ttl_secs, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, + cfg.general.me_hardswap_warmup_delay_min_ms, + cfg.general.me_hardswap_warmup_delay_max_ms, + cfg.general.me_hardswap_warmup_extra_passes, + cfg.general.me_hardswap_warmup_pass_backoff_base_ms, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -407,6 +411,10 @@ pub async fn me_config_updater( cfg.general.me_pool_drain_ttl_secs, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, + cfg.general.me_hardswap_warmup_delay_min_ms, + cfg.general.me_hardswap_warmup_delay_max_ms, + cfg.general.me_hardswap_warmup_extra_passes, + cfg.general.me_hardswap_warmup_pass_backoff_base_ms, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 223d488..aa14e5b 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -83,6 +83,10 @@ pub struct MePool { pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, + pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, + pub(super) me_hardswap_warmup_delay_max_ms: AtomicU64, + pub(super) me_hardswap_warmup_extra_passes: AtomicU32, + pub(super) me_hardswap_warmup_pass_backoff_base_ms: AtomicU64, pool_size: usize, } @@ -140,6 +144,10 @@ impl MePool { me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, me_pool_min_fresh_ratio: f32, + me_hardswap_warmup_delay_min_ms: u64, + me_hardswap_warmup_delay_max_ms: u64, + me_hardswap_warmup_extra_passes: u8, + me_hardswap_warmup_pass_backoff_base_ms: u64, ) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), @@ -188,6 +196,10 @@ impl MePool { me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(me_pool_min_fresh_ratio)), + me_hardswap_warmup_delay_min_ms: AtomicU64::new(me_hardswap_warmup_delay_min_ms), + me_hardswap_warmup_delay_max_ms: AtomicU64::new(me_hardswap_warmup_delay_max_ms), + me_hardswap_warmup_extra_passes: AtomicU32::new(me_hardswap_warmup_extra_passes as u32), + me_hardswap_warmup_pass_backoff_base_ms: AtomicU64::new(me_hardswap_warmup_pass_backoff_base_ms), }) } @@ -205,6 +217,10 @@ impl MePool { drain_ttl_secs: u64, force_close_secs: u64, min_fresh_ratio: f32, + hardswap_warmup_delay_min_ms: u64, + hardswap_warmup_delay_max_ms: u64, + hardswap_warmup_extra_passes: u8, + hardswap_warmup_pass_backoff_base_ms: u64, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs.store(drain_ttl_secs, Ordering::Relaxed); @@ -212,6 +228,14 @@ impl MePool { .store(force_close_secs, Ordering::Relaxed); self.me_pool_min_fresh_ratio_permille .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); + self.me_hardswap_warmup_delay_min_ms + .store(hardswap_warmup_delay_min_ms, Ordering::Relaxed); + self.me_hardswap_warmup_delay_max_ms + .store(hardswap_warmup_delay_max_ms, Ordering::Relaxed); + self.me_hardswap_warmup_extra_passes + .store(hardswap_warmup_extra_passes as u32, Ordering::Relaxed); + self.me_hardswap_warmup_pass_backoff_base_ms + .store(hardswap_warmup_pass_backoff_base_ms, Ordering::Relaxed); } pub fn reset_stun_state(&self) { @@ -330,6 +354,49 @@ impl MePool { endpoint_count.max(3) } + fn hardswap_warmup_connect_delay_ms(&self) -> u64 { + let min_ms = self + .me_hardswap_warmup_delay_min_ms + .load(Ordering::Relaxed); + let max_ms = self + .me_hardswap_warmup_delay_max_ms + .load(Ordering::Relaxed); + let (min_ms, max_ms) = if min_ms <= max_ms { + (min_ms, max_ms) + } else { + (max_ms, min_ms) + }; + if min_ms == max_ms { + return min_ms; + } + rand::rng().random_range(min_ms..=max_ms) + } + + fn hardswap_warmup_backoff_ms(&self, pass_idx: usize) -> u64 { + let base_ms = self + .me_hardswap_warmup_pass_backoff_base_ms + .load(Ordering::Relaxed); + let cap_ms = (self.me_reconnect_backoff_cap.as_millis() as u64).max(base_ms); + let shift = (pass_idx as u32).min(20); + let scaled = base_ms.saturating_mul(1u64 << shift); + let core = scaled.min(cap_ms); + let jitter = (core / 2).max(1); + core.saturating_add(rand::rng().random_range(0..=jitter)) + } + + async fn fresh_writer_count_for_endpoints( + &self, + generation: u64, + endpoints: &HashSet, + ) -> usize { + let ws = self.writers.read().await; + ws.iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .filter(|w| w.generation == generation) + .filter(|w| endpoints.contains(&w.addr)) + .count() + } + pub(super) async fn connect_endpoints_round_robin( self: &Arc, endpoints: &[SocketAddr], @@ -356,6 +423,12 @@ impl MePool { generation: u64, desired_by_dc: &HashMap>, ) { + let extra_passes = self + .me_hardswap_warmup_extra_passes + .load(Ordering::Relaxed) + .min(10) as usize; + let total_passes = 1 + extra_passes; + for (dc, endpoints) in desired_by_dc { if endpoints.is_empty() { continue; @@ -364,30 +437,85 @@ impl MePool { let mut endpoint_list: Vec = endpoints.iter().copied().collect(); endpoint_list.sort_unstable(); let required = Self::required_writers_for_dc(endpoint_list.len()); + let mut completed = false; + let mut last_fresh_count = self + .fresh_writer_count_for_endpoints(generation, endpoints) + .await; - loop { - let fresh_count = { - let ws = self.writers.read().await; - ws.iter() - .filter(|w| !w.draining.load(Ordering::Relaxed)) - .filter(|w| w.generation == generation) - .filter(|w| endpoints.contains(&w.addr)) - .count() - }; - if fresh_count >= required { + for pass_idx in 0..total_passes { + if last_fresh_count >= required { + completed = true; break; } - if !self.connect_endpoints_round_robin(&endpoint_list, rng).await { - warn!( + let missing = required.saturating_sub(last_fresh_count); + debug!( + dc = *dc, + pass = pass_idx + 1, + total_passes, + fresh_count = last_fresh_count, + required, + missing, + endpoint_count = endpoint_list.len(), + "ME hardswap warmup pass started" + ); + + for attempt_idx in 0..missing { + let delay_ms = self.hardswap_warmup_connect_delay_ms(); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + + let connected = self.connect_endpoints_round_robin(&endpoint_list, rng).await; + debug!( dc = *dc, - fresh_count, + pass = pass_idx + 1, + total_passes, + attempt = attempt_idx + 1, + delay_ms, + connected, + "ME hardswap warmup connect attempt finished" + ); + } + + last_fresh_count = self + .fresh_writer_count_for_endpoints(generation, endpoints) + .await; + if last_fresh_count >= required { + completed = true; + info!( + dc = *dc, + pass = pass_idx + 1, + total_passes, + fresh_count = last_fresh_count, required, - endpoint_count = endpoint_list.len(), - "ME warmup stopped: unable to reach required writer floor for DC" + "ME hardswap warmup floor reached for DC" ); break; } + + if pass_idx + 1 < total_passes { + let backoff_ms = self.hardswap_warmup_backoff_ms(pass_idx); + debug!( + dc = *dc, + pass = pass_idx + 1, + total_passes, + fresh_count = last_fresh_count, + required, + backoff_ms, + "ME hardswap warmup pass incomplete, delaying next pass" + ); + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + } + } + + if !completed { + warn!( + dc = *dc, + fresh_count = last_fresh_count, + required, + endpoint_count = endpoint_list.len(), + total_passes, + "ME warmup stopped: unable to reach required writer floor for DC" + ); } } }