From 9afaa28add0df19e40e196bb6e08583ad93e4ed3 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 28 Feb 2026 14:21:09 +0300 Subject: [PATCH] UpstreamManager: Backoff Retries --- src/config/defaults.rs | 14 +++++ src/config/hot_reload.rs | 8 +++ src/config/load.rs | 76 +++++++++++++++++++++++ src/config/types.rs | 15 +++++ src/main.rs | 7 ++- src/transport/upstream.rs | 127 ++++++++++++++++++++++++++++---------- 6 files changed, 214 insertions(+), 33 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index dbc251c..ab087fd 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -8,6 +8,8 @@ const DEFAULT_STUN_TCP_FALLBACK: bool = true; const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16; const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8; const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; +const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; +const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; const DEFAULT_ACCESS_USER: &str = "default"; const DEFAULT_ACCESS_SECRET: &str = "00000000000000000000000000000000"; @@ -158,6 +160,18 @@ pub(crate) fn default_me_reconnect_fast_retry_count() -> u32 { DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT } +pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { + DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS +} + +pub(crate) fn default_upstream_connect_retry_backoff_ms() -> u64 { + 250 +} + +pub(crate) fn default_upstream_unhealthy_fail_threshold() -> u32 { + DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD +} + pub(crate) fn default_crypto_pending_buffer() -> usize { 256 * 1024 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 579a9cb..eec6b8c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -117,6 +117,14 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig) { if old.general.stun_nat_probe_concurrency != new.general.stun_nat_probe_concurrency { warn!("config reload: general.stun_nat_probe_concurrency changed; restart required"); } + if old.general.upstream_connect_retry_attempts != new.general.upstream_connect_retry_attempts + || old.general.upstream_connect_retry_backoff_ms + != new.general.upstream_connect_retry_backoff_ms + || old.general.upstream_unhealthy_fail_threshold + != new.general.upstream_unhealthy_fail_threshold + { + warn!("config reload: general.upstream_* changed; restart required"); + } } /// Resolve the public host for link generation — mirrors the logic in main.rs. diff --git a/src/config/load.rs b/src/config/load.rs index 7c578a3..3aafda2 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -237,6 +237,18 @@ impl ProxyConfig { )); } + if config.general.upstream_connect_retry_attempts == 0 { + return Err(ProxyError::Config( + "general.upstream_connect_retry_attempts must be > 0".to_string(), + )); + } + + if config.general.upstream_unhealthy_fail_threshold == 0 { + return Err(ProxyError::Config( + "general.upstream_unhealthy_fail_threshold must be > 0".to_string(), + )); + } + if config.general.me_reinit_every_secs == 0 { return Err(ProxyError::Config( "general.me_reinit_every_secs must be > 0".to_string(), @@ -567,6 +579,18 @@ mod tests { cfg.general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + cfg.general.upstream_connect_retry_attempts, + default_upstream_connect_retry_attempts() + ); + assert_eq!( + cfg.general.upstream_connect_retry_backoff_ms, + default_upstream_connect_retry_backoff_ms() + ); + assert_eq!( + cfg.general.upstream_unhealthy_fail_threshold, + default_upstream_unhealthy_fail_threshold() + ); assert_eq!(cfg.general.update_every, default_update_every()); assert_eq!(cfg.server.listen_addr_ipv4, default_listen_addr_ipv4()); assert_eq!(cfg.server.listen_addr_ipv6, default_listen_addr_ipv6_opt()); @@ -593,6 +617,18 @@ mod tests { general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + general.upstream_connect_retry_attempts, + default_upstream_connect_retry_attempts() + ); + assert_eq!( + general.upstream_connect_retry_backoff_ms, + default_upstream_connect_retry_backoff_ms() + ); + assert_eq!( + general.upstream_unhealthy_fail_threshold, + default_upstream_unhealthy_fail_threshold() + ); assert_eq!(general.update_every, default_update_every()); let server = ServerConfig::default(); @@ -765,6 +801,46 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn upstream_connect_retry_attempts_zero_is_rejected() { + let toml = r#" + [general] + upstream_connect_retry_attempts = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_upstream_connect_retry_attempts_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.upstream_connect_retry_attempts must be > 0")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn upstream_unhealthy_fail_threshold_zero_is_rejected() { + let toml = r#" + [general] + upstream_unhealthy_fail_threshold = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_upstream_unhealthy_fail_threshold_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.upstream_unhealthy_fail_threshold must be > 0")); + let _ = std::fs::remove_file(path); + } + #[test] fn me_hardswap_warmup_defaults_are_set() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index 902d816..7a3f6e9 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -365,6 +365,18 @@ pub struct GeneralConfig { #[serde(default = "default_me_reconnect_fast_retry_count")] pub me_reconnect_fast_retry_count: u32, + /// Connect attempts for the selected upstream before returning error/fallback. + #[serde(default = "default_upstream_connect_retry_attempts")] + pub upstream_connect_retry_attempts: u32, + + /// Delay in milliseconds between upstream connect attempts. + #[serde(default = "default_upstream_connect_retry_backoff_ms")] + pub upstream_connect_retry_backoff_ms: u64, + + /// Consecutive failed requests before upstream is marked unhealthy. + #[serde(default = "default_upstream_unhealthy_fail_threshold")] + pub upstream_unhealthy_fail_threshold: u32, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). #[serde(default)] pub stun_iface_mismatch_ignore: bool, @@ -522,6 +534,9 @@ impl Default for GeneralConfig { me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), me_reconnect_fast_retry_count: default_me_reconnect_fast_retry_count(), + upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), + upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), + upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, diff --git a/src/main.rs b/src/main.rs index 4d4d3f5..a87dd99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -261,7 +261,12 @@ async fn main() -> std::result::Result<(), Box> { warn!("Using default tls_domain. Consider setting a custom domain."); } - let upstream_manager = Arc::new(UpstreamManager::new(config.upstreams.clone())); + let upstream_manager = Arc::new(UpstreamManager::new( + config.upstreams.clone(), + config.general.upstream_connect_retry_attempts, + config.general.upstream_connect_retry_backoff_ms, + config.general.upstream_unhealthy_fail_threshold, + )); let mut tls_domains = Vec::with_capacity(1 + config.censorship.tls_domains.len()); tls_domains.push(config.censorship.tls_domain.clone()); diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 1918fdc..8411f5a 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -172,10 +172,18 @@ pub struct UpstreamEgressInfo { #[derive(Clone)] pub struct UpstreamManager { upstreams: Arc>>, + connect_retry_attempts: u32, + connect_retry_backoff: Duration, + unhealthy_fail_threshold: u32, } impl UpstreamManager { - pub fn new(configs: Vec) -> Self { + pub fn new( + configs: Vec, + connect_retry_attempts: u32, + connect_retry_backoff_ms: u64, + unhealthy_fail_threshold: u32, + ) -> Self { let states = configs.into_iter() .filter(|c| c.enabled) .map(UpstreamState::new) @@ -183,6 +191,9 @@ impl UpstreamManager { Self { upstreams: Arc::new(RwLock::new(states)), + connect_retry_attempts: connect_retry_attempts.max(1), + connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms), + unhealthy_fail_threshold: unhealthy_fail_threshold.max(1), } } @@ -430,43 +441,83 @@ impl UpstreamManager { upstream.selected_scope = s.to_string(); } - let start = Instant::now(); - let bind_rr = { let guard = self.upstreams.read().await; guard.get(idx).map(|u| u.bind_rr.clone()) }; - match self.connect_via_upstream(&upstream, target, bind_rr).await { - Ok((stream, egress)) => { - let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; - let mut guard = self.upstreams.write().await; - if let Some(u) = guard.get_mut(idx) { - if !u.healthy { - debug!(rtt_ms = format!("{:.1}", rtt_ms), "Upstream recovered"); - } - u.healthy = true; - u.fails = 0; + let mut last_error: Option = None; + for attempt in 1..=self.connect_retry_attempts { + let start = Instant::now(); + match self + .connect_via_upstream(&upstream, target, bind_rr.clone()) + .await + { + Ok((stream, egress)) => { + let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; + let mut guard = self.upstreams.write().await; + if let Some(u) = guard.get_mut(idx) { + if !u.healthy { + debug!(rtt_ms = format!("{:.1}", rtt_ms), "Upstream recovered"); + } + if attempt > 1 { + debug!( + attempt, + attempts = self.connect_retry_attempts, + rtt_ms = format!("{:.1}", rtt_ms), + "Upstream connect recovered after retry" + ); + } + u.healthy = true; + u.fails = 0; - if let Some(di) = dc_idx.and_then(UpstreamState::dc_array_idx) { - u.dc_latency[di].update(rtt_ms); + if let Some(di) = dc_idx.and_then(UpstreamState::dc_array_idx) { + u.dc_latency[di].update(rtt_ms); + } } + return Ok((stream, egress)); } - Ok((stream, egress)) - }, - Err(e) => { - let mut guard = self.upstreams.write().await; - if let Some(u) = guard.get_mut(idx) { - u.fails += 1; - warn!(fails = u.fails, "Upstream failed: {}", e); - if u.fails > 3 { - u.healthy = false; - warn!("Upstream marked unhealthy"); + Err(e) => { + if attempt < self.connect_retry_attempts { + debug!( + attempt, + attempts = self.connect_retry_attempts, + target = %target, + error = %e, + "Upstream connect attempt failed, retrying" + ); + if !self.connect_retry_backoff.is_zero() { + tokio::time::sleep(self.connect_retry_backoff).await; + } } + last_error = Some(e); } - Err(e) } } + + let error = last_error.unwrap_or_else(|| { + ProxyError::Config("Upstream connect attempts exhausted".to_string()) + }); + + let mut guard = self.upstreams.write().await; + if let Some(u) = guard.get_mut(idx) { + u.fails += 1; + warn!( + fails = u.fails, + attempts = self.connect_retry_attempts, + "Upstream failed after retries: {}", + error + ); + if u.fails >= self.unhealthy_fail_threshold { + u.healthy = false; + warn!( + fails = u.fails, + threshold = self.unhealthy_fail_threshold, + "Upstream marked unhealthy" + ); + } + } + Err(error) } async fn connect_via_upstream( @@ -1035,18 +1086,26 @@ impl UpstreamManager { u.fails += 1; debug!(dc = dc_zero_idx + 1, fails = u.fails, "Health check failed (both): {}", e); - if u.fails > 3 { + if u.fails >= self.unhealthy_fail_threshold { u.healthy = false; - warn!("Upstream unhealthy (fails)"); + warn!( + fails = u.fails, + threshold = self.unhealthy_fail_threshold, + "Upstream unhealthy (fails)" + ); } } Err(_) => { u.fails += 1; debug!(dc = dc_zero_idx + 1, fails = u.fails, "Health check timeout (both)"); - if u.fails > 3 { + if u.fails >= self.unhealthy_fail_threshold { u.healthy = false; - warn!("Upstream unhealthy (timeout)"); + warn!( + fails = u.fails, + threshold = self.unhealthy_fail_threshold, + "Upstream unhealthy (timeout)" + ); } } } @@ -1057,9 +1116,13 @@ impl UpstreamManager { let mut guard = self.upstreams.write().await; let u = &mut guard[i]; u.fails += 1; - if u.fails > 3 { + if u.fails >= self.unhealthy_fail_threshold { u.healthy = false; - warn!("Upstream unhealthy (no fallback family)"); + warn!( + fails = u.fails, + threshold = self.unhealthy_fail_threshold, + "Upstream unhealthy (no fallback family)" + ); } u.last_check = std::time::Instant::now(); }