From d7a03196968c96f3c643230ca615a2833cbf9598 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 15 Apr 2026 01:32:49 +0300 Subject: [PATCH] Server.Listeners + Upstream V4/V6 Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/api/users.rs | 14 +- src/cli.rs | 3 +- src/config/hot_reload.rs | 16 +- src/config/load.rs | 17 ++ src/config/types.rs | 16 +- src/conntrack_control.rs | 61 ++++--- src/maestro/listeners.rs | 22 ++- .../client_masking_blackhat_campaign_tests.rs | 2 + .../client_masking_budget_security_tests.rs | 2 + ...ient_masking_diagnostics_security_tests.rs | 2 + ...ng_fragmented_classifier_security_tests.rs | 2 + .../client_masking_hard_adversarial_tests.rs | 2 + ...http2_fragmented_preface_security_tests.rs | 2 + ...fig_pipeline_integration_security_tests.rs | 2 + ...sking_prefetch_invariant_security_tests.rs | 2 + ...nt_masking_probe_evasion_blackhat_tests.rs | 2 + ...ent_masking_redteam_expected_fail_tests.rs | 8 + ...nt_masking_replay_timing_security_tests.rs | 2 + ...sifier_fuzz_redteam_expected_fail_tests.rs | 2 + ...sking_shape_hardening_adversarial_tests.rs | 2 + ...e_hardening_redteam_expected_fail_tests.rs | 2 + ..._masking_shape_hardening_security_tests.rs | 2 + ...client_masking_stress_adversarial_tests.rs | 2 + src/proxy/tests/client_security_tests.rs | 54 ++++++ ...client_timing_profile_adversarial_tests.rs | 2 + ...ent_tls_clienthello_size_security_tests.rs | 2 + ...lienthello_truncation_adversarial_tests.rs | 2 + ...ent_tls_mtproto_fallback_security_tests.rs | 2 + .../tests/direct_relay_security_tests.rs | 10 ++ .../proxy_shared_state_isolation_tests.rs | 2 + src/transport/upstream.rs | 168 +++++++++++++++--- 31 files changed, 377 insertions(+), 52 deletions(-) diff --git a/src/api/users.rs b/src/api/users.rs index 5a09714..6b20b85 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -452,7 +452,11 @@ fn build_user_links( startup_detected_ip_v6: Option, ) -> UserLinks { let hosts = resolve_link_hosts(cfg, startup_detected_ip_v4, startup_detected_ip_v6); - let port = cfg.general.links.public_port.unwrap_or(cfg.server.port); + let port = cfg + .general + .links + .public_port + .unwrap_or(resolve_default_link_port(cfg)); let tls_domains = resolve_tls_domains(cfg); let mut classic = Vec::new(); @@ -490,6 +494,14 @@ fn build_user_links( } } +fn resolve_default_link_port(cfg: &ProxyConfig) -> u16 { + cfg.server + .listeners + .first() + .and_then(|listener| listener.port) + .unwrap_or(cfg.server.port) +} + fn resolve_link_hosts( cfg: &ProxyConfig, startup_detected_ip_v4: Option, diff --git a/src/cli.rs b/src/cli.rs index 5a79bae..47a10d5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -598,16 +598,17 @@ secure = false tls = true [server] -port = {port} listen_addr_ipv4 = "0.0.0.0" listen_addr_ipv6 = "::" [[server.listeners]] ip = "0.0.0.0" +port = {port} # reuse_allow = false # Set true only when intentionally running multiple telemt instances on same port [[server.listeners]] ip = "::" +port = {port} [timeouts] client_first_byte_idle_secs = 300 diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index f481798..f42638c 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -17,8 +17,9 @@ //! | `network` | `dns_overrides` | Applied immediately | //! | `access` | All user/quota fields | Effective immediately | //! -//! Fields that require re-binding sockets (`server.port`, `censorship.*`, -//! `network.*`, `use_middle_proxy`) are **not** applied; a warning is emitted. +//! Fields that require re-binding sockets (`server.listeners`, legacy +//! `server.port`, `censorship.*`, `network.*`, `use_middle_proxy`) are **not** +//! applied; a warning is emitted. //! Non-hot changes are never mixed into the runtime config snapshot. use std::collections::BTreeSet; @@ -299,6 +300,7 @@ fn listeners_equal( } lhs.iter().zip(rhs.iter()).all(|(a, b)| { a.ip == b.ip + && a.port == b.port && a.announce == b.announce && a.announce_ip == b.announce_ip && a.proxy_protocol == b.proxy_protocol @@ -306,6 +308,14 @@ fn listeners_equal( }) } +fn resolve_default_link_port(cfg: &ProxyConfig) -> u16 { + cfg.server + .listeners + .first() + .and_then(|listener| listener.port) + .unwrap_or(cfg.server.port) +} + #[derive(Debug, Clone, Default, PartialEq, Eq)] struct WatchManifest { files: BTreeSet, @@ -1120,7 +1130,7 @@ fn log_changes( .general .links .public_port - .unwrap_or(new_cfg.server.port); + .unwrap_or(resolve_default_link_port(new_cfg)); for user in &added { if let Some(secret) = new_hot.users.get(*user) { print_user_links(user, secret, &host, port, new_cfg); diff --git a/src/config/load.rs b/src/config/load.rs index ffecc30..abe4072 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -253,6 +253,12 @@ fn validate_upstreams(config: &ProxyConfig) -> Result<()> { } for upstream in &config.upstreams { + if matches!(upstream.ipv4, Some(false)) && matches!(upstream.ipv6, Some(false)) { + return Err(ProxyError::Config( + "upstream.ipv4 and upstream.ipv6 cannot both be false".to_string(), + )); + } + if let UpstreamType::Shadowsocks { url, .. } = &upstream.upstream_type { let parsed = ShadowsocksServerConfig::from_url(url) .map_err(|error| ProxyError::Config(format!("invalid shadowsocks url: {error}")))?; @@ -1324,6 +1330,7 @@ impl ProxyConfig { if let Ok(ipv4) = ipv4_str.parse::() { config.server.listeners.push(ListenerConfig { ip: ipv4, + port: Some(config.server.port), announce: None, announce_ip: None, proxy_protocol: None, @@ -1335,6 +1342,7 @@ impl ProxyConfig { { config.server.listeners.push(ListenerConfig { ip: ipv6, + port: Some(config.server.port), announce: None, announce_ip: None, proxy_protocol: None, @@ -1343,6 +1351,13 @@ impl ProxyConfig { } } + // Migration: listeners[].port fallback to legacy server.port. + for listener in &mut config.server.listeners { + if listener.port.is_none() { + listener.port = Some(config.server.port); + } + } + // Migration: announce_ip → announce for each listener. for listener in &mut config.server.listeners { if listener.announce.is_none() @@ -1369,6 +1384,8 @@ impl ProxyConfig { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }); } diff --git a/src/config/types.rs b/src/config/types.rs index ee52cb7..35b8d46 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1153,7 +1153,8 @@ pub struct LinksConfig { #[serde(default)] pub public_host: Option, - /// Public port for tg:// link generation (overrides server.port). + /// Public port for tg:// link generation. + /// Overrides listener ports and legacy `server.port`. #[serde(default)] pub public_port: Option, } @@ -1375,6 +1376,8 @@ impl Default for ConntrackControlConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { + /// Legacy listener port used for backward compatibility. + /// For new configs prefer `[[server.listeners]].port`. #[serde(default = "default_port")] pub port: u16, @@ -1917,11 +1920,22 @@ pub struct UpstreamConfig { pub scopes: String, #[serde(skip)] pub selected_scope: String, + /// Allow IPv4 DC targets for this upstream. + /// `None` means auto-detect from runtime connectivity state. + #[serde(default)] + pub ipv4: Option, + /// Allow IPv6 DC targets for this upstream. + /// `None` means auto-detect from runtime connectivity state. + #[serde(default)] + pub ipv6: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ListenerConfig { pub ip: IpAddr, + /// Per-listener TCP port. If omitted, falls back to legacy `server.port`. + #[serde(default)] + pub port: Option, /// IP address or hostname to announce in proxy links. /// Takes precedence over `announce_ip` if both are set. #[serde(default)] diff --git a/src/conntrack_control.rs b/src/conntrack_control.rs index 306697e..12069c3 100644 --- a/src/conntrack_control.rs +++ b/src/conntrack_control.rs @@ -343,15 +343,28 @@ fn command_exists(binary: &str) -> bool { }) } -fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec>) { +fn listener_port_set(cfg: &ProxyConfig) -> Vec { + let mut ports: BTreeSet = BTreeSet::new(); + if cfg.server.listeners.is_empty() { + ports.insert(cfg.server.port); + } else { + for listener in &cfg.server.listeners { + ports.insert(listener.port.unwrap_or(cfg.server.port)); + } + } + ports.into_iter().collect() +} + +fn notrack_targets(cfg: &ProxyConfig) -> (Vec<(Option, u16)>, Vec<(Option, u16)>) { let mode = cfg.server.conntrack_control.mode; - let mut v4_targets: BTreeSet> = BTreeSet::new(); - let mut v6_targets: BTreeSet> = BTreeSet::new(); + let mut v4_targets: BTreeSet<(Option, u16)> = BTreeSet::new(); + let mut v6_targets: BTreeSet<(Option, u16)> = BTreeSet::new(); match mode { ConntrackMode::Tracked => {} ConntrackMode::Notrack => { if cfg.server.listeners.is_empty() { + let port = cfg.server.port; if let Some(ipv4) = cfg .server .listen_addr_ipv4 @@ -359,9 +372,9 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec().ok()) { if ipv4.is_unspecified() { - v4_targets.insert(None); + v4_targets.insert((None, port)); } else { - v4_targets.insert(Some(ipv4)); + v4_targets.insert((Some(ipv4), port)); } } if let Some(ipv6) = cfg @@ -371,33 +384,39 @@ fn notrack_targets(cfg: &ProxyConfig) -> (Vec>, Vec().ok()) { if ipv6.is_unspecified() { - v6_targets.insert(None); + v6_targets.insert((None, port)); } else { - v6_targets.insert(Some(ipv6)); + v6_targets.insert((Some(ipv6), port)); } } } else { for listener in &cfg.server.listeners { + let port = listener.port.unwrap_or(cfg.server.port); if listener.ip.is_ipv4() { if listener.ip.is_unspecified() { - v4_targets.insert(None); + v4_targets.insert((None, port)); } else { - v4_targets.insert(Some(listener.ip)); + v4_targets.insert((Some(listener.ip), port)); } } else if listener.ip.is_unspecified() { - v6_targets.insert(None); + v6_targets.insert((None, port)); } else { - v6_targets.insert(Some(listener.ip)); + v6_targets.insert((Some(listener.ip), port)); } } } } ConntrackMode::Hybrid => { + let ports = listener_port_set(cfg); for ip in &cfg.server.conntrack_control.hybrid_listener_ips { if ip.is_ipv4() { - v4_targets.insert(Some(*ip)); + for port in &ports { + v4_targets.insert((Some(*ip), *port)); + } } else { - v6_targets.insert(Some(*ip)); + for port in &ports { + v6_targets.insert((Some(*ip), *port)); + } } } } @@ -422,19 +441,19 @@ async fn apply_nft_rules(cfg: &ProxyConfig) -> Result<(), String> { let (v4_targets, v6_targets) = notrack_targets(cfg); let mut rules = Vec::new(); - for ip in v4_targets { + for (ip, port) in v4_targets { let rule = if let Some(ip) = ip { - format!("tcp dport {} ip daddr {} notrack", cfg.server.port, ip) + format!("tcp dport {} ip daddr {} notrack", port, ip) } else { - format!("tcp dport {} notrack", cfg.server.port) + format!("tcp dport {} notrack", port) }; rules.push(rule); } - for ip in v6_targets { + for (ip, port) in v6_targets { let rule = if let Some(ip) = ip { - format!("tcp dport {} ip6 daddr {} notrack", cfg.server.port, ip) + format!("tcp dport {} ip6 daddr {} notrack", port, ip) } else { - format!("tcp dport {} notrack", cfg.server.port) + format!("tcp dport {} notrack", port) }; rules.push(rule); } @@ -498,7 +517,7 @@ async fn apply_iptables_rules_for_binary( let (v4_targets, v6_targets) = notrack_targets(cfg); let selected = if ipv4 { v4_targets } else { v6_targets }; - for ip in selected { + for (ip, port) in selected { let mut args = vec![ "-t".to_string(), "raw".to_string(), @@ -507,7 +526,7 @@ async fn apply_iptables_rules_for_binary( "-p".to_string(), "tcp".to_string(), "--dport".to_string(), - cfg.server.port.to_string(), + port.to_string(), ]; if let Some(ip) = ip { args.push("-d".to_string()); diff --git a/src/maestro/listeners.rs b/src/maestro/listeners.rs index f032d77..034fbef 100644 --- a/src/maestro/listeners.rs +++ b/src/maestro/listeners.rs @@ -31,6 +31,19 @@ pub(crate) struct BoundListeners { pub(crate) has_unix_listener: bool, } +fn listener_port_or_legacy(listener: &crate::config::ListenerConfig, config: &ProxyConfig) -> u16 { + listener.port.unwrap_or(config.server.port) +} + +fn default_link_port(config: &ProxyConfig) -> u16 { + config + .server + .listeners + .first() + .and_then(|listener| listener.port) + .unwrap_or(config.server.port) +} + #[allow(clippy::too_many_arguments)] pub(crate) async fn bind_listeners( config: &Arc, @@ -63,7 +76,8 @@ pub(crate) async fn bind_listeners( let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { - let addr = SocketAddr::new(listener_conf.ip, config.server.port); + let listener_port = listener_port_or_legacy(listener_conf, config); + let addr = SocketAddr::new(listener_conf.ip, listener_port); if addr.is_ipv4() && !decision_ipv4_dc { warn!(%addr, "Skipping IPv4 listener: IPv4 disabled by [network]"); continue; @@ -110,7 +124,7 @@ pub(crate) async fn bind_listeners( .general .links .public_port - .unwrap_or(config.server.port); + .unwrap_or(listener_port); print_proxy_links(&public_host, link_port, config); } @@ -158,7 +172,7 @@ pub(crate) async fn bind_listeners( .general .links .public_port - .unwrap_or(config.server.port), + .unwrap_or(default_link_port(config)), ) } else { let ip = detected_ip_v4.or(detected_ip_v6).map(|ip| ip.to_string()); @@ -173,7 +187,7 @@ pub(crate) async fn bind_listeners( .general .links .public_port - .unwrap_or(config.server.port), + .unwrap_or(default_link_port(config)), ) }; diff --git a/src/proxy/tests/client_masking_blackhat_campaign_tests.rs b/src/proxy/tests/client_masking_blackhat_campaign_tests.rs index 962387a..c48caa0 100644 --- a/src/proxy/tests/client_masking_blackhat_campaign_tests.rs +++ b/src/proxy/tests/client_masking_blackhat_campaign_tests.rs @@ -37,6 +37,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_budget_security_tests.rs b/src/proxy/tests/client_masking_budget_security_tests.rs index d356869..11a72a0 100644 --- a/src/proxy/tests/client_masking_budget_security_tests.rs +++ b/src/proxy/tests/client_masking_budget_security_tests.rs @@ -33,6 +33,8 @@ fn build_harness(config: ProxyConfig) -> PipelineHarness { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_diagnostics_security_tests.rs b/src/proxy/tests/client_masking_diagnostics_security_tests.rs index a2f44ce..a55bb79 100644 --- a/src/proxy/tests/client_masking_diagnostics_security_tests.rs +++ b/src/proxy/tests/client_masking_diagnostics_security_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs b/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs index ae04c6a..5817f24 100644 --- a/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs +++ b/src/proxy/tests/client_masking_fragmented_classifier_security_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_hard_adversarial_tests.rs b/src/proxy/tests/client_masking_hard_adversarial_tests.rs index 7e0c683..709ff49 100644 --- a/src/proxy/tests/client_masking_hard_adversarial_tests.rs +++ b/src/proxy/tests/client_masking_hard_adversarial_tests.rs @@ -31,6 +31,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs b/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs index 8aa6fb2..49c9aa6 100644 --- a/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs +++ b/src/proxy/tests/client_masking_http2_fragmented_preface_security_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs b/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs index b992402..6ebaa5a 100644 --- a/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_config_pipeline_integration_security_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs b/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs index e9af94c..9491e3f 100644 --- a/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs +++ b/src/proxy/tests/client_masking_prefetch_invariant_security_tests.rs @@ -44,6 +44,8 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs b/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs index 282465a..62a2ef8 100644 --- a/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs +++ b/src/proxy/tests/client_masking_probe_evasion_blackhat_tests.rs @@ -22,6 +22,8 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs index b4a79fe..2781102 100644 --- a/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs +++ b/src/proxy/tests/client_masking_redteam_expected_fail_tests.rs @@ -45,6 +45,8 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> RedTeamHarness { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -236,6 +238,8 @@ async fn redteam_03_masking_duration_must_be_less_than_1ms_when_backend_down() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -478,6 +482,8 @@ async fn measure_invalid_probe_duration_ms(delay_ms: u64, tls_len: u16, body_sen enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -553,6 +559,8 @@ async fn capture_forwarded_probe_len(tls_len: u16, body_sent: usize) -> usize { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_replay_timing_security_tests.rs b/src/proxy/tests/client_masking_replay_timing_security_tests.rs index 8c829be..788ce80 100644 --- a/src/proxy/tests/client_masking_replay_timing_security_tests.rs +++ b/src/proxy/tests/client_masking_replay_timing_security_tests.rs @@ -19,6 +19,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs index 14837bf..ed1ac8d 100644 --- a/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs +++ b/src/proxy/tests/client_masking_shape_classifier_fuzz_redteam_expected_fail_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs b/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs index 014180d..45ce014 100644 --- a/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs +++ b/src/proxy/tests/client_masking_shape_hardening_adversarial_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs b/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs index 49378f6..f160b01 100644 --- a/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs +++ b/src/proxy/tests/client_masking_shape_hardening_redteam_expected_fail_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_shape_hardening_security_tests.rs b/src/proxy/tests/client_masking_shape_hardening_security_tests.rs index 6a64c6e..9948e60 100644 --- a/src/proxy/tests/client_masking_shape_hardening_security_tests.rs +++ b/src/proxy/tests/client_masking_shape_hardening_security_tests.rs @@ -17,6 +17,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_masking_stress_adversarial_tests.rs b/src/proxy/tests/client_masking_stress_adversarial_tests.rs index 9ccd033..575bfb5 100644 --- a/src/proxy/tests/client_masking_stress_adversarial_tests.rs +++ b/src/proxy/tests/client_masking_stress_adversarial_tests.rs @@ -31,6 +31,8 @@ fn new_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_security_tests.rs b/src/proxy/tests/client_security_tests.rs index 40d99ec..c5b971b 100644 --- a/src/proxy/tests/client_security_tests.rs +++ b/src/proxy/tests/client_security_tests.rs @@ -338,6 +338,8 @@ async fn relay_task_abort_releases_user_gate_and_ip_reservation() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -453,6 +455,8 @@ async fn relay_cutover_releases_user_gate_and_ip_reservation() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -578,6 +582,8 @@ async fn integration_route_cutover_and_quota_overlap_fails_closed_and_releases_s enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -749,6 +755,8 @@ async fn proxy_protocol_header_is_rejected_when_trust_list_is_empty() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -827,6 +835,8 @@ async fn proxy_protocol_header_from_untrusted_peer_range_is_rejected_under_load( enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -988,6 +998,8 @@ async fn short_tls_probe_is_masked_through_client_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1077,6 +1089,8 @@ async fn tls12_record_probe_is_masked_through_client_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1164,6 +1178,8 @@ async fn handle_client_stream_increments_connects_all_exactly_once() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1258,6 +1274,8 @@ async fn running_client_handler_increments_connects_all_exactly_once() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1349,6 +1367,8 @@ async fn idle_pooled_connection_closes_cleanly_in_generic_stream_path() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1421,6 +1441,8 @@ async fn idle_pooled_connection_closes_cleanly_in_client_handler_path() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1508,6 +1530,8 @@ async fn partial_tls_header_stall_triggers_handshake_timeout() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1834,6 +1858,8 @@ async fn valid_tls_path_does_not_fall_back_to_mask_backend() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1944,6 +1970,8 @@ async fn valid_tls_with_invalid_mtproto_falls_back_to_mask_backend() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2052,6 +2080,8 @@ async fn client_handler_tls_bad_mtproto_is_forwarded_to_mask_backend() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2175,6 +2205,8 @@ async fn alpn_mismatch_tls_probe_is_masked_through_client_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2269,6 +2301,8 @@ async fn invalid_hmac_tls_probe_is_masked_through_client_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -2369,6 +2403,8 @@ async fn burst_invalid_tls_probes_are_masked_verbatim() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3275,6 +3311,8 @@ async fn relay_connect_error_releases_user_and_ip_before_return() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3837,6 +3875,8 @@ async fn untrusted_proxy_header_source_is_rejected() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -3908,6 +3948,8 @@ async fn empty_proxy_trusted_cidrs_rejects_proxy_header_by_default() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4006,6 +4048,8 @@ async fn oversized_tls_record_is_masked_in_generic_stream_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4110,6 +4154,8 @@ async fn oversized_tls_record_is_masked_in_client_handler_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4228,6 +4274,8 @@ async fn tls_record_len_min_minus_1_is_rejected_in_generic_stream_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4332,6 +4380,8 @@ async fn tls_record_len_min_minus_1_is_rejected_in_client_handler_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4439,6 +4489,8 @@ async fn tls_record_len_16384_is_accepted_in_generic_stream_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -4541,6 +4593,8 @@ async fn tls_record_len_16384_is_accepted_in_client_handler_pipeline() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_timing_profile_adversarial_tests.rs b/src/proxy/tests/client_timing_profile_adversarial_tests.rs index 54ccbe2..bc452a8 100644 --- a/src/proxy/tests/client_timing_profile_adversarial_tests.rs +++ b/src/proxy/tests/client_timing_profile_adversarial_tests.rs @@ -30,6 +30,8 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_tls_clienthello_size_security_tests.rs b/src/proxy/tests/client_tls_clienthello_size_security_tests.rs index 442412d..a779c92 100644 --- a/src/proxy/tests/client_tls_clienthello_size_security_tests.rs +++ b/src/proxy/tests/client_tls_clienthello_size_security_tests.rs @@ -32,6 +32,8 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs b/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs index 1243b83..aa0b925 100644 --- a/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs +++ b/src/proxy/tests/client_tls_clienthello_truncation_adversarial_tests.rs @@ -33,6 +33,8 @@ fn make_test_upstream_manager(stats: Arc) -> Arc { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs b/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs index 74ab347..edea451 100644 --- a/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs +++ b/src/proxy/tests/client_tls_mtproto_fallback_security_tests.rs @@ -47,6 +47,8 @@ fn build_harness(secret_hex: &str, mask_port: u16) -> PipelineHarness { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/proxy/tests/direct_relay_security_tests.rs b/src/proxy/tests/direct_relay_security_tests.rs index 5972204..193ff7b 100644 --- a/src/proxy/tests/direct_relay_security_tests.rs +++ b/src/proxy/tests/direct_relay_security_tests.rs @@ -1299,6 +1299,8 @@ async fn direct_relay_abort_midflight_releases_route_gauge() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1407,6 +1409,8 @@ async fn direct_relay_cutover_midflight_releases_route_gauge() { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1530,6 +1534,8 @@ async fn direct_relay_cutover_storm_multi_session_keeps_generic_errors_and_relea enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, @@ -1764,6 +1770,8 @@ async fn negative_direct_relay_dc_connection_refused_fails_fast() { bindtodevice: None, }, selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 100, @@ -1856,6 +1864,8 @@ async fn adversarial_direct_relay_cutover_integrity() { bindtodevice: None, }, selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 100, diff --git a/src/proxy/tests/proxy_shared_state_isolation_tests.rs b/src/proxy/tests/proxy_shared_state_isolation_tests.rs index b174ee3..faa045f 100644 --- a/src/proxy/tests/proxy_shared_state_isolation_tests.rs +++ b/src/proxy/tests/proxy_shared_state_isolation_tests.rs @@ -59,6 +59,8 @@ fn new_client_harness() -> ClientHarness { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 1, diff --git a/src/transport/upstream.rs b/src/transport/upstream.rs index 2486b13..9aaca76 100644 --- a/src/transport/upstream.rs +++ b/src/transport/upstream.rs @@ -455,6 +455,87 @@ impl UpstreamManager { } } + fn resolve_probe_dc_families( + upstream: &UpstreamConfig, + ipv4_available: bool, + ipv6_available: bool, + ) -> (bool, bool) { + ( + upstream.ipv4.unwrap_or(ipv4_available), + upstream.ipv6.unwrap_or(ipv6_available), + ) + } + + fn resolve_runtime_dc_families( + upstream: &UpstreamConfig, + dc_preference: IpPreference, + ) -> (bool, bool) { + let (auto_ipv4, auto_ipv6) = match dc_preference { + IpPreference::PreferV4 => (true, false), + IpPreference::PreferV6 => (false, true), + IpPreference::BothWork | IpPreference::Unknown | IpPreference::Unavailable => { + (true, true) + } + }; + + ( + upstream.ipv4.unwrap_or(auto_ipv4), + upstream.ipv6.unwrap_or(auto_ipv6), + ) + } + + fn dc_table_addr(dc_idx: i16, ipv6: bool, port: u16) -> Option { + let arr_idx = UpstreamState::dc_array_idx(dc_idx)?; + let ip = if ipv6 { + TG_DATACENTERS_V6[arr_idx] + } else { + TG_DATACENTERS_V4[arr_idx] + }; + Some(SocketAddr::new(ip, port)) + } + + fn resolve_runtime_dc_target( + target: SocketAddr, + dc_idx: Option, + upstream: &UpstreamConfig, + dc_preference: IpPreference, + ) -> Result { + let (allow_ipv4, allow_ipv6) = Self::resolve_runtime_dc_families(upstream, dc_preference); + if (target.is_ipv4() && allow_ipv4) || (target.is_ipv6() && allow_ipv6) { + return Ok(target); + } + + if !allow_ipv4 && !allow_ipv6 { + return Err(ProxyError::Config(format!( + "Upstream DC family policy blocks all families for target {target}" + ))); + } + + let Some(dc_idx) = dc_idx else { + return Err(ProxyError::Config(format!( + "Upstream DC family policy cannot remap target {target} without dc_idx" + ))); + }; + + let remapped = if target.is_ipv4() { + if allow_ipv6 { + Self::dc_table_addr(dc_idx, true, target.port()) + } else { + None + } + } else if allow_ipv4 { + Self::dc_table_addr(dc_idx, false, target.port()) + } else { + None + }; + + remapped.ok_or_else(|| { + ProxyError::Config(format!( + "Upstream DC family policy rejected target {target} (dc_idx={dc_idx})" + )) + }) + } + #[cfg(unix)] fn resolve_interface_addrs(name: &str, want_ipv6: bool) -> Vec { use nix::ifaddrs::getifaddrs; @@ -728,18 +809,24 @@ impl UpstreamManager { .await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let mut upstream = { + let (mut upstream, bind_rr, dc_preference) = { let guard = self.upstreams.read().await; - guard[idx].config.clone() + let state = &guard[idx]; + let dc_preference = dc_idx + .and_then(UpstreamState::dc_array_idx) + .map(|dc_array_idx| state.dc_ip_pref[dc_array_idx]) + .unwrap_or(IpPreference::Unknown); + (state.config.clone(), Some(state.bind_rr.clone()), dc_preference) }; if let Some(s) = scope { upstream.selected_scope = s.to_string(); } - let bind_rr = { - let guard = self.upstreams.read().await; - guard.get(idx).map(|u| u.bind_rr.clone()) + let target = if dc_idx.is_some() { + Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)? + } else { + target }; let (stream, _) = self @@ -760,9 +847,14 @@ impl UpstreamManager { .await .ok_or_else(|| ProxyError::Config("No upstreams available".to_string()))?; - let mut upstream = { + let (mut upstream, bind_rr, dc_preference) = { let guard = self.upstreams.read().await; - guard[idx].config.clone() + let state = &guard[idx]; + let dc_preference = dc_idx + .and_then(UpstreamState::dc_array_idx) + .map(|dc_array_idx| state.dc_ip_pref[dc_array_idx]) + .unwrap_or(IpPreference::Unknown); + (state.config.clone(), Some(state.bind_rr.clone()), dc_preference) }; // Set scope for configuration copy @@ -770,9 +862,10 @@ impl UpstreamManager { upstream.selected_scope = s.to_string(); } - let bind_rr = { - let guard = self.upstreams.read().await; - guard.get(idx).map(|u| u.bind_rr.clone()) + let target = if dc_idx.is_some() { + Self::resolve_runtime_dc_target(target, dc_idx, &upstream, dc_preference)? + } else { + target }; let (stream, egress) = self @@ -1212,6 +1305,8 @@ impl UpstreamManager { let mut all_results = Vec::new(); for (upstream_idx, upstream_config, bind_rr) in &upstreams { + let (upstream_ipv4_enabled, upstream_ipv6_enabled) = + Self::resolve_probe_dc_families(upstream_config, ipv4_enabled, ipv6_enabled); let upstream_name = match &upstream_config.upstream_type { UpstreamType::Direct { interface, @@ -1244,7 +1339,7 @@ impl UpstreamManager { }; let mut v6_results = Vec::with_capacity(NUM_DCS); - if ipv6_enabled { + if upstream_ipv6_enabled { for dc_zero_idx in 0..NUM_DCS { let dc_v6 = TG_DATACENTERS_V6[dc_zero_idx]; let addr_v6 = SocketAddr::new(dc_v6, TG_DATACENTER_PORT); @@ -1295,13 +1390,17 @@ impl UpstreamManager { dc_idx: dc_zero_idx + 1, dc_addr: SocketAddr::new(dc_v6, TG_DATACENTER_PORT), rtt_ms: None, - error: Some("ipv6 disabled".to_string()), + error: Some(if ipv6_enabled { + "ipv6 disabled by upstream policy".to_string() + } else { + "ipv6 disabled".to_string() + }), }); } } let mut v4_results = Vec::with_capacity(NUM_DCS); - if ipv4_enabled { + if upstream_ipv4_enabled { for dc_zero_idx in 0..NUM_DCS { let dc_v4 = TG_DATACENTERS_V4[dc_zero_idx]; let addr_v4 = SocketAddr::new(dc_v4, TG_DATACENTER_PORT); @@ -1352,7 +1451,11 @@ impl UpstreamManager { dc_idx: dc_zero_idx + 1, dc_addr: SocketAddr::new(dc_v4, TG_DATACENTER_PORT), rtt_ms: None, - error: Some("ipv4 disabled".to_string()), + error: Some(if ipv4_enabled { + "ipv4 disabled by upstream policy".to_string() + } else { + "ipv4 disabled".to_string() + }), }); } } @@ -1372,7 +1475,9 @@ impl UpstreamManager { match addr_str.parse::() { Ok(addr) => { let is_v6 = addr.is_ipv6(); - if (is_v6 && !ipv6_enabled) || (!is_v6 && !ipv4_enabled) { + if (is_v6 && !upstream_ipv6_enabled) + || (!is_v6 && !upstream_ipv4_enabled) + { continue; } let result = tokio::time::timeout( @@ -1614,6 +1719,8 @@ impl UpstreamManager { let u = &guard[i]; (u.config.clone(), u.bind_rr.clone()) }; + let (upstream_ipv4_enabled, upstream_ipv6_enabled) = + Self::resolve_probe_dc_families(&config, ipv4_enabled, ipv6_enabled); let mut healthy_groups = 0usize; let mut latency_updates: Vec<(usize, f64)> = Vec::new(); @@ -1629,14 +1736,31 @@ impl UpstreamManager { continue; } + let filtered_endpoints: Vec = endpoints + .iter() + .copied() + .filter(|endpoint| { + if endpoint.is_ipv4() { + upstream_ipv4_enabled + } else { + upstream_ipv6_enabled + } + }) + .collect(); + + if filtered_endpoints.is_empty() { + continue; + } + let rotation_key = (i, group.dc_idx, is_primary); let start_idx = - *endpoint_rotation.entry(rotation_key).or_insert(0) % endpoints.len(); - let mut next_idx = (start_idx + 1) % endpoints.len(); + *endpoint_rotation.entry(rotation_key).or_insert(0) + % filtered_endpoints.len(); + let mut next_idx = (start_idx + 1) % filtered_endpoints.len(); - for step in 0..endpoints.len() { - let endpoint_idx = (start_idx + step) % endpoints.len(); - let endpoint = endpoints[endpoint_idx]; + for step in 0..filtered_endpoints.len() { + let endpoint_idx = (start_idx + step) % filtered_endpoints.len(); + let endpoint = filtered_endpoints[endpoint_idx]; let start = Instant::now(); let result = tokio::time::timeout( @@ -1655,7 +1779,7 @@ impl UpstreamManager { Ok(Ok(_stream)) => { group_ok = true; group_rtt_ms = Some(start.elapsed().as_secs_f64() * 1000.0); - next_idx = (endpoint_idx + 1) % endpoints.len(); + next_idx = (endpoint_idx + 1) % filtered_endpoints.len(); break; } Ok(Err(e)) => { @@ -1910,6 +2034,8 @@ mod tests { enabled: true, scopes: String::new(), selected_scope: String::new(), + ipv4: None, + ipv6: None, }], 1, 100,