From 65da1f91ecf9ef7bec018f5fe630fdebee7faadd Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 30 Mar 2026 23:35:41 +0300 Subject: [PATCH] Drafting fixes for Apple/XNU Darwin Connectivity issues Co-Authored-By: Aleksandr Kalashnikov <33665156+sleep3r@users.noreply.github.com> --- src/api/runtime_zero.rs | 2 + src/cli.rs | 3 +- src/config/defaults.rs | 6 +- src/config/tests/load_idle_policy_tests.rs | 22 ++ src/config/types.rs | 7 + src/proxy/client.rs | 203 +++++++++++--- src/proxy/tests/client_security_tests.rs | 306 ++++++++++++++++++++- 7 files changed, 499 insertions(+), 50 deletions(-) diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 160b27a..52f8d99 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -50,6 +50,7 @@ pub(super) struct RuntimeGatesData { #[derive(Serialize)] pub(super) struct EffectiveTimeoutLimits { + pub(super) client_first_byte_idle_secs: u64, pub(super) client_handshake_secs: u64, pub(super) tg_connect_secs: u64, pub(super) client_keepalive_secs: u64, @@ -227,6 +228,7 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD me_reinit_every_secs: cfg.general.effective_me_reinit_every_secs(), me_pool_force_close_secs: cfg.general.effective_me_pool_force_close_secs(), timeouts: EffectiveTimeoutLimits { + client_first_byte_idle_secs: cfg.timeouts.client_first_byte_idle_secs, client_handshake_secs: cfg.timeouts.client_handshake, tg_connect_secs: cfg.general.tg_connect, client_keepalive_secs: cfg.timeouts.client_keepalive, diff --git a/src/cli.rs b/src/cli.rs index dfea010..fd12176 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -610,7 +610,8 @@ ip = "0.0.0.0" ip = "::" [timeouts] -client_handshake = 15 +client_first_byte_idle_secs = 300 +client_handshake = 60 client_keepalive = 60 client_ack = 300 diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 4a8fd45..6297a3e 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -110,7 +110,11 @@ pub(crate) fn default_replay_window_secs() -> u64 { } pub(crate) fn default_handshake_timeout() -> u64 { - 30 + 60 +} + +pub(crate) fn default_client_first_byte_idle_secs() -> u64 { + 300 } pub(crate) fn default_relay_idle_policy_v2_enabled() -> bool { diff --git a/src/config/tests/load_idle_policy_tests.rs b/src/config/tests/load_idle_policy_tests.rs index c6a4e86..0767e8e 100644 --- a/src/config/tests/load_idle_policy_tests.rs +++ b/src/config/tests/load_idle_policy_tests.rs @@ -17,6 +17,28 @@ fn remove_temp_config(path: &PathBuf) { let _ = fs::remove_file(path); } +#[test] +fn default_timeouts_enable_apple_compatible_handshake_profile() { + let cfg = ProxyConfig::default(); + assert_eq!(cfg.timeouts.client_first_byte_idle_secs, 300); + assert_eq!(cfg.timeouts.client_handshake, 60); +} + +#[test] +fn load_accepts_zero_first_byte_idle_timeout_as_legacy_opt_out() { + let path = write_temp_config( + r#" +[timeouts] +client_first_byte_idle_secs = 0 +"#, + ); + + let cfg = ProxyConfig::load(&path).expect("config with zero first-byte idle timeout must load"); + assert_eq!(cfg.timeouts.client_first_byte_idle_secs, 0); + + remove_temp_config(&path); +} + #[test] fn load_rejects_relay_hard_idle_smaller_than_soft_idle_with_clear_error() { let path = write_temp_config( diff --git a/src/config/types.rs b/src/config/types.rs index 8234557..5f3342f 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -1319,6 +1319,12 @@ impl Default for ServerConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TimeoutsConfig { + /// Maximum idle wait in seconds for the first client byte before handshake parsing starts. + /// `0` disables the separate idle phase and keeps legacy timeout behavior. + #[serde(default = "default_client_first_byte_idle_secs")] + pub client_first_byte_idle_secs: u64, + + /// Maximum active handshake duration in seconds after the first client byte is received. #[serde(default = "default_handshake_timeout")] pub client_handshake: u64, @@ -1358,6 +1364,7 @@ pub struct TimeoutsConfig { impl Default for TimeoutsConfig { fn default() -> Self { Self { + client_first_byte_idle_secs: default_client_first_byte_idle_secs(), client_handshake: default_handshake_timeout(), relay_idle_policy_v2_enabled: default_relay_idle_policy_v2_enabled(), relay_client_idle_soft_secs: default_relay_client_idle_soft_secs(), diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 8ce3e96..d71411a 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -416,16 +416,68 @@ where debug!(peer = %real_peer, "New connection (generic stream)"); + let first_byte = if config.timeouts.client_first_byte_idle_secs == 0 { + None + } else { + let idle_timeout = Duration::from_secs(config.timeouts.client_first_byte_idle_secs); + let mut first_byte = [0u8; 1]; + match timeout(idle_timeout, stream.read(&mut first_byte)).await { + Ok(Ok(0)) => { + debug!(peer = %real_peer, "Connection closed before first client byte"); + return Ok(()); + } + Ok(Ok(_)) => Some(first_byte[0]), + Ok(Err(e)) + if matches!( + e.kind(), + std::io::ErrorKind::UnexpectedEof + | std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected + ) => + { + debug!( + peer = %real_peer, + error = %e, + "Connection closed before first client byte" + ); + return Ok(()); + } + Ok(Err(e)) => { + debug!( + peer = %real_peer, + error = %e, + "Failed while waiting for first client byte" + ); + return Err(ProxyError::Io(e)); + } + Err(_) => { + debug!( + peer = %real_peer, + idle_secs = config.timeouts.client_first_byte_idle_secs, + "Closing idle pooled connection before first client byte" + ); + return Ok(()); + } + } + }; + let handshake_timeout = handshake_timeout_with_mask_grace(&config); let stats_for_timeout = stats.clone(); let config_for_timeout = config.clone(); let beobachten_for_timeout = beobachten.clone(); let peer_for_timeout = real_peer.ip(); - // Phase 1: handshake (with timeout) + // Phase 2: active handshake (with timeout after the first client byte) let outcome = match timeout(handshake_timeout, async { let mut first_bytes = [0u8; 5]; - stream.read_exact(&mut first_bytes).await?; + if let Some(first_byte) = first_byte { + first_bytes[0] = first_byte; + stream.read_exact(&mut first_bytes[1..]).await?; + } else { + stream.read_exact(&mut first_bytes).await?; + } let is_tls = tls::is_tls_handshake(&first_bytes[..3]); debug!(peer = %real_peer, is_tls = is_tls, "Handshake type detected"); @@ -736,36 +788,9 @@ impl RunningClientHandler { debug!(peer = %peer, error = %e, "Failed to configure client socket"); } - let handshake_timeout = handshake_timeout_with_mask_grace(&self.config); - let stats = self.stats.clone(); - let config_for_timeout = self.config.clone(); - let beobachten_for_timeout = self.beobachten.clone(); - let peer_for_timeout = peer.ip(); - - // Phase 1: handshake (with timeout) - let outcome = match timeout(handshake_timeout, self.do_handshake()).await { - Ok(Ok(outcome)) => outcome, - Ok(Err(e)) => { - debug!(peer = %peer, error = %e, "Handshake failed"); - record_handshake_failure_class( - &beobachten_for_timeout, - &config_for_timeout, - peer_for_timeout, - &e, - ); - return Err(e); - } - Err(_) => { - stats.increment_handshake_timeouts(); - debug!(peer = %peer, "Handshake timeout"); - record_beobachten_class( - &beobachten_for_timeout, - &config_for_timeout, - peer_for_timeout, - "other", - ); - return Err(ProxyError::TgHandshakeTimeout); - } + let outcome = match self.do_handshake().await? { + Some(outcome) => outcome, + None => return Ok(()), }; // Phase 2: relay (WITHOUT handshake timeout — relay has its own activity timeouts) @@ -774,7 +799,7 @@ impl RunningClientHandler { } } - async fn do_handshake(mut self) -> Result { + async fn do_handshake(mut self) -> Result> { let mut local_addr = self.stream.local_addr().map_err(ProxyError::Io)?; if self.proxy_protocol_enabled { @@ -849,19 +874,107 @@ impl RunningClientHandler { } } - let mut first_bytes = [0u8; 5]; - self.stream.read_exact(&mut first_bytes).await?; - - let is_tls = tls::is_tls_handshake(&first_bytes[..3]); - let peer = self.peer; - - debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); - - if is_tls { - self.handle_tls_client(first_bytes, local_addr).await + let first_byte = if self.config.timeouts.client_first_byte_idle_secs == 0 { + None } else { - self.handle_direct_client(first_bytes, local_addr).await - } + let idle_timeout = Duration::from_secs(self.config.timeouts.client_first_byte_idle_secs); + let mut first_byte = [0u8; 1]; + match timeout(idle_timeout, self.stream.read(&mut first_byte)).await { + Ok(Ok(0)) => { + debug!(peer = %self.peer, "Connection closed before first client byte"); + return Ok(None); + } + Ok(Ok(_)) => Some(first_byte[0]), + Ok(Err(e)) + if matches!( + e.kind(), + std::io::ErrorKind::UnexpectedEof + | std::io::ErrorKind::ConnectionReset + | std::io::ErrorKind::ConnectionAborted + | std::io::ErrorKind::BrokenPipe + | std::io::ErrorKind::NotConnected + ) => + { + debug!( + peer = %self.peer, + error = %e, + "Connection closed before first client byte" + ); + return Ok(None); + } + Ok(Err(e)) => { + debug!( + peer = %self.peer, + error = %e, + "Failed while waiting for first client byte" + ); + return Err(ProxyError::Io(e)); + } + Err(_) => { + debug!( + peer = %self.peer, + idle_secs = self.config.timeouts.client_first_byte_idle_secs, + "Closing idle pooled connection before first client byte" + ); + return Ok(None); + } + } + }; + + let handshake_timeout = handshake_timeout_with_mask_grace(&self.config); + let stats = self.stats.clone(); + let config_for_timeout = self.config.clone(); + let beobachten_for_timeout = self.beobachten.clone(); + let peer_for_timeout = self.peer.ip(); + let peer_for_log = self.peer; + + let outcome = match timeout(handshake_timeout, async { + let mut first_bytes = [0u8; 5]; + if let Some(first_byte) = first_byte { + first_bytes[0] = first_byte; + self.stream.read_exact(&mut first_bytes[1..]).await?; + } else { + self.stream.read_exact(&mut first_bytes).await?; + } + + let is_tls = tls::is_tls_handshake(&first_bytes[..3]); + let peer = self.peer; + + debug!(peer = %peer, is_tls = is_tls, "Handshake type detected"); + + if is_tls { + self.handle_tls_client(first_bytes, local_addr).await + } else { + self.handle_direct_client(first_bytes, local_addr).await + } + }) + .await + { + Ok(Ok(outcome)) => outcome, + Ok(Err(e)) => { + debug!(peer = %peer_for_log, error = %e, "Handshake failed"); + record_handshake_failure_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + &e, + ); + return Err(e); + } + Err(_) => { + stats.increment_handshake_timeouts(); + debug!(peer = %peer_for_log, "Handshake timeout"); + record_beobachten_class( + &beobachten_for_timeout, + &config_for_timeout, + peer_for_timeout, + "other", + ); + return Err(ProxyError::TgHandshakeTimeout); + } + }; + + Ok(Some(outcome)) } async fn handle_tls_client( diff --git a/src/proxy/tests/client_security_tests.rs b/src/proxy/tests/client_security_tests.rs index 7fc1afe..3a66a09 100644 --- a/src/proxy/tests/client_security_tests.rs +++ b/src/proxy/tests/client_security_tests.rs @@ -1,8 +1,10 @@ use super::*; use crate::config::{UpstreamConfig, UpstreamType}; -use crate::crypto::AesCtr; -use crate::crypto::sha256_hmac; -use crate::protocol::constants::ProtoTag; +use crate::crypto::{AesCtr, sha256, sha256_hmac}; +use crate::protocol::constants::{ + DC_IDX_POS, HANDSHAKE_LEN, IV_LEN, PREKEY_LEN, PROTO_TAG_POS, ProtoTag, SKIP_LEN, + TLS_RECORD_CHANGE_CIPHER, +}; use crate::protocol::tls; use crate::proxy::handshake::HandshakeSuccess; use crate::stream::{CryptoReader, CryptoWriter}; @@ -1319,6 +1321,163 @@ async fn running_client_handler_increments_connects_all_exactly_once() { ); } +#[tokio::test(start_paused = true)] +async fn idle_pooled_connection_closes_cleanly_in_generic_stream_path() { + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.timeouts.client_first_byte_idle_secs = 1; + + let config = Arc::new(cfg); + let stats = Arc::new(Stats::new()); + let upstream_manager = Arc::new(UpstreamManager::new( + vec![UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + }], + 1, + 1, + 1, + 10, + 1, + false, + stats.clone(), + )); + let replay_checker = Arc::new(ReplayChecker::new(128, Duration::from_secs(60))); + let buffer_pool = Arc::new(BufferPool::new()); + let rng = Arc::new(SecureRandom::new()); + let route_runtime = Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)); + let ip_tracker = Arc::new(UserIpTracker::new()); + let beobachten = Arc::new(BeobachtenStore::new()); + + let (server_side, _client_side) = duplex(4096); + let peer: SocketAddr = "198.51.100.169:55200".parse().unwrap(); + + let handler = tokio::spawn(handle_client_stream( + server_side, + peer, + config, + stats.clone(), + upstream_manager, + replay_checker, + buffer_pool, + rng, + None, + route_runtime, + None, + ip_tracker, + beobachten, + false, + )); + + // Let the spawned handler arm the idle-phase timeout before advancing paused time. + tokio::task::yield_now().await; + tokio::time::advance(Duration::from_secs(2)).await; + tokio::task::yield_now().await; + + let result = tokio::time::timeout(Duration::from_secs(1), handler) + .await + .unwrap() + .unwrap(); + assert!(result.is_ok()); + assert_eq!(stats.get_handshake_timeouts(), 0); + assert_eq!(stats.get_connects_bad(), 0); +} + +#[tokio::test(start_paused = true)] +async fn idle_pooled_connection_closes_cleanly_in_client_handler_path() { + let front_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let front_addr = front_listener.local_addr().unwrap(); + + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.timeouts.client_first_byte_idle_secs = 1; + + let config = Arc::new(cfg); + let stats = Arc::new(Stats::new()); + let upstream_manager = Arc::new(UpstreamManager::new( + vec![UpstreamConfig { + upstream_type: UpstreamType::Direct { + interface: None, + bind_addresses: None, + }, + weight: 1, + enabled: true, + scopes: String::new(), + selected_scope: String::new(), + }], + 1, + 1, + 1, + 10, + 1, + false, + stats.clone(), + )); + let replay_checker = Arc::new(ReplayChecker::new(128, Duration::from_secs(60))); + let buffer_pool = Arc::new(BufferPool::new()); + let rng = Arc::new(SecureRandom::new()); + let route_runtime = Arc::new(RouteRuntimeController::new(RelayRouteMode::Direct)); + let ip_tracker = Arc::new(UserIpTracker::new()); + let beobachten = Arc::new(BeobachtenStore::new()); + + let server_task = { + let config = config.clone(); + let stats = stats.clone(); + let upstream_manager = upstream_manager.clone(); + let replay_checker = replay_checker.clone(); + let buffer_pool = buffer_pool.clone(); + let rng = rng.clone(); + let route_runtime = route_runtime.clone(); + let ip_tracker = ip_tracker.clone(); + let beobachten = beobachten.clone(); + + tokio::spawn(async move { + let (stream, peer) = front_listener.accept().await.unwrap(); + let real_peer_report = Arc::new(std::sync::Mutex::new(None)); + ClientHandler::new( + stream, + peer, + config, + stats, + upstream_manager, + replay_checker, + buffer_pool, + rng, + None, + route_runtime, + None, + ip_tracker, + beobachten, + false, + real_peer_report, + ) + .run() + .await + }) + }; + + let _client = TcpStream::connect(front_addr).await.unwrap(); + + // Let the accepted connection reach the idle wait before advancing paused time. + tokio::task::yield_now().await; + tokio::time::advance(Duration::from_secs(2)).await; + tokio::task::yield_now().await; + + let result = tokio::time::timeout(Duration::from_secs(1), server_task) + .await + .unwrap() + .unwrap(); + assert!(result.is_ok()); + assert_eq!(stats.get_handshake_timeouts(), 0); + assert_eq!(stats.get_connects_bad(), 0); +} + #[tokio::test] async fn partial_tls_header_stall_triggers_handshake_timeout() { let mut cfg = ProxyConfig::default(); @@ -1487,6 +1646,147 @@ fn wrap_tls_application_data(payload: &[u8]) -> Vec { record } +fn wrap_tls_ccs_record() -> Vec { + let mut record = Vec::with_capacity(6); + record.push(TLS_RECORD_CHANGE_CIPHER); + record.extend_from_slice(&[0x03, 0x03]); + record.extend_from_slice(&1u16.to_be_bytes()); + record.push(0x01); + record +} + +fn make_valid_mtproto_handshake( + secret_hex: &str, + proto_tag: ProtoTag, + dc_idx: i16, +) -> [u8; HANDSHAKE_LEN] { + let secret = hex::decode(secret_hex).expect("secret hex must decode for mtproto test helper"); + + let mut handshake = [0x5Au8; HANDSHAKE_LEN]; + for (idx, b) in handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN] + .iter_mut() + .enumerate() + { + *b = (idx as u8).wrapping_add(1); + } + + let dec_prekey = &handshake[SKIP_LEN..SKIP_LEN + PREKEY_LEN]; + let dec_iv_bytes = &handshake[SKIP_LEN + PREKEY_LEN..SKIP_LEN + PREKEY_LEN + IV_LEN]; + + let mut dec_key_input = Vec::with_capacity(PREKEY_LEN + secret.len()); + dec_key_input.extend_from_slice(dec_prekey); + dec_key_input.extend_from_slice(&secret); + let dec_key = sha256(&dec_key_input); + + let mut dec_iv_arr = [0u8; IV_LEN]; + dec_iv_arr.copy_from_slice(dec_iv_bytes); + let dec_iv = u128::from_be_bytes(dec_iv_arr); + + let mut stream = AesCtr::new(&dec_key, dec_iv); + let keystream = stream.encrypt(&[0u8; HANDSHAKE_LEN]); + + let mut target_plain = [0u8; HANDSHAKE_LEN]; + target_plain[PROTO_TAG_POS..PROTO_TAG_POS + 4].copy_from_slice(&proto_tag.to_bytes()); + target_plain[DC_IDX_POS..DC_IDX_POS + 2].copy_from_slice(&dc_idx.to_le_bytes()); + + for idx in PROTO_TAG_POS..HANDSHAKE_LEN { + handshake[idx] = target_plain[idx] ^ keystream[idx]; + } + + handshake +} + +#[tokio::test] +async fn fragmented_tls_mtproto_with_interleaved_ccs_is_accepted() { + let secret_hex = "55555555555555555555555555555555"; + let secret = [0x55u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 0); + let mtproto_handshake = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2); + + let mut cfg = ProxyConfig::default(); + cfg.general.beobachten = false; + cfg.access.ignore_time_skew = true; + cfg.access + .users + .insert("user".to_string(), secret_hex.to_string()); + + let config = Arc::new(cfg); + let replay_checker = Arc::new(ReplayChecker::new(128, Duration::from_secs(60))); + let rng = SecureRandom::new(); + + let (server_side, mut client_side) = duplex(131072); + let peer: SocketAddr = "198.51.100.85:55007".parse().unwrap(); + let (read_half, write_half) = tokio::io::split(server_side); + + let (mut tls_reader, tls_writer, tls_user) = match handle_tls_handshake( + &client_hello, + read_half, + write_half, + peer, + &config, + &replay_checker, + &rng, + None, + ) + .await + { + HandshakeResult::Success(result) => result, + _ => panic!("expected successful TLS handshake"), + }; + + let mut tls_response_head = [0u8; 5]; + client_side + .read_exact(&mut tls_response_head) + .await + .unwrap(); + assert_eq!(tls_response_head[0], 0x16); + let tls_response_len = u16::from_be_bytes([tls_response_head[3], tls_response_head[4]]) as usize; + let mut tls_response_body = vec![0u8; tls_response_len]; + client_side + .read_exact(&mut tls_response_body) + .await + .unwrap(); + + client_side + .write_all(&wrap_tls_application_data(&mtproto_handshake[..13])) + .await + .unwrap(); + client_side.write_all(&wrap_tls_ccs_record()).await.unwrap(); + client_side + .write_all(&wrap_tls_application_data(&mtproto_handshake[13..37])) + .await + .unwrap(); + client_side.write_all(&wrap_tls_ccs_record()).await.unwrap(); + client_side + .write_all(&wrap_tls_application_data(&mtproto_handshake[37..])) + .await + .unwrap(); + + let mtproto_data = tls_reader.read_exact(HANDSHAKE_LEN).await.unwrap(); + assert_eq!(&mtproto_data[..], &mtproto_handshake); + + let mtproto_handshake: [u8; HANDSHAKE_LEN] = mtproto_data[..].try_into().unwrap(); + let (_, _, success) = match handle_mtproto_handshake( + &mtproto_handshake, + tls_reader, + tls_writer, + peer, + &config, + &replay_checker, + true, + Some(tls_user.as_str()), + ) + .await + { + HandshakeResult::Success(result) => result, + _ => panic!("expected successful MTProto handshake"), + }; + + assert_eq!(success.user, "user"); + assert_eq!(success.proto_tag, ProtoTag::Secure); + assert_eq!(success.dc_idx, 2); +} + #[tokio::test] async fn valid_tls_path_does_not_fall_back_to_mask_backend() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();