diff --git a/src/main.rs b/src/main.rs index 2dd9a56..3711265 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,7 +35,7 @@ use crate::transport::middle_proxy::{ stun_probe, }; use crate::transport::{ListenOptions, UpstreamManager, create_listener}; -use crate::util::ip::detect_ip; +use crate::util::ip::{detect_ip, check_ipv6_available}; use crate::protocol::constants::{TG_MIDDLE_PROXIES_V4, TG_MIDDLE_PROXIES_V6}; fn parse_cli() -> (String, bool, Option) { @@ -219,7 +219,19 @@ async fn main() -> std::result::Result<(), Box> { warn!("Using default tls_domain. Consider setting a custom domain."); } - let prefer_ipv6 = config.general.prefer_ipv6; + let ipv6_available = check_ipv6_available(); + if ipv6_available { + info!("IPv6: available"); + } else { + warn!("IPv6: not available on this host"); + } + + let prefer_ipv6 = if config.general.prefer_ipv6 && !ipv6_available { + warn!("prefer_ipv6 is set but IPv6 is not available, falling back to IPv4"); + false + } else { + config.general.prefer_ipv6 + }; let mut use_middle_proxy = config.general.use_middle_proxy; let config = Arc::new(config); let stats = Arc::new(Stats::new()); @@ -342,6 +354,12 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); } + let effective_v6_map = if ipv6_available { + cfg_v6.map.clone() + } else { + std::collections::HashMap::new() + }; + let pool = MePool::new( proxy_tag, proxy_secret, @@ -349,7 +367,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_stun.clone(), cfg_v4.map.clone(), - cfg_v6.map.clone(), + effective_v6_map, cfg_v4.default_dc.or(cfg_v6.default_dc), ); @@ -362,7 +380,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai let rng_clone = rng.clone(); tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, 2, + pool_clone, rng_clone, 2, ipv6_available, ) .await; }); @@ -482,7 +500,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai info!("================= Telegram DC Connectivity ================="); let ping_results = upstream_manager - .ping_all_dcs(prefer_ipv6, &config.dc_overrides) + .ping_all_dcs(prefer_ipv6, &config.dc_overrides, ipv6_available) .await; for upstream_result in &ping_results { @@ -560,7 +578,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai // Background tasks let um_clone = upstream_manager.clone(); tokio::spawn(async move { - um_clone.run_health_checks(prefer_ipv6).await; + um_clone.run_health_checks(prefer_ipv6, ipv6_available).await; }); let rc_clone = replay_checker.clone(); diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index d2bb51a..745adf8 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -10,7 +10,7 @@ use crate::crypto::SecureRandom; use super::MePool; -pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { +pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize, ipv6_available: bool) { let mut backoff: HashMap = HashMap::new(); let mut last_attempt: HashMap = HashMap::new(); loop { @@ -63,7 +63,10 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c } } - // IPv6 coverage check (if available) + // IPv6 coverage check (skip if IPv6 not available on host) + if !ipv6_available { + continue; + } let map_v6 = pool.proxy_map_v6.read().await.clone(); let writer_addrs_v6: std::collections::HashSet = pool .writers diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index db0d366..22b5690 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -355,6 +355,7 @@ impl UpstreamManager { &self, prefer_ipv6: bool, dc_overrides: &HashMap>, + ipv6_available: bool, ) -> Vec { let upstreams: Vec<(usize, UpstreamConfig)> = { let guard = self.upstreams.read().await; @@ -377,43 +378,45 @@ impl UpstreamManager { let mut v6_results = Vec::new(); let mut v4_results = Vec::new(); - // === Ping IPv6 first === - for dc_zero_idx in 0..NUM_DCS { - let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; - let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); + // === Ping IPv6 first (skip if IPv6 not available) === + if ipv6_available { + for dc_zero_idx in 0..NUM_DCS { + let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; + let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); - let result = tokio::time::timeout( - Duration::from_secs(DC_PING_TIMEOUT_SECS), - self.ping_single_dc(&upstream_config, addr_v6) - ).await; + let result = tokio::time::timeout( + Duration::from_secs(DC_PING_TIMEOUT_SECS), + self.ping_single_dc(&upstream_config, addr_v6) + ).await; - let ping_result = match result { - Ok(Ok(rtt_ms)) => { - let mut guard = self.upstreams.write().await; - if let Some(u) = guard.get_mut(*upstream_idx) { - u.dc_latency[dc_zero_idx].update(rtt_ms); + let ping_result = match result { + Ok(Ok(rtt_ms)) => { + let mut guard = self.upstreams.write().await; + if let Some(u) = guard.get_mut(*upstream_idx) { + u.dc_latency[dc_zero_idx].update(rtt_ms); + } + DcPingResult { + dc_idx: dc_zero_idx + 1, + dc_addr: addr_v6, + rtt_ms: Some(rtt_ms), + error: None, + } } - DcPingResult { + Ok(Err(e)) => DcPingResult { dc_idx: dc_zero_idx + 1, dc_addr: addr_v6, - rtt_ms: Some(rtt_ms), - error: None, - } - } - Ok(Err(e)) => DcPingResult { - dc_idx: dc_zero_idx + 1, - dc_addr: addr_v6, - rtt_ms: None, - error: Some(e.to_string()), - }, - Err(_) => DcPingResult { - dc_idx: dc_zero_idx + 1, - dc_addr: addr_v6, - rtt_ms: None, - error: Some("timeout".to_string()), - }, - }; - v6_results.push(ping_result); + rtt_ms: None, + error: Some(e.to_string()), + }, + Err(_) => DcPingResult { + dc_idx: dc_zero_idx + 1, + dc_addr: addr_v6, + rtt_ms: None, + error: Some("timeout".to_string()), + }, + }; + v6_results.push(ping_result); + } } // === Then ping IPv4 === @@ -517,8 +520,12 @@ impl UpstreamManager { let mut guard = self.upstreams.write().await; if let Some(u) = guard.get_mut(*upstream_idx) { for dc_zero_idx in 0..NUM_DCS { - let v6_ok = v6_results[dc_zero_idx].rtt_ms.is_some(); - let v4_ok = v4_results[dc_zero_idx].rtt_ms.is_some(); + let v6_ok = v6_results.get(dc_zero_idx) + .map(|r| r.rtt_ms.is_some()) + .unwrap_or(false); + let v4_ok = v4_results.get(dc_zero_idx) + .map(|r| r.rtt_ms.is_some()) + .unwrap_or(false); u.dc_ip_pref[dc_zero_idx] = match (v6_ok, v4_ok) { (true, true) => IpPreference::BothWork, @@ -551,7 +558,7 @@ impl UpstreamManager { /// Background health check: rotates through DCs, 30s interval. /// Uses preferred IP version based on config. - pub async fn run_health_checks(&self, prefer_ipv6: bool) { + pub async fn run_health_checks(&self, prefer_ipv6: bool, ipv6_available: bool) { let mut dc_rotation = 0usize; loop { @@ -560,16 +567,19 @@ impl UpstreamManager { let dc_zero_idx = dc_rotation % NUM_DCS; dc_rotation += 1; - let dc_addr = if prefer_ipv6 { + let dc_addr = if prefer_ipv6 && ipv6_available { SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT) } else { SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT) }; - let fallback_addr = if prefer_ipv6 { - SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT) + // Skip IPv6 fallback if IPv6 is not available + let fallback_addr = if !ipv6_available { + None + } else if prefer_ipv6 { + Some(SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT)) } else { - SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT) + Some(SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT)) }; let count = self.upstreams.read().await.len(); @@ -605,53 +615,67 @@ impl UpstreamManager { u.last_check = std::time::Instant::now(); } Ok(Err(_)) | Err(_) => { - // Try fallback - debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback"); + // Try fallback (only if fallback address is available) + if let Some(fb_addr) = fallback_addr { + debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback"); - let start2 = Instant::now(); - let result2 = tokio::time::timeout( - Duration::from_secs(10), - self.connect_via_upstream(&config, fallback_addr) - ).await; + let start2 = Instant::now(); + let result2 = tokio::time::timeout( + Duration::from_secs(10), + self.connect_via_upstream(&config, fb_addr) + ).await; - let mut guard = self.upstreams.write().await; - let u = &mut guard[i]; + let mut guard = self.upstreams.write().await; + let u = &mut guard[i]; - match result2 { - Ok(Ok(_stream)) => { - let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0; - u.dc_latency[dc_zero_idx].update(rtt_ms); + match result2 { + Ok(Ok(_stream)) => { + let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0; + u.dc_latency[dc_zero_idx].update(rtt_ms); - if !u.healthy { - info!( - rtt = format!("{:.0} ms", rtt_ms), - dc = dc_zero_idx + 1, - "Upstream recovered (fallback)" - ); + if !u.healthy { + info!( + rtt = format!("{:.0} ms", rtt_ms), + dc = dc_zero_idx + 1, + "Upstream recovered (fallback)" + ); + } + u.healthy = true; + u.fails = 0; } - u.healthy = true; - u.fails = 0; - } - Ok(Err(e)) => { - u.fails += 1; - debug!(dc = dc_zero_idx + 1, fails = u.fails, - "Health check failed (both): {}", e); - if u.fails > 3 { - u.healthy = false; - warn!("Upstream unhealthy (fails)"); + Ok(Err(e)) => { + u.fails += 1; + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check failed (both): {}", e); + if u.fails > 3 { + u.healthy = false; + warn!("Upstream unhealthy (fails)"); + } + } + Err(_) => { + u.fails += 1; + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check timeout (both)"); + if u.fails > 3 { + u.healthy = false; + warn!("Upstream unhealthy (timeout)"); + } } } - Err(_) => { - u.fails += 1; - debug!(dc = dc_zero_idx + 1, fails = u.fails, - "Health check timeout (both)"); - if u.fails > 3 { - u.healthy = false; - warn!("Upstream unhealthy (timeout)"); - } + u.last_check = std::time::Instant::now(); + } else { + // No fallback available, mark failure directly + let mut guard = self.upstreams.write().await; + let u = &mut guard[i]; + u.fails += 1; + debug!(dc = dc_zero_idx + 1, fails = u.fails, + "Health check failed (no fallback)"); + if u.fails > 3 { + u.healthy = false; + warn!("Upstream unhealthy (fails)"); } + u.last_check = std::time::Instant::now(); } - u.last_check = std::time::Instant::now(); } } } diff --git a/src/util/ip.rs b/src/util/ip.rs index 9bde513..66bd500 100644 --- a/src/util/ip.rs +++ b/src/util/ip.rs @@ -54,6 +54,12 @@ fn get_local_ipv6(target: &str) -> Option { socket.local_addr().ok().map(|addr| addr.ip()) } +/// Check if IPv6 connectivity is available on this host. +/// Uses UDP connect to Google DNS IPv6 (no packets sent). +pub fn check_ipv6_available() -> bool { + get_local_ipv6("[2001:4860:4860::8888]:80").is_some() +} + /// Detect public IP addresses pub async fn detect_ip() -> IpInfo { let mut info = IpInfo::default();