From 35a8f5b2e559e6a91963ad4e3f899c5a9ee604be Mon Sep 17 00:00:00 2001 From: David Osipov Date: Fri, 20 Mar 2026 17:56:37 +0400 Subject: [PATCH] Add method to retrieve inner reader with pending plaintext This commit introduces the `into_inner_with_pending_plaintext` method to the `FakeTlsReader` struct. This method allows users to extract the underlying reader along with any pending plaintext data that may have been buffered during the TLS reading process. The method handles the state transition and ensures that any buffered data is returned as a vector, facilitating easier management of plaintext data in TLS streams. --- src/proxy/client.rs | 25 +- ...ent_tls_mtproto_fallback_security_tests.rs | 1409 +++++++++++++++++ src/stream/tls_stream.rs | 8 + 3 files changed, 1440 insertions(+), 2 deletions(-) diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 984c7b4..487f8db 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -101,6 +101,15 @@ fn beobachten_ttl(config: &ProxyConfig) -> Duration { Duration::from_secs(minutes.saturating_mul(60)) } +fn wrap_tls_application_record(payload: &[u8]) -> Vec { + let mut record = Vec::with_capacity(5 + payload.len()); + record.push(TLS_RECORD_APPLICATION); + record.extend_from_slice(&TLS_VERSION); + record.extend_from_slice(&(payload.len() as u16).to_be_bytes()); + record.extend_from_slice(payload); + record +} + fn record_beobachten_class( beobachten: &BeobachtenStore, config: &ProxyConfig, @@ -298,8 +307,14 @@ where // MTProto failed after TLS ServerHello was already sent. // Switch fallback relay back to raw transport so the mask // backend receives valid TLS records (not unwrapped payload). - let reader = reader.into_inner(); + let (reader, pending_plaintext) = reader.into_inner_with_pending_plaintext(); let writer = writer.into_inner(); + let pending_record = if pending_plaintext.is_empty() { + Vec::new() + } else { + wrap_tls_application_record(&pending_plaintext) + }; + let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader); stats.increment_connects_bad(); debug!( peer = %peer, @@ -719,8 +734,14 @@ impl RunningClientHandler { // MTProto failed after TLS ServerHello was already sent. // Switch fallback relay back to raw transport so the mask // backend receives valid TLS records (not unwrapped payload). - let reader = reader.into_inner(); + let (reader, pending_plaintext) = reader.into_inner_with_pending_plaintext(); let writer = writer.into_inner(); + let pending_record = if pending_plaintext.is_empty() { + Vec::new() + } else { + wrap_tls_application_record(&pending_plaintext) + }; + let reader = tokio::io::AsyncReadExt::chain(std::io::Cursor::new(pending_record), reader); stats.increment_connects_bad(); debug!( peer = %peer, diff --git a/src/proxy/client_tls_mtproto_fallback_security_tests.rs b/src/proxy/client_tls_mtproto_fallback_security_tests.rs index 80393bb..9451016 100644 --- a/src/proxy/client_tls_mtproto_fallback_security_tests.rs +++ b/src/proxy/client_tls_mtproto_fallback_security_tests.rs @@ -110,6 +110,12 @@ fn wrap_tls_record(record_type: u8, payload: &[u8]) -> Vec { record } +fn wrap_invalid_mtproto_with_coalesced_tail(tail: &[u8]) -> Vec { + let mut payload = vec![0u8; HANDSHAKE_LEN]; + payload.extend_from_slice(tail); + wrap_tls_application_data(&payload) +} + async fn read_and_discard_tls_record_body(stream: &mut T, header: [u8; 5]) where T: tokio::io::AsyncRead + Unpin, @@ -1501,3 +1507,1406 @@ async fn tls_bad_mtproto_fallback_many_short_sessions_with_chaos_no_cross_leak() .unwrap() .unwrap(); } + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_small_is_forwarded_as_tls_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA1u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 300, 600, 0x31); + let coalesced_tail = b"coalesced-tail-small".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1a1", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.210:56110".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_large_is_forwarded_as_tls_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA2u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 301, 600, 0x32); + let coalesced_tail = vec![0xAB; 4096]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.211:56111".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_keeps_order_before_following_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA3u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 302, 600, 0x33); + let coalesced_tail = b"coalesced-first".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + let following_record = wrap_tls_application_data(b"following-record"); + let expected_concat = [expected_tail_record.clone(), following_record.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_records = expected_concat.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_records = vec![0u8; expected_records.len()]; + stream.read_exact(&mut got_records).await.unwrap(); + assert_eq!(got_records, expected_records); + }); + + let harness = build_harness("a3a3a3a3a3a3a3a3a3a3a3a3a3a3a3a3", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.212:56112".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.write_all(&following_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_fragmented_client_write_is_forwarded() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA4u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 303, 600, 0x34); + let coalesced_tail = vec![0xCD; 1536]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a4a4a4a4a4a4a4a4a4a4a4a4a4a4a4a4", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.213:56113".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + let steps = [7usize, 3, 13, 5, 11, 2, 17, 19]; + let mut offset = 0usize; + let mut i = 0usize; + while offset < coalesced_record.len() { + let step = steps[i % steps.len()]; + let end = (offset + step).min(coalesced_record.len()); + client_side + .write_all(&coalesced_record[offset..end]) + .await + .unwrap(); + offset = end; + i += 1; + } + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn tls_bad_mtproto_fallback_coalesced_tail_max_payload_is_forwarded() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xA5u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 304, 600, 0x35); + let coalesced_tail = vec![0xEF; MAX_TLS_CHUNK_SIZE - HANDSHAKE_LEN]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&coalesced_tail); + let expected_tail_record = wrap_tls_application_data(&coalesced_tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("a5a5a5a5a5a5a5a5a5a5a5a5a5a5a5a5", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.214:56114".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(5), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_identical_following_record_must_not_duplicate_or_reorder() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xB1u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 400, 600, 0x21); + let tail = b"same-payload-record".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let tail_record = wrap_tls_application_data(&tail); + let expected = [tail_record.clone(), tail_record.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_payload = expected.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got = vec![0u8; expected_payload.len()]; + stream.read_exact(&mut got).await.unwrap(); + assert_eq!(got, expected_payload); + + let mut tail = [0u8; 1]; + let n = stream.read(&mut tail).await.unwrap(); + assert_eq!(n, 0, "fallback stream must not emit extra bytes"); + }); + + let harness = build_harness("b1b1b1b1b1b1b1b1b1b1b1b1b1b1b1b1", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.220:56120".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.write_all(&tail_record).await.unwrap(); + client_side.shutdown().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_tls_header_looking_bytes_must_stay_payload() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xB2u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 401, 600, 0x22); + let mut tail = vec![0x16, 0x03, 0x03, 0x00, 0x10]; + tail.extend_from_slice(b"not-a-real-record-boundary"); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail_record = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2b2", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.221:56121".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_client_half_close_must_not_truncate_prepended_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xB3u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 402, 600, 0x23); + let tail = vec![0xAA; 3072]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail_record = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let expected_tail = expected_tail_record.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + + let mut one = [0u8; 1]; + let n = stream.read(&mut one).await.unwrap(); + assert_eq!(n, 0, "backend must observe EOF after client half-close"); + }); + + let harness = build_harness("b3b3b3b3b3b3b3b3b3b3b3b3b3b3b3b3", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.222:56122".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.shutdown().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_multi_session_no_cross_bleed_under_churn() { + let sessions = 16usize; + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut expected = std::collections::HashMap::new(); + let secret = [0xB4u8; 16]; + for idx in 0..sessions { + let hello = make_valid_tls_client_hello(&secret, 450 + idx as u32, 600, 0x40 + idx as u8); + let tail = vec![idx as u8; 17 + idx]; + expected.insert(hello, wrap_tls_application_data(&tail)); + } + + let accept_task = tokio::spawn(async move { + let mut remaining = expected; + for _ in 0..sessions { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; 605]; + stream.read_exact(&mut got_hello).await.unwrap(); + let expected_tail = remaining + .remove(&got_hello) + .expect("unexpected hello or duplicated session routing"); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + } + assert!(remaining.is_empty(), "all sessions must map one-to-one"); + }); + + let mut tasks = Vec::with_capacity(sessions); + for idx in 0..sessions { + let harness = build_harness("b4b4b4b4b4b4b4b4b4b4b4b4b4b4b4b4", backend_addr.port()); + let hello = make_valid_tls_client_hello(&secret, 450 + idx as u32, 600, 0x40 + idx as u8); + let tail = vec![idx as u8; 17 + idx]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let peer: SocketAddr = format!("198.51.100.223:{}", 56200 + idx as u16) + .parse() + .unwrap(); + + tasks.push(tokio::spawn(async move { + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + peer, + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + for chunk in coalesced_record.chunks((idx % 7) + 1) { + client_side.write_all(chunk).await.unwrap(); + } + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); + })); + } + + for task in tasks { + task.await.unwrap(); + } + + tokio::time::timeout(Duration::from_secs(6), accept_task) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_single_byte_tail_is_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC1u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 500, 600, 0x11); + let tail = vec![0x7F]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c1c1c1c1c1c1c1c1c1c1c1c1c1c1c1c1", backend_addr.port()); + let (server_side, mut client_side) = duplex(65536); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.230:56130".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_exact_tls_header_size_payload_is_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC2u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 501, 600, 0x12); + let tail = vec![0xAA, 0xBB, 0xCC, 0xDD, 0xEE]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c2c2c2c2c2c2c2c2c2c2c2c2c2c2c2c2", backend_addr.port()); + let (server_side, mut client_side) = duplex(65536); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.231:56131".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_all_zero_payload_is_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC3u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 502, 600, 0x13); + let tail = vec![0u8; 2048]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3c3", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.232:56132".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_following_control_records_are_not_mutated() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC4u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 503, 600, 0x14); + let tail = b"tail-before-controls".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let tail_record = wrap_tls_application_data(&tail); + let ccs = wrap_tls_record(0x14, &[0x01]); + let alert = wrap_tls_record(0x15, &[0x01, 0x00]); + let app = wrap_tls_application_data(b"control-final-app"); + let expected = [tail_record, ccs.clone(), alert.clone(), app.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_payload = expected.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_payload = vec![0u8; expected_payload.len()]; + stream.read_exact(&mut got_payload).await.unwrap(); + assert_eq!(got_payload, expected_payload); + }); + + let harness = build_harness("c4c4c4c4c4c4c4c4c4c4c4c4c4c4c4c4", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.233:56133".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.write_all(&ccs).await.unwrap(); + client_side.write_all(&alert).await.unwrap(); + client_side.write_all(&app).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_then_following_records_fragmented_chaos_stays_ordered() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC5u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 504, 600, 0x15); + let tail = vec![0xAC; 900]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let tail_record = wrap_tls_application_data(&tail); + let r1 = wrap_tls_application_data(b"r1"); + let r2 = wrap_tls_application_data(&vec![0xDD; 257]); + let expected = [tail_record, r1.clone(), r2.clone()].concat(); + + let expected_hello = client_hello.clone(); + let expected_payload = expected.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_payload = vec![0u8; expected_payload.len()]; + stream.read_exact(&mut got_payload).await.unwrap(); + assert_eq!(got_payload, expected_payload); + }); + + let harness = build_harness("c5c5c5c5c5c5c5c5c5c5c5c5c5c5c5c5", backend_addr.port()); + let (server_side, mut client_side) = duplex(262144); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.234:56134".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + let pattern = [3usize, 1, 5, 2, 7, 11, 13, 17, 19]; + let mut idx = 0usize; + for data in [&coalesced_record, &r1, &r2] { + let mut pos = 0usize; + while pos < data.len() { + let step = pattern[idx % pattern.len()]; + idx += 1; + let end = (pos + step).min(data.len()); + client_side.write_all(&data[pos..end]).await.unwrap(); + pos = end; + } + } + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_backend_response_integrity_after_fallback() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC6u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 505, 600, 0x16); + let tail = b"coalesced-request-body".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + let backend_response = b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n".to_vec(); + + let expected_hello = client_hello.clone(); + let expected_resp = backend_response.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + + stream.write_all(&expected_resp).await.unwrap(); + }); + + let harness = build_harness("c6c6c6c6c6c6c6c6c6c6c6c6c6c6c6c6", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.235:56135".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + let mut observed = Vec::new(); + let mut buf = [0u8; 512]; + let mut found = false; + for _ in 0..32 { + let n = tokio::time::timeout(Duration::from_millis(200), client_side.read(&mut buf)) + .await + .unwrap() + .unwrap(); + if n == 0 { + break; + } + observed.extend_from_slice(&buf[..n]); + if observed + .windows(backend_response.len()) + .any(|w| w == backend_response.as_slice()) + { + found = true; + break; + } + } + assert!( + found, + "backend plaintext response must be observable on client stream after fallback" + ); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_connects_bad_increments_exactly_once() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC7u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 506, 600, 0x17); + let tail = b"count-bad-once".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let harness = build_harness("c7c7c7c7c7c7c7c7c7c7c7c7c7c7c7c7", backend_addr.port()); + let stats = harness.stats.clone(); + let bad_before = stats.get_connects_bad(); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.236:56136".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); + + let bad_after = stats.get_connects_bad(); + assert_eq!( + bad_after, + bad_before + 1, + "invalid MTProto after valid TLS must increment connects_bad exactly once" + ); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_parallel_32_sessions_no_cross_bleed() { + let sessions = 32usize; + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let mut expected = std::collections::HashMap::new(); + let secret = [0xC8u8; 16]; + for idx in 0..sessions { + let hello = make_valid_tls_client_hello(&secret, 550 + idx as u32, 600, 0x20 + idx as u8); + let tail = vec![idx as u8; 48 + (idx % 11)]; + expected.insert(hello, wrap_tls_application_data(&tail)); + } + + let accept_task = tokio::spawn(async move { + let mut remaining = expected; + for _ in 0..sessions { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; 605]; + stream.read_exact(&mut got_hello).await.unwrap(); + let expected_tail = remaining + .remove(&got_hello) + .expect("session mixup detected in parallel-32 blackhat test"); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + } + assert!(remaining.is_empty(), "all expected sessions must be consumed"); + }); + + let mut tasks = Vec::with_capacity(sessions); + for idx in 0..sessions { + let harness = build_harness("c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8", backend_addr.port()); + let hello = make_valid_tls_client_hello(&secret, 550 + idx as u32, 600, 0x20 + idx as u8); + let tail = vec![idx as u8; 48 + (idx % 11)]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let peer: SocketAddr = format!("198.51.100.237:{}", 56300 + idx as u16) + .parse() + .unwrap(); + + tasks.push(tokio::spawn(async move { + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + peer, + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + + let chunk = (idx % 13) + 1; + for part in coalesced_record.chunks(chunk) { + client_side.write_all(part).await.unwrap(); + } + client_side.shutdown().await.unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); + })); + } + + for task in tasks { + task.await.unwrap(); + } + + tokio::time::timeout(Duration::from_secs(6), accept_task) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_repeated_tls_like_prefixes_are_preserved() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xC9u8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 507, 600, 0x18); + let mut tail = Vec::new(); + for _ in 0..64 { + tail.extend_from_slice(&[0x16, 0x03, 0x03, 0x00, 0x20]); + } + tail.extend_from_slice(b"suffix-data"); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("c9c9c9c9c9c9c9c9c9c9c9c9c9c9c9c9", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.238:56138".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + drop(client_side); + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_drop_after_write_still_delivers_prepended_record() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xCAu8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 508, 600, 0x19); + let tail = vec![0xBE; 1024]; + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + }); + + let harness = build_harness("cacacacacacacacacacacacacacacaca", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.239:56139".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + drop(client_side); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} + +#[tokio::test] +async fn blackhat_coalesced_tail_zero_following_record_after_coalesced_is_not_invented() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let backend_addr = listener.local_addr().unwrap(); + + let secret = [0xCBu8; 16]; + let client_hello = make_valid_tls_client_hello(&secret, 509, 600, 0x1A); + let tail = b"terminal-tail".to_vec(); + let coalesced_record = wrap_invalid_mtproto_with_coalesced_tail(&tail); + let expected_tail = wrap_tls_application_data(&tail); + + let expected_hello = client_hello.clone(); + let accept_task = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut got_hello = vec![0u8; expected_hello.len()]; + stream.read_exact(&mut got_hello).await.unwrap(); + assert_eq!(got_hello, expected_hello); + + let mut got_tail = vec![0u8; expected_tail.len()]; + stream.read_exact(&mut got_tail).await.unwrap(); + assert_eq!(got_tail, expected_tail); + + let mut one = [0u8; 1]; + let n = stream.read(&mut one).await.unwrap(); + assert_eq!(n, 0, "no synthetic extra record must appear"); + }); + + let harness = build_harness("cbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcb", backend_addr.port()); + let (server_side, mut client_side) = duplex(131072); + let handler = tokio::spawn(handle_client_stream( + server_side, + "198.51.100.240:56140".parse().unwrap(), + harness.config, + harness.stats, + harness.upstream_manager, + harness.replay_checker, + harness.buffer_pool, + harness.rng, + None, + harness.route_runtime, + None, + harness.ip_tracker, + harness.beobachten, + false, + )); + + client_side.write_all(&client_hello).await.unwrap(); + let mut head = [0u8; 5]; + client_side.read_exact(&mut head).await.unwrap(); + assert_eq!(head[0], 0x16); + read_and_discard_tls_record_body(&mut client_side, head).await; + client_side.write_all(&coalesced_record).await.unwrap(); + client_side.shutdown().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(3), accept_task) + .await + .unwrap() + .unwrap(); + + let _ = tokio::time::timeout(Duration::from_secs(3), handler) + .await + .unwrap() + .unwrap(); +} diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index fe28542..c87c350 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -250,6 +250,14 @@ impl FakeTlsReader { pub fn get_mut(&mut self) -> &mut R { &mut self.upstream } pub fn into_inner(self) -> R { self.upstream } + pub fn into_inner_with_pending_plaintext(mut self) -> (R, Vec) { + let pending = match std::mem::replace(&mut self.state, TlsReaderState::Idle) { + TlsReaderState::Yielding { buffer } => buffer.as_slice().to_vec(), + _ => Vec::new(), + }; + (self.upstream, pending) + } + pub fn is_poisoned(&self) -> bool { self.state.is_poisoned() } pub fn state_name(&self) -> &'static str { self.state.state_name() }