diff --git a/src/config/load.rs b/src/config/load.rs index 4bbb73a..ae43f7a 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -334,6 +334,24 @@ impl ProxyConfig { )); } + let handshake_timeout_ms = config + .timeouts + .client_handshake + .checked_mul(1000) + .ok_or_else(|| { + ProxyError::Config( + "timeouts.client_handshake is too large to validate milliseconds budget" + .to_string(), + ) + })?; + + if config.censorship.server_hello_delay_max_ms >= handshake_timeout_ms { + return Err(ProxyError::Config( + "censorship.server_hello_delay_max_ms must be < timeouts.client_handshake * 1000" + .to_string(), + )); + } + if config.timeouts.relay_client_idle_soft_secs == 0 { return Err(ProxyError::Config( "timeouts.relay_client_idle_soft_secs must be > 0".to_string(), @@ -977,6 +995,10 @@ impl ProxyConfig { #[path = "load_idle_policy_tests.rs"] mod load_idle_policy_tests; +#[cfg(test)] +#[path = "load_security_tests.rs"] +mod load_security_tests; + #[cfg(test)] mod tests { use super::*; diff --git a/src/config/load_security_tests.rs b/src/config/load_security_tests.rs new file mode 100644 index 0000000..a1a35ac --- /dev/null +++ b/src/config/load_security_tests.rs @@ -0,0 +1,84 @@ +use super::*; +use std::fs; +use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; + +fn write_temp_config(contents: &str) -> PathBuf { + let nonce = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time must be after unix epoch") + .as_nanos(); + let path = std::env::temp_dir().join(format!("telemt-load-security-{nonce}.toml")); + fs::write(&path, contents).expect("temp config write must succeed"); + path +} + +fn remove_temp_config(path: &PathBuf) { + let _ = fs::remove_file(path); +} + +#[test] +fn load_rejects_server_hello_delay_equal_to_handshake_timeout_budget() { + let path = write_temp_config( + r#" +[timeouts] +client_handshake = 1 + +[censorship] +server_hello_delay_max_ms = 1000 +"#, + ); + + let err = ProxyConfig::load(&path) + .expect_err("delay equal to handshake timeout must be rejected"); + let msg = err.to_string(); + assert!( + msg.contains("censorship.server_hello_delay_max_ms must be < timeouts.client_handshake * 1000"), + "error must explain delay { + if user == "u1" { + u1_success += 1; + } else { + u2_success += 1; + } + reservations.push(reservation); + } + Err(ProxyError::ConnectionLimitExceeded { .. }) => {} + Err(other) => panic!("unexpected error: {other}"), + } + } + + assert_eq!(u1_success, 8, "u1 must get exactly its own configured cap"); + assert_eq!(u2_success, 8, "u2 must get exactly its own configured cap"); + + drop(reservations); + ip_tracker.drain_cleanup_queue().await; + assert_eq!(stats.get_user_curr_connects("u1"), 0); + assert_eq!(stats.get_user_curr_connects("u2"), 0); +} + +#[tokio::test] +async fn client_limit_recovery_after_full_rejection_wave() { + let user = "recover-user"; + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 1).await; + + let mut config = ProxyConfig::default(); + config.access.user_max_tcp_conns.insert(user.to_string(), 1); + + let first_peer: SocketAddr = "198.51.100.50:38001".parse().unwrap(); + let reservation = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + first_peer, + ip_tracker.clone(), + ) + .await + .unwrap(); + + for i in 0..64u16 { + let peer = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(198, 51, 100, (i % 60 + 1) as u8)), + 38002 + i, + ); + let denied = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer, + ip_tracker.clone(), + ) + .await; + assert!(matches!(denied, Err(ProxyError::ConnectionLimitExceeded { .. }))); + } + + drop(reservation); + ip_tracker.drain_cleanup_queue().await; + assert_eq!(stats.get_user_curr_connects(user), 0); + + let recovery_peer: SocketAddr = "198.51.100.200:38999".parse().unwrap(); + let recovered = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + recovery_peer, + ip_tracker.clone(), + ) + .await; + assert!(recovered.is_ok(), "capacity must recover after prior holder drops"); +} + +#[tokio::test] +async fn client_dual_limit_cross_product_never_leaks_on_reject() { + let user = "dual-limit-user"; + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 2).await; + + let mut config = ProxyConfig::default(); + config.access.user_max_tcp_conns.insert(user.to_string(), 2); + + let p1: SocketAddr = "203.0.113.10:39001".parse().unwrap(); + let p2: SocketAddr = "203.0.113.11:39002".parse().unwrap(); + let r1 = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + p1, + ip_tracker.clone(), + ) + .await + .unwrap(); + let r2 = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + p2, + ip_tracker.clone(), + ) + .await + .unwrap(); + + for i in 0..32u16 { + let peer = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(203, 0, 113, (50 + i) as u8)), + 39010 + i, + ); + let denied = RunningClientHandler::acquire_user_connection_reservation_static( + user, + &config, + stats.clone(), + peer, + ip_tracker.clone(), + ) + .await; + assert!(matches!(denied, Err(ProxyError::ConnectionLimitExceeded { .. }))); + } + + assert_eq!(stats.get_user_curr_connects(user), 2); + drop((r1, r2)); + ip_tracker.drain_cleanup_queue().await; + assert_eq!(stats.get_user_curr_connects(user), 0); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 0); +} + +#[tokio::test] +async fn client_check_user_limits_concurrent_churn_no_counter_drift() { + let user = "check-drift-user"; + let stats = Arc::new(Stats::new()); + let ip_tracker = Arc::new(UserIpTracker::new()); + ip_tracker.set_user_limit(user, 64).await; + + let mut config = ProxyConfig::default(); + config.access.user_max_tcp_conns.insert(user.to_string(), 64); + + let mut tasks = Vec::new(); + for i in 0..512u16 { + let stats = Arc::clone(&stats); + let ip_tracker = Arc::clone(&ip_tracker); + let config = config.clone(); + tasks.push(tokio::spawn(async move { + let peer = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(172, 20, (i / 255) as u8, (i % 255 + 1) as u8)), + 40000 + (i % 500), + ); + let _ = RunningClientHandler::check_user_limits_static( + user, + &config, + &stats, + peer, + &ip_tracker, + ) + .await; + })); + } + + for task in futures::future::join_all(tasks).await { + task.unwrap(); + } + + assert_eq!(stats.get_user_curr_connects(user), 0); + assert_eq!(ip_tracker.get_active_ip_count(user).await, 0); +} diff --git a/src/proxy/handshake_adversarial_tests.rs b/src/proxy/handshake_adversarial_tests.rs index f93d8ce..da93ef4 100644 --- a/src/proxy/handshake_adversarial_tests.rs +++ b/src/proxy/handshake_adversarial_tests.rs @@ -229,3 +229,239 @@ async fn mtproto_handshake_concurrent_flood_stability() { let _ = task.await.unwrap(); } } + +#[tokio::test] +async fn mtproto_replay_is_rejected_across_distinct_peers() { + let _guard = auth_probe_test_guard(); + clear_auth_probe_state_for_testing(); + + let secret_hex = "0123456789abcdeffedcba9876543210"; + let handshake = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2); + let config = test_config_with_secret_hex(secret_hex); + let replay_checker = ReplayChecker::new(128, Duration::from_secs(60)); + + let first_peer: SocketAddr = "198.51.100.10:41001".parse().unwrap(); + let second_peer: SocketAddr = "198.51.100.11:41002".parse().unwrap(); + + let first = handle_mtproto_handshake( + &handshake, + tokio::io::empty(), + tokio::io::sink(), + first_peer, + &config, + &replay_checker, + false, + None, + ) + .await; + assert!(matches!(first, HandshakeResult::Success(_))); + + let replay = handle_mtproto_handshake( + &handshake, + tokio::io::empty(), + tokio::io::sink(), + second_peer, + &config, + &replay_checker, + false, + None, + ) + .await; + assert!(matches!(replay, HandshakeResult::BadClient { .. })); +} + +#[tokio::test] +async fn mtproto_blackhat_mutation_corpus_never_panics_and_stays_fail_closed() { + let _guard = auth_probe_test_guard(); + clear_auth_probe_state_for_testing(); + + let secret_hex = "89abcdef012345670123456789abcdef"; + let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2); + let config = test_config_with_secret_hex(secret_hex); + let replay_checker = ReplayChecker::new(8192, Duration::from_secs(60)); + + for i in 0..512usize { + let mut mutated = base; + let pos = (SKIP_LEN + (i * 31) % (HANDSHAKE_LEN - SKIP_LEN)).min(HANDSHAKE_LEN - 1); + mutated[pos] ^= ((i as u8) | 1).rotate_left((i % 8) as u32); + let peer: SocketAddr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(198, 18, (i / 254) as u8, (i % 254 + 1) as u8)), + 42000 + (i % 1000) as u16, + ); + + let res = tokio::time::timeout( + Duration::from_millis(250), + handle_mtproto_handshake( + &mutated, + tokio::io::empty(), + tokio::io::sink(), + peer, + &config, + &replay_checker, + false, + None, + ), + ) + .await + .expect("fuzzed mutation must complete in bounded time"); + + assert!( + matches!(res, HandshakeResult::BadClient { .. } | HandshakeResult::Success(_)), + "mutation corpus must stay within explicit handshake outcomes" + ); + } +} + +#[tokio::test] +async fn auth_probe_success_clears_throttled_peer_state() { + let _guard = auth_probe_test_guard(); + clear_auth_probe_state_for_testing(); + + let target_ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 90)); + let now = Instant::now(); + for _ in 0..AUTH_PROBE_BACKOFF_START_FAILS { + auth_probe_record_failure(target_ip, now); + } + assert!(auth_probe_is_throttled(target_ip, now)); + + auth_probe_record_success(target_ip); + assert!( + !auth_probe_is_throttled(target_ip, now + Duration::from_millis(1)), + "successful auth must clear per-peer throttle state" + ); +} + +#[tokio::test] +async fn mtproto_invalid_storm_over_cap_keeps_probe_map_hard_bounded() { + let _guard = auth_probe_test_guard(); + clear_auth_probe_state_for_testing(); + + let secret_hex = "00112233445566778899aabbccddeeff"; + let mut invalid = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2); + invalid[SKIP_LEN + 3] ^= 0xff; + + let config = test_config_with_secret_hex(secret_hex); + let replay_checker = ReplayChecker::new(64, Duration::from_secs(60)); + + for i in 0..(AUTH_PROBE_TRACK_MAX_ENTRIES + 512) { + let peer: SocketAddr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(10, (i / 65535) as u8, ((i / 255) % 255) as u8, (i % 255 + 1) as u8)), + 43000 + (i % 20000) as u16, + ); + let res = handle_mtproto_handshake( + &invalid, + tokio::io::empty(), + tokio::io::sink(), + peer, + &config, + &replay_checker, + false, + None, + ) + .await; + assert!(matches!(res, HandshakeResult::BadClient { .. })); + } + + let tracked = AUTH_PROBE_STATE + .get() + .map(|state| state.len()) + .unwrap_or(0); + assert!( + tracked <= AUTH_PROBE_TRACK_MAX_ENTRIES, + "probe map must remain bounded under invalid storm: {tracked}" + ); +} + +#[tokio::test] +async fn mtproto_property_style_multi_bit_mutations_fail_closed_or_auth_only() { + let _guard = auth_probe_test_guard(); + clear_auth_probe_state_for_testing(); + + let secret_hex = "f0e1d2c3b4a5968778695a4b3c2d1e0f"; + let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2); + let config = test_config_with_secret_hex(secret_hex); + let replay_checker = ReplayChecker::new(10_000, Duration::from_secs(60)); + + let mut seed: u64 = 0xC0FF_EE12_3456_789A; + for i in 0..2_048usize { + let mut mutated = base; + for _ in 0..4 { + seed ^= seed << 7; + seed ^= seed >> 9; + seed ^= seed << 8; + let idx = SKIP_LEN + (seed as usize % (HANDSHAKE_LEN - SKIP_LEN)); + mutated[idx] ^= ((seed >> 11) as u8).wrapping_add(1); + } + + let peer: SocketAddr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(10, 123, (i / 254) as u8, (i % 254 + 1) as u8)), + 45000 + (i % 2000) as u16, + ); + + let outcome = tokio::time::timeout( + Duration::from_millis(250), + handle_mtproto_handshake( + &mutated, + tokio::io::empty(), + tokio::io::sink(), + peer, + &config, + &replay_checker, + false, + None, + ), + ) + .await + .expect("mutation iteration must complete in bounded time"); + + assert!( + matches!(outcome, HandshakeResult::BadClient { .. } | HandshakeResult::Success(_)), + "mutations must remain fail-closed/auth-only" + ); + } +} + +#[tokio::test] +#[ignore = "heavy soak; run manually"] +async fn mtproto_blackhat_20k_mutation_soak_never_panics() { + let _guard = auth_probe_test_guard(); + clear_auth_probe_state_for_testing(); + + let secret_hex = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + let base = make_valid_mtproto_handshake(secret_hex, ProtoTag::Secure, 2); + let config = test_config_with_secret_hex(secret_hex); + let replay_checker = ReplayChecker::new(50_000, Duration::from_secs(120)); + + let mut seed: u64 = 0xA5A5_5A5A_DEAD_BEEF; + for i in 0..20_000usize { + let mut mutated = base; + for _ in 0..3 { + seed ^= seed << 7; + seed ^= seed >> 9; + seed ^= seed << 8; + let idx = SKIP_LEN + (seed as usize % (HANDSHAKE_LEN - SKIP_LEN)); + mutated[idx] ^= ((seed >> 19) as u8).wrapping_add(1); + } + + let peer: SocketAddr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(172, 31, (i / 254) as u8, (i % 254 + 1) as u8)), + 47000 + (i % 15000) as u16, + ); + + let _ = tokio::time::timeout( + Duration::from_millis(250), + handle_mtproto_handshake( + &mutated, + tokio::io::empty(), + tokio::io::sink(), + peer, + &config, + &replay_checker, + false, + None, + ), + ) + .await + .expect("soak mutation must complete in bounded time"); + } +} diff --git a/src/proxy/masking.rs b/src/proxy/masking.rs index a7da35a..26f64cd 100644 --- a/src/proxy/masking.rs +++ b/src/proxy/masking.rs @@ -120,6 +120,37 @@ fn detect_client_type(data: &[u8]) -> &'static str { "unknown" } +fn build_mask_proxy_header( + version: u8, + peer: SocketAddr, + local_addr: SocketAddr, +) -> Option> { + match version { + 0 => None, + 2 => Some( + ProxyProtocolV2Builder::new() + .with_addrs(peer, local_addr) + .build(), + ), + _ => { + let header = match (peer, local_addr) { + (SocketAddr::V4(src), SocketAddr::V4(dst)) => { + ProxyProtocolV1Builder::new() + .tcp4(src.into(), dst.into()) + .build() + } + (SocketAddr::V6(src), SocketAddr::V6(dst)) => { + ProxyProtocolV1Builder::new() + .tcp6(src.into(), dst.into()) + .build() + } + _ => ProxyProtocolV1Builder::new().build(), + }; + Some(header) + } + } +} + /// Handle a bad client by forwarding to mask host pub async fn handle_bad_client( reader: R, @@ -162,23 +193,8 @@ where match connect_result { Ok(Ok(stream)) => { let (mask_read, mut mask_write) = stream.into_split(); - let proxy_header: Option> = match config.censorship.mask_proxy_protocol { - 0 => None, - version => { - let header = match version { - 2 => ProxyProtocolV2Builder::new().with_addrs(peer, local_addr).build(), - _ => match (peer, local_addr) { - (SocketAddr::V4(src), SocketAddr::V4(dst)) => - ProxyProtocolV1Builder::new().tcp4(src.into(), dst.into()).build(), - (SocketAddr::V6(src), SocketAddr::V6(dst)) => - ProxyProtocolV1Builder::new().tcp6(src.into(), dst.into()).build(), - _ => - ProxyProtocolV1Builder::new().build(), - }, - }; - Some(header) - } - }; + let proxy_header = + build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr); if let Some(header) = proxy_header { if !write_proxy_header_with_timeout(&mut mask_write, &header).await { wait_mask_outcome_budget(outcome_started).await; @@ -226,23 +242,8 @@ where let connect_result = timeout(MASK_TIMEOUT, TcpStream::connect(&mask_addr)).await; match connect_result { Ok(Ok(stream)) => { - let proxy_header: Option> = match config.censorship.mask_proxy_protocol { - 0 => None, - version => { - let header = match version { - 2 => ProxyProtocolV2Builder::new().with_addrs(peer, local_addr).build(), - _ => match (peer, local_addr) { - (SocketAddr::V4(src), SocketAddr::V4(dst)) => - ProxyProtocolV1Builder::new().tcp4(src.into(), dst.into()).build(), - (SocketAddr::V6(src), SocketAddr::V6(dst)) => - ProxyProtocolV1Builder::new().tcp6(src.into(), dst.into()).build(), - _ => - ProxyProtocolV1Builder::new().build(), - }, - }; - Some(header) - } - }; + let proxy_header = + build_mask_proxy_header(config.censorship.mask_proxy_protocol, peer, local_addr); let (mask_read, mut mask_write) = stream.into_split(); if let Some(header) = proxy_header { diff --git a/src/proxy/masking_adversarial_tests.rs b/src/proxy/masking_adversarial_tests.rs index 16b0047..955e8ec 100644 --- a/src/proxy/masking_adversarial_tests.rs +++ b/src/proxy/masking_adversarial_tests.rs @@ -4,7 +4,10 @@ use tokio::io::duplex; use tokio::net::TcpListener; use tokio::time::{Instant, Duration}; use crate::config::ProxyConfig; +use crate::proxy::relay::relay_bidirectional; +use crate::stats::Stats; use crate::stats::beobachten::BeobachtenStore; +use crate::stream::BufferPool; // ------------------------------------------------------------------ // Probing Indistinguishability (OWASP ASVS 5.1.7) @@ -211,3 +214,549 @@ async fn masking_ssrf_resolve_internal_ranges_blocked() { ); } } + +#[tokio::test] +async fn masking_unknown_proxy_protocol_version_falls_back_to_v1_unknown_header() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + + let mut header = [0u8; 15]; + stream.read_exact(&mut header).await.unwrap(); + assert_eq!(&header, b"PROXY UNKNOWN\r\n"); + + let mut payload = [0u8; 5]; + stream.read_exact(&mut payload).await.unwrap(); + assert_eq!(&payload, b"probe"); + }); + + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + config.censorship.mask_proxy_protocol = 255; + + let peer: SocketAddr = "198.51.100.77:50001".parse().unwrap(); + let local_addr: SocketAddr = "[2001:db8::10]:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + let (client_reader, _client_writer) = duplex(128); + let (_client_visible_reader, client_visible_writer) = duplex(128); + + handle_bad_client( + client_reader, + client_visible_writer, + b"probe", + peer, + local_addr, + &config, + &beobachten, + ) + .await; + + accept_task.await.unwrap(); +} + +#[tokio::test] +async fn masking_zero_length_initial_data_does_not_hang_or_panic() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut one = [0u8; 1]; + let n = tokio::time::timeout(Duration::from_millis(150), stream.read(&mut one)) + .await + .unwrap() + .unwrap(); + assert_eq!(n, 0, "backend must observe clean EOF for empty initial payload"); + }); + + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + + let peer: SocketAddr = "203.0.113.70:50002".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let (client_reader, client_writer) = duplex(64); + drop(client_writer); + let (_client_visible_reader, client_visible_writer) = duplex(64); + + handle_bad_client( + client_reader, + client_visible_writer, + b"", + peer, + local, + &config, + &beobachten, + ) + .await; + + accept_task.await.unwrap(); +} + +#[tokio::test] +async fn masking_oversized_initial_payload_is_forwarded_verbatim() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + let payload = vec![0xA5u8; 32 * 1024]; + + let accept_task = tokio::spawn({ + let payload = payload.clone(); + async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut observed = vec![0u8; payload.len()]; + stream.read_exact(&mut observed).await.unwrap(); + assert_eq!(observed, payload, "large initial payload must stay byte-for-byte"); + } + }); + + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + + let peer: SocketAddr = "203.0.113.71:50003".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + let (client_reader, _client_writer) = duplex(64); + let (_client_visible_reader, client_visible_writer) = duplex(64); + + handle_bad_client( + client_reader, + client_visible_writer, + &payload, + peer, + local, + &config, + &beobachten, + ) + .await; + + accept_task.await.unwrap(); +} + +#[tokio::test] +async fn masking_refused_backend_keeps_constantish_timing_floor_under_burst() { + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = 1; + + let peer: SocketAddr = "203.0.113.72:50004".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + for _ in 0..16 { + let (client_reader, _client_writer) = duplex(128); + let (_client_visible_reader, client_visible_writer) = duplex(128); + let started = Instant::now(); + handle_bad_client( + client_reader, + client_visible_writer, + b"GET / HTTP/1.1\r\n", + peer, + local, + &config, + &beobachten, + ) + .await; + assert!( + started.elapsed() >= Duration::from_millis(30), + "refused-backend path must keep timing floor to reduce fingerprinting" + ); + } +} + +#[tokio::test] +async fn masking_backend_half_close_then_client_half_close_completes_without_hang() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut pre = [0u8; 4]; + stream.read_exact(&mut pre).await.unwrap(); + assert_eq!(&pre, b"PING"); + stream.write_all(b"PONG").await.unwrap(); + stream.shutdown().await.unwrap(); + }); + + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + + let peer: SocketAddr = "203.0.113.73:50005".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let (mut client_writer, client_reader) = duplex(256); + let (mut client_visible_reader, client_visible_writer) = duplex(256); + + let handle = tokio::spawn(async move { + handle_bad_client( + client_reader, + client_visible_writer, + b"PING", + peer, + local, + &config, + &beobachten, + ) + .await; + }); + + client_writer.shutdown().await.unwrap(); + + let mut got = [0u8; 4]; + client_visible_reader.read_exact(&mut got).await.unwrap(); + assert_eq!(&got, b"PONG"); + + timeout(Duration::from_secs(2), handle) + .await + .expect("masking task must terminate after bilateral half-close") + .unwrap(); + accept_task.await.unwrap(); +} + +#[tokio::test] +async fn chaos_burst_reconnect_storm_for_masking_and_relay_concurrently() { + const MASKING_SESSIONS: usize = 48; + const RELAY_SESSIONS: usize = 48; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + let backend_reply = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK".to_vec(); + + let backend_task = tokio::spawn({ + let backend_reply = backend_reply.clone(); + async move { + for _ in 0..MASKING_SESSIONS { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut req = [0u8; 32]; + stream.read_exact(&mut req).await.unwrap(); + assert!( + req.starts_with(b"GET /storm/"), + "masking backend must receive storm reconnect probes" + ); + stream.write_all(&backend_reply).await.unwrap(); + stream.shutdown().await.unwrap(); + } + } + }); + + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + config.censorship.mask_proxy_protocol = 0; + + let config = Arc::new(config); + let beobachten = Arc::new(BeobachtenStore::new()); + let peer: SocketAddr = "198.51.100.200:55555".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + + let mut masking_tasks = Vec::with_capacity(MASKING_SESSIONS); + for i in 0..MASKING_SESSIONS { + let config = Arc::clone(&config); + let beobachten = Arc::clone(&beobachten); + let expected_reply = backend_reply.clone(); + masking_tasks.push(tokio::spawn(async move { + let mut probe = [0u8; 32]; + let template = format!("GET /storm/{i:04} HTTP/1.1\r\n\r\n"); + let bytes = template.as_bytes(); + probe[..bytes.len()].copy_from_slice(bytes); + + let (client_reader, client_writer) = duplex(256); + drop(client_writer); + let (mut client_visible_reader, client_visible_writer) = duplex(1024); + + let handle = tokio::spawn(async move { + handle_bad_client( + client_reader, + client_visible_writer, + &probe, + peer, + local, + &config, + &beobachten, + ) + .await; + }); + + let mut observed = vec![0u8; expected_reply.len()]; + client_visible_reader.read_exact(&mut observed).await.unwrap(); + assert_eq!(observed, expected_reply); + + timeout(Duration::from_secs(2), handle) + .await + .expect("masking reconnect task must complete") + .unwrap(); + })); + } + + let mut relay_tasks = Vec::with_capacity(RELAY_SESSIONS); + for i in 0..RELAY_SESSIONS { + relay_tasks.push(tokio::spawn(async move { + let stats = Arc::new(Stats::new()); + let (mut client_peer, relay_client) = duplex(4096); + let (relay_server, mut server_peer) = duplex(4096); + + let (client_reader, client_writer) = tokio::io::split(relay_client); + let (server_reader, server_writer) = tokio::io::split(relay_server); + + let relay_task = tokio::spawn(relay_bidirectional( + client_reader, + client_writer, + server_reader, + server_writer, + 1024, + 1024, + "chaos-storm-relay", + stats, + None, + Arc::new(BufferPool::new()), + )); + + let c2s = vec![(i as u8).wrapping_add(1); 64]; + client_peer.write_all(&c2s).await.unwrap(); + let mut c2s_seen = vec![0u8; c2s.len()]; + server_peer.read_exact(&mut c2s_seen).await.unwrap(); + assert_eq!(c2s_seen, c2s); + + let s2c = vec![(i as u8).wrapping_add(17); 96]; + server_peer.write_all(&s2c).await.unwrap(); + let mut s2c_seen = vec![0u8; s2c.len()]; + client_peer.read_exact(&mut s2c_seen).await.unwrap(); + assert_eq!(s2c_seen, s2c); + + drop(client_peer); + drop(server_peer); + timeout(Duration::from_secs(2), relay_task) + .await + .expect("relay reconnect task must complete") + .unwrap() + .unwrap(); + })); + } + + for task in masking_tasks { + timeout(Duration::from_secs(3), task) + .await + .expect("masking storm join must complete") + .unwrap(); + } + + for task in relay_tasks { + timeout(Duration::from_secs(3), task) + .await + .expect("relay storm join must complete") + .unwrap(); + } + + timeout(Duration::from_secs(3), backend_task) + .await + .expect("masking backend accept loop must complete") + .unwrap(); +} + +fn read_env_usize_or_default(name: &str, default: usize) -> usize { + match std::env::var(name) { + Ok(raw) => match raw.parse::() { + Ok(parsed) if parsed > 0 => parsed, + _ => default, + }, + Err(_) => default, + } +} + +#[tokio::test] +#[ignore = "heavy soak; run manually"] +async fn chaos_burst_reconnect_storm_for_masking_and_relay_multiwave_soak() { + let waves = read_env_usize_or_default("CHAOS_WAVES", 4); + let masking_per_wave = read_env_usize_or_default("CHAOS_MASKING_PER_WAVE", 160); + let relay_per_wave = read_env_usize_or_default("CHAOS_RELAY_PER_WAVE", 160); + let total_masking = waves * masking_per_wave; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + let backend_reply = b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n".to_vec(); + + let backend_task = tokio::spawn({ + let backend_reply = backend_reply.clone(); + async move { + for _ in 0..total_masking { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut req = [0u8; 32]; + stream.read_exact(&mut req).await.unwrap(); + assert!( + req.starts_with(b"GET /storm/"), + "mask backend must only receive storm probes" + ); + stream.write_all(&backend_reply).await.unwrap(); + stream.shutdown().await.unwrap(); + } + } + }); + + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = backend_addr.port(); + config.censorship.mask_proxy_protocol = 0; + + let config = Arc::new(config); + let beobachten = Arc::new(BeobachtenStore::new()); + let peer: SocketAddr = "198.51.100.201:56565".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + + for wave in 0..waves { + let mut masking_tasks = Vec::with_capacity(masking_per_wave); + for i in 0..masking_per_wave { + let config = Arc::clone(&config); + let beobachten = Arc::clone(&beobachten); + let expected_reply = backend_reply.clone(); + masking_tasks.push(tokio::spawn(async move { + let mut probe = [0u8; 32]; + let template = format!("GET /storm/{wave:02}-{i:03}\r\n\r\n"); + let bytes = template.as_bytes(); + probe[..bytes.len()].copy_from_slice(bytes); + + let (client_reader, client_writer) = duplex(256); + drop(client_writer); + let (mut client_visible_reader, client_visible_writer) = duplex(1024); + + let handle = tokio::spawn(async move { + handle_bad_client( + client_reader, + client_visible_writer, + &probe, + peer, + local, + &config, + &beobachten, + ) + .await; + }); + + let mut observed = vec![0u8; expected_reply.len()]; + client_visible_reader.read_exact(&mut observed).await.unwrap(); + assert_eq!(observed, expected_reply); + + timeout(Duration::from_secs(3), handle) + .await + .expect("masking storm task must complete") + .unwrap(); + })); + } + + let mut relay_tasks = Vec::with_capacity(relay_per_wave); + for i in 0..relay_per_wave { + relay_tasks.push(tokio::spawn(async move { + let stats = Arc::new(Stats::new()); + let (mut client_peer, relay_client) = duplex(4096); + let (relay_server, mut server_peer) = duplex(4096); + + let (client_reader, client_writer) = tokio::io::split(relay_client); + let (server_reader, server_writer) = tokio::io::split(relay_server); + + let relay_task = tokio::spawn(relay_bidirectional( + client_reader, + client_writer, + server_reader, + server_writer, + 1024, + 1024, + "chaos-multiwave-relay", + stats, + None, + Arc::new(BufferPool::new()), + )); + + let c2s = vec![(wave as u8).wrapping_add(i as u8).wrapping_add(1); 32]; + client_peer.write_all(&c2s).await.unwrap(); + let mut c2s_seen = vec![0u8; c2s.len()]; + server_peer.read_exact(&mut c2s_seen).await.unwrap(); + assert_eq!(c2s_seen, c2s); + + let s2c = vec![(wave as u8).wrapping_add(i as u8).wrapping_add(17); 48]; + server_peer.write_all(&s2c).await.unwrap(); + let mut s2c_seen = vec![0u8; s2c.len()]; + client_peer.read_exact(&mut s2c_seen).await.unwrap(); + assert_eq!(s2c_seen, s2c); + + drop(client_peer); + drop(server_peer); + timeout(Duration::from_secs(3), relay_task) + .await + .expect("relay storm task must complete") + .unwrap() + .unwrap(); + })); + } + + for task in masking_tasks { + timeout(Duration::from_secs(6), task) + .await + .expect("masking wave task join must complete") + .unwrap(); + } + + for task in relay_tasks { + timeout(Duration::from_secs(6), task) + .await + .expect("relay wave task join must complete") + .unwrap(); + } + } + + timeout(Duration::from_secs(8), backend_task) + .await + .expect("mask backend must complete all accepted storm sessions") + .unwrap(); +} + +#[tokio::test] +#[ignore = "heavy soak; run manually"] +async fn masking_timing_bucket_soak_refused_backend_stays_within_narrow_band() { + let mut config = ProxyConfig::default(); + config.censorship.mask = true; + config.censorship.mask_host = Some("127.0.0.1".to_string()); + config.censorship.mask_port = 1; + + let peer: SocketAddr = "203.0.113.74:50006".parse().unwrap(); + let local: SocketAddr = "127.0.0.1:443".parse().unwrap(); + let beobachten = BeobachtenStore::new(); + + let mut samples = Vec::with_capacity(128); + for _ in 0..128 { + let (client_reader, _client_writer) = duplex(128); + let (_client_visible_reader, client_visible_writer) = duplex(128); + let started = Instant::now(); + handle_bad_client( + client_reader, + client_visible_writer, + b"GET / HTTP/1.1\r\n", + peer, + local, + &config, + &beobachten, + ) + .await; + samples.push(started.elapsed().as_millis()); + } + + samples.sort_unstable(); + let p10 = samples[samples.len() / 10]; + let p90 = samples[(samples.len() * 9) / 10]; + assert!( + p90.saturating_sub(p10) <= 40, + "timing spread too wide for refused-backend masking path: p10={p10}ms p90={p90}ms" + ); +} diff --git a/src/proxy/masking_security_tests.rs b/src/proxy/masking_security_tests.rs index 893b3e5..3219408 100644 --- a/src/proxy/masking_security_tests.rs +++ b/src/proxy/masking_security_tests.rs @@ -130,6 +130,41 @@ fn detect_client_type_len_boundary_9_vs_10_bytes() { assert_eq!(detect_client_type(b"1234567890"), "unknown"); } +#[test] +fn build_mask_proxy_header_version_zero_disables_header() { + let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap(); + let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap(); + + let header = build_mask_proxy_header(0, peer, local_addr); + assert!(header.is_none(), "version 0 must disable PROXY header"); +} + +#[test] +fn build_mask_proxy_header_v2_matches_builder_output() { + let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap(); + let local_addr: SocketAddr = "127.0.0.1:443".parse().unwrap(); + + let expected = ProxyProtocolV2Builder::new() + .with_addrs(peer, local_addr) + .build(); + let actual = build_mask_proxy_header(2, peer, local_addr) + .expect("v2 mode must produce a header"); + + assert_eq!(actual, expected, "v2 header bytes must be deterministic"); +} + +#[test] +fn build_mask_proxy_header_v1_mixed_ip_family_uses_generic_unknown_form() { + let peer: SocketAddr = "203.0.113.10:42424".parse().unwrap(); + let local_addr: SocketAddr = "[2001:db8::1]:443".parse().unwrap(); + + let expected = ProxyProtocolV1Builder::new().build(); + let actual = build_mask_proxy_header(1, peer, local_addr) + .expect("v1 mode must produce a header"); + + assert_eq!(actual, expected, "mixed-family v1 must use UNKNOWN form"); +} + #[tokio::test] async fn beobachten_records_scanner_class_when_mask_is_disabled() { let mut config = ProxyConfig::default(); diff --git a/src/proxy/relay_adversarial_tests.rs b/src/proxy/relay_adversarial_tests.rs index 08de0b8..f87d82b 100644 --- a/src/proxy/relay_adversarial_tests.rs +++ b/src/proxy/relay_adversarial_tests.rs @@ -120,3 +120,91 @@ async fn relay_quota_mid_session_cutoff() { let n = sp_reader.read(&mut small_buf).await.unwrap(); assert_eq!(n, 0, "Server must see EOF after quota reached"); } + +#[tokio::test] +async fn relay_chaos_half_close_crossfire_terminates_without_hang() { + let stats = Arc::new(Stats::new()); + + let (mut client_peer, relay_client) = duplex(8192); + let (relay_server, mut server_peer) = duplex(8192); + + let (client_reader, client_writer) = tokio::io::split(relay_client); + let (server_reader, server_writer) = tokio::io::split(relay_server); + + let relay_task = tokio::spawn(relay_bidirectional( + client_reader, + client_writer, + server_reader, + server_writer, + 1024, + 1024, + "half-close-crossfire", + Arc::clone(&stats), + None, + Arc::new(BufferPool::new()), + )); + + client_peer.write_all(b"c2s-pre-half-close").await.unwrap(); + server_peer.write_all(b"s2c-pre-half-close").await.unwrap(); + + client_peer.shutdown().await.unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + server_peer.shutdown().await.unwrap(); + + let done = timeout(Duration::from_secs(1), relay_task) + .await + .expect("relay must terminate after bilateral half-close") + .expect("relay task must not panic"); + assert!(done.is_ok(), "relay must terminate cleanly under half-close crossfire"); +} + +#[tokio::test] +#[ignore = "heavy soak; run manually"] +async fn relay_soak_bidirectional_temporal_jitter_5k_rounds() { + let stats = Arc::new(Stats::new()); + + let (mut client_peer, relay_client) = duplex(65536); + let (relay_server, mut server_peer) = duplex(65536); + + let (client_reader, client_writer) = tokio::io::split(relay_client); + let (server_reader, server_writer) = tokio::io::split(relay_server); + + let relay_task = tokio::spawn(relay_bidirectional( + client_reader, + client_writer, + server_reader, + server_writer, + 4096, + 4096, + "soak-jitter-user", + Arc::clone(&stats), + None, + Arc::new(BufferPool::new()), + )); + + for i in 0..5_000u32 { + let c = [((i as u8).wrapping_mul(13)).wrapping_add(1); 17]; + client_peer.write_all(&c).await.unwrap(); + let mut c_seen = [0u8; 17]; + server_peer.read_exact(&mut c_seen).await.unwrap(); + assert_eq!(c_seen, c); + + let s = [((i as u8).wrapping_mul(7)).wrapping_add(3); 23]; + server_peer.write_all(&s).await.unwrap(); + let mut s_seen = [0u8; 23]; + client_peer.read_exact(&mut s_seen).await.unwrap(); + assert_eq!(s_seen, s); + + if i % 10 == 0 { + tokio::time::sleep(Duration::from_millis((i % 3) as u64)).await; + } + } + + drop(client_peer); + drop(server_peer); + let done = timeout(Duration::from_secs(2), relay_task) + .await + .expect("relay must stop after soak peers close") + .expect("relay task must not panic"); + assert!(done.is_ok()); +}