From d08ddd718a9715ea5837d1641bcb1d6d64931273 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Mon, 23 Feb 2026 15:28:02 +0300 Subject: [PATCH] Desync Full Forensics Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/cli.rs | 1 + src/config/defaults.rs | 4 + src/config/hot_reload.rs | 10 ++ src/config/types.rs | 6 + src/metrics.rs | 35 ++++++ src/proxy/middle_relay.rs | 226 ++++++++++++++++++++++++++++++++------ src/stats/mod.rs | 53 +++++++++ 7 files changed, 304 insertions(+), 31 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index cf98121..7e31f26 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -194,6 +194,7 @@ prefer_ipv6 = false fast_mode = true use_middle_proxy = false log_level = "normal" +desync_all_full = false [network] ipv4 = true diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 90dd6f9..5216e29 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -118,6 +118,10 @@ pub(crate) fn default_max_client_frame() -> usize { 16 * 1024 * 1024 } +pub(crate) fn default_desync_all_full() -> bool { + false +} + pub(crate) fn default_tls_new_session_tickets() -> u8 { 0 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 6c3b8ff..56cfa0f 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -10,6 +10,7 @@ //! | `general` | `ad_tag` | Passed on next connection | //! | `general` | `middle_proxy_pool_size` | Passed on next connection | //! | `general` | `me_keepalive_*` | Passed on next connection | +//! | `general` | `desync_all_full` | Applied immediately | //! | `access` | All user/quota fields | Effective immediately | //! //! Fields that require re-binding sockets (`server.port`, `censorship.*`, @@ -34,6 +35,7 @@ pub struct HotFields { pub log_level: LogLevel, pub ad_tag: Option, pub middle_proxy_pool_size: usize, + pub desync_all_full: bool, pub me_keepalive_enabled: bool, pub me_keepalive_interval_secs: u64, pub me_keepalive_jitter_secs: u64, @@ -47,6 +49,7 @@ impl HotFields { log_level: cfg.general.log_level.clone(), ad_tag: cfg.general.ad_tag.clone(), middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, + desync_all_full: cfg.general.desync_all_full, me_keepalive_enabled: cfg.general.me_keepalive_enabled, me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, @@ -175,6 +178,13 @@ fn log_changes( ); } + if old_hot.desync_all_full != new_hot.desync_all_full { + info!( + "config reload: desync_all_full: {} → {}", + old_hot.desync_all_full, new_hot.desync_all_full, + ); + } + if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs diff --git a/src/config/types.rs b/src/config/types.rs index a303db8..39ba683 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -201,6 +201,11 @@ pub struct GeneralConfig { #[serde(default = "default_max_client_frame")] pub max_client_frame: usize, + /// Emit full crypto-desync forensic logs for every event. + /// When false, full forensic details are emitted once per key window. + #[serde(default = "default_desync_all_full")] + pub desync_all_full: bool, + /// Enable staggered warmup of extra ME writers. #[serde(default = "default_true")] pub me_warmup_stagger_enabled: bool, @@ -310,6 +315,7 @@ impl Default for GeneralConfig { links: LinksConfig::default(), crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), + desync_all_full: default_desync_all_full(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), diff --git a/src/metrics.rs b/src/metrics.rs index e00091f..326d333 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -140,6 +140,41 @@ fn render_metrics(stats: &Stats) -> String { let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid()); + let _ = writeln!(out, "# HELP telemt_desync_total Total crypto-desync detections"); + let _ = writeln!(out, "# TYPE telemt_desync_total counter"); + let _ = writeln!(out, "telemt_desync_total {}", stats.get_desync_total()); + + let _ = writeln!(out, "# HELP telemt_desync_full_logged_total Full forensic desync logs emitted"); + let _ = writeln!(out, "# TYPE telemt_desync_full_logged_total counter"); + let _ = writeln!(out, "telemt_desync_full_logged_total {}", stats.get_desync_full_logged()); + + let _ = writeln!(out, "# HELP telemt_desync_suppressed_total Suppressed desync forensic events"); + let _ = writeln!(out, "# TYPE telemt_desync_suppressed_total counter"); + let _ = writeln!(out, "telemt_desync_suppressed_total {}", stats.get_desync_suppressed()); + + let _ = writeln!(out, "# HELP telemt_desync_frames_bucket_total Desync count by frames_ok bucket"); + let _ = writeln!(out, "# TYPE telemt_desync_frames_bucket_total counter"); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"0\"}} {}", + stats.get_desync_frames_bucket_0() + ); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"1_2\"}} {}", + stats.get_desync_frames_bucket_1_2() + ); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"3_10\"}} {}", + stats.get_desync_frames_bucket_3_10() + ); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"gt_10\"}} {}", + stats.get_desync_frames_bucket_gt_10() + ); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 3b98112..d55e5a2 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -1,5 +1,10 @@ -use std::net::SocketAddr; -use std::sync::Arc; +use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; +use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::{mpsc, oneshot}; @@ -19,6 +24,148 @@ enum C2MeCommand { Close, } +const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60); +const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync"; +static DESYNC_DEDUP: OnceLock>> = OnceLock::new(); + +struct RelayForensicsState { + trace_id: u64, + conn_id: u64, + user: String, + peer: SocketAddr, + peer_hash: u64, + started_at: Instant, + bytes_c2me: u64, + bytes_me2c: Arc, + desync_all_full: bool, +} + +fn hash_value(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() +} + +fn hash_ip(ip: IpAddr) -> u64 { + hash_value(&ip) +} + +fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool { + if all_full { + return true; + } + + let dedup = DESYNC_DEDUP.get_or_init(|| Mutex::new(HashMap::new())); + let mut guard = dedup.lock().expect("desync dedup mutex poisoned"); + guard.retain(|_, seen_at| now.duration_since(*seen_at) < DESYNC_DEDUP_WINDOW); + + match guard.get_mut(&key) { + Some(seen_at) => { + if now.duration_since(*seen_at) >= DESYNC_DEDUP_WINDOW { + *seen_at = now; + true + } else { + false + } + } + None => { + guard.insert(key, now); + true + } + } +} + +fn report_desync_frame_too_large( + state: &RelayForensicsState, + proto_tag: ProtoTag, + frame_counter: u64, + max_frame: usize, + len: usize, + raw_len_bytes: Option<[u8; 4]>, + stats: &Stats, +) -> ProxyError { + let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes()); + let looks_like_tls = raw_len_bytes + .map(|b| b[0] == 0x16 && b[1] == 0x03) + .unwrap_or(false); + let looks_like_http = raw_len_bytes + .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) + .unwrap_or(false); + let now = Instant::now(); + let dedup_key = hash_value(&( + state.user.as_str(), + state.peer_hash, + proto_tag, + DESYNC_ERROR_CLASS, + )); + let emit_full = should_emit_full_desync(dedup_key, state.desync_all_full, now); + let duration_ms = state.started_at.elapsed().as_millis() as u64; + let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed); + + stats.increment_desync_total(); + stats.observe_desync_frames_ok(frame_counter); + if emit_full { + stats.increment_desync_full_logged(); + warn!( + trace_id = format_args!("0x{:016x}", state.trace_id), + conn_id = state.conn_id, + user = %state.user, + peer_hash = format_args!("0x{:016x}", state.peer_hash), + proto = ?proto_tag, + mode = "middle_proxy", + is_tls = true, + duration_ms, + bytes_c2me = state.bytes_c2me, + bytes_me2c, + raw_len = len, + raw_len_hex = format_args!("0x{:08x}", len), + raw_bytes = format_args!( + "{:02x} {:02x} {:02x} {:02x}", + len_buf[0], len_buf[1], len_buf[2], len_buf[3] + ), + max_frame, + tls_like = looks_like_tls, + http_like = looks_like_http, + frames_ok = frame_counter, + dedup_window_secs = DESYNC_DEDUP_WINDOW.as_secs(), + desync_all_full = state.desync_all_full, + full_reason = if state.desync_all_full { "desync_all_full" } else { "first_in_dedup_window" }, + error_class = DESYNC_ERROR_CLASS, + "Frame too large — crypto desync forensics" + ); + debug!( + trace_id = format_args!("0x{:016x}", state.trace_id), + conn_id = state.conn_id, + user = %state.user, + peer = %state.peer, + "Frame too large forensic peer detail" + ); + } else { + stats.increment_desync_suppressed(); + debug!( + trace_id = format_args!("0x{:016x}", state.trace_id), + conn_id = state.conn_id, + user = %state.user, + peer_hash = format_args!("0x{:016x}", state.peer_hash), + proto = ?proto_tag, + duration_ms, + bytes_c2me = state.bytes_c2me, + bytes_me2c, + raw_len = len, + frames_ok = frame_counter, + dedup_window_secs = DESYNC_DEDUP_WINDOW.as_secs(), + error_class = DESYNC_ERROR_CLASS, + "Frame too large — crypto desync forensic suppressed" + ); + } + + ProxyError::Proxy(format!( + "Frame too large: {len} (max {max_frame}), frames_ok={frame_counter}, conn_id={}, trace_id=0x{:016x}", + state.conn_id, + state.trace_id + )) +} + pub(crate) async fn handle_via_middle_proxy( mut crypto_reader: CryptoReader, crypto_writer: CryptoWriter, @@ -48,14 +195,30 @@ where ); let (conn_id, me_rx) = me_pool.registry().register().await; + let trace_id = conn_id; + let bytes_me2c = Arc::new(AtomicU64::new(0)); + let mut forensics = RelayForensicsState { + trace_id, + conn_id, + user: user.clone(), + peer, + peer_hash: hash_ip(peer.ip()), + started_at: Instant::now(), + bytes_c2me: 0, + bytes_me2c: bytes_me2c.clone(), + desync_all_full: config.general.desync_all_full, + }; stats.increment_user_connects(&user); stats.increment_user_curr_connects(&user); let proto_flags = proto_flags_for_tag(proto_tag, me_pool.has_proxy_tag()); debug!( + trace_id = format_args!("0x{:016x}", trace_id), user = %user, conn_id, + peer_hash = format_args!("0x{:016x}", forensics.peer_hash), + desync_all_full = forensics.desync_all_full, proto_flags = format_args!("0x{:08x}", proto_flags), "ME relay started" ); @@ -93,6 +256,7 @@ where let stats_clone = stats.clone(); let rng_clone = rng.clone(); let user_clone = user.clone(); + let bytes_me2c_clone = bytes_me2c.clone(); let me_writer = tokio::spawn(async move { let mut writer = crypto_writer; let mut frame_buf = Vec::with_capacity(16 * 1024); @@ -102,6 +266,7 @@ where match msg { Some(MeResponse::Data { flags, data }) => { trace!(conn_id, bytes = data.len(), flags, "ME->C data"); + bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed); stats_clone.add_user_octets_to(&user_clone, data.len() as u64); write_client_payload( &mut writer, @@ -118,6 +283,7 @@ where match next { MeResponse::Data { flags, data } => { trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)"); + bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed); stats_clone.add_user_octets_to(&user_clone, data.len() as u64); write_client_payload( &mut writer, @@ -173,12 +339,15 @@ where &mut crypto_reader, proto_tag, frame_limit, - &user, + &forensics, &mut frame_counter, &stats, ).await { Ok(Some((payload, quickack))) => { trace!(conn_id, bytes = payload.len(), "C->ME frame"); + forensics.bytes_c2me = forensics + .bytes_c2me + .saturating_add(payload.len() as u64); stats.add_user_octets_from(&user, payload.len() as u64); let mut flags = proto_flags; if quickack { @@ -237,7 +406,16 @@ where (_, _, Err(e)) => Err(e), }; - debug!(user = %user, conn_id, "ME relay cleanup"); + debug!( + user = %user, + conn_id, + trace_id = format_args!("0x{:016x}", trace_id), + duration_ms = forensics.started_at.elapsed().as_millis() as u64, + bytes_c2me = forensics.bytes_c2me, + bytes_me2c = forensics.bytes_me2c.load(Ordering::Relaxed), + frames_ok = frame_counter, + "ME relay cleanup" + ); me_pool.registry().unregister(conn_id).await; stats.decrement_user_curr_connects(&user); result @@ -247,7 +425,7 @@ async fn read_client_payload( client_reader: &mut CryptoReader, proto_tag: ProtoTag, max_frame: usize, - user: &str, + forensics: &RelayForensicsState, frame_counter: &mut u64, stats: &Stats, ) -> Result, bool)>> @@ -302,7 +480,9 @@ where } if len < 4 && proto_tag != ProtoTag::Abridged { warn!( - user = %user, + trace_id = format_args!("0x{:016x}", forensics.trace_id), + conn_id = forensics.conn_id, + user = %forensics.user, len, proto = ?proto_tag, "Frame too small — corrupt or probe" @@ -311,31 +491,15 @@ where } if len > max_frame { - let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes()); - let looks_like_tls = raw_len_bytes - .map(|b| b[0] == 0x16 && b[1] == 0x03) - .unwrap_or(false); - let looks_like_http = raw_len_bytes - .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) - .unwrap_or(false); - warn!( - user = %user, - raw_len = len, - raw_len_hex = format_args!("0x{:08x}", len), - raw_bytes = format_args!( - "{:02x} {:02x} {:02x} {:02x}", - len_buf[0], len_buf[1], len_buf[2], len_buf[3] - ), - proto = ?proto_tag, - tls_like = looks_like_tls, - http_like = looks_like_http, - frames_ok = *frame_counter, - "Frame too large — crypto desync forensics" - ); - return Err(ProxyError::Proxy(format!( - "Frame too large: {len} (max {max_frame}), frames_ok={}", - *frame_counter - ))); + return Err(report_desync_frame_too_large( + forensics, + proto_tag, + *frame_counter, + max_frame, + len, + raw_len_bytes, + stats, + )); } let secure_payload_len = if proto_tag == ProtoTag::Secure { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 307da6d..4c16d25 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -31,6 +31,13 @@ pub struct Stats { me_route_drop_channel_closed: AtomicU64, me_route_drop_queue_full: AtomicU64, secure_padding_invalid: AtomicU64, + desync_total: AtomicU64, + desync_full_logged: AtomicU64, + desync_suppressed: AtomicU64, + desync_frames_bucket_0: AtomicU64, + desync_frames_bucket_1_2: AtomicU64, + desync_frames_bucket_3_10: AtomicU64, + desync_frames_bucket_gt_10: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -76,6 +83,31 @@ impl Stats { pub fn increment_secure_padding_invalid(&self) { self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed); } + pub fn increment_desync_total(&self) { + self.desync_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_desync_full_logged(&self) { + self.desync_full_logged.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_desync_suppressed(&self) { + self.desync_suppressed.fetch_add(1, Ordering::Relaxed); + } + pub fn observe_desync_frames_ok(&self, frames_ok: u64) { + match frames_ok { + 0 => { + self.desync_frames_bucket_0.fetch_add(1, Ordering::Relaxed); + } + 1..=2 => { + self.desync_frames_bucket_1_2.fetch_add(1, Ordering::Relaxed); + } + 3..=10 => { + self.desync_frames_bucket_3_10.fetch_add(1, Ordering::Relaxed); + } + _ => { + self.desync_frames_bucket_gt_10.fetch_add(1, Ordering::Relaxed); + } + } + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -96,6 +128,27 @@ impl Stats { pub fn get_secure_padding_invalid(&self) -> u64 { self.secure_padding_invalid.load(Ordering::Relaxed) } + pub fn get_desync_total(&self) -> u64 { + self.desync_total.load(Ordering::Relaxed) + } + pub fn get_desync_full_logged(&self) -> u64 { + self.desync_full_logged.load(Ordering::Relaxed) + } + pub fn get_desync_suppressed(&self) -> u64 { + self.desync_suppressed.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_0(&self) -> u64 { + self.desync_frames_bucket_0.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_1_2(&self) -> u64 { + self.desync_frames_bucket_1_2.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_3_10(&self) -> u64 { + self.desync_frames_bucket_3_10.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_gt_10(&self) -> u64 { + self.desync_frames_bucket_gt_10.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default()