From 780fafa6041c65cc854469fc9f1474a46b86b482 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 14 Mar 2026 02:20:51 +0300 Subject: [PATCH] Src-IP in ME Routing + more strict bind_addresses --- src/maestro/mod.rs | 1 + src/network/probe.rs | 141 ++++++++++++++++++++-- src/transport/middle_proxy/handshake.rs | 2 + src/transport/middle_proxy/pool.rs | 1 + src/transport/middle_proxy/pool_writer.rs | 1 + src/transport/middle_proxy/send.rs | 44 +++++-- src/transport/upstream.rs | 46 ++++++- 7 files changed, 206 insertions(+), 30 deletions(-) diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index 6724188..5f6c70a 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -287,6 +287,7 @@ pub async fn run() -> std::result::Result<(), Box> { .await; let probe = run_probe( &config.network, + &config.upstreams, config.general.middle_proxy_nat_probe, config.general.stun_nat_probe_concurrency, ) diff --git a/src/network/probe.rs b/src/network/probe.rs index cbd32d9..a9e369d 100644 --- a/src/network/probe.rs +++ b/src/network/probe.rs @@ -8,9 +8,10 @@ use tokio::task::JoinSet; use tokio::time::timeout; use tracing::{debug, info, warn}; -use crate::config::NetworkConfig; +use crate::config::{NetworkConfig, UpstreamConfig, UpstreamType}; use crate::error::Result; -use crate::network::stun::{stun_probe_dual, DualStunResult, IpFamily, StunProbeResult}; +use crate::network::stun::{stun_probe_family_with_bind, DualStunResult, IpFamily, StunProbeResult}; +use crate::transport::UpstreamManager; #[derive(Debug, Clone, Default)] pub struct NetworkProbe { @@ -57,19 +58,22 @@ const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5); pub async fn run_probe( config: &NetworkConfig, + upstreams: &[UpstreamConfig], nat_probe: bool, stun_nat_probe_concurrency: usize, ) -> Result { let mut probe = NetworkProbe::default(); + let servers = collect_stun_servers(config); + let mut detected_ipv4 = detect_local_ip_v4(); + let mut detected_ipv6 = detect_local_ip_v6(); + let mut explicit_detected_ipv4 = false; + let mut explicit_detected_ipv6 = false; + let mut explicit_reflected_ipv4 = false; + let mut explicit_reflected_ipv6 = false; + let mut strict_bind_ipv4_requested = false; + let mut strict_bind_ipv6_requested = false; - probe.detected_ipv4 = detect_local_ip_v4(); - probe.detected_ipv6 = detect_local_ip_v6(); - - probe.ipv4_is_bogon = probe.detected_ipv4.map(is_bogon_v4).unwrap_or(false); - probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false); - - let stun_res = if nat_probe && config.stun_use { - let servers = collect_stun_servers(config); + let global_stun_res = if nat_probe && config.stun_use { if servers.is_empty() { warn!("STUN probe is enabled but network.stun_servers is empty"); DualStunResult::default() @@ -77,6 +81,8 @@ pub async fn run_probe( probe_stun_servers_parallel( &servers, stun_nat_probe_concurrency.max(1), + None, + None, ) .await } @@ -86,8 +92,108 @@ pub async fn run_probe( } else { DualStunResult::default() }; - probe.reflected_ipv4 = stun_res.v4.map(|r| r.reflected_addr); - probe.reflected_ipv6 = stun_res.v6.map(|r| r.reflected_addr); + let mut reflected_ipv4 = global_stun_res.v4.map(|r| r.reflected_addr); + let mut reflected_ipv6 = global_stun_res.v6.map(|r| r.reflected_addr); + + for upstream in upstreams.iter().filter(|upstream| upstream.enabled) { + let UpstreamType::Direct { + interface, + bind_addresses, + } = &upstream.upstream_type else { + continue; + }; + if let Some(addrs) = bind_addresses.as_ref().filter(|v| !v.is_empty()) { + let mut saw_parsed_ip = false; + for value in addrs { + if let Ok(ip) = value.parse::() { + saw_parsed_ip = true; + if ip.is_ipv4() { + strict_bind_ipv4_requested = true; + } else { + strict_bind_ipv6_requested = true; + } + } + } + if !saw_parsed_ip { + strict_bind_ipv4_requested = true; + strict_bind_ipv6_requested = true; + } + } + + let bind_v4 = UpstreamManager::resolve_bind_address( + interface, + bind_addresses, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 443), + None, + true, + ); + let bind_v6 = UpstreamManager::resolve_bind_address( + interface, + bind_addresses, + SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)), + 443, + ), + None, + true, + ); + + if let Some(IpAddr::V4(ip)) = bind_v4 + && !explicit_detected_ipv4 + { + detected_ipv4 = Some(ip); + explicit_detected_ipv4 = true; + } + if let Some(IpAddr::V6(ip)) = bind_v6 + && !explicit_detected_ipv6 + { + detected_ipv6 = Some(ip); + explicit_detected_ipv6 = true; + } + if bind_v4.is_none() && bind_v6.is_none() { + continue; + } + + if !(nat_probe && config.stun_use) || servers.is_empty() { + continue; + } + + let direct_stun_res = probe_stun_servers_parallel( + &servers, + stun_nat_probe_concurrency.max(1), + bind_v4, + bind_v6, + ) + .await; + if let Some(reflected) = direct_stun_res.v4.map(|r| r.reflected_addr) { + reflected_ipv4 = Some(reflected); + explicit_reflected_ipv4 = true; + } + if let Some(reflected) = direct_stun_res.v6.map(|r| r.reflected_addr) { + reflected_ipv6 = Some(reflected); + explicit_reflected_ipv6 = true; + } + } + + if strict_bind_ipv4_requested && !explicit_detected_ipv4 { + detected_ipv4 = None; + reflected_ipv4 = None; + } else if strict_bind_ipv4_requested && !explicit_reflected_ipv4 { + reflected_ipv4 = None; + } + if strict_bind_ipv6_requested && !explicit_detected_ipv6 { + detected_ipv6 = None; + reflected_ipv6 = None; + } else if strict_bind_ipv6_requested && !explicit_reflected_ipv6 { + reflected_ipv6 = None; + } + + probe.detected_ipv4 = detected_ipv4; + probe.detected_ipv6 = detected_ipv6; + probe.reflected_ipv4 = reflected_ipv4; + probe.reflected_ipv6 = reflected_ipv6; + probe.ipv4_is_bogon = probe.detected_ipv4.map(is_bogon_v4).unwrap_or(false); + probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false); // If STUN is blocked but IPv4 is private, try HTTP public-IP fallback. if nat_probe @@ -162,6 +268,8 @@ fn collect_stun_servers(config: &NetworkConfig) -> Vec { async fn probe_stun_servers_parallel( servers: &[String], concurrency: usize, + bind_v4: Option, + bind_v6: Option, ) -> DualStunResult { let mut join_set = JoinSet::new(); let mut next_idx = 0usize; @@ -172,8 +280,15 @@ async fn probe_stun_servers_parallel( while next_idx < servers.len() && join_set.len() < concurrency { let stun_addr = servers[next_idx].clone(); next_idx += 1; + let bind_v4 = bind_v4; + let bind_v6 = bind_v6; join_set.spawn(async move { - let res = timeout(STUN_BATCH_TIMEOUT, stun_probe_dual(&stun_addr)).await; + let res = timeout(STUN_BATCH_TIMEOUT, async { + let v4 = stun_probe_family_with_bind(&stun_addr, IpFamily::V4, bind_v4).await?; + let v6 = stun_probe_family_with_bind(&stun_addr, IpFamily::V6, bind_v6).await?; + Ok::(DualStunResult { v4, v6 }) + }) + .await; (stun_addr, res) }); } diff --git a/src/transport/middle_proxy/handshake.rs b/src/transport/middle_proxy/handshake.rs index 0d7626c..39e34d7 100644 --- a/src/transport/middle_proxy/handshake.rs +++ b/src/transport/middle_proxy/handshake.rs @@ -59,6 +59,7 @@ impl KdfClientPortSource { pub(crate) struct HandshakeOutput { pub rd: ReadHalf, pub wr: WriteHalf, + pub source_ip: IpAddr, pub read_key: [u8; 32], pub read_iv: [u8; 16], pub write_key: [u8; 32], @@ -689,6 +690,7 @@ impl MePool { Ok(HandshakeOutput { rd, wr, + source_ip: local_addr_nat.ip(), read_key: rk, read_iv, write_key: wk, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 08ef142..42cba81 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -34,6 +34,7 @@ pub(super) struct RefillEndpointKey { pub struct MeWriter { pub id: u64, pub addr: SocketAddr, + pub source_ip: IpAddr, pub writer_dc: i32, pub generation: u64, pub contour: Arc, diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 7e79f10..64fb700 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -163,6 +163,7 @@ impl MePool { let writer = MeWriter { id: writer_id, addr, + source_ip: hs.source_ip, writer_dc, generation, contour: contour.clone(), diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 79cfa54..f63662b 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -42,20 +42,30 @@ impl MePool { tag_override: Option<&[u8]>, ) -> Result<()> { let tag = tag_override.or(self.proxy_tag.as_deref()); - let payload = build_proxy_req_payload( - conn_id, - client_addr, - our_addr, - data, - tag, - proto_flags, - ); - let meta = ConnMeta { + let fallback_meta = ConnMeta { target_dc, client_addr, our_addr, proto_flags, }; + let build_routed_payload = |effective_our_addr: SocketAddr| { + ( + build_proxy_req_payload( + conn_id, + client_addr, + effective_our_addr, + data, + tag, + proto_flags, + ), + ConnMeta { + target_dc, + client_addr, + our_addr: effective_our_addr, + proto_flags, + }, + ) + }; let no_writer_mode = MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed)); let (routed_dc, unknown_target_dc) = self @@ -70,8 +80,14 @@ impl MePool { let mut hybrid_wait_current = hybrid_wait_step; loop { + let current_meta = self + .registry + .get_meta(conn_id) + .await + .unwrap_or_else(|| fallback_meta.clone()); + let (current_payload, _) = build_routed_payload(current_meta.our_addr); if let Some(current) = self.registry.get_writer(conn_id).await { - match current.tx.try_send(WriterCommand::Data(payload.clone())) { + match current.tx.try_send(WriterCommand::Data(current_payload.clone())) { Ok(()) => return Ok(()), Err(TrySendError::Full(cmd)) => { if current.tx.send(cmd).await.is_ok() { @@ -354,11 +370,13 @@ impl MePool { if !self.writer_accepts_new_binding(w) { continue; } + let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); + let (payload, meta) = build_routed_payload(effective_our_addr); match w.tx.try_send(WriterCommand::Data(payload.clone())) { Ok(()) => { self.stats.increment_me_writer_pick_success_try_total(pick_mode); self.registry - .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) + .bind_writer(conn_id, w.id, w.tx.clone(), meta) .await; if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); @@ -397,12 +415,14 @@ impl MePool { continue; } self.stats.increment_me_writer_pick_blocking_fallback_total(); + let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); + let (payload, meta) = build_routed_payload(effective_our_addr); match w.tx.send(WriterCommand::Data(payload.clone())).await { Ok(()) => { self.stats .increment_me_writer_pick_success_fallback_total(pick_mode); self.registry - .bind_writer(conn_id, w.id, w.tx.clone(), meta.clone()) + .bind_writer(conn_id, w.id, w.tx.clone(), meta) .await; if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 1355934..8360e1e 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -390,7 +390,7 @@ impl UpstreamManager { out } - fn resolve_bind_address( + pub(crate) fn resolve_bind_address( interface: &Option, bind_addresses: &Option>, target: SocketAddr, @@ -399,7 +399,7 @@ impl UpstreamManager { ) -> Option { let want_ipv6 = target.is_ipv6(); - if let Some(addrs) = bind_addresses { + if let Some(addrs) = bind_addresses.as_ref().filter(|v| !v.is_empty()) { let mut candidates: Vec = addrs .iter() .filter_map(|s| s.parse::().ok()) @@ -431,7 +431,7 @@ impl UpstreamManager { warn!( interface = %iface, target = %target, - "Configured interface has no addresses for target family; falling back to direct connect without bind" + "Configured interface has no addresses for target family" ); candidates.clear(); } @@ -454,10 +454,11 @@ impl UpstreamManager { warn!( interface = interface.as_deref().unwrap_or(""), target = %target, - "No valid bind_addresses left for interface; falling back to direct connect without bind" + "No valid bind_addresses left for interface" ); - return None; } + + return None; } if let Some(iface) = interface { @@ -795,6 +796,13 @@ impl UpstreamManager { bind_rr.as_deref(), true, ); + if bind_ip.is_none() + && bind_addresses.as_ref().is_some_and(|v| !v.is_empty()) + { + return Err(ProxyError::Config(format!( + "No valid bind_addresses for target family {target}" + ))); + } let socket = create_outgoing_socket_bound(target, bind_ip)?; if let Some(ip) = bind_ip { @@ -1642,4 +1650,32 @@ mod tests { }; assert!(!UpstreamManager::is_hard_connect_error(&error)); } + + #[test] + fn resolve_bind_address_prefers_explicit_bind_ip() { + let target = "203.0.113.10:443".parse::().unwrap(); + let bind = UpstreamManager::resolve_bind_address( + &Some("198.51.100.20".to_string()), + &Some(vec!["198.51.100.10".to_string()]), + target, + None, + true, + ); + + assert_eq!(bind, Some("198.51.100.10".parse::().unwrap())); + } + + #[test] + fn resolve_bind_address_does_not_fallback_to_interface_when_bind_addresses_present() { + let target = "203.0.113.10:443".parse::().unwrap(); + let bind = UpstreamManager::resolve_bind_address( + &Some("198.51.100.20".to_string()), + &Some(vec!["2001:db8::10".to_string()]), + target, + None, + true, + ); + + assert_eq!(bind, None); + } }