From c9a043d8d5cfc1c13e867b5da8d48a8df84b101b Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 21 Feb 2026 02:15:10 +0300 Subject: [PATCH 1/3] ME Frame too large Fixes Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 8 ++ src/config/types.rs | 11 +++ src/protocol/constants.rs | 17 +++- src/protocol/tls.rs | 10 +-- src/proxy/direct_relay.rs | 3 +- src/proxy/handshake.rs | 3 +- src/proxy/middle_relay.rs | 149 +++++++++++++++++++++++------------- src/stream/crypto_stream.rs | 38 +++++---- src/stream/frame_stream.rs | 6 +- src/stream/mod.rs | 2 +- src/stream/tls_stream.rs | 15 ++-- src/tls_front/cache.rs | 22 +++++- 12 files changed, 189 insertions(+), 95 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index f9d878a..b68c120 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -110,6 +110,14 @@ pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 { 30_000 } +pub(crate) fn default_crypto_pending_buffer() -> usize { + 256 * 1024 +} + +pub(crate) fn default_max_client_frame() -> usize { + 16 * 1024 * 1024 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/types.rs b/src/config/types.rs index 6228cb8..41b7a3f 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -172,6 +172,15 @@ pub struct GeneralConfig { #[serde(default = "default_true")] pub me_keepalive_payload_random: bool, + /// Max pending ciphertext buffer per client writer (bytes). + /// Controls FakeTLS backpressure vs throughput. + #[serde(default = "default_crypto_pending_buffer")] + pub crypto_pending_buffer: usize, + + /// Maximum allowed client MTProto frame size (bytes). + #[serde(default = "default_max_client_frame")] + pub max_client_frame: usize, + /// Enable staggered warmup of extra ME writers. #[serde(default = "default_true")] pub me_warmup_stagger_enabled: bool, @@ -251,6 +260,8 @@ impl Default for GeneralConfig { log_level: LogLevel::Normal, disable_colors: false, links: LinksConfig::default(), + crypto_pending_buffer: default_crypto_pending_buffer(), + max_client_frame: default_max_client_frame(), } } } diff --git a/src/protocol/constants.rs b/src/protocol/constants.rs index 86cd2bd..826f2b2 100644 --- a/src/protocol/constants.rs +++ b/src/protocol/constants.rs @@ -1,6 +1,8 @@ //! Protocol constants and datacenter addresses use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use crate::crypto::SecureRandom; use std::sync::LazyLock; // ============= Telegram Datacenters ============= @@ -151,7 +153,18 @@ pub const TLS_RECORD_ALERT: u8 = 0x15; /// Maximum TLS record size pub const MAX_TLS_RECORD_SIZE: usize = 16384; /// Maximum TLS chunk size (with overhead) -pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 24; +/// RFC 8446 §5.2 allows up to 16384 + 256 bytes of ciphertext +pub const MAX_TLS_CHUNK_SIZE: usize = 16384 + 256; + +/// Generate padding length for Secure Intermediate protocol. +/// Total (data + padding) must not be divisible by 4 per MTProto spec. +pub fn secure_padding_len(data_len: usize, rng: &SecureRandom) -> usize { + if data_len % 4 == 0 { + (rng.range(3) + 1) as usize // 1-3 + } else { + rng.range(4) as usize // 0-3 + } +} // ============= Timeouts ============= @@ -319,4 +332,4 @@ mod tests { assert_eq!(TG_DATACENTERS_V4.len(), 5); assert_eq!(TG_DATACENTERS_V6.len(), 5); } -} \ No newline at end of file +} diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index d7afdee..5f890a5 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -376,13 +376,9 @@ pub fn build_server_hello( app_data_record.push(TLS_RECORD_APPLICATION); app_data_record.extend_from_slice(&TLS_VERSION); app_data_record.extend_from_slice(&(fake_cert_len as u16).to_be_bytes()); - if fake_cert_len > 17 { - app_data_record.extend_from_slice(&fake_cert[..fake_cert_len - 17]); - app_data_record.push(0x16); // inner content type marker - app_data_record.extend_from_slice(&rng.bytes(16)); // AEAD-like tag mimic - } else { - app_data_record.extend_from_slice(&fake_cert); - } + // Fill ApplicationData with fully random bytes of desired length to avoid + // deterministic DPI fingerprints (fixed inner content type markers). + app_data_record.extend_from_slice(&fake_cert); // Combine all records let mut response = Vec::with_capacity( diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 7b1ac1b..630937b 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -178,8 +178,9 @@ async fn do_tg_handshake_static( let (read_half, write_half) = stream.into_split(); + let max_pending = config.general.crypto_pending_buffer; Ok(( CryptoReader::new(read_half, tg_decryptor), - CryptoWriter::new(write_half, tg_encryptor), + CryptoWriter::new(write_half, tg_encryptor, max_pending), )) } diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 8b61112..685d999 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -264,9 +264,10 @@ where "MTProto handshake successful" ); + let max_pending = config.general.crypto_pending_buffer; return HandshakeResult::Success(( CryptoReader::new(reader, decryptor), - CryptoWriter::new(writer, encryptor), + CryptoWriter::new(writer, encryptor, max_pending), success, )); } diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 09dd532..2a17e7e 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -2,12 +2,13 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tracing::{debug, info, trace}; +use tokio::sync::oneshot; +use tracing::{debug, info, trace, warn}; use crate::config::ProxyConfig; use crate::crypto::SecureRandom; use crate::error::{ProxyError, Result}; -use crate::protocol::constants::*; +use crate::protocol::constants::{*, secure_padding_len}; use crate::proxy::handshake::HandshakeSuccess; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; @@ -15,11 +16,11 @@ use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; pub(crate) async fn handle_via_middle_proxy( mut crypto_reader: CryptoReader, - mut crypto_writer: CryptoWriter, + crypto_writer: CryptoWriter, success: HandshakeSuccess, me_pool: Arc, stats: Arc, - _config: Arc, + config: Arc, _buffer_pool: Arc, local_addr: SocketAddr, rng: Arc, @@ -41,7 +42,7 @@ where "Routing via Middle-End" ); - let (conn_id, mut me_rx) = me_pool.registry().register().await; + let (conn_id, me_rx) = me_pool.registry().register().await; stats.increment_user_connects(&user); stats.increment_user_curr_connects(&user); @@ -56,59 +57,90 @@ where let translated_local_addr = me_pool.translate_our_addr(local_addr); - let result: Result<()> = loop { - tokio::select! { - client_frame = read_client_payload(&mut crypto_reader, proto_tag) => { - match client_frame { - Ok(Some((payload, quickack))) => { - trace!(conn_id, bytes = payload.len(), "C->ME frame"); - stats.add_user_octets_from(&user, payload.len() as u64); - let mut flags = proto_flags; - if quickack { - flags |= RPC_FLAG_QUICKACK; + let frame_limit = config.general.max_client_frame; + + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + let mut me_rx_task = me_rx; + let stats_clone = stats.clone(); + let rng_clone = rng.clone(); + let user_clone = user.clone(); + let me_writer = tokio::spawn(async move { + let mut writer = crypto_writer; + loop { + tokio::select! { + msg = me_rx_task.recv() => { + match msg { + Some(MeResponse::Data { flags, data }) => { + trace!(conn_id, bytes = data.len(), flags, "ME->C data"); + stats_clone.add_user_octets_to(&user_clone, data.len() as u64); + write_client_payload(&mut writer, proto_tag, flags, &data, rng_clone.as_ref()).await?; } - if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) { - flags |= RPC_FLAG_NOT_ENCRYPTED; + Some(MeResponse::Ack(confirm)) => { + trace!(conn_id, confirm, "ME->C quickack"); + write_client_ack(&mut writer, proto_tag, confirm).await?; + } + Some(MeResponse::Close) => { + debug!(conn_id, "ME sent close"); + return Ok(()); + } + None => { + debug!(conn_id, "ME channel closed"); + return Err(ProxyError::Proxy("ME connection lost".into())); } - me_pool.send_proxy_req( - conn_id, - success.dc_idx, - peer, - translated_local_addr, - &payload, - flags, - ).await?; } - Ok(None) => { - debug!(conn_id, "Client EOF"); - let _ = me_pool.send_close(conn_id).await; - break Ok(()); - } - Err(e) => break Err(e), } - } - me_msg = me_rx.recv() => { - match me_msg { - Some(MeResponse::Data { flags, data }) => { - trace!(conn_id, bytes = data.len(), flags, "ME->C data"); - stats.add_user_octets_to(&user, data.len() as u64); - write_client_payload(&mut crypto_writer, proto_tag, flags, &data, rng.as_ref()).await?; - } - Some(MeResponse::Ack(confirm)) => { - trace!(conn_id, confirm, "ME->C quickack"); - write_client_ack(&mut crypto_writer, proto_tag, confirm).await?; - } - Some(MeResponse::Close) => { - debug!(conn_id, "ME sent close"); - break Ok(()); - } - None => { - debug!(conn_id, "ME channel closed"); - break Err(ProxyError::Proxy("ME connection lost".into())); - } + _ = &mut stop_rx => { + debug!(conn_id, "ME writer stop signal"); + return Ok(()); } } } + }); + + let mut main_result: Result<()> = Ok(()); + loop { + match read_client_payload(&mut crypto_reader, proto_tag, frame_limit, &user).await { + Ok(Some((payload, quickack))) => { + trace!(conn_id, bytes = payload.len(), "C->ME frame"); + stats.add_user_octets_from(&user, payload.len() as u64); + let mut flags = proto_flags; + if quickack { + flags |= RPC_FLAG_QUICKACK; + } + if payload.len() >= 8 && payload[..8].iter().all(|b| *b == 0) { + flags |= RPC_FLAG_NOT_ENCRYPTED; + } + if let Err(e) = me_pool.send_proxy_req( + conn_id, + success.dc_idx, + peer, + translated_local_addr, + &payload, + flags, + ).await { + main_result = Err(e); + break; + } + } + Ok(None) => { + debug!(conn_id, "Client EOF"); + let _ = me_pool.send_close(conn_id).await; + break; + } + Err(e) => { + main_result = Err(e); + break; + } + } + } + + let _ = stop_tx.send(()); + let writer_result = me_writer.await.unwrap_or_else(|e| Err(ProxyError::Proxy(format!("ME writer join error: {e}")))); + + let result = match (main_result, writer_result) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), }; debug!(user = %user, conn_id, "ME relay cleanup"); @@ -120,6 +152,8 @@ where async fn read_client_payload( client_reader: &mut CryptoReader, proto_tag: ProtoTag, + max_frame: usize, + user: &str, ) -> Result, bool)>> where R: AsyncRead + Unpin + Send + 'static, @@ -162,8 +196,15 @@ where } }; - if len > 16 * 1024 * 1024 { - return Err(ProxyError::Proxy(format!("Frame too large: {len}"))); + if len > max_frame { + warn!( + user = %user, + raw_len = len, + raw_len_hex = format_args!("0x{:08x}", len), + proto = ?proto_tag, + "Frame too large — possible crypto desync or TLS record error" + ); + return Err(ProxyError::Proxy(format!("Frame too large: {len} (max {max_frame})"))); } let mut payload = vec![0u8; len]; @@ -237,7 +278,7 @@ where } ProtoTag::Intermediate | ProtoTag::Secure => { let padding_len = if proto_tag == ProtoTag::Secure { - (rng.bytes(1)[0] % 4) as usize + secure_padding_len(data.len(), rng) } else { 0 }; diff --git a/src/stream/crypto_stream.rs b/src/stream/crypto_stream.rs index 4705fe6..ebb6f43 100644 --- a/src/stream/crypto_stream.rs +++ b/src/stream/crypto_stream.rs @@ -34,7 +34,7 @@ //! └────────────────────────────────────────┘ //! //! Backpressure -//! - pending ciphertext buffer is bounded (MAX_PENDING_WRITE) +//! - pending ciphertext buffer is bounded (configurable per connection) //! - pending is full and upstream is pending //! -> poll_write returns Poll::Pending //! -> do not accept any plaintext @@ -62,10 +62,9 @@ use super::state::{StreamState, YieldBuffer}; // ============= Constants ============= -/// Maximum size for pending ciphertext buffer (bounded backpressure). -/// Reduced to 64KB to prevent bufferbloat on mobile networks. -/// 512KB was causing high latency on 3G/LTE connections. -const MAX_PENDING_WRITE: usize = 64 * 1024; +/// Default size for pending ciphertext buffer (bounded backpressure). +/// Actual limit is supplied at runtime from configuration. +const DEFAULT_MAX_PENDING_WRITE: usize = 64 * 1024; /// Default read buffer capacity (reader mostly decrypts in-place into caller buffer). const DEFAULT_READ_CAPACITY: usize = 16 * 1024; @@ -427,15 +426,22 @@ pub struct CryptoWriter { encryptor: AesCtr, state: CryptoWriterState, scratch: BytesMut, + max_pending_write: usize, } impl CryptoWriter { - pub fn new(upstream: W, encryptor: AesCtr) -> Self { + pub fn new(upstream: W, encryptor: AesCtr, max_pending_write: usize) -> Self { + let max_pending = if max_pending_write == 0 { + DEFAULT_MAX_PENDING_WRITE + } else { + max_pending_write + }; Self { upstream, encryptor, state: CryptoWriterState::Idle, scratch: BytesMut::with_capacity(16 * 1024), + max_pending_write: max_pending.max(4 * 1024), } } @@ -484,10 +490,10 @@ impl CryptoWriter { } /// Ensure we are in Flushing state and return mutable pending buffer. - fn ensure_pending<'a>(state: &'a mut CryptoWriterState) -> &'a mut PendingCiphertext { + fn ensure_pending<'a>(state: &'a mut CryptoWriterState, max_pending: usize) -> &'a mut PendingCiphertext { if matches!(state, CryptoWriterState::Idle) { *state = CryptoWriterState::Flushing { - pending: PendingCiphertext::new(MAX_PENDING_WRITE), + pending: PendingCiphertext::new(max_pending), }; } @@ -498,14 +504,14 @@ impl CryptoWriter { } /// Select how many plaintext bytes can be accepted in buffering path - fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize) -> usize { + fn select_to_accept_for_buffering(state: &CryptoWriterState, buf_len: usize, max_pending: usize) -> usize { if buf_len == 0 { return 0; } match state { CryptoWriterState::Flushing { pending } => buf_len.min(pending.remaining_capacity()), - CryptoWriterState::Idle => buf_len.min(MAX_PENDING_WRITE), + CryptoWriterState::Idle => buf_len.min(max_pending), CryptoWriterState::Poisoned { .. } => 0, } } @@ -603,7 +609,7 @@ impl AsyncWrite for CryptoWriter { Poll::Pending => { // Upstream blocked. Apply ideal backpressure let to_accept = - Self::select_to_accept_for_buffering(&this.state, buf.len()); + Self::select_to_accept_for_buffering(&this.state, buf.len(), this.max_pending_write); if to_accept == 0 { trace!( @@ -618,7 +624,7 @@ impl AsyncWrite for CryptoWriter { // Disjoint borrows let encryptor = &mut this.encryptor; - let pending = Self::ensure_pending(&mut this.state); + let pending = Self::ensure_pending(&mut this.state, this.max_pending_write); if let Err(e) = pending.push_encrypted(encryptor, plaintext) { if e.kind() == ErrorKind::WouldBlock { @@ -635,7 +641,7 @@ impl AsyncWrite for CryptoWriter { // 2) Fast path: pending empty -> write-through debug_assert!(matches!(this.state, CryptoWriterState::Idle)); - let to_accept = buf.len().min(MAX_PENDING_WRITE); + let to_accept = buf.len().min(this.max_pending_write); let plaintext = &buf[..to_accept]; Self::encrypt_into_scratch(&mut this.encryptor, &mut this.scratch, plaintext); @@ -645,7 +651,7 @@ impl AsyncWrite for CryptoWriter { // Upstream blocked: buffer FULL ciphertext for accepted bytes. let ciphertext = std::mem::take(&mut this.scratch); - let pending = Self::ensure_pending(&mut this.state); + let pending = Self::ensure_pending(&mut this.state, this.max_pending_write); pending.replace_with(ciphertext); Poll::Ready(Ok(to_accept)) @@ -672,7 +678,7 @@ impl AsyncWrite for CryptoWriter { let remainder = this.scratch.split_off(n); this.scratch.clear(); - let pending = Self::ensure_pending(&mut this.state); + let pending = Self::ensure_pending(&mut this.state, this.max_pending_write); pending.replace_with(remainder); Poll::Ready(Ok(to_accept)) @@ -767,4 +773,4 @@ impl AsyncWrite for PassthroughStream { fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_shutdown(cx) } -} \ No newline at end of file +} diff --git a/src/stream/frame_stream.rs b/src/stream/frame_stream.rs index fd8c1b4..1ea6d1b 100644 --- a/src/stream/frame_stream.rs +++ b/src/stream/frame_stream.rs @@ -267,8 +267,8 @@ impl SecureIntermediateFrameWriter { return Ok(()); } - // Add random padding (0-3 bytes) - let padding_len = self.rng.range(4); + // Add padding so total length is never divisible by 4 (MTProto Secure) + let padding_len = secure_padding_len(data.len(), &self.rng); let padding = self.rng.bytes(padding_len); let total_len = data.len() + padding_len; @@ -585,4 +585,4 @@ mod tests { let (received, _) = reader.read_frame().await.unwrap(); assert_eq!(&received[..], &data[..]); } -} \ No newline at end of file +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a86b56f..ea30e5e 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -40,4 +40,4 @@ pub use frame_stream::{ SecureIntermediateFrameReader, SecureIntermediateFrameWriter, MtprotoFrameReader, MtprotoFrameWriter, FrameReaderKind, FrameWriterKind, -}; \ No newline at end of file +}; diff --git a/src/stream/tls_stream.rs b/src/stream/tls_stream.rs index 6a3c1d6..edf970d 100644 --- a/src/stream/tls_stream.rs +++ b/src/stream/tls_stream.rs @@ -25,7 +25,8 @@ //! - However, the on-the-wire record length can exceed 16384 because TLS 1.3 //! uses AEAD and can include tag/overhead/padding. //! - Telegram FakeTLS clients (notably iOS) may send Application Data records -//! with length up to 16384 + 24 bytes. We accept that as MAX_TLS_CHUNK_SIZE. +//! with length up to 16384 + 256 bytes (RFC 8446 §5.2). We accept that as +//! MAX_TLS_CHUNK_SIZE. //! //! If you reject those (e.g. validate length <= 16384), you will see errors like: //! "TLS record too large: 16408 bytes" @@ -52,9 +53,8 @@ use super::state::{StreamState, HeaderBuffer, YieldBuffer, WriteBuffer}; const TLS_HEADER_SIZE: usize = 5; /// Maximum TLS fragment size we emit for Application Data. -/// Real TLS 1.3 ciphertexts often add ~16-24 bytes AEAD overhead, so to mimic -/// on-the-wire record sizes we allow up to 16384 + 24 bytes of plaintext. -const MAX_TLS_PAYLOAD: usize = 16384 + 24; +/// Real TLS 1.3 allows up to 16384 + 256 bytes of ciphertext (incl. tag). +const MAX_TLS_PAYLOAD: usize = 16384 + 256; /// Maximum pending write buffer for one record remainder. /// Note: we never queue unlimited amount of data here; state holds at most one record. @@ -91,7 +91,7 @@ impl TlsRecordHeader { /// - We accept TLS 1.0 header version for ClientHello-like records (0x03 0x01), /// and TLS 1.2/1.3 style version bytes for the rest (we use TLS_VERSION = 0x03 0x03). /// - For Application Data, Telegram FakeTLS may send payload length up to - /// MAX_TLS_CHUNK_SIZE (16384 + 24). + /// MAX_TLS_CHUNK_SIZE (16384 + 256). /// - For other record types we keep stricter bounds to avoid memory abuse. fn validate(&self) -> Result<()> { // Version: accept TLS 1.0 header (ClientHello quirk) and TLS_VERSION (0x0303). @@ -105,7 +105,7 @@ impl TlsRecordHeader { let len = self.length as usize; // Length checks depend on record type. - // Telegram FakeTLS: ApplicationData length may be 16384 + 24. + // Telegram FakeTLS: ApplicationData length may be 16384 + 256. match self.record_type { TLS_RECORD_APPLICATION => { if len > MAX_TLS_CHUNK_SIZE { @@ -755,9 +755,6 @@ impl AsyncWrite for FakeTlsWriter { payload_size: chunk_size, }; - // Wake to retry flushing soon. - cx.waker().wake_by_ref(); - Poll::Ready(Ok(chunk_size)) } } diff --git a/src/tls_front/cache.rs b/src/tls_front/cache.rs index dccdf2a..103c2b1 100644 --- a/src/tls_front/cache.rs +++ b/src/tls_front/cache.rs @@ -72,7 +72,27 @@ impl TlsFrontCache { continue; } if let Ok(data) = tokio::fs::read(entry.path()).await { - if let Ok(cached) = serde_json::from_slice::(&data) { + if let Ok(mut cached) = serde_json::from_slice::(&data) { + if cached.domain.is_empty() + || cached.domain.len() > 255 + || !cached.domain.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-') + { + warn!(file = %name, "Skipping TLS cache entry with invalid domain"); + continue; + } + // fetched_at is skipped during deserialization; approximate with file mtime if available. + if let Ok(meta) = entry.metadata().await { + if let Ok(modified) = meta.modified() { + cached.fetched_at = modified; + } + } + // Drop entries older than 72h + if let Ok(age) = cached.fetched_at.elapsed() { + if age > Duration::from_secs(72 * 3600) { + warn!(domain = %cached.domain, "Skipping stale TLS cache entry (>72h)"); + continue; + } + } let domain = cached.domain.clone(); self.set(&domain, cached).await; loaded += 1; From 83fc9d6db3daf27def14a7e6acc9902c6377c0c0 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 21 Feb 2026 03:36:13 +0300 Subject: [PATCH 2/3] Middle-End Fixes Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 61 +++++++++++ src/config/types.rs | 75 +++++++++++++ src/main.rs | 3 + src/protocol/tls.rs | 164 +++++++++++++++++------------ src/proxy/handshake.rs | 36 +++++++ src/tls_front/emulator.rs | 28 ++++- src/transport/middle_proxy/pool.rs | 6 +- 7 files changed, 300 insertions(+), 73 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index b68c120..2dee3e0 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -118,6 +118,67 @@ pub(crate) fn default_max_client_frame() -> usize { 16 * 1024 * 1024 } +pub(crate) fn default_tls_new_session_tickets() -> u8 { + 0 +} + +pub(crate) fn default_server_hello_delay_min_ms() -> u64 { + 0 +} + +pub(crate) fn default_server_hello_delay_max_ms() -> u64 { + 0 +} + +pub(crate) fn default_alpn_enforce() -> bool { + true +} + +pub(crate) fn default_stun_servers() -> Vec { + vec![ + "stun.l.google.com:19302".to_string(), + "stun1.l.google.com:19302".to_string(), + "stun2.l.google.com:19302".to_string(), + "stun.stunprotocol.org:3478".to_string(), + "stun.voip.eutelia.it:3478".to_string(), + ] +} + +pub(crate) fn default_http_ip_detect_urls() -> Vec { + vec![ + "https://ifconfig.me/ip".to_string(), + "https://api.ipify.org".to_string(), + ] +} + +pub(crate) fn default_cache_public_ip_path() -> String { + "cache/public_ip.txt".to_string() +} + +pub(crate) fn default_proxy_secret_reload_secs() -> u64 { + 12 * 60 * 60 +} + +pub(crate) fn default_proxy_config_reload_secs() -> u64 { + 12 * 60 * 60 +} + +pub(crate) fn default_ntp_check() -> bool { + true +} + +pub(crate) fn default_ntp_servers() -> Vec { + vec!["pool.ntp.org".to_string()] +} + +pub(crate) fn default_fast_mode_min_tls_record() -> usize { + 0 +} + +pub(crate) fn default_degradation_min_unavailable_dc_groups() -> u8 { + 2 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/types.rs b/src/config/types.rs index 41b7a3f..6c54598 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -96,6 +96,22 @@ pub struct NetworkConfig { #[serde(default)] pub multipath: bool, + + /// STUN servers list for public IP discovery. + #[serde(default = "default_stun_servers")] + pub stun_servers: Vec, + + /// Enable TCP STUN fallback when UDP is blocked. + #[serde(default)] + pub stun_tcp_fallback: bool, + + /// HTTP-based public IP detection endpoints (fallback after STUN). + #[serde(default = "default_http_ip_detect_urls")] + pub http_ip_detect_urls: Vec, + + /// Cache file path for detected public IP. + #[serde(default = "default_cache_public_ip_path")] + pub cache_public_ip_path: String, } impl Default for NetworkConfig { @@ -105,6 +121,10 @@ impl Default for NetworkConfig { ipv6: None, prefer: 4, multipath: false, + stun_servers: default_stun_servers(), + stun_tcp_fallback: true, + http_ip_detect_urls: default_http_ip_detect_urls(), + cache_public_ip_path: default_cache_public_ip_path(), } } } @@ -227,6 +247,34 @@ pub struct GeneralConfig { /// [general.links] — proxy link generation overrides. #[serde(default)] pub links: LinksConfig, + + /// Minimum TLS record size when fast_mode coalescing is enabled (0 = disabled). + #[serde(default = "default_fast_mode_min_tls_record")] + pub fast_mode_min_tls_record: usize, + + /// Automatically reload proxy-secret every N seconds. + #[serde(default = "default_proxy_secret_reload_secs")] + pub proxy_secret_auto_reload_secs: u64, + + /// Automatically reload proxy-multi.conf every N seconds. + #[serde(default = "default_proxy_config_reload_secs")] + pub proxy_config_auto_reload_secs: u64, + + /// Enable NTP drift check at startup. + #[serde(default = "default_ntp_check")] + pub ntp_check: bool, + + /// NTP servers for drift check. + #[serde(default = "default_ntp_servers")] + pub ntp_servers: Vec, + + /// Enable auto-degradation from ME to Direct-DC. + #[serde(default = "default_true")] + pub auto_degradation_enabled: bool, + + /// Minimum unavailable ME DC groups before degrading. + #[serde(default = "default_degradation_min_unavailable_dc_groups")] + pub degradation_min_unavailable_dc_groups: u8, } impl Default for GeneralConfig { @@ -262,6 +310,13 @@ impl Default for GeneralConfig { links: LinksConfig::default(), crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), + fast_mode_min_tls_record: default_fast_mode_min_tls_record(), + proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), + proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), + ntp_check: default_ntp_check(), + ntp_servers: default_ntp_servers(), + auto_degradation_enabled: true, + degradation_min_unavailable_dc_groups: default_degradation_min_unavailable_dc_groups(), } } } @@ -406,6 +461,22 @@ pub struct AntiCensorshipConfig { /// Directory to store TLS front cache (on disk). #[serde(default = "default_tls_front_dir")] pub tls_front_dir: String, + + /// Minimum server_hello delay in milliseconds (anti-fingerprint). + #[serde(default = "default_server_hello_delay_min_ms")] + pub server_hello_delay_min_ms: u64, + + /// Maximum server_hello delay in milliseconds. + #[serde(default = "default_server_hello_delay_max_ms")] + pub server_hello_delay_max_ms: u64, + + /// Number of NewSessionTicket messages to emit post-handshake. + #[serde(default = "default_tls_new_session_tickets")] + pub tls_new_session_tickets: u8, + + /// Enforce ALPN echo of client preference. + #[serde(default = "default_alpn_enforce")] + pub alpn_enforce: bool, } impl Default for AntiCensorshipConfig { @@ -420,6 +491,10 @@ impl Default for AntiCensorshipConfig { fake_cert_len: default_fake_cert_len(), tls_emulation: false, tls_front_dir: default_tls_front_dir(), + server_hello_delay_min_ms: default_server_hello_delay_min_ms(), + server_hello_delay_max_ms: default_server_hello_delay_max_ms(), + tls_new_session_tickets: default_tls_new_session_tickets(), + alpn_enforce: default_alpn_enforce(), } } } diff --git a/src/main.rs b/src/main.rs index 8446403..0e635de 100644 --- a/src/main.rs +++ b/src/main.rs @@ -213,6 +213,9 @@ async fn main() -> std::result::Result<(), Box> { "Modes: classic={} secure={} tls={}", config.general.modes.classic, config.general.modes.secure, config.general.modes.tls ); + if config.general.modes.classic { + warn!("Classic mode is vulnerable to DPI detection; enable only for legacy clients"); + } info!("TLS domain: {}", config.censorship.tls_domain); if let Some(ref sock) = config.censorship.mask_unix_sock { info!("Mask: {} -> unix:{}", config.censorship.mask, sock); diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index 5f890a5..a0d1d46 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -32,6 +32,7 @@ pub const TIME_SKEW_MAX: i64 = 10 * 60; // 10 minutes after mod extension_type { pub const KEY_SHARE: u16 = 0x0033; pub const SUPPORTED_VERSIONS: u16 = 0x002b; + pub const ALPN: u16 = 0x0010; } /// TLS Cipher Suites @@ -62,6 +63,7 @@ pub struct TlsValidation { // ============= TLS Extension Builder ============= /// Builder for TLS extensions with correct length calculation +#[derive(Clone)] struct TlsExtensionBuilder { extensions: Vec, } @@ -108,6 +110,27 @@ impl TlsExtensionBuilder { self } + + /// Add ALPN extension with a single selected protocol. + fn add_alpn(&mut self, proto: &[u8]) -> &mut Self { + // Extension type: ALPN (0x0010) + self.extensions.extend_from_slice(&extension_type::ALPN.to_be_bytes()); + + // ALPN extension format: + // extension_data length (2 bytes) + // protocols length (2 bytes) + // protocol name length (1 byte) + // protocol name bytes + let proto_len = proto.len() as u8; + let list_len: u16 = 1 + proto_len as u16; + let ext_len: u16 = 2 + list_len; + + self.extensions.extend_from_slice(&ext_len.to_be_bytes()); + self.extensions.extend_from_slice(&list_len.to_be_bytes()); + self.extensions.push(proto_len); + self.extensions.extend_from_slice(proto); + self + } /// Build final extensions with length prefix fn build(self) -> Vec { @@ -144,6 +167,8 @@ struct ServerHelloBuilder { compression: u8, /// Extensions extensions: TlsExtensionBuilder, + /// Selected ALPN protocol (if any) + alpn: Option>, } impl ServerHelloBuilder { @@ -154,6 +179,7 @@ impl ServerHelloBuilder { cipher_suite: cipher_suite::TLS_AES_128_GCM_SHA256, compression: 0x00, extensions: TlsExtensionBuilder::new(), + alpn: None, } } @@ -167,10 +193,19 @@ impl ServerHelloBuilder { self.extensions.add_supported_versions(0x0304); self } + + fn with_alpn(mut self, proto: Option>) -> Self { + self.alpn = proto; + self + } /// Build ServerHello message (without record header) fn build_message(&self) -> Vec { - let extensions = self.extensions.extensions.clone(); + let mut ext_builder = self.extensions.clone(); + if let Some(ref alpn) = self.alpn { + ext_builder.add_alpn(alpn); + } + let extensions = ext_builder.extensions.clone(); let extensions_len = extensions.len() as u16; // Calculate total length @@ -350,6 +385,8 @@ pub fn build_server_hello( session_id: &[u8], fake_cert_len: usize, rng: &SecureRandom, + alpn: Option>, + new_session_tickets: u8, ) -> Vec { const MIN_APP_DATA: usize = 64; const MAX_APP_DATA: usize = 16640; // RFC 8446 §5.2 upper bound @@ -360,6 +397,7 @@ pub fn build_server_hello( let server_hello = ServerHelloBuilder::new(session_id.to_vec()) .with_x25519_key(&x25519_key) .with_tls13_version() + .with_alpn(alpn) .build_record(); // Build Change Cipher Spec record @@ -380,13 +418,31 @@ pub fn build_server_hello( // deterministic DPI fingerprints (fixed inner content type markers). app_data_record.extend_from_slice(&fake_cert); + // Build optional NewSessionTicket records (TLS 1.3 handshake messages are encrypted; + // here we mimic with opaque ApplicationData records of plausible size). + let mut tickets = Vec::new(); + if new_session_tickets > 0 { + for _ in 0..new_session_tickets { + let ticket_len: usize = rng.range(48) + 48; // 48-95 bytes + let mut record = Vec::with_capacity(5 + ticket_len); + record.push(TLS_RECORD_APPLICATION); + record.extend_from_slice(&TLS_VERSION); + record.extend_from_slice(&(ticket_len as u16).to_be_bytes()); + record.extend_from_slice(&rng.bytes(ticket_len)); + tickets.push(record); + } + } + // Combine all records let mut response = Vec::with_capacity( - server_hello.len() + change_cipher_spec.len() + app_data_record.len() + server_hello.len() + change_cipher_spec.len() + app_data_record.len() + tickets.iter().map(|r| r.len()).sum::() ); response.extend_from_slice(&server_hello); response.extend_from_slice(&change_cipher_spec); response.extend_from_slice(&app_data_record); + for t in &tickets { + response.extend_from_slice(t); + } // Compute HMAC for the response let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len()); @@ -480,85 +536,53 @@ pub fn extract_sni_from_client_hello(handshake: &[u8]) -> Option { None } -/// Extract ALPN protocol list from TLS ClientHello. -pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Option> { - if handshake.len() < 43 || handshake[0] != TLS_RECORD_HANDSHAKE { - return None; - } - +/// Extract ALPN protocol list from ClientHello, return in offered order. +pub fn extract_alpn_from_client_hello(handshake: &[u8]) -> Vec> { let mut pos = 5; // after record header - if handshake.get(pos).copied()? != 0x01 { - return None; // not ClientHello + if handshake.get(pos) != Some(&0x01) { + return Vec::new(); } - - // Handshake length bytes - pos += 4; // type + len (3) - - // version (2) + random (32) - pos += 2 + 32; - if pos + 1 > handshake.len() { - return None; - } - - let session_id_len = *handshake.get(pos)? as usize; + pos += 4; // type + len + pos += 2 + 32; // version + random + if pos >= handshake.len() { return Vec::new(); } + let session_id_len = *handshake.get(pos).unwrap_or(&0) as usize; pos += 1 + session_id_len; - if pos + 2 > handshake.len() { - return None; - } - - let cipher_suites_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize; - pos += 2 + cipher_suites_len; - if pos + 1 > handshake.len() { - return None; - } - - let comp_len = *handshake.get(pos)? as usize; + if pos + 2 > handshake.len() { return Vec::new(); } + let cipher_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize; + pos += 2 + cipher_len; + if pos >= handshake.len() { return Vec::new(); } + let comp_len = *handshake.get(pos).unwrap_or(&0) as usize; pos += 1 + comp_len; - if pos + 2 > handshake.len() { - return None; - } - - let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize; + if pos + 2 > handshake.len() { return Vec::new(); } + let ext_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize; pos += 2; let ext_end = pos + ext_len; - if ext_end > handshake.len() { - return None; - } - + if ext_end > handshake.len() { return Vec::new(); } + let mut out = Vec::new(); while pos + 4 <= ext_end { - let etype = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]); - let elen = u16::from_be_bytes([handshake[pos + 2], handshake[pos + 3]]) as usize; + let etype = u16::from_be_bytes([handshake[pos], handshake[pos+1]]); + let elen = u16::from_be_bytes([handshake[pos+2], handshake[pos+3]]) as usize; pos += 4; - if pos + elen > ext_end { + if pos + elen > ext_end { break; } + if etype == extension_type::ALPN && elen >= 3 { + let list_len = u16::from_be_bytes([handshake[pos], handshake[pos+1]]) as usize; + let mut lp = pos + 2; + let list_end = (pos + 2).saturating_add(list_len).min(pos + elen); + while lp + 1 <= list_end { + let plen = handshake[lp] as usize; + lp += 1; + if lp + plen > list_end { break; } + out.push(handshake[lp..lp+plen].to_vec()); + lp += plen; + } break; } - - if etype == 0x0010 && elen >= 3 { - // ALPN - let list_len = u16::from_be_bytes([handshake[pos], handshake[pos + 1]]) as usize; - let mut alpn_pos = pos + 2; - let list_end = std::cmp::min(alpn_pos + list_len, pos + elen); - let mut protocols = Vec::new(); - while alpn_pos < list_end { - let proto_len = *handshake.get(alpn_pos)? as usize; - alpn_pos += 1; - if alpn_pos + proto_len > list_end { - break; - } - if let Ok(p) = std::str::from_utf8(&handshake[alpn_pos..alpn_pos + proto_len]) { - protocols.push(p.to_string()); - } - alpn_pos += proto_len; - } - return Some(protocols); - } - pos += elen; } - - None + out } + /// Check if bytes look like a TLS ClientHello pub fn is_tls_handshake(first_bytes: &[u8]) -> bool { if first_bytes.len() < 3 { @@ -737,7 +761,7 @@ mod tests { let session_id = vec![0xAA; 32]; let rng = SecureRandom::new(); - let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng); + let response = build_server_hello(secret, &client_digest, &session_id, 2048, &rng, None, 0); // Should have at least 3 records assert!(response.len() > 100); @@ -770,8 +794,8 @@ mod tests { let session_id = vec![0xAA; 32]; let rng = SecureRandom::new(); - let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng); - let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng); + let response1 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0); + let response2 = build_server_hello(secret, &client_digest, &session_id, 1024, &rng, None, 0); // Digest position should have non-zero data let digest1 = &response1[TLS_DIGEST_POS..TLS_DIGEST_POS + TLS_DIGEST_LEN]; diff --git a/src/proxy/handshake.rs b/src/proxy/handshake.rs index 685d999..8d48c8b 100644 --- a/src/proxy/handshake.rs +++ b/src/proxy/handshake.rs @@ -7,6 +7,7 @@ use tracing::{debug, warn, trace, info}; use zeroize::Zeroize; use crate::crypto::{sha256, AesCtr, SecureRandom}; +use rand::Rng; use crate::protocol::constants::*; use crate::protocol::tls; use crate::stream::{FakeTlsReader, FakeTlsWriter, CryptoReader, CryptoWriter}; @@ -119,6 +120,23 @@ where None }; + let alpn_list = if config.censorship.alpn_enforce { + tls::extract_alpn_from_client_hello(handshake) + } else { + Vec::new() + }; + let selected_alpn = if config.censorship.alpn_enforce { + if alpn_list.iter().any(|p| p == b"h2") { + Some(b"h2".to_vec()) + } else if alpn_list.iter().any(|p| p == b"http/1.1") { + Some(b"http/1.1".to_vec()) + } else { + None + } + } else { + None + }; + let response = if let Some(cached_entry) = cached { emulator::build_emulated_server_hello( secret, @@ -126,6 +144,8 @@ where &validation.session_id, &cached_entry, rng, + selected_alpn.clone(), + config.censorship.tls_new_session_tickets, ) } else { tls::build_server_hello( @@ -134,9 +154,25 @@ where &validation.session_id, config.censorship.fake_cert_len, rng, + selected_alpn.clone(), + config.censorship.tls_new_session_tickets, ) }; + // Optional anti-fingerprint delay before sending ServerHello. + if config.censorship.server_hello_delay_max_ms > 0 { + let min = config.censorship.server_hello_delay_min_ms; + let max = config.censorship.server_hello_delay_max_ms.max(min); + let delay_ms = if max == min { + max + } else { + rand::rng().random_range(min..=max) + }; + if delay_ms > 0 { + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + } + } + debug!(peer = %peer, response_len = response.len(), "Sending TLS ServerHello"); if let Err(e) = writer.write_all(&response).await { diff --git a/src/tls_front/emulator.rs b/src/tls_front/emulator.rs index 2bf6872..4d3e64d 100644 --- a/src/tls_front/emulator.rs +++ b/src/tls_front/emulator.rs @@ -34,6 +34,8 @@ pub fn build_emulated_server_hello( session_id: &[u8], cached: &CachedTlsData, rng: &SecureRandom, + alpn: Option>, + new_session_tickets: u8, ) -> Vec { // --- ServerHello --- let mut extensions = Vec::new(); @@ -48,6 +50,15 @@ pub fn build_emulated_server_hello( extensions.extend_from_slice(&0x002bu16.to_be_bytes()); extensions.extend_from_slice(&(2u16).to_be_bytes()); extensions.extend_from_slice(&0x0304u16.to_be_bytes()); + if let Some(alpn_proto) = &alpn { + extensions.extend_from_slice(&0x0010u16.to_be_bytes()); + let list_len: u16 = 1 + alpn_proto.len() as u16; + let ext_len: u16 = 2 + list_len; + extensions.extend_from_slice(&ext_len.to_be_bytes()); + extensions.extend_from_slice(&list_len.to_be_bytes()); + extensions.push(alpn_proto.len() as u8); + extensions.extend_from_slice(alpn_proto); + } let extensions_len = extensions.len() as u16; @@ -118,10 +129,25 @@ pub fn build_emulated_server_hello( } // --- Combine --- - let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len()); + // Optional NewSessionTicket mimic records (opaque ApplicationData for fingerprint). + let mut tickets = Vec::new(); + if new_session_tickets > 0 { + for _ in 0..new_session_tickets { + let ticket_len: usize = rng.range(48) + 48; + let mut rec = Vec::with_capacity(5 + ticket_len); + rec.push(TLS_RECORD_APPLICATION); + rec.extend_from_slice(&TLS_VERSION); + rec.extend_from_slice(&(ticket_len as u16).to_be_bytes()); + rec.extend_from_slice(&rng.bytes(ticket_len)); + tickets.extend_from_slice(&rec); + } + } + + let mut response = Vec::with_capacity(server_hello.len() + change_cipher_spec.len() + app_data.len() + tickets.len()); response.extend_from_slice(&server_hello); response.extend_from_slice(&change_cipher_spec); response.extend_from_slice(&app_data); + response.extend_from_slice(&tickets); // --- HMAC --- let mut hmac_input = Vec::with_capacity(TLS_DIGEST_LEN + response.len()); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 84c526f..3572671 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -567,12 +567,14 @@ impl MePool { let cancel_keepalive = cancel_keepalive_token; tokio::spawn(async move { // Per-writer jittered start to avoid phase sync. - let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64); + let jitter_cap_ms = keepalive_interval.as_millis() / 2; + let effective_jitter_ms = keepalive_jitter.as_millis().min(jitter_cap_ms).max(1); + let initial_jitter_ms = rand::rng().random_range(0..=effective_jitter_ms as u64); tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await; loop { tokio::select! { _ = cancel_keepalive.cancelled() => break, - _ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {} + _ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=effective_jitter_ms as u64))) => {} } if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() { break; From 1bd495a224e2770d08a42e328cc10875acd2093b Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Sat, 21 Feb 2026 04:04:49 +0300 Subject: [PATCH 3/3] Fixed tests --- src/protocol/tls.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/protocol/tls.rs b/src/protocol/tls.rs index a0d1d46..93d111f 100644 --- a/src/protocol/tls.rs +++ b/src/protocol/tls.rs @@ -924,8 +924,12 @@ mod tests { alpn_data.push(2); alpn_data.extend_from_slice(b"h2"); let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test"); - let alpn = extract_alpn_from_client_hello(&ch).unwrap(); - assert_eq!(alpn, vec!["h2"]); + let alpn = extract_alpn_from_client_hello(&ch); + let alpn_str: Vec = alpn + .iter() + .map(|p| std::str::from_utf8(p).unwrap().to_string()) + .collect(); + assert_eq!(alpn_str, vec!["h2"]); } #[test] @@ -940,7 +944,11 @@ mod tests { alpn_data.push(2); alpn_data.extend_from_slice(b"h3"); let ch = build_client_hello_with_exts(vec![(0x0010, alpn_data)], "alpn.test"); - let alpn = extract_alpn_from_client_hello(&ch).unwrap(); - assert_eq!(alpn, vec!["h2", "spdy", "h3"]); + let alpn = extract_alpn_from_client_hello(&ch); + let alpn_str: Vec = alpn + .iter() + .map(|p| std::str::from_utf8(p).unwrap().to_string()) + .collect(); + assert_eq!(alpn_str, vec!["h2", "spdy", "h3"]); } }