diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index fa7b0a6..1e2dd1e 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -19,6 +19,7 @@ use crate::config::{UpstreamConfig, UpstreamType}; use crate::error::{Result, ProxyError}; use crate::network::dns_overrides::{resolve_socket_addr, split_host_port}; use crate::protocol::constants::{TG_DATACENTERS_V4, TG_DATACENTERS_V6, TG_DATACENTER_PORT}; +use crate::stats::Stats; use crate::transport::socket::{create_outgoing_socket_bound, resolve_interface_ip}; use crate::transport::socks::{connect_socks4, connect_socks5}; @@ -188,6 +189,8 @@ pub struct UpstreamManager { connect_retry_attempts: u32, connect_retry_backoff: Duration, unhealthy_fail_threshold: u32, + connect_failfast_hard_errors: bool, + stats: Arc, } impl UpstreamManager { @@ -196,6 +199,8 @@ impl UpstreamManager { connect_retry_attempts: u32, connect_retry_backoff_ms: u64, unhealthy_fail_threshold: u32, + connect_failfast_hard_errors: bool, + stats: Arc, ) -> Self { let states = configs.into_iter() .filter(|c| c.enabled) @@ -207,6 +212,8 @@ impl UpstreamManager { connect_retry_attempts: connect_retry_attempts.max(1), connect_retry_backoff: Duration::from_millis(connect_retry_backoff_ms), unhealthy_fail_threshold: unhealthy_fail_threshold.max(1), + connect_failfast_hard_errors, + stats, } } @@ -349,6 +356,34 @@ impl UpstreamManager { } } + fn retry_backoff_with_jitter(&self) -> Duration { + if self.connect_retry_backoff.is_zero() { + return Duration::ZERO; + } + let base_ms = self.connect_retry_backoff.as_millis() as u64; + if base_ms == 0 { + return self.connect_retry_backoff; + } + let jitter_cap_ms = (base_ms / 2).max(1); + let jitter_ms = rand::rng().gen_range(0..=jitter_cap_ms); + Duration::from_millis(base_ms.saturating_add(jitter_ms)) + } + + fn is_hard_connect_error(error: &ProxyError) -> bool { + match error { + ProxyError::Config(_) | ProxyError::ConnectionRefused { .. } => true, + ProxyError::Io(ioe) => matches!( + ioe.kind(), + std::io::ErrorKind::ConnectionRefused + | std::io::ErrorKind::AddrInUse + | std::io::ErrorKind::AddrNotAvailable + | std::io::ErrorKind::InvalidInput + | std::io::ErrorKind::Unsupported + ), + _ => false, + } + } + /// Select upstream using latency-weighted random selection. async fn select_upstream(&self, dc_idx: Option, scope: Option<&str>) -> Option { let upstreams = self.upstreams.read().await; @@ -459,8 +494,12 @@ impl UpstreamManager { guard.get(idx).map(|u| u.bind_rr.clone()) }; + let connect_started_at = Instant::now(); let mut last_error: Option = None; + let mut attempts_used = 0u32; for attempt in 1..=self.connect_retry_attempts { + attempts_used = attempt; + self.stats.increment_upstream_connect_attempt_total(); let start = Instant::now(); match self .connect_via_upstream(&upstream, target, bind_rr.clone()) @@ -468,6 +507,13 @@ impl UpstreamManager { { Ok((stream, egress)) => { let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; + self.stats.increment_upstream_connect_success_total(); + self.stats + .observe_upstream_connect_attempts_per_request(attempts_used); + self.stats.observe_upstream_connect_duration_ms( + connect_started_at.elapsed().as_millis() as u64, + true, + ); let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(idx) { if !u.healthy { @@ -491,7 +537,13 @@ impl UpstreamManager { return Ok((stream, egress)); } Err(e) => { - if attempt < self.connect_retry_attempts { + let hard_error = + self.connect_failfast_hard_errors && Self::is_hard_connect_error(&e); + if hard_error { + self.stats + .increment_upstream_connect_failfast_hard_error_total(); + } + if attempt < self.connect_retry_attempts && !hard_error { debug!( attempt, attempts = self.connect_retry_attempts, @@ -499,21 +551,43 @@ impl UpstreamManager { error = %e, "Upstream connect attempt failed, retrying" ); - if !self.connect_retry_backoff.is_zero() { - tokio::time::sleep(self.connect_retry_backoff).await; + let backoff = self.retry_backoff_with_jitter(); + if !backoff.is_zero() { + tokio::time::sleep(backoff).await; } + } else if hard_error { + debug!( + attempt, + attempts = self.connect_retry_attempts, + target = %target, + error = %e, + "Upstream connect failed with hard error, failfast is active" + ); } last_error = Some(e); + if hard_error { + break; + } } } } + self.stats.increment_upstream_connect_fail_total(); + self.stats + .observe_upstream_connect_attempts_per_request(attempts_used); + self.stats.observe_upstream_connect_duration_ms( + connect_started_at.elapsed().as_millis() as u64, + false, + ); + let error = last_error.unwrap_or_else(|| { ProxyError::Config("Upstream connect attempts exhausted".to_string()) }); let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(idx) { + // Intermediate attempts are intentionally ignored here. + // Health state is degraded only when the entire connect cycle fails. u.fails += 1; warn!( fails = u.fails, @@ -1364,4 +1438,20 @@ mod tests { .contains(&"198.51.100.2:443".parse::().unwrap())); assert!(dc9.fallback.is_empty()); } + + #[test] + fn hard_connect_error_classification_detects_connection_refused() { + let error = ProxyError::ConnectionRefused { + addr: "127.0.0.1:443".to_string(), + }; + assert!(UpstreamManager::is_hard_connect_error(&error)); + } + + #[test] + fn hard_connect_error_classification_skips_timeouts() { + let error = ProxyError::ConnectionTimeout { + addr: "127.0.0.1:443".to_string(), + }; + assert!(!UpstreamManager::is_hard_connect_error(&error)); + } }