mirror of https://github.com/telemt/telemt.git
Skip IPv6 connections when IPv6 is unavailable on host
Detect IPv6 availability at startup using UDP probe. When IPv6 is not available: override prefer_ipv6 to false, skip IPv6 DC pings, pass empty ME v6 proxy map, skip IPv6 coverage checks in health monitor, and skip IPv6 fallback in upstream health checks. This eliminates useless connection attempts with timeouts on IPv6-less hosts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
8268714a4c
commit
d122cbfe5a
30
src/main.rs
30
src/main.rs
|
|
@ -35,7 +35,7 @@ use crate::transport::middle_proxy::{
|
|||
stun_probe,
|
||||
};
|
||||
use crate::transport::{ListenOptions, UpstreamManager, create_listener};
|
||||
use crate::util::ip::detect_ip;
|
||||
use crate::util::ip::{detect_ip, check_ipv6_available};
|
||||
use crate::protocol::constants::{TG_MIDDLE_PROXIES_V4, TG_MIDDLE_PROXIES_V6};
|
||||
|
||||
fn parse_cli() -> (String, bool, Option<String>) {
|
||||
|
|
@ -219,7 +219,19 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
|||
warn!("Using default tls_domain. Consider setting a custom domain.");
|
||||
}
|
||||
|
||||
let prefer_ipv6 = config.general.prefer_ipv6;
|
||||
let ipv6_available = check_ipv6_available();
|
||||
if ipv6_available {
|
||||
info!("IPv6: available");
|
||||
} else {
|
||||
warn!("IPv6: not available on this host");
|
||||
}
|
||||
|
||||
let prefer_ipv6 = if config.general.prefer_ipv6 && !ipv6_available {
|
||||
warn!("prefer_ipv6 is set but IPv6 is not available, falling back to IPv4");
|
||||
false
|
||||
} else {
|
||||
config.general.prefer_ipv6
|
||||
};
|
||||
let mut use_middle_proxy = config.general.use_middle_proxy;
|
||||
let config = Arc::new(config);
|
||||
let stats = Arc::new(Stats::new());
|
||||
|
|
@ -342,6 +354,12 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||
cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone();
|
||||
}
|
||||
|
||||
let effective_v6_map = if ipv6_available {
|
||||
cfg_v6.map.clone()
|
||||
} else {
|
||||
std::collections::HashMap::new()
|
||||
};
|
||||
|
||||
let pool = MePool::new(
|
||||
proxy_tag,
|
||||
proxy_secret,
|
||||
|
|
@ -349,7 +367,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||
config.general.middle_proxy_nat_probe,
|
||||
config.general.middle_proxy_nat_stun.clone(),
|
||||
cfg_v4.map.clone(),
|
||||
cfg_v6.map.clone(),
|
||||
effective_v6_map,
|
||||
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
||||
);
|
||||
|
||||
|
|
@ -362,7 +380,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||
let rng_clone = rng.clone();
|
||||
tokio::spawn(async move {
|
||||
crate::transport::middle_proxy::me_health_monitor(
|
||||
pool_clone, rng_clone, 2,
|
||||
pool_clone, rng_clone, 2, ipv6_available,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
|
@ -482,7 +500,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||
info!("================= Telegram DC Connectivity =================");
|
||||
|
||||
let ping_results = upstream_manager
|
||||
.ping_all_dcs(prefer_ipv6, &config.dc_overrides)
|
||||
.ping_all_dcs(prefer_ipv6, &config.dc_overrides, ipv6_available)
|
||||
.await;
|
||||
|
||||
for upstream_result in &ping_results {
|
||||
|
|
@ -560,7 +578,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
|||
// Background tasks
|
||||
let um_clone = upstream_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
um_clone.run_health_checks(prefer_ipv6).await;
|
||||
um_clone.run_health_checks(prefer_ipv6, ipv6_available).await;
|
||||
});
|
||||
|
||||
let rc_clone = replay_checker.clone();
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ use crate::crypto::SecureRandom;
|
|||
|
||||
use super::MePool;
|
||||
|
||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize, ipv6_available: bool) {
|
||||
let mut backoff: HashMap<i32, u64> = HashMap::new();
|
||||
let mut last_attempt: HashMap<i32, Instant> = HashMap::new();
|
||||
loop {
|
||||
|
|
@ -63,7 +63,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
|||
}
|
||||
}
|
||||
|
||||
// IPv6 coverage check (if available)
|
||||
// IPv6 coverage check (skip if IPv6 not available on host)
|
||||
if !ipv6_available {
|
||||
continue;
|
||||
}
|
||||
let map_v6 = pool.proxy_map_v6.read().await.clone();
|
||||
let writer_addrs_v6: std::collections::HashSet<SocketAddr> = pool
|
||||
.writers
|
||||
|
|
|
|||
|
|
@ -355,6 +355,7 @@ impl UpstreamManager {
|
|||
&self,
|
||||
prefer_ipv6: bool,
|
||||
dc_overrides: &HashMap<String, Vec<String>>,
|
||||
ipv6_available: bool,
|
||||
) -> Vec<StartupPingResult> {
|
||||
let upstreams: Vec<(usize, UpstreamConfig)> = {
|
||||
let guard = self.upstreams.read().await;
|
||||
|
|
@ -377,43 +378,45 @@ impl UpstreamManager {
|
|||
let mut v6_results = Vec::new();
|
||||
let mut v4_results = Vec::new();
|
||||
|
||||
// === Ping IPv6 first ===
|
||||
for dc_zero_idx in 0..NUM_DCS {
|
||||
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx];
|
||||
let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT);
|
||||
// === Ping IPv6 first (skip if IPv6 not available) ===
|
||||
if ipv6_available {
|
||||
for dc_zero_idx in 0..NUM_DCS {
|
||||
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx];
|
||||
let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT);
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
Duration::from_secs(DC_PING_TIMEOUT_SECS),
|
||||
self.ping_single_dc(&upstream_config, addr_v6)
|
||||
).await;
|
||||
let result = tokio::time::timeout(
|
||||
Duration::from_secs(DC_PING_TIMEOUT_SECS),
|
||||
self.ping_single_dc(&upstream_config, addr_v6)
|
||||
).await;
|
||||
|
||||
let ping_result = match result {
|
||||
Ok(Ok(rtt_ms)) => {
|
||||
let mut guard = self.upstreams.write().await;
|
||||
if let Some(u) = guard.get_mut(*upstream_idx) {
|
||||
u.dc_latency[dc_zero_idx].update(rtt_ms);
|
||||
let ping_result = match result {
|
||||
Ok(Ok(rtt_ms)) => {
|
||||
let mut guard = self.upstreams.write().await;
|
||||
if let Some(u) = guard.get_mut(*upstream_idx) {
|
||||
u.dc_latency[dc_zero_idx].update(rtt_ms);
|
||||
}
|
||||
DcPingResult {
|
||||
dc_idx: dc_zero_idx + 1,
|
||||
dc_addr: addr_v6,
|
||||
rtt_ms: Some(rtt_ms),
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
DcPingResult {
|
||||
Ok(Err(e)) => DcPingResult {
|
||||
dc_idx: dc_zero_idx + 1,
|
||||
dc_addr: addr_v6,
|
||||
rtt_ms: Some(rtt_ms),
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => DcPingResult {
|
||||
dc_idx: dc_zero_idx + 1,
|
||||
dc_addr: addr_v6,
|
||||
rtt_ms: None,
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
Err(_) => DcPingResult {
|
||||
dc_idx: dc_zero_idx + 1,
|
||||
dc_addr: addr_v6,
|
||||
rtt_ms: None,
|
||||
error: Some("timeout".to_string()),
|
||||
},
|
||||
};
|
||||
v6_results.push(ping_result);
|
||||
rtt_ms: None,
|
||||
error: Some(e.to_string()),
|
||||
},
|
||||
Err(_) => DcPingResult {
|
||||
dc_idx: dc_zero_idx + 1,
|
||||
dc_addr: addr_v6,
|
||||
rtt_ms: None,
|
||||
error: Some("timeout".to_string()),
|
||||
},
|
||||
};
|
||||
v6_results.push(ping_result);
|
||||
}
|
||||
}
|
||||
|
||||
// === Then ping IPv4 ===
|
||||
|
|
@ -517,8 +520,12 @@ impl UpstreamManager {
|
|||
let mut guard = self.upstreams.write().await;
|
||||
if let Some(u) = guard.get_mut(*upstream_idx) {
|
||||
for dc_zero_idx in 0..NUM_DCS {
|
||||
let v6_ok = v6_results[dc_zero_idx].rtt_ms.is_some();
|
||||
let v4_ok = v4_results[dc_zero_idx].rtt_ms.is_some();
|
||||
let v6_ok = v6_results.get(dc_zero_idx)
|
||||
.map(|r| r.rtt_ms.is_some())
|
||||
.unwrap_or(false);
|
||||
let v4_ok = v4_results.get(dc_zero_idx)
|
||||
.map(|r| r.rtt_ms.is_some())
|
||||
.unwrap_or(false);
|
||||
|
||||
u.dc_ip_pref[dc_zero_idx] = match (v6_ok, v4_ok) {
|
||||
(true, true) => IpPreference::BothWork,
|
||||
|
|
@ -551,7 +558,7 @@ impl UpstreamManager {
|
|||
|
||||
/// Background health check: rotates through DCs, 30s interval.
|
||||
/// Uses preferred IP version based on config.
|
||||
pub async fn run_health_checks(&self, prefer_ipv6: bool) {
|
||||
pub async fn run_health_checks(&self, prefer_ipv6: bool, ipv6_available: bool) {
|
||||
let mut dc_rotation = 0usize;
|
||||
|
||||
loop {
|
||||
|
|
@ -560,16 +567,19 @@ impl UpstreamManager {
|
|||
let dc_zero_idx = dc_rotation % NUM_DCS;
|
||||
dc_rotation += 1;
|
||||
|
||||
let dc_addr = if prefer_ipv6 {
|
||||
let dc_addr = if prefer_ipv6 && ipv6_available {
|
||||
SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT)
|
||||
} else {
|
||||
SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT)
|
||||
};
|
||||
|
||||
let fallback_addr = if prefer_ipv6 {
|
||||
SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT)
|
||||
// Skip IPv6 fallback if IPv6 is not available
|
||||
let fallback_addr = if !ipv6_available {
|
||||
None
|
||||
} else if prefer_ipv6 {
|
||||
Some(SocketAddr::new(TG_DATACENTERS_V4[dc_zero_idx], TG_DATACENTER_PORT))
|
||||
} else {
|
||||
SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT)
|
||||
Some(SocketAddr::new(TG_DATACENTERS_V6[dc_zero_idx], TG_DATACENTER_PORT))
|
||||
};
|
||||
|
||||
let count = self.upstreams.read().await.len();
|
||||
|
|
@ -605,53 +615,67 @@ impl UpstreamManager {
|
|||
u.last_check = std::time::Instant::now();
|
||||
}
|
||||
Ok(Err(_)) | Err(_) => {
|
||||
// Try fallback
|
||||
debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback");
|
||||
// Try fallback (only if fallback address is available)
|
||||
if let Some(fb_addr) = fallback_addr {
|
||||
debug!(dc = dc_zero_idx + 1, "Health check failed, trying fallback");
|
||||
|
||||
let start2 = Instant::now();
|
||||
let result2 = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
self.connect_via_upstream(&config, fallback_addr)
|
||||
).await;
|
||||
let start2 = Instant::now();
|
||||
let result2 = tokio::time::timeout(
|
||||
Duration::from_secs(10),
|
||||
self.connect_via_upstream(&config, fb_addr)
|
||||
).await;
|
||||
|
||||
let mut guard = self.upstreams.write().await;
|
||||
let u = &mut guard[i];
|
||||
let mut guard = self.upstreams.write().await;
|
||||
let u = &mut guard[i];
|
||||
|
||||
match result2 {
|
||||
Ok(Ok(_stream)) => {
|
||||
let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0;
|
||||
u.dc_latency[dc_zero_idx].update(rtt_ms);
|
||||
match result2 {
|
||||
Ok(Ok(_stream)) => {
|
||||
let rtt_ms = start2.elapsed().as_secs_f64() * 1000.0;
|
||||
u.dc_latency[dc_zero_idx].update(rtt_ms);
|
||||
|
||||
if !u.healthy {
|
||||
info!(
|
||||
rtt = format!("{:.0} ms", rtt_ms),
|
||||
dc = dc_zero_idx + 1,
|
||||
"Upstream recovered (fallback)"
|
||||
);
|
||||
if !u.healthy {
|
||||
info!(
|
||||
rtt = format!("{:.0} ms", rtt_ms),
|
||||
dc = dc_zero_idx + 1,
|
||||
"Upstream recovered (fallback)"
|
||||
);
|
||||
}
|
||||
u.healthy = true;
|
||||
u.fails = 0;
|
||||
}
|
||||
u.healthy = true;
|
||||
u.fails = 0;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
u.fails += 1;
|
||||
debug!(dc = dc_zero_idx + 1, fails = u.fails,
|
||||
"Health check failed (both): {}", e);
|
||||
if u.fails > 3 {
|
||||
u.healthy = false;
|
||||
warn!("Upstream unhealthy (fails)");
|
||||
Ok(Err(e)) => {
|
||||
u.fails += 1;
|
||||
debug!(dc = dc_zero_idx + 1, fails = u.fails,
|
||||
"Health check failed (both): {}", e);
|
||||
if u.fails > 3 {
|
||||
u.healthy = false;
|
||||
warn!("Upstream unhealthy (fails)");
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
u.fails += 1;
|
||||
debug!(dc = dc_zero_idx + 1, fails = u.fails,
|
||||
"Health check timeout (both)");
|
||||
if u.fails > 3 {
|
||||
u.healthy = false;
|
||||
warn!("Upstream unhealthy (timeout)");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
u.fails += 1;
|
||||
debug!(dc = dc_zero_idx + 1, fails = u.fails,
|
||||
"Health check timeout (both)");
|
||||
if u.fails > 3 {
|
||||
u.healthy = false;
|
||||
warn!("Upstream unhealthy (timeout)");
|
||||
}
|
||||
u.last_check = std::time::Instant::now();
|
||||
} else {
|
||||
// No fallback available, mark failure directly
|
||||
let mut guard = self.upstreams.write().await;
|
||||
let u = &mut guard[i];
|
||||
u.fails += 1;
|
||||
debug!(dc = dc_zero_idx + 1, fails = u.fails,
|
||||
"Health check failed (no fallback)");
|
||||
if u.fails > 3 {
|
||||
u.healthy = false;
|
||||
warn!("Upstream unhealthy (fails)");
|
||||
}
|
||||
u.last_check = std::time::Instant::now();
|
||||
}
|
||||
u.last_check = std::time::Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,6 +54,12 @@ fn get_local_ipv6(target: &str) -> Option<IpAddr> {
|
|||
socket.local_addr().ok().map(|addr| addr.ip())
|
||||
}
|
||||
|
||||
/// Check if IPv6 connectivity is available on this host.
|
||||
/// Uses UDP connect to Google DNS IPv6 (no packets sent).
|
||||
pub fn check_ipv6_available() -> bool {
|
||||
get_local_ipv6("[2001:4860:4860::8888]:80").is_some()
|
||||
}
|
||||
|
||||
/// Detect public IP addresses
|
||||
pub async fn detect_ip() -> IpInfo {
|
||||
let mut info = IpInfo::default();
|
||||
|
|
|
|||
Loading…
Reference in New Issue