Src-IP in ME Routing + more strict bind_addresses + ME Gate fixes: merge pull request #408 from telemt/flow

Src-IP in ME Routing + more strict bind_addresses + ME Gate fixes
This commit is contained in:
Alexey 2026-03-14 13:22:09 +03:00 committed by GitHub
commit e57a93880b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 281 additions and 37 deletions

View File

@ -287,6 +287,7 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
.await;
let probe = run_probe(
&config.network,
&config.upstreams,
config.general.middle_proxy_nat_probe,
config.general.stun_nat_probe_concurrency,
)
@ -295,7 +296,11 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
probe.detected_ipv4.map(IpAddr::V4),
probe.detected_ipv6.map(IpAddr::V6),
));
let decision = decide_network_capabilities(&config.network, &probe);
let decision = decide_network_capabilities(
&config.network,
&probe,
config.general.middle_proxy_nat_ip,
);
log_probe_result(&probe, &decision);
startup_tracker
.complete_component(

View File

@ -8,9 +8,10 @@ use tokio::task::JoinSet;
use tokio::time::timeout;
use tracing::{debug, info, warn};
use crate::config::NetworkConfig;
use crate::config::{NetworkConfig, UpstreamConfig, UpstreamType};
use crate::error::Result;
use crate::network::stun::{stun_probe_dual, DualStunResult, IpFamily, StunProbeResult};
use crate::network::stun::{stun_probe_family_with_bind, DualStunResult, IpFamily, StunProbeResult};
use crate::transport::UpstreamManager;
#[derive(Debug, Clone, Default)]
pub struct NetworkProbe {
@ -57,19 +58,22 @@ const STUN_BATCH_TIMEOUT: Duration = Duration::from_secs(5);
pub async fn run_probe(
config: &NetworkConfig,
upstreams: &[UpstreamConfig],
nat_probe: bool,
stun_nat_probe_concurrency: usize,
) -> Result<NetworkProbe> {
let mut probe = NetworkProbe::default();
let servers = collect_stun_servers(config);
let mut detected_ipv4 = detect_local_ip_v4();
let mut detected_ipv6 = detect_local_ip_v6();
let mut explicit_detected_ipv4 = false;
let mut explicit_detected_ipv6 = false;
let mut explicit_reflected_ipv4 = false;
let mut explicit_reflected_ipv6 = false;
let mut strict_bind_ipv4_requested = false;
let mut strict_bind_ipv6_requested = false;
probe.detected_ipv4 = detect_local_ip_v4();
probe.detected_ipv6 = detect_local_ip_v6();
probe.ipv4_is_bogon = probe.detected_ipv4.map(is_bogon_v4).unwrap_or(false);
probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false);
let stun_res = if nat_probe && config.stun_use {
let servers = collect_stun_servers(config);
let global_stun_res = if nat_probe && config.stun_use {
if servers.is_empty() {
warn!("STUN probe is enabled but network.stun_servers is empty");
DualStunResult::default()
@ -77,6 +81,8 @@ pub async fn run_probe(
probe_stun_servers_parallel(
&servers,
stun_nat_probe_concurrency.max(1),
None,
None,
)
.await
}
@ -86,8 +92,108 @@ pub async fn run_probe(
} else {
DualStunResult::default()
};
probe.reflected_ipv4 = stun_res.v4.map(|r| r.reflected_addr);
probe.reflected_ipv6 = stun_res.v6.map(|r| r.reflected_addr);
let mut reflected_ipv4 = global_stun_res.v4.map(|r| r.reflected_addr);
let mut reflected_ipv6 = global_stun_res.v6.map(|r| r.reflected_addr);
for upstream in upstreams.iter().filter(|upstream| upstream.enabled) {
let UpstreamType::Direct {
interface,
bind_addresses,
} = &upstream.upstream_type else {
continue;
};
if let Some(addrs) = bind_addresses.as_ref().filter(|v| !v.is_empty()) {
let mut saw_parsed_ip = false;
for value in addrs {
if let Ok(ip) = value.parse::<IpAddr>() {
saw_parsed_ip = true;
if ip.is_ipv4() {
strict_bind_ipv4_requested = true;
} else {
strict_bind_ipv6_requested = true;
}
}
}
if !saw_parsed_ip {
strict_bind_ipv4_requested = true;
strict_bind_ipv6_requested = true;
}
}
let bind_v4 = UpstreamManager::resolve_bind_address(
interface,
bind_addresses,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(198, 51, 100, 1)), 443),
None,
true,
);
let bind_v6 = UpstreamManager::resolve_bind_address(
interface,
bind_addresses,
SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)),
443,
),
None,
true,
);
if let Some(IpAddr::V4(ip)) = bind_v4
&& !explicit_detected_ipv4
{
detected_ipv4 = Some(ip);
explicit_detected_ipv4 = true;
}
if let Some(IpAddr::V6(ip)) = bind_v6
&& !explicit_detected_ipv6
{
detected_ipv6 = Some(ip);
explicit_detected_ipv6 = true;
}
if bind_v4.is_none() && bind_v6.is_none() {
continue;
}
if !(nat_probe && config.stun_use) || servers.is_empty() {
continue;
}
let direct_stun_res = probe_stun_servers_parallel(
&servers,
stun_nat_probe_concurrency.max(1),
bind_v4,
bind_v6,
)
.await;
if let Some(reflected) = direct_stun_res.v4.map(|r| r.reflected_addr) {
reflected_ipv4 = Some(reflected);
explicit_reflected_ipv4 = true;
}
if let Some(reflected) = direct_stun_res.v6.map(|r| r.reflected_addr) {
reflected_ipv6 = Some(reflected);
explicit_reflected_ipv6 = true;
}
}
if strict_bind_ipv4_requested && !explicit_detected_ipv4 {
detected_ipv4 = None;
reflected_ipv4 = None;
} else if strict_bind_ipv4_requested && !explicit_reflected_ipv4 {
reflected_ipv4 = None;
}
if strict_bind_ipv6_requested && !explicit_detected_ipv6 {
detected_ipv6 = None;
reflected_ipv6 = None;
} else if strict_bind_ipv6_requested && !explicit_reflected_ipv6 {
reflected_ipv6 = None;
}
probe.detected_ipv4 = detected_ipv4;
probe.detected_ipv6 = detected_ipv6;
probe.reflected_ipv4 = reflected_ipv4;
probe.reflected_ipv6 = reflected_ipv6;
probe.ipv4_is_bogon = probe.detected_ipv4.map(is_bogon_v4).unwrap_or(false);
probe.ipv6_is_bogon = probe.detected_ipv6.map(is_bogon_v6).unwrap_or(false);
// If STUN is blocked but IPv4 is private, try HTTP public-IP fallback.
if nat_probe
@ -162,6 +268,8 @@ fn collect_stun_servers(config: &NetworkConfig) -> Vec<String> {
async fn probe_stun_servers_parallel(
servers: &[String],
concurrency: usize,
bind_v4: Option<IpAddr>,
bind_v6: Option<IpAddr>,
) -> DualStunResult {
let mut join_set = JoinSet::new();
let mut next_idx = 0usize;
@ -172,8 +280,15 @@ async fn probe_stun_servers_parallel(
while next_idx < servers.len() && join_set.len() < concurrency {
let stun_addr = servers[next_idx].clone();
next_idx += 1;
let bind_v4 = bind_v4;
let bind_v6 = bind_v6;
join_set.spawn(async move {
let res = timeout(STUN_BATCH_TIMEOUT, stun_probe_dual(&stun_addr)).await;
let res = timeout(STUN_BATCH_TIMEOUT, async {
let v4 = stun_probe_family_with_bind(&stun_addr, IpFamily::V4, bind_v4).await?;
let v6 = stun_probe_family_with_bind(&stun_addr, IpFamily::V6, bind_v6).await?;
Ok::<DualStunResult, crate::error::ProxyError>(DualStunResult { v4, v6 })
})
.await;
(stun_addr, res)
});
}
@ -226,18 +341,24 @@ async fn probe_stun_servers_parallel(
out
}
pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe) -> NetworkDecision {
pub fn decide_network_capabilities(
config: &NetworkConfig,
probe: &NetworkProbe,
middle_proxy_nat_ip: Option<IpAddr>,
) -> NetworkDecision {
let ipv4_dc = config.ipv4 && probe.detected_ipv4.is_some();
let ipv6_dc = config.ipv6.unwrap_or(probe.detected_ipv6.is_some()) && probe.detected_ipv6.is_some();
let nat_ip_v4 = matches!(middle_proxy_nat_ip, Some(IpAddr::V4(_)));
let nat_ip_v6 = matches!(middle_proxy_nat_ip, Some(IpAddr::V6(_)));
let ipv4_me = config.ipv4
&& probe.detected_ipv4.is_some()
&& (!probe.ipv4_is_bogon || probe.reflected_ipv4.is_some());
&& (!probe.ipv4_is_bogon || probe.reflected_ipv4.is_some() || nat_ip_v4);
let ipv6_enabled = config.ipv6.unwrap_or(probe.detected_ipv6.is_some());
let ipv6_me = ipv6_enabled
&& probe.detected_ipv6.is_some()
&& (!probe.ipv6_is_bogon || probe.reflected_ipv6.is_some());
&& (!probe.ipv6_is_bogon || probe.reflected_ipv6.is_some() || nat_ip_v6);
let effective_prefer = match config.prefer {
6 if ipv6_me || ipv6_dc => 6,
@ -262,6 +383,58 @@ pub fn decide_network_capabilities(config: &NetworkConfig, probe: &NetworkProbe)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::NetworkConfig;
#[test]
fn manual_nat_ip_enables_ipv4_me_without_reflection() {
let config = NetworkConfig {
ipv4: true,
..Default::default()
};
let probe = NetworkProbe {
detected_ipv4: Some(Ipv4Addr::new(10, 0, 0, 10)),
ipv4_is_bogon: true,
..Default::default()
};
let decision = decide_network_capabilities(
&config,
&probe,
Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
);
assert!(decision.ipv4_me);
}
#[test]
fn manual_nat_ip_does_not_enable_other_family() {
let config = NetworkConfig {
ipv4: true,
ipv6: Some(true),
..Default::default()
};
let probe = NetworkProbe {
detected_ipv4: Some(Ipv4Addr::new(10, 0, 0, 10)),
detected_ipv6: Some(Ipv6Addr::LOCALHOST),
ipv4_is_bogon: true,
ipv6_is_bogon: true,
..Default::default()
};
let decision = decide_network_capabilities(
&config,
&probe,
Some(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4))),
);
assert!(decision.ipv4_me);
assert!(!decision.ipv6_me);
}
}
fn detect_local_ip_v4() -> Option<Ipv4Addr> {
let socket = UdpSocket::bind("0.0.0.0:0").ok()?;
socket.connect("8.8.8.8:80").ok()?;

View File

@ -59,6 +59,7 @@ impl KdfClientPortSource {
pub(crate) struct HandshakeOutput {
pub rd: ReadHalf<TcpStream>,
pub wr: WriteHalf<TcpStream>,
pub source_ip: IpAddr,
pub read_key: [u8; 32],
pub read_iv: [u8; 16],
pub write_key: [u8; 32],
@ -689,6 +690,7 @@ impl MePool {
Ok(HandshakeOutput {
rd,
wr,
source_ip: local_addr_nat.ip(),
read_key: rk,
read_iv,
write_key: wk,

View File

@ -34,6 +34,7 @@ pub(super) struct RefillEndpointKey {
pub struct MeWriter {
pub id: u64,
pub addr: SocketAddr,
pub source_ip: IpAddr,
pub writer_dc: i32,
pub generation: u64,
pub contour: Arc<AtomicU8>,
@ -638,9 +639,9 @@ impl MePool {
}
}
/// Translate the local ME address into the address material sent to the proxy.
pub fn translate_our_addr(&self, addr: SocketAddr) -> SocketAddr {
let ip = self.translate_ip_for_nat(addr.ip());
SocketAddr::new(ip, addr.port())
self.translate_our_addr_with_reflection(addr, None)
}
pub fn registry(&self) -> &Arc<ConnRegistry> {

View File

@ -159,7 +159,13 @@ impl MePool {
addr: std::net::SocketAddr,
reflected: Option<std::net::SocketAddr>,
) -> std::net::SocketAddr {
let ip = if let Some(r) = reflected {
let ip = if let Some(nat_ip) = self.nat_ip_cfg {
match (addr.ip(), nat_ip) {
(IpAddr::V4(_), IpAddr::V4(dst)) => IpAddr::V4(dst),
(IpAddr::V6(_), IpAddr::V6(dst)) => IpAddr::V6(dst),
_ => addr.ip(),
}
} else if let Some(r) = reflected {
// Use reflected IP (not port) only when local address is non-public.
if is_bogon(addr.ip()) || addr.ip().is_loopback() || addr.ip().is_unspecified() {
r.ip()

View File

@ -163,6 +163,7 @@ impl MePool {
let writer = MeWriter {
id: writer_id,
addr,
source_ip: hs.source_ip,
writer_dc,
generation,
contour: contour.clone(),

View File

@ -42,20 +42,30 @@ impl MePool {
tag_override: Option<&[u8]>,
) -> Result<()> {
let tag = tag_override.or(self.proxy_tag.as_deref());
let payload = build_proxy_req_payload(
conn_id,
client_addr,
our_addr,
data,
tag,
proto_flags,
);
let meta = ConnMeta {
let fallback_meta = ConnMeta {
target_dc,
client_addr,
our_addr,
proto_flags,
};
let build_routed_payload = |effective_our_addr: SocketAddr| {
(
build_proxy_req_payload(
conn_id,
client_addr,
effective_our_addr,
data,
tag,
proto_flags,
),
ConnMeta {
target_dc,
client_addr,
our_addr: effective_our_addr,
proto_flags,
},
)
};
let no_writer_mode =
MeRouteNoWriterMode::from_u8(self.me_route_no_writer_mode.load(Ordering::Relaxed));
let (routed_dc, unknown_target_dc) = self
@ -70,8 +80,14 @@ impl MePool {
let mut hybrid_wait_current = hybrid_wait_step;
loop {
let current_meta = self
.registry
.get_meta(conn_id)
.await
.unwrap_or_else(|| fallback_meta.clone());
let (current_payload, _) = build_routed_payload(current_meta.our_addr);
if let Some(current) = self.registry.get_writer(conn_id).await {
match current.tx.try_send(WriterCommand::Data(payload.clone())) {
match current.tx.try_send(WriterCommand::Data(current_payload.clone())) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(cmd)) => {
if current.tx.send(cmd).await.is_ok() {
@ -354,11 +370,13 @@ impl MePool {
if !self.writer_accepts_new_binding(w) {
continue;
}
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
match w.tx.try_send(WriterCommand::Data(payload.clone())) {
Ok(()) => {
self.stats.increment_me_writer_pick_success_try_total(pick_mode);
self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.bind_writer(conn_id, w.id, w.tx.clone(), meta)
.await;
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();
@ -397,12 +415,14 @@ impl MePool {
continue;
}
self.stats.increment_me_writer_pick_blocking_fallback_total();
let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port());
let (payload, meta) = build_routed_payload(effective_our_addr);
match w.tx.send(WriterCommand::Data(payload.clone())).await {
Ok(()) => {
self.stats
.increment_me_writer_pick_success_fallback_total(pick_mode);
self.registry
.bind_writer(conn_id, w.id, w.tx.clone(), meta.clone())
.bind_writer(conn_id, w.id, w.tx.clone(), meta)
.await;
if w.generation < self.current_generation() {
self.stats.increment_pool_stale_pick_total();

View File

@ -390,7 +390,7 @@ impl UpstreamManager {
out
}
fn resolve_bind_address(
pub(crate) fn resolve_bind_address(
interface: &Option<String>,
bind_addresses: &Option<Vec<String>>,
target: SocketAddr,
@ -399,7 +399,7 @@ impl UpstreamManager {
) -> Option<IpAddr> {
let want_ipv6 = target.is_ipv6();
if let Some(addrs) = bind_addresses {
if let Some(addrs) = bind_addresses.as_ref().filter(|v| !v.is_empty()) {
let mut candidates: Vec<IpAddr> = addrs
.iter()
.filter_map(|s| s.parse::<IpAddr>().ok())
@ -431,7 +431,7 @@ impl UpstreamManager {
warn!(
interface = %iface,
target = %target,
"Configured interface has no addresses for target family; falling back to direct connect without bind"
"Configured interface has no addresses for target family"
);
candidates.clear();
}
@ -454,10 +454,11 @@ impl UpstreamManager {
warn!(
interface = interface.as_deref().unwrap_or(""),
target = %target,
"No valid bind_addresses left for interface; falling back to direct connect without bind"
"No valid bind_addresses left for interface"
);
return None;
}
return None;
}
if let Some(iface) = interface {
@ -795,6 +796,13 @@ impl UpstreamManager {
bind_rr.as_deref(),
true,
);
if bind_ip.is_none()
&& bind_addresses.as_ref().is_some_and(|v| !v.is_empty())
{
return Err(ProxyError::Config(format!(
"No valid bind_addresses for target family {target}"
)));
}
let socket = create_outgoing_socket_bound(target, bind_ip)?;
if let Some(ip) = bind_ip {
@ -1642,4 +1650,32 @@ mod tests {
};
assert!(!UpstreamManager::is_hard_connect_error(&error));
}
#[test]
fn resolve_bind_address_prefers_explicit_bind_ip() {
let target = "203.0.113.10:443".parse::<SocketAddr>().unwrap();
let bind = UpstreamManager::resolve_bind_address(
&Some("198.51.100.20".to_string()),
&Some(vec!["198.51.100.10".to_string()]),
target,
None,
true,
);
assert_eq!(bind, Some("198.51.100.10".parse::<IpAddr>().unwrap()));
}
#[test]
fn resolve_bind_address_does_not_fallback_to_interface_when_bind_addresses_present() {
let target = "203.0.113.10:443".parse::<SocketAddr>().unwrap();
let bind = UpstreamManager::resolve_bind_address(
&Some("198.51.100.20".to_string()),
&Some(vec!["2001:db8::10".to_string()]),
target,
None,
true,
);
assert_eq!(bind, None);
}
}