diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 984c7b4..487f8db 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -101,6 +101,15 @@ fn beobachten_ttl(config: &ProxyConfig) -> Duration { Duration::from_secs(minutes.saturating_mul(60)) } +fn wrap_tls_application_record(payload: &[u8]) -> Vec { + let mut record = Vec::with_capacity(5 + payload.len()); + record.push(TLS_RECORD_APPLICATION); + record.extend_from_slice(&TLS_VERSION); + record.extend_from_slice(&(payload.len() as u16).to_be_bytes()); + record.extend_from_slice(payload); + record +} + fn record_beobachten_class( beobachten: &BeobachtenStore, config: &ProxyConfig, @@ -298,8 +307,14 @@ where // MTProto failed after TLS ServerHello was already sent. // Switch fallback relay back to raw transport so the mask // backend receives valid TLS records (not unwrapped payload). - let reader = reader.into_inner(); + let (reader, pending_plaintext) = reader.into_inner_with_pending_plaintext(); let writer = writer.into_inner(); + let pending_record = if pending_plaintext.is_empty() { + Vec::new() + } else { + wrap_tls_application_record(&pending_plaintext) + }; + let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader); stats.increment_connects_bad(); debug!( peer = %peer, @@ -719,8 +734,14 @@ impl RunningClientHandler { // MTProto failed after TLS ServerHello was already sent. // Switch fallback relay back to raw transport so the mask // backend receives valid TLS records (not unwrapped payload). - let reader = reader.into_inner(); + let (reader, pending_plaintext) = reader.into_inner_with_pending_plaintext(); let writer = writer.into_inner(); + let pending_record = if pending_plaintext.is_empty() { + Vec::new() + } else { + wrap_tls_application_record(&pending_plaintext) + }; + let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader); stats.increment_connects_bad(); debug!( peer = %peer, diff --git a/src/proxy/client_tls_mtproto_fallback_security_tests.rs b/src/proxy/client_tls_mtproto_fallback_security_tests.rs index 80393bb..9451016 100644 --- a/src/proxy/client_tls_mtproto_fallback_security_tests.rs +++ b/src/proxy/client_tls_mtproto_fallback_security_tests.rs @@ -110,6 +110,12 @@ fn wrap_tls_record(record_type: u8, payload: &[u8]) -> Vec { record } +fn wrap_invalid_mtproto_with_coalesced_tail(tail: &[u8]) -> Vec { + let mut payload = vec![0u8; HANDSHAKE_LEN]; + payload.extend_from_slice(tail); + wrap_tls_application_data(&payload) +} + async fn read_and_discard_tls_record_body(stream: &mut T, header: [u8; 5]) where T: tokio::io::AsyncRead + Unpin, @@ -1501,3 +1507,1406 @@ async fn tls_bad_mtproto_fallback_many_short_sessions_with_chaos_no_cross_leak() .unwrap() .unwrap(); } + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_small_is_forwarded_as_tls_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA1u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 300, 600, 0x31); + let coalesced_tail = b"coalesced-tail-small".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.210:56110".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_large_is_forwarded_as_tls_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA2u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 301, 600, 0x32); + let coalesced_tail = vec![0xAB; 4096]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.211:56111".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_keeps_order_before_following_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA3u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 302, 600, 0x33); + let coalesced_tail = b"coalesced-first".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + let following_record = wrap_tls_application_data(b"following-record"); + let expected_concat = [expected_tail_record.clone(), following_record.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_records = expected_concat.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_records = vec![0u8; expected_records.len()]; + stream.read_exact(&mut got_records).await.unwrap(); + assert_eq!(got_records, expected_records); + }); + + let harness = build_harness("a3a3a3a3a3a3a3a3a3a3a3a3a3a3a3a3", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.212:56112".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.write_all(&following_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_fragmented_client_write_is_forwarded() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA4u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 303, 600, 0x34); + let coalesced_tail = vec![0xCD; 1536]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a4a4a4a4a4a4a4a4a4a4a4a4a4a4a4a4", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.213:56113".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + let steps = [7usize, 3, 13, 5, 11, 2, 17, 19]; + let mut offset = 0usize; + let mut i = 0usize; + while offset < coalesced_record.len() { + let step = steps[i % steps.len()]; + let end = (offset + step).min(coalesced_record.len()); + client_side + .write_all(&coalesced_record[offset..end]) + .await + .unwrap(); + offset = end; + i += 1; + } + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_max_payload_is_forwarded() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA5u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 304, 600, 0x35); + let coalesced_tail = vec![0xEF; MAX_TLS_CHUNK_SIZE - HANDSHAKE_LEN]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a5a5a5a5a5a5a5a5a5a5a5a5a5a5a5a5", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.214:56114".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(5), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_identical_following_record_must_not_duplicate_or_reorder() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xB1u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 400, 600, 0x21); + let tail = b"same-payload-record".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let tail_record = wrap_tls_application_data(&tail); + let expected = [tail_record.clone(), tail_record.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_payload = expected.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got = vec![0u8; expected_payload.len()]; + stream.read_exact(&mut got).await.unwrap(); + assert_eq!(got, expected_payload); + + let mut tail = [0u8; 1]; + let n = stream.read(&mut tail).await.unwrap(); + assert_eq!(n, 0, "fallback stream must not emit extra bytes"); + }); + + let harness = build_harness("b1b1b1b1b1b1b1b1b1b1b1b1b1b1b1b1", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.220:56120".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.write_all(&tail_record).await.unwrap(); + client_side.shutdown().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_tls_header_looking_bytes_must_stay_payload() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xB2u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 401, 600, 0x22); + let mut tail = vec![0x16, 0x03, 0x03, 0x00, 0x10]; + tail.extend_from_slice(b"not-a-real-record-boundary"); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail_record = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.221:56121".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_client_half_close_must_not_truncate_prepended_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xB3u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 402, 600, 0x23); + let tail = vec![0xAA; 3072]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail_record = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + + let mut one = [0u8; 1]; + let n = stream.read(&mut one).await.unwrap(); + assert_eq!(n, 0, "backend must observe EOF after client half-close"); + }); + + let harness = build_harness("b3b3b3b3b3b3b3b3b3b3b3b3b3b3b3b3", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.222:56122".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.shutdown().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_multi_session_no_cross_bleed_under_churn() { + let sessions = 16usize; + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut expected = std::collections::HashMap::new(); + let secret = [0xB4u8; 16]; + for idx in 0..sessions { + let hello = make_valid_tls_client_hello(&secret, 450 + idx as u32, 600, 0x40 + idx as u8); + let tail = vec![idx as u8; 17 + idx]; + expected.insert(hello, wrap_tls_application_data(&tail)); + } + + let accept_task = tokio::spawn(async move { + let mut remaining = expected; + for _ in 0..sessions { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; 605]; + stream.read_exact(&mut got_hello).await.unwrap(); + let expected_tail = remaining + .remove(&got_hello) + .expect("unexpected hello or duplicated session routing"); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + } + assert!(remaining.is_empty(), "all sessions must map one-to-one"); + }); + + let mut tasks = Vec::with_capacity(sessions); + for idx in 0..sessions { + let harness = build_harness("b4b4b4b4b4b4b4b4b4b4b4b4b4b4b4b4", backend_addr.port()); + let hello = make_valid_tls_client_hello(&secret, 450 + idx as u32, 600, 0x40 + idx as u8); + let tail = vec![idx as u8; 17 + idx]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let peer: SocketAddr = format!("198.51.100.223:{}", 56200 + idx as u16) + .parse() + .unwrap(); + + tasks.push(tokio::spawn(async move { + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + peer, + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + for chunk in coalesced_record.chunks((idx % 7) + 1) { + client_side.write_all(chunk).await.unwrap(); + } + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); + })); + } + + for task in tasks { + task.await.unwrap(); + } + + tokio::time::timeout(Duration::from_secs(6), accept_task) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_single_byte_tail_is_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC1u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 500, 600, 0x11); + let tail = vec![0x7F]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c1c1c1c1c1c1c1c1c1c1c1c1c1c1c1c1", backend_addr.port()); + let (server_side, mut client_side) = duplex(65536); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.230:56130".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_exact_tls_header_size_payload_is_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC2u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 501, 600, 0x12); + let tail = vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c2c2c2c2c2c2c2c2c2c2c2c2c2c2c2c2", backend_addr.port()); + let (server_side, mut client_side) = duplex(65536); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.231:56131".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_all_zero_payload_is_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC3u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 502, 600, 0x13); + let tail = vec![0u8; 2048]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.232:56132".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_following_control_records_are_not_mutated() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC4u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 503, 600, 0x14); + let tail = b"tail-before-controls".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let tail_record = wrap_tls_application_data(&tail); + let ccs = wrap_tls_record(0x14, &[0x01]); + let alert = wrap_tls_record(0x15, &[0x01, 0x00]); + let app = wrap_tls_application_data(b"control-final-app"); + let expected = [tail_record, ccs.clone(), alert.clone(), app.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_payload = expected.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_payload = vec![0u8; expected_payload.len()]; + stream.read_exact(&mut got_payload).await.unwrap(); + assert_eq!(got_payload, expected_payload); + }); + + let harness = build_harness("c4c4c4c4c4c4c4c4c4c4c4c4c4c4c4c4", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.233:56133".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.write_all(&ccs).await.unwrap(); + client_side.write_all(&alert).await.unwrap(); + client_side.write_all(&app).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_then_following_records_fragmented_chaos_stays_ordered() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC5u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 504, 600, 0x15); + let tail = vec![0xAC; 900]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let tail_record = wrap_tls_application_data(&tail); + let r1 = wrap_tls_application_data(b"r1"); + let r2 = wrap_tls_application_data(&vec![0xDD; 257]); + let expected = [tail_record, r1.clone(), r2.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_payload = expected.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_payload = vec![0u8; expected_payload.len()]; + stream.read_exact(&mut got_payload).await.unwrap(); + assert_eq!(got_payload, expected_payload); + }); + + let harness = build_harness("c5c5c5c5c5c5c5c5c5c5c5c5c5c5c5c5", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.234:56134".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + let pattern = [3usize, 1, 5, 2, 7, 11, 13, 17, 19]; + let mut idx = 0usize; + for data in [&coalesced_record, &r1, &r2] { + let mut pos = 0usize; + while pos < data.len() { + let step = pattern[idx % pattern.len()]; + idx += 1; + let end = (pos + step).min(data.len()); + client_side.write_all(&data[pos..end]).await.unwrap(); + pos = end; + } + } + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_backend_response_integrity_after_fallback() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC6u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 505, 600, 0x16); + let tail = b"coalesced-request-body".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + let backend_response = b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n".to_vec(); + + let expected_hello = client_hello.clone(); + let expected_resp = backend_response.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + + stream.write_all(&expected_resp).await.unwrap(); + }); + + let harness = build_harness("c6c6c6c6c6c6c6c6c6c6c6c6c6c6c6c6", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.235:56135".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + let mut observed = Vec::new(); + let mut buf = [0u8; 512]; + let mut found = false; + for _ in 0..32 { + let n = tokio::time::timeout(Duration::from_millis(200), client_side.read(&mut buf)) + .await + .unwrap() + .unwrap(); + if n == 0 { + break; + } + observed.extend_from_slice(&buf[..n]); + if observed + .windows(backend_response.len()) + .any(|w| w == backend_response.as_slice()) + { + found = true; + break; + } + } + assert!( + found, + "backend plaintext response must be observable on client stream after fallback" + ); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_connects_bad_increments_exactly_once() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC7u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 506, 600, 0x17); + let tail = b"count-bad-once".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let harness = build_harness("c7c7c7c7c7c7c7c7c7c7c7c7c7c7c7c7", backend_addr.port()); + let stats = harness.stats.clone(); + let bad_before = stats.get_connects_bad(); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.236:56136".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); + + let bad_after = stats.get_connects_bad(); + assert_eq!( + bad_after, + bad_before + 1, + "invalid MTProto after valid TLS must increment connects_bad exactly once" + ); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_parallel_32_sessions_no_cross_bleed() { + let sessions = 32usize; + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut expected = std::collections::HashMap::new(); + let secret = [0xC8u8; 16]; + for idx in 0..sessions { + let hello = make_valid_tls_client_hello(&secret, 550 + idx as u32, 600, 0x20 + idx as u8); + let tail = vec![idx as u8; 48 + (idx % 11)]; + expected.insert(hello, wrap_tls_application_data(&tail)); + } + + let accept_task = tokio::spawn(async move { + let mut remaining = expected; + for _ in 0..sessions { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; 605]; + stream.read_exact(&mut got_hello).await.unwrap(); + let expected_tail = remaining + .remove(&got_hello) + .expect("session mixup detected in parallel-32 blackhat test"); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + } + assert!(remaining.is_empty(), "all expected sessions must be consumed"); + }); + + let mut tasks = Vec::with_capacity(sessions); + for idx in 0..sessions { + let harness = build_harness("c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8", backend_addr.port()); + let hello = make_valid_tls_client_hello(&secret, 550 + idx as u32, 600, 0x20 + idx as u8); + let tail = vec![idx as u8; 48 + (idx % 11)]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let peer: SocketAddr = format!("198.51.100.237:{}", 56300 + idx as u16) + .parse() + .unwrap(); + + tasks.push(tokio::spawn(async move { + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + peer, + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + let chunk = (idx % 13) + 1; + for part in coalesced_record.chunks(chunk) { + client_side.write_all(part).await.unwrap(); + } + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); + })); + } + + for task in tasks { + task.await.unwrap(); + } + + tokio::time::timeout(Duration::from_secs(6), accept_task) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_repeated_tls_like_prefixes_are_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC9u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 507, 600, 0x18); + let mut tail = Vec::new(); + for _ in 0..64 { + tail.extend_from_slice(&[0x16, 0x03, 0x03, 0x00, 0x20]); + } + tail.extend_from_slice(b"suffix-data"); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c9c9c9c9c9c9c9c9c9c9c9c9c9c9c9c9", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.238:56138".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_drop_after_write_still_delivers_prepended_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xCAu8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 508, 600, 0x19); + let tail = vec![0xBE; 1024]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("cacacacacacacacacacacacacacacaca", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.239:56139".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + drop(client_side); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_zero_following_record_after_coalesced_is_not_invented() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xCBu8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 509, 600, 0x1A); + let tail = b"terminal-tail".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + + let mut one = [0u8; 1]; + let n = stream.read(&mut one).await.unwrap(); + assert_eq!(n, 0, "no synthetic extra record must appear"); + }); + + let harness = build_harness("cbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcb", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.240:56140".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.shutdown().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index fe28542..c87c350 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -250,6 +250,14 @@ impl FakeTlsReader { pub fn get_mut(&mut self) -> &mut R { &mut self.upstream } pub fn into_inner(self) -> R { self.upstream } + pub fn into_inner_with_pending_plaintext(mut self) -> (R, Vec) { + let pending = match std::mem::replace(&mut self.state, TlsReaderState::Idle) { + TlsReaderState::Yielding { buffer } => buffer.as_slice().to_vec(), + _ => Vec::new(), + }; + (self.upstream, pending) + } + pub fn is_poisoned(&self) -> bool { self.state.is_poisoned() } pub fn state_name(&self) -> &'static str { self.state.state_name() }