Server.Listeners + Upstream V4/V6

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-04-15 01:32:49 +03:00
parent 3fefcdd11f
commit d7a0319696
31 changed files with 377 additions and 52 deletions

View File

@@ -455,6 +455,87 @@ impl UpstreamManager {
}
}
fn resolve_probe_dc_families(
upstream: &UpstreamConfig,
ipv4_available: bool,
ipv6_available: bool,
) -> (bool, bool) {
(
upstream.ipv4.unwrap_or(ipv4_available),
upstream.ipv6.unwrap_or(ipv6_available),
)
}
fn resolve_runtime_dc_families(
upstream: &UpstreamConfig,
dc_preference: IpPreference,
) -> (bool, bool) {
let (auto_ipv4, auto_ipv6) = match dc_preference {
IpPreference::PreferV4 => (true, false),
IpPreference::PreferV6 => (false, true),
IpPreference::BothWork | IpPreference::Unknown | IpPreference::Unavailable => {
(true, true)
}
};
(
upstream.ipv4.unwrap_or(auto_ipv4),
upstream.ipv6.unwrap_or(auto_ipv6),
)
}
fn dc_table_addr(dc_idx: i16, ipv6: bool, port: u16) -> Option<SocketAddr> {
let arr_idx = UpstreamState::dc_array_idx(dc_idx)?;
let ip = if ipv6 {
TG_DATACENTERS_V6[arr_idx]
} else {
TG_DATACENTERS_V4[arr_idx]
};
Some(SocketAddr::new(ip, port))
}
fn resolve_runtime_dc_target(
target: SocketAddr,
dc_idx: Option<i16>,
upstream: &UpstreamConfig,
dc_preference: IpPreference,
) -> Result<SocketAddr> {
let (allow_ipv4, allow_ipv6) = Self::resolve_runtime_dc_families(upstream, dc_preference);
if (target.is_ipv4() && allow_ipv4) || (target.is_ipv6() && allow_ipv6) {
return Ok(target);
}
if !allow_ipv4 && !allow_ipv6 {
return Err(ProxyError::Config(format!(
"Upstream DC family policy blocks all families for target {target}"
)));
}
let Some(dc_idx) = dc_idx else {
return Err(ProxyError::Config(format!(
"Upstream DC family policy cannot remap target {target} without dc_idx"
)));
};
let remapped = if target.is_ipv4() {
if allow_ipv6 {
Self::dc_table_addr(dc_idx, true, target.port())
} else {
None
}
} else if allow_ipv4 {
Self::dc_table_addr(dc_idx, false, target.port())
} else {
None
};
remapped.ok_or_else(|| {
ProxyError::Config(format!(
"Upstream DC family policy rejected target {target} (dc_idx={dc_idx})"
))
})
}
#[cfg(unix)]
fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec<IpAddr> {
use nix::ifaddrs::getifaddrs;
@@ -728,18 +809,24 @@ impl UpstreamManager {
.await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let mut upstream = {
let (mut upstream, bind_rr, dc_preference) = {
let guard = self.upstreams.read().await;
guard[idx].config.clone()
let state = &guard[idx];
let dc_preference = dc_idx
.and_then(UpstreamState::dc_array_idx)
.map(|dc_array_idx| state.dc_ip_pref[dc_array_idx])
.unwrap_or(IpPreference::Unknown);
(state.config.clone(), Some(state.bind_rr.clone()), dc_preference)
};
if let Some(s) = scope {
upstream.selected_scope = s.to_string();
}
let bind_rr = {
let guard = self.upstreams.read().await;
guard.get(idx).map(|u| u.bind_rr.clone())
let target = if dc_idx.is_some() {
Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)?
} else {
target
};
let (stream, _) = self
@@ -760,9 +847,14 @@ impl UpstreamManager {
.await
.ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?;
let mut upstream = {
let (mut upstream, bind_rr, dc_preference) = {
let guard = self.upstreams.read().await;
guard[idx].config.clone()
let state = &guard[idx];
let dc_preference = dc_idx
.and_then(UpstreamState::dc_array_idx)
.map(|dc_array_idx| state.dc_ip_pref[dc_array_idx])
.unwrap_or(IpPreference::Unknown);
(state.config.clone(), Some(state.bind_rr.clone()), dc_preference)
};
// Set scope for configuration copy
@@ -770,9 +862,10 @@ impl UpstreamManager {
upstream.selected_scope = s.to_string();
}
let bind_rr = {
let guard = self.upstreams.read().await;
guard.get(idx).map(|u| u.bind_rr.clone())
let target = if dc_idx.is_some() {
Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)?
} else {
target
};
let (stream, egress) = self
@@ -1212,6 +1305,8 @@ impl UpstreamManager {
let mut all_results = Vec::new();
for (upstream_idx, upstream_config, bind_rr) in &upstreams {
let (upstream_ipv4_enabled, upstream_ipv6_enabled) =
Self::resolve_probe_dc_families(upstream_config, ipv4_enabled, ipv6_enabled);
let upstream_name = match &upstream_config.upstream_type {
UpstreamType::Direct {
interface,
@@ -1244,7 +1339,7 @@ impl UpstreamManager {
};
let mut v6_results = Vec::with_capacity(NUM_DCS);
if ipv6_enabled {
if upstream_ipv6_enabled {
for dc_zero_idx in 0..NUM_DCS {
let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx];
let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT);
@@ -1295,13 +1390,17 @@ impl UpstreamManager {
dc_idx: dc_zero_idx + 1,
dc_addr: SocketAddr::new(dc_v6, TG_DATACENTER_PORT),
rtt_ms: None,
error: Some("ipv6 disabled".to_string()),
error: Some(if ipv6_enabled {
"ipv6 disabled by upstream policy".to_string()
} else {
"ipv6 disabled".to_string()
}),
});
}
}
let mut v4_results = Vec::with_capacity(NUM_DCS);
if ipv4_enabled {
if upstream_ipv4_enabled {
for dc_zero_idx in 0..NUM_DCS {
let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx];
let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT);
@@ -1352,7 +1451,11 @@ impl UpstreamManager {
dc_idx: dc_zero_idx + 1,
dc_addr: SocketAddr::new(dc_v4, TG_DATACENTER_PORT),
rtt_ms: None,
error: Some("ipv4 disabled".to_string()),
error: Some(if ipv4_enabled {
"ipv4 disabled by upstream policy".to_string()
} else {
"ipv4 disabled".to_string()
}),
});
}
}
@@ -1372,7 +1475,9 @@ impl UpstreamManager {
match addr_str.parse::<SocketAddr>() {
Ok(addr) => {
let is_v6 = addr.is_ipv6();
if (is_v6 && !ipv6_enabled) || (!is_v6 && !ipv4_enabled) {
if (is_v6 && !upstream_ipv6_enabled)
|| (!is_v6 && !upstream_ipv4_enabled)
{
continue;
}
let result = tokio::time::timeout(
@@ -1614,6 +1719,8 @@ impl UpstreamManager {
let u = &guard[i];
(u.config.clone(), u.bind_rr.clone())
};
let (upstream_ipv4_enabled, upstream_ipv6_enabled) =
Self::resolve_probe_dc_families(&config, ipv4_enabled, ipv6_enabled);
let mut healthy_groups = 0usize;
let mut latency_updates: Vec<(usize, f64)> = Vec::new();
@@ -1629,14 +1736,31 @@ impl UpstreamManager {
continue;
}
let filtered_endpoints: Vec<SocketAddr> = endpoints
.iter()
.copied()
.filter(|endpoint| {
if endpoint.is_ipv4() {
upstream_ipv4_enabled
} else {
upstream_ipv6_enabled
}
})
.collect();
if filtered_endpoints.is_empty() {
continue;
}
let rotation_key = (i, group.dc_idx, is_primary);
let start_idx =
*endpoint_rotation.entry(rotation_key).or_insert(0) % endpoints.len();
let mut next_idx = (start_idx + 1) % endpoints.len();
*endpoint_rotation.entry(rotation_key).or_insert(0)
% filtered_endpoints.len();
let mut next_idx = (start_idx + 1) % filtered_endpoints.len();
for step in 0..endpoints.len() {
let endpoint_idx = (start_idx + step) % endpoints.len();
let endpoint = endpoints[endpoint_idx];
for step in 0..filtered_endpoints.len() {
let endpoint_idx = (start_idx + step) % filtered_endpoints.len();
let endpoint = filtered_endpoints[endpoint_idx];
let start = Instant::now();
let result = tokio::time::timeout(
@@ -1655,7 +1779,7 @@ impl UpstreamManager {
Ok(Ok(_stream)) => {
group_ok = true;
group_rtt_ms = Some(start.elapsed().as_secs_f64() * 1000.0);
next_idx = (endpoint_idx + 1) % endpoints.len();
next_idx = (endpoint_idx + 1) % filtered_endpoints.len();
break;
}
Ok(Err(e)) => {
@@ -1910,6 +2034,8 @@ mod tests {
enabled: true,
scopes: String::new(),
selected_scope: String::new(),
ipv4: None,
ipv6: None,
}],
1,
100,