Unified STUN + ME Primary parallelized

- Unified STUN server source-of-truth
- parallelize per-DC primary ME init for multi-endpoint DCs
This commit is contained in:
Alexey 2026-02-26 18:18:24 +03:00
parent 7782336264
commit 9d2ff25bf5
No known key found for this signature in database
7 changed files with 99 additions and 60 deletions

View File

@ -111,25 +111,11 @@ pub(crate) fn default_proxy_secret_path() -> Option<String> {
} }
pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> { pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> {
Some("stun.l.google.com:19302".to_string()) None
} }
pub(crate) fn default_middle_proxy_nat_stun_servers() -> Vec<String> { pub(crate) fn default_middle_proxy_nat_stun_servers() -> Vec<String> {
vec![ Vec::new()
"stun.l.google.com:5349".to_string(),
"stun1.l.google.com:3478".to_string(),
"stun.gmx.net:3478".to_string(),
"stun.l.google.com:19302".to_string(),
"stun.1und1.de:3478".to_string(),
"stun1.l.google.com:19302".to_string(),
"stun2.l.google.com:19302".to_string(),
"stun3.l.google.com:19302".to_string(),
"stun4.l.google.com:19302".to_string(),
"stun.services.mozilla.com:3478".to_string(),
"stun.stunprotocol.org:3478".to_string(),
"stun.nextcloud.com:3478".to_string(),
"stun.voip.eutelia.it:3478".to_string(),
]
} }
pub(crate) fn default_stun_nat_probe_concurrency() -> usize { pub(crate) fn default_stun_nat_probe_concurrency() -> usize {

View File

@ -65,6 +65,16 @@ fn validate_network_cfg(net: &mut NetworkConfig) -> Result<()> {
Ok(()) Ok(())
} }
fn push_unique_nonempty(target: &mut Vec<String>, value: String) {
let trimmed = value.trim();
if trimmed.is_empty() {
return;
}
if !target.iter().any(|existing| existing == trimmed) {
target.push(trimmed.to_string());
}
}
// ============= Main Config ============= // ============= Main Config =============
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
@ -138,6 +148,30 @@ impl ProxyConfig {
config.general.update_every = None; config.general.update_every = None;
} }
let legacy_nat_stun = config.general.middle_proxy_nat_stun.take();
let legacy_nat_stun_servers = std::mem::take(&mut config.general.middle_proxy_nat_stun_servers);
let legacy_nat_stun_used = legacy_nat_stun.is_some() || !legacy_nat_stun_servers.is_empty();
let mut unified_stun_servers = Vec::new();
for stun in std::mem::take(&mut config.network.stun_servers) {
push_unique_nonempty(&mut unified_stun_servers, stun);
}
if let Some(stun) = legacy_nat_stun {
push_unique_nonempty(&mut unified_stun_servers, stun);
}
for stun in legacy_nat_stun_servers {
push_unique_nonempty(&mut unified_stun_servers, stun);
}
if unified_stun_servers.is_empty() {
unified_stun_servers = default_stun_servers();
}
config.network.stun_servers = unified_stun_servers;
if legacy_nat_stun_used {
warn!("general.middle_proxy_nat_stun and general.middle_proxy_nat_stun_servers are deprecated; use network.stun_servers");
}
if let Some(update_every) = config.general.update_every { if let Some(update_every) = config.general.update_every {
if update_every == 0 { if update_every == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(

View File

@ -160,11 +160,13 @@ pub struct GeneralConfig {
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub middle_proxy_nat_probe: bool, pub middle_proxy_nat_probe: bool,
/// Optional STUN server address (host:port) for NAT probing. /// Deprecated legacy single STUN server for NAT probing.
/// Use `network.stun_servers` instead.
#[serde(default = "default_middle_proxy_nat_stun")] #[serde(default = "default_middle_proxy_nat_stun")]
pub middle_proxy_nat_stun: Option<String>, pub middle_proxy_nat_stun: Option<String>,
/// Optional list of STUN servers for NAT probing fallback. /// Deprecated legacy STUN list for NAT probing fallback.
/// Use `network.stun_servers` instead.
#[serde(default = "default_middle_proxy_nat_stun_servers")] #[serde(default = "default_middle_proxy_nat_stun_servers")]
pub middle_proxy_nat_stun_servers: Vec<String>, pub middle_proxy_nat_stun_servers: Vec<String>,

View File

@ -256,8 +256,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let probe = run_probe( let probe = run_probe(
&config.network, &config.network,
config.general.middle_proxy_nat_stun.clone(),
config.general.middle_proxy_nat_stun_servers.clone(),
config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_probe,
config.general.stun_nat_probe_concurrency, config.general.stun_nat_probe_concurrency,
) )
@ -360,8 +358,8 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
proxy_secret, proxy_secret,
config.general.middle_proxy_nat_ip, config.general.middle_proxy_nat_ip,
config.general.middle_proxy_nat_probe, config.general.middle_proxy_nat_probe,
config.general.middle_proxy_nat_stun.clone(), None,
config.general.middle_proxy_nat_stun_servers.clone(), config.network.stun_servers.clone(),
config.general.stun_nat_probe_concurrency, config.general.stun_nat_probe_concurrency,
probe.detected_ipv6, probe.detected_ipv6,
config.timeouts.me_one_retry, config.timeouts.me_one_retry,

View File

@ -57,8 +57,6 @@ const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
pub async fn run_probe( pub async fn run_probe(
config: &NetworkConfig, config: &NetworkConfig,
stun_addr: Option<String>,
stun_servers: Vec<String>,
nat_probe: bool, nat_probe: bool,
stun_nat_probe_concurrency: usize, stun_nat_probe_concurrency: usize,
) -> Result<NetworkProbe> { ) -> Result<NetworkProbe> {
@ -71,12 +69,17 @@ pub async fn run_probe(
probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false); probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false);
let stun_res = if nat_probe { let stun_res = if nat_probe {
let servers = collect_stun_servers(config, stun_addr, stun_servers); let servers = collect_stun_servers(config);
probe_stun_servers_parallel( if servers.is_empty() {
&servers, warn!("STUN probe is enabled but network.stun_servers is empty");
stun_nat_probe_concurrency.max(1), DualStunResult::default()
) } else {
.await probe_stun_servers_parallel(
&servers,
stun_nat_probe_concurrency.max(1),
)
.await
}
} else { } else {
DualStunResult::default() DualStunResult::default()
}; };
@ -143,36 +146,13 @@ async fn detect_public_ipv4_http(urls: &[String]) -> Option<Ipv4Addr> {
None None
} }
fn collect_stun_servers( fn collect_stun_servers(config: &NetworkConfig) -> Vec<String> {
config: &NetworkConfig,
stun_addr: Option<String>,
stun_servers: Vec<String>,
) -> Vec<String> {
let mut out = Vec::new(); let mut out = Vec::new();
if !stun_servers.is_empty() { for s in &config.stun_servers {
for s in stun_servers { if !s.is_empty() && !out.contains(s) {
if !s.is_empty() && !out.contains(&s) { out.push(s.clone());
out.push(s);
}
}
} else if let Some(s) = stun_addr
&& !s.is_empty()
{
out.push(s);
}
if out.is_empty() {
for s in &config.stun_servers {
if !s.is_empty() && !out.contains(s) {
out.push(s.clone());
}
} }
} }
if out.is_empty() {
out.push("stun.l.google.com:19302".to_string());
}
out out
} }

View File

@ -1199,6 +1199,35 @@ impl MePool {
return false; return false;
} }
addrs.shuffle(&mut rand::rng()); addrs.shuffle(&mut rand::rng());
if addrs.len() > 1 {
let mut join = tokio::task::JoinSet::new();
for (ip, port) in addrs {
let addr = SocketAddr::new(ip, port);
let pool = Arc::clone(&self);
let rng_clone = Arc::clone(&rng);
join.spawn(async move { (addr, pool.connect_one(addr, rng_clone.as_ref()).await) });
}
while let Some(res) = join.join_next().await {
match res {
Ok((addr, Ok(()))) => {
info!(%addr, dc = %dc, "ME connected");
join.abort_all();
while join.join_next().await.is_some() {}
return true;
}
Ok((addr, Err(e))) => {
warn!(%addr, dc = %dc, error = %e, "ME connect failed, trying next");
}
Err(e) => {
warn!(dc = %dc, error = %e, "ME connect task failed");
}
}
}
warn!(dc = %dc, "All ME servers for DC failed at init");
return false;
}
for (ip, port) in addrs { for (ip, port) in addrs {
let addr = SocketAddr::new(ip, port); let addr = SocketAddr::new(ip, port);
match self.connect_one(addr, rng.as_ref()).await { match self.connect_one(addr, rng.as_ref()).await {

View File

@ -17,7 +17,15 @@ const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
#[allow(dead_code)] #[allow(dead_code)]
pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> { pub async fn stun_probe(stun_addr: Option<String>) -> Result<crate::network::stun::DualStunResult> {
let stun_addr = stun_addr.unwrap_or_else(|| "stun.l.google.com:19302".to_string()); let stun_addr = stun_addr.unwrap_or_else(|| {
crate::config::defaults::default_stun_servers()
.into_iter()
.next()
.unwrap_or_default()
});
if stun_addr.is_empty() {
return Err(ProxyError::Proxy("STUN server is not configured".to_string()));
}
stun_probe_dual(&stun_addr).await stun_probe_dual(&stun_addr).await
} }
@ -31,10 +39,12 @@ impl MePool {
if !self.nat_stun_servers.is_empty() { if !self.nat_stun_servers.is_empty() {
return self.nat_stun_servers.clone(); return self.nat_stun_servers.clone();
} }
if let Some(s) = &self.nat_stun { if let Some(s) = &self.nat_stun
&& !s.trim().is_empty()
{
return vec![s.clone()]; return vec![s.clone()];
} }
vec!["stun.l.google.com:19302".to_string()] Vec::new()
} }
async fn probe_stun_batch_for_family( async fn probe_stun_batch_for_family(