diff --git a/src/cli.rs b/src/cli.rs index cf98121..3525a22 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -194,6 +194,9 @@ prefer_ipv6 = false fast_mode = true use_middle_proxy = false log_level = "normal" +desync_all_full = false +update_every = 43200 +me_reinit_drain_timeout_secs = 300 [network] ipv4 = true diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 90dd6f9..01cdcb0 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -118,6 +118,10 @@ pub(crate) fn default_max_client_frame() -> usize { 16 * 1024 * 1024 } +pub(crate) fn default_desync_all_full() -> bool { + false +} + pub(crate) fn default_tls_new_session_tickets() -> u8 { 0 } @@ -167,6 +171,14 @@ pub(crate) fn default_proxy_config_reload_secs() -> u64 { 12 * 60 * 60 } +pub(crate) fn default_update_every_secs() -> u64 { + 2 * 60 * 60 +} + +pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { + 300 +} + pub(crate) fn default_ntp_check() -> bool { true } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 6c3b8ff..5c7263f 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -10,6 +10,9 @@ //! | `general` | `ad_tag` | Passed on next connection | //! | `general` | `middle_proxy_pool_size` | Passed on next connection | //! | `general` | `me_keepalive_*` | Passed on next connection | +//! | `general` | `desync_all_full` | Applied immediately | +//! | `general` | `update_every` | Applied to ME updater immediately | +//! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update | //! | `access` | All user/quota fields | Effective immediately | //! //! Fields that require re-binding sockets (`server.port`, `censorship.*`, @@ -34,6 +37,9 @@ pub struct HotFields { pub log_level: LogLevel, pub ad_tag: Option, pub middle_proxy_pool_size: usize, + pub desync_all_full: bool, + pub update_every_secs: u64, + pub me_reinit_drain_timeout_secs: u64, pub me_keepalive_enabled: bool, pub me_keepalive_interval_secs: u64, pub me_keepalive_jitter_secs: u64, @@ -47,6 +53,9 @@ impl HotFields { log_level: cfg.general.log_level.clone(), ad_tag: cfg.general.ad_tag.clone(), middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, + desync_all_full: cfg.general.desync_all_full, + update_every_secs: cfg.general.effective_update_every_secs(), + me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_keepalive_enabled: cfg.general.me_keepalive_enabled, me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, @@ -175,6 +184,27 @@ fn log_changes( ); } + if old_hot.desync_all_full != new_hot.desync_all_full { + info!( + "config reload: desync_all_full: {} → {}", + old_hot.desync_all_full, new_hot.desync_all_full, + ); + } + + if old_hot.update_every_secs != new_hot.update_every_secs { + info!( + "config reload: update_every(effective): {}s → {}s", + old_hot.update_every_secs, new_hot.update_every_secs, + ); + } + + if old_hot.me_reinit_drain_timeout_secs != new_hot.me_reinit_drain_timeout_secs { + info!( + "config reload: me_reinit_drain_timeout_secs: {}s → {}s", + old_hot.me_reinit_drain_timeout_secs, new_hot.me_reinit_drain_timeout_secs, + ); + } + if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs diff --git a/src/config/load.rs b/src/config/load.rs index 827687a..fa61539 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -117,6 +117,34 @@ impl ProxyConfig { let mut config: ProxyConfig = toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?; + if let Some(update_every) = config.general.update_every { + if update_every == 0 { + return Err(ProxyError::Config( + "general.update_every must be > 0".to_string(), + )); + } + } else { + let legacy_secret = config.general.proxy_secret_auto_reload_secs; + let legacy_config = config.general.proxy_config_auto_reload_secs; + let effective = legacy_secret.min(legacy_config); + if effective == 0 { + return Err(ProxyError::Config( + "legacy proxy_*_auto_reload_secs values must be > 0 when general.update_every is not set".to_string(), + )); + } + + if legacy_secret != default_proxy_secret_reload_secs() + || legacy_config != default_proxy_config_reload_secs() + { + warn!( + proxy_secret_auto_reload_secs = legacy_secret, + proxy_config_auto_reload_secs = legacy_config, + effective_update_every_secs = effective, + "proxy_*_auto_reload_secs are deprecated; set general.update_every" + ); + } + } + // Validate secrets. for (user, secret) in &config.access.users { if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 { @@ -347,4 +375,68 @@ mod tests { .unwrap_or(false)); let _ = std::fs::remove_file(path); } + + #[test] + fn update_every_overrides_legacy_fields() { + let toml = r#" + [general] + update_every = 123 + proxy_secret_auto_reload_secs = 700 + proxy_config_auto_reload_secs = 800 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_update_every_override_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.effective_update_every_secs(), 123); + let _ = std::fs::remove_file(path); + } + + #[test] + fn update_every_fallback_to_legacy_min() { + let toml = r#" + [general] + proxy_secret_auto_reload_secs = 600 + proxy_config_auto_reload_secs = 120 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_update_every_legacy_min_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.update_every, None); + assert_eq!(cfg.general.effective_update_every_secs(), 120); + let _ = std::fs::remove_file(path); + } + + #[test] + fn update_every_zero_is_rejected() { + let toml = r#" + [general] + update_every = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_update_every_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.update_every must be > 0")); + let _ = std::fs::remove_file(path); + } } diff --git a/src/config/types.rs b/src/config/types.rs index a303db8..eb16885 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -201,6 +201,11 @@ pub struct GeneralConfig { #[serde(default = "default_max_client_frame")] pub max_client_frame: usize, + /// Emit full crypto-desync forensic logs for every event. + /// When false, full forensic details are emitted once per key window. + #[serde(default = "default_desync_all_full")] + pub desync_all_full: bool, + /// Enable staggered warmup of extra ME writers. #[serde(default = "default_true")] pub me_warmup_stagger_enabled: bool, @@ -252,11 +257,23 @@ pub struct GeneralConfig { #[serde(default = "default_fast_mode_min_tls_record")] pub fast_mode_min_tls_record: usize, - /// Automatically reload proxy-secret every N seconds. + /// Unified ME updater interval in seconds for getProxyConfig/getProxyConfigV6/getProxySecret. + /// When omitted, effective value falls back to legacy proxy_*_auto_reload_secs fields. + #[serde(default)] + pub update_every: Option, + + /// Drain timeout in seconds for stale ME writers after endpoint map changes. + /// Set to 0 to keep stale writers draining indefinitely (no force-close). + #[serde(default = "default_me_reinit_drain_timeout_secs")] + pub me_reinit_drain_timeout_secs: u64, + + /// Deprecated legacy setting; kept for backward compatibility fallback. + /// Use `update_every` instead. #[serde(default = "default_proxy_secret_reload_secs")] pub proxy_secret_auto_reload_secs: u64, - /// Automatically reload proxy-multi.conf every N seconds. + /// Deprecated legacy setting; kept for backward compatibility fallback. + /// Use `update_every` instead. #[serde(default = "default_proxy_config_reload_secs")] pub proxy_config_auto_reload_secs: u64, @@ -310,7 +327,10 @@ impl Default for GeneralConfig { links: LinksConfig::default(), crypto_pending_buffer: default_crypto_pending_buffer(), max_client_frame: default_max_client_frame(), + desync_all_full: default_desync_all_full(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), + update_every: Some(default_update_every_secs()), + me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), ntp_check: default_ntp_check(), @@ -321,6 +341,15 @@ impl Default for GeneralConfig { } } +impl GeneralConfig { + /// Resolve the active updater interval for ME infrastructure refresh tasks. + /// `update_every` has priority, otherwise legacy proxy_*_auto_reload_secs are used. + pub fn effective_update_every_secs(&self) -> u64 { + self.update_every + .unwrap_or_else(|| self.proxy_secret_auto_reload_secs.min(self.proxy_config_auto_reload_secs)) + } +} + /// `[general.links]` — proxy link generation settings. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct LinksConfig { diff --git a/src/main.rs b/src/main.rs index 61debb9..af1a069 100644 --- a/src/main.rs +++ b/src/main.rs @@ -392,18 +392,6 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai .await; }); - // Periodic updater: getProxyConfig + proxy-secret - let pool_clone2 = pool.clone(); - let rng_clone2 = rng.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_config_updater( - pool_clone2, - rng_clone2, - std::time::Duration::from_secs(12 * 3600), - ) - .await; - }); - Some(pool) } Err(e) => { @@ -702,6 +690,20 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai detected_ip_v6, ); + if let Some(ref pool) = me_pool { + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let config_rx_clone = config_rx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_config_updater( + pool_clone, + rng_clone, + config_rx_clone, + ) + .await; + }); + } + let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { diff --git a/src/metrics.rs b/src/metrics.rs index e00091f..326d333 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -140,6 +140,41 @@ fn render_metrics(stats: &Stats) -> String { let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); let _ = writeln!(out, "telemt_secure_padding_invalid_total {}", stats.get_secure_padding_invalid()); + let _ = writeln!(out, "# HELP telemt_desync_total Total crypto-desync detections"); + let _ = writeln!(out, "# TYPE telemt_desync_total counter"); + let _ = writeln!(out, "telemt_desync_total {}", stats.get_desync_total()); + + let _ = writeln!(out, "# HELP telemt_desync_full_logged_total Full forensic desync logs emitted"); + let _ = writeln!(out, "# TYPE telemt_desync_full_logged_total counter"); + let _ = writeln!(out, "telemt_desync_full_logged_total {}", stats.get_desync_full_logged()); + + let _ = writeln!(out, "# HELP telemt_desync_suppressed_total Suppressed desync forensic events"); + let _ = writeln!(out, "# TYPE telemt_desync_suppressed_total counter"); + let _ = writeln!(out, "telemt_desync_suppressed_total {}", stats.get_desync_suppressed()); + + let _ = writeln!(out, "# HELP telemt_desync_frames_bucket_total Desync count by frames_ok bucket"); + let _ = writeln!(out, "# TYPE telemt_desync_frames_bucket_total counter"); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"0\"}} {}", + stats.get_desync_frames_bucket_0() + ); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"1_2\"}} {}", + stats.get_desync_frames_bucket_1_2() + ); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"3_10\"}} {}", + stats.get_desync_frames_bucket_3_10() + ); + let _ = writeln!( + out, + "telemt_desync_frames_bucket_total{{bucket=\"gt_10\"}} {}", + stats.get_desync_frames_bucket_gt_10() + ); + let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections"); let _ = writeln!(out, "# TYPE telemt_user_connections_total counter"); let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections"); diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 3b98112..d55e5a2 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -1,5 +1,10 @@ -use std::net::SocketAddr; -use std::sync::Arc; +use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; +use std::net::{IpAddr, SocketAddr}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; +use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::{mpsc, oneshot}; @@ -19,6 +24,148 @@ enum C2MeCommand { Close, } +const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60); +const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync"; +static DESYNC_DEDUP: OnceLock>> = OnceLock::new(); + +struct RelayForensicsState { + trace_id: u64, + conn_id: u64, + user: String, + peer: SocketAddr, + peer_hash: u64, + started_at: Instant, + bytes_c2me: u64, + bytes_me2c: Arc, + desync_all_full: bool, +} + +fn hash_value(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() +} + +fn hash_ip(ip: IpAddr) -> u64 { + hash_value(&ip) +} + +fn should_emit_full_desync(key: u64, all_full: bool, now: Instant) -> bool { + if all_full { + return true; + } + + let dedup = DESYNC_DEDUP.get_or_init(|| Mutex::new(HashMap::new())); + let mut guard = dedup.lock().expect("desync dedup mutex poisoned"); + guard.retain(|_, seen_at| now.duration_since(*seen_at) < DESYNC_DEDUP_WINDOW); + + match guard.get_mut(&key) { + Some(seen_at) => { + if now.duration_since(*seen_at) >= DESYNC_DEDUP_WINDOW { + *seen_at = now; + true + } else { + false + } + } + None => { + guard.insert(key, now); + true + } + } +} + +fn report_desync_frame_too_large( + state: &RelayForensicsState, + proto_tag: ProtoTag, + frame_counter: u64, + max_frame: usize, + len: usize, + raw_len_bytes: Option<[u8; 4]>, + stats: &Stats, +) -> ProxyError { + let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes()); + let looks_like_tls = raw_len_bytes + .map(|b| b[0] == 0x16 && b[1] == 0x03) + .unwrap_or(false); + let looks_like_http = raw_len_bytes + .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) + .unwrap_or(false); + let now = Instant::now(); + let dedup_key = hash_value(&( + state.user.as_str(), + state.peer_hash, + proto_tag, + DESYNC_ERROR_CLASS, + )); + let emit_full = should_emit_full_desync(dedup_key, state.desync_all_full, now); + let duration_ms = state.started_at.elapsed().as_millis() as u64; + let bytes_me2c = state.bytes_me2c.load(Ordering::Relaxed); + + stats.increment_desync_total(); + stats.observe_desync_frames_ok(frame_counter); + if emit_full { + stats.increment_desync_full_logged(); + warn!( + trace_id = format_args!("0x{:016x}", state.trace_id), + conn_id = state.conn_id, + user = %state.user, + peer_hash = format_args!("0x{:016x}", state.peer_hash), + proto = ?proto_tag, + mode = "middle_proxy", + is_tls = true, + duration_ms, + bytes_c2me = state.bytes_c2me, + bytes_me2c, + raw_len = len, + raw_len_hex = format_args!("0x{:08x}", len), + raw_bytes = format_args!( + "{:02x} {:02x} {:02x} {:02x}", + len_buf[0], len_buf[1], len_buf[2], len_buf[3] + ), + max_frame, + tls_like = looks_like_tls, + http_like = looks_like_http, + frames_ok = frame_counter, + dedup_window_secs = DESYNC_DEDUP_WINDOW.as_secs(), + desync_all_full = state.desync_all_full, + full_reason = if state.desync_all_full { "desync_all_full" } else { "first_in_dedup_window" }, + error_class = DESYNC_ERROR_CLASS, + "Frame too large — crypto desync forensics" + ); + debug!( + trace_id = format_args!("0x{:016x}", state.trace_id), + conn_id = state.conn_id, + user = %state.user, + peer = %state.peer, + "Frame too large forensic peer detail" + ); + } else { + stats.increment_desync_suppressed(); + debug!( + trace_id = format_args!("0x{:016x}", state.trace_id), + conn_id = state.conn_id, + user = %state.user, + peer_hash = format_args!("0x{:016x}", state.peer_hash), + proto = ?proto_tag, + duration_ms, + bytes_c2me = state.bytes_c2me, + bytes_me2c, + raw_len = len, + frames_ok = frame_counter, + dedup_window_secs = DESYNC_DEDUP_WINDOW.as_secs(), + error_class = DESYNC_ERROR_CLASS, + "Frame too large — crypto desync forensic suppressed" + ); + } + + ProxyError::Proxy(format!( + "Frame too large: {len} (max {max_frame}), frames_ok={frame_counter}, conn_id={}, trace_id=0x{:016x}", + state.conn_id, + state.trace_id + )) +} + pub(crate) async fn handle_via_middle_proxy( mut crypto_reader: CryptoReader, crypto_writer: CryptoWriter, @@ -48,14 +195,30 @@ where ); let (conn_id, me_rx) = me_pool.registry().register().await; + let trace_id = conn_id; + let bytes_me2c = Arc::new(AtomicU64::new(0)); + let mut forensics = RelayForensicsState { + trace_id, + conn_id, + user: user.clone(), + peer, + peer_hash: hash_ip(peer.ip()), + started_at: Instant::now(), + bytes_c2me: 0, + bytes_me2c: bytes_me2c.clone(), + desync_all_full: config.general.desync_all_full, + }; stats.increment_user_connects(&user); stats.increment_user_curr_connects(&user); let proto_flags = proto_flags_for_tag(proto_tag, me_pool.has_proxy_tag()); debug!( + trace_id = format_args!("0x{:016x}", trace_id), user = %user, conn_id, + peer_hash = format_args!("0x{:016x}", forensics.peer_hash), + desync_all_full = forensics.desync_all_full, proto_flags = format_args!("0x{:08x}", proto_flags), "ME relay started" ); @@ -93,6 +256,7 @@ where let stats_clone = stats.clone(); let rng_clone = rng.clone(); let user_clone = user.clone(); + let bytes_me2c_clone = bytes_me2c.clone(); let me_writer = tokio::spawn(async move { let mut writer = crypto_writer; let mut frame_buf = Vec::with_capacity(16 * 1024); @@ -102,6 +266,7 @@ where match msg { Some(MeResponse::Data { flags, data }) => { trace!(conn_id, bytes = data.len(), flags, "ME->C data"); + bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed); stats_clone.add_user_octets_to(&user_clone, data.len() as u64); write_client_payload( &mut writer, @@ -118,6 +283,7 @@ where match next { MeResponse::Data { flags, data } => { trace!(conn_id, bytes = data.len(), flags, "ME->C data (batched)"); + bytes_me2c_clone.fetch_add(data.len() as u64, Ordering::Relaxed); stats_clone.add_user_octets_to(&user_clone, data.len() as u64); write_client_payload( &mut writer, @@ -173,12 +339,15 @@ where &mut crypto_reader, proto_tag, frame_limit, - &user, + &forensics, &mut frame_counter, &stats, ).await { Ok(Some((payload, quickack))) => { trace!(conn_id, bytes = payload.len(), "C->ME frame"); + forensics.bytes_c2me = forensics + .bytes_c2me + .saturating_add(payload.len() as u64); stats.add_user_octets_from(&user, payload.len() as u64); let mut flags = proto_flags; if quickack { @@ -237,7 +406,16 @@ where (_, _, Err(e)) => Err(e), }; - debug!(user = %user, conn_id, "ME relay cleanup"); + debug!( + user = %user, + conn_id, + trace_id = format_args!("0x{:016x}", trace_id), + duration_ms = forensics.started_at.elapsed().as_millis() as u64, + bytes_c2me = forensics.bytes_c2me, + bytes_me2c = forensics.bytes_me2c.load(Ordering::Relaxed), + frames_ok = frame_counter, + "ME relay cleanup" + ); me_pool.registry().unregister(conn_id).await; stats.decrement_user_curr_connects(&user); result @@ -247,7 +425,7 @@ async fn read_client_payload( client_reader: &mut CryptoReader, proto_tag: ProtoTag, max_frame: usize, - user: &str, + forensics: &RelayForensicsState, frame_counter: &mut u64, stats: &Stats, ) -> Result, bool)>> @@ -302,7 +480,9 @@ where } if len < 4 && proto_tag != ProtoTag::Abridged { warn!( - user = %user, + trace_id = format_args!("0x{:016x}", forensics.trace_id), + conn_id = forensics.conn_id, + user = %forensics.user, len, proto = ?proto_tag, "Frame too small — corrupt or probe" @@ -311,31 +491,15 @@ where } if len > max_frame { - let len_buf = raw_len_bytes.unwrap_or((len as u32).to_le_bytes()); - let looks_like_tls = raw_len_bytes - .map(|b| b[0] == 0x16 && b[1] == 0x03) - .unwrap_or(false); - let looks_like_http = raw_len_bytes - .map(|b| matches!(b[0], b'G' | b'P' | b'H' | b'C' | b'D')) - .unwrap_or(false); - warn!( - user = %user, - raw_len = len, - raw_len_hex = format_args!("0x{:08x}", len), - raw_bytes = format_args!( - "{:02x} {:02x} {:02x} {:02x}", - len_buf[0], len_buf[1], len_buf[2], len_buf[3] - ), - proto = ?proto_tag, - tls_like = looks_like_tls, - http_like = looks_like_http, - frames_ok = *frame_counter, - "Frame too large — crypto desync forensics" - ); - return Err(ProxyError::Proxy(format!( - "Frame too large: {len} (max {max_frame}), frames_ok={}", - *frame_counter - ))); + return Err(report_desync_frame_too_large( + forensics, + proto_tag, + *frame_counter, + max_frame, + len, + raw_len_bytes, + stats, + )); } let secure_payload_len = if proto_tag == ProtoTag::Secure { diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 307da6d..4c16d25 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -31,6 +31,13 @@ pub struct Stats { me_route_drop_channel_closed: AtomicU64, me_route_drop_queue_full: AtomicU64, secure_padding_invalid: AtomicU64, + desync_total: AtomicU64, + desync_full_logged: AtomicU64, + desync_suppressed: AtomicU64, + desync_frames_bucket_0: AtomicU64, + desync_frames_bucket_1_2: AtomicU64, + desync_frames_bucket_3_10: AtomicU64, + desync_frames_bucket_gt_10: AtomicU64, user_stats: DashMap, start_time: parking_lot::RwLock>, } @@ -76,6 +83,31 @@ impl Stats { pub fn increment_secure_padding_invalid(&self) { self.secure_padding_invalid.fetch_add(1, Ordering::Relaxed); } + pub fn increment_desync_total(&self) { + self.desync_total.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_desync_full_logged(&self) { + self.desync_full_logged.fetch_add(1, Ordering::Relaxed); + } + pub fn increment_desync_suppressed(&self) { + self.desync_suppressed.fetch_add(1, Ordering::Relaxed); + } + pub fn observe_desync_frames_ok(&self, frames_ok: u64) { + match frames_ok { + 0 => { + self.desync_frames_bucket_0.fetch_add(1, Ordering::Relaxed); + } + 1..=2 => { + self.desync_frames_bucket_1_2.fetch_add(1, Ordering::Relaxed); + } + 3..=10 => { + self.desync_frames_bucket_3_10.fetch_add(1, Ordering::Relaxed); + } + _ => { + self.desync_frames_bucket_gt_10.fetch_add(1, Ordering::Relaxed); + } + } + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } @@ -96,6 +128,27 @@ impl Stats { pub fn get_secure_padding_invalid(&self) -> u64 { self.secure_padding_invalid.load(Ordering::Relaxed) } + pub fn get_desync_total(&self) -> u64 { + self.desync_total.load(Ordering::Relaxed) + } + pub fn get_desync_full_logged(&self) -> u64 { + self.desync_full_logged.load(Ordering::Relaxed) + } + pub fn get_desync_suppressed(&self) -> u64 { + self.desync_suppressed.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_0(&self) -> u64 { + self.desync_frames_bucket_0.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_1_2(&self) -> u64 { + self.desync_frames_bucket_1_2.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_3_10(&self) -> u64 { + self.desync_frames_bucket_3_10.load(Ordering::Relaxed) + } + pub fn get_desync_frames_bucket_gt_10(&self) -> u64 { + self.desync_frames_bucket_gt_10.load(Ordering::Relaxed) + } pub fn increment_user_connects(&self, user: &str) { self.user_stats.entry(user.to_string()).or_default() diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index d2bb550..479a880 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -4,8 +4,10 @@ use std::sync::Arc; use std::time::Duration; use httpdate; +use tokio::sync::watch; use tracing::{debug, info, warn}; +use crate::config::ProxyConfig; use crate::error::Result; use super::MePool; @@ -128,49 +130,126 @@ pub async fn fetch_proxy_config(url: &str) -> Result { Ok(ProxyConfigData { map, default_dc }) } -pub async fn me_config_updater(pool: Arc, rng: Arc, interval: Duration) { - let mut tick = tokio::time::interval(interval); - // skip immediate tick to avoid double-fetch right after startup - tick.tick().await; +async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &ProxyConfig) { + let mut maps_changed = false; + + // Update proxy config v4 + let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; + if let Some(cfg_v4) = cfg_v4 { + let changed = pool.update_proxy_maps(cfg_v4.map.clone(), None).await; + if let Some(dc) = cfg_v4.default_dc { + pool.default_dc + .store(dc, std::sync::atomic::Ordering::Relaxed); + } + if changed { + maps_changed = true; + info!("ME config updated (v4)"); + } else { + debug!("ME config v4 unchanged"); + } + } + + // Update proxy config v6 (optional) + let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; + if let Some(cfg_v6) = cfg_v6 { + let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + if changed { + maps_changed = true; + info!("ME config updated (v6)"); + } else { + debug!("ME config v6 unchanged"); + } + } + + if maps_changed { + let drain_timeout = if cfg.general.me_reinit_drain_timeout_secs == 0 { + None + } else { + Some(Duration::from_secs(cfg.general.me_reinit_drain_timeout_secs)) + }; + pool.zero_downtime_reinit_after_map_change(rng.as_ref(), drain_timeout) + .await; + } + + pool.reset_stun_state(); + + // Update proxy-secret + match download_proxy_secret().await { + Ok(secret) => { + if pool.update_secret(secret).await { + info!("proxy-secret updated and pool reconnect scheduled"); + } + } + Err(e) => warn!(error = %e, "proxy-secret update failed"), + } +} + +pub async fn me_config_updater( + pool: Arc, + rng: Arc, + mut config_rx: watch::Receiver>, +) { + let mut update_every_secs = config_rx + .borrow() + .general + .effective_update_every_secs() + .max(1); + let mut update_every = Duration::from_secs(update_every_secs); + let mut next_tick = tokio::time::Instant::now() + update_every; + info!(update_every_secs, "ME config updater started"); + loop { - tick.tick().await; + let sleep = tokio::time::sleep_until(next_tick); + tokio::pin!(sleep); - // Update proxy config v4 - let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; - if let Some(cfg) = cfg_v4 { - let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; - if let Some(dc) = cfg.default_dc { - pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); + tokio::select! { + _ = &mut sleep => { + let cfg = config_rx.borrow().clone(); + run_update_cycle(&pool, &rng, cfg.as_ref()).await; + let refreshed_secs = cfg.general.effective_update_every_secs().max(1); + if refreshed_secs != update_every_secs { + info!( + old_update_every_secs = update_every_secs, + new_update_every_secs = refreshed_secs, + "ME config updater interval changed" + ); + update_every_secs = refreshed_secs; + update_every = Duration::from_secs(update_every_secs); + } + next_tick = tokio::time::Instant::now() + update_every; } - if changed { - info!("ME config updated (v4), reconciling connections"); - pool.reconcile_connections(&rng).await; - } else { - debug!("ME config v4 unchanged"); - } - } + changed = config_rx.changed() => { + if changed.is_err() { + warn!("ME config updater stopped: config channel closed"); + break; + } + let cfg = config_rx.borrow().clone(); + let new_secs = cfg.general.effective_update_every_secs().max(1); + if new_secs == update_every_secs { + continue; + } - // Update proxy config v6 (optional) - let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; - if let Some(cfg_v6) = cfg_v6 { - let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; - if changed { - info!("ME config updated (v6), reconciling connections"); - pool.reconcile_connections(&rng).await; - } else { - debug!("ME config v6 unchanged"); - } - } - pool.reset_stun_state(); - - // Update proxy-secret - match download_proxy_secret().await { - Ok(secret) => { - if pool.update_secret(secret).await { - info!("proxy-secret updated and pool reconnect scheduled"); + if new_secs < update_every_secs { + info!( + old_update_every_secs = update_every_secs, + new_update_every_secs = new_secs, + "ME config updater interval decreased, running immediate refresh" + ); + update_every_secs = new_secs; + update_every = Duration::from_secs(update_every_secs); + run_update_cycle(&pool, &rng, cfg.as_ref()).await; + next_tick = tokio::time::Instant::now() + update_every; + } else { + info!( + old_update_every_secs = update_every_secs, + new_update_every_secs = new_secs, + "ME config updater interval increased" + ); + update_every_secs = new_secs; + update_every = Duration::from_secs(update_every_secs); + next_tick = tokio::time::Instant::now() + update_every; } } - Err(e) => warn!(error = %e, "proxy-secret update failed"), } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 8faeabf..bd7c9cc 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; @@ -178,7 +178,6 @@ impl MePool { } pub async fn reconcile_connections(self: &Arc, rng: &SecureRandom) { - use std::collections::HashSet; let writers = self.writers.read().await; let current: HashSet = writers .iter() @@ -210,6 +209,103 @@ impl MePool { } } + async fn desired_dc_endpoints(&self) -> HashMap> { + let mut out: HashMap> = HashMap::new(); + + if self.decision.ipv4_me { + let map_v4 = self.proxy_map_v4.read().await.clone(); + for (dc, addrs) in map_v4 { + let entry = out.entry(dc.abs()).or_default(); + for (ip, port) in addrs { + entry.insert(SocketAddr::new(ip, port)); + } + } + } + + if self.decision.ipv6_me { + let map_v6 = self.proxy_map_v6.read().await.clone(); + for (dc, addrs) in map_v6 { + let entry = out.entry(dc.abs()).or_default(); + for (ip, port) in addrs { + entry.insert(SocketAddr::new(ip, port)); + } + } + } + + out + } + + pub async fn zero_downtime_reinit_after_map_change( + self: &Arc, + rng: &SecureRandom, + drain_timeout: Option, + ) { + // Stage 1: prewarm writers for new endpoint maps before draining old ones. + self.reconcile_connections(rng).await; + + let desired_by_dc = self.desired_dc_endpoints().await; + if desired_by_dc.is_empty() { + warn!("ME endpoint map is empty after update; skipping stale writer drain"); + return; + } + + let writers = self.writers.read().await; + let active_writer_addrs: HashSet = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .map(|w| w.addr) + .collect(); + + let mut missing_dc = Vec::::new(); + for (dc, endpoints) in &desired_by_dc { + if endpoints.is_empty() { + continue; + } + if !endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) { + missing_dc.push(*dc); + } + } + + if !missing_dc.is_empty() { + missing_dc.sort_unstable(); + warn!( + missing_dc = ?missing_dc, + // Keep stale writers alive when fresh coverage is incomplete. + "ME reinit coverage incomplete after map update; keeping stale writers" + ); + return; + } + + let desired_addrs: HashSet = desired_by_dc + .values() + .flat_map(|set| set.iter().copied()) + .collect(); + + let stale_writer_ids: Vec = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .filter(|w| !desired_addrs.contains(&w.addr)) + .map(|w| w.id) + .collect(); + drop(writers); + + if stale_writer_ids.is_empty() { + debug!("ME map update completed with no stale writers"); + return; + } + + let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0); + info!( + stale_writers = stale_writer_ids.len(), + drain_timeout_secs, + "ME map update covered; draining stale writers" + ); + for writer_id in stale_writer_ids { + self.mark_writer_draining_with_timeout(writer_id, drain_timeout) + .await; + } + } + pub async fn update_proxy_maps( &self, new_v4: HashMap>, @@ -631,23 +727,40 @@ impl MePool { self.registry.writer_lost(writer_id).await } - pub(crate) async fn mark_writer_draining(self: &Arc, writer_id: u64) { - { + pub(crate) async fn mark_writer_draining_with_timeout( + self: &Arc, + writer_id: u64, + timeout: Option, + ) { + let timeout = timeout.filter(|d| !d.is_zero()); + let found = { let mut ws = self.writers.write().await; if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) { w.draining.store(true, Ordering::Relaxed); + true + } else { + false } + }; + + if !found { + return; } + let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0); + debug!(writer_id, timeout_secs, "ME writer marked draining"); + let pool = Arc::downgrade(self); tokio::spawn(async move { - let deadline = Instant::now() + Duration::from_secs(300); + let deadline = timeout.map(|t| Instant::now() + t); loop { if let Some(p) = pool.upgrade() { - if Instant::now() >= deadline { - warn!(writer_id, "Drain timeout, force-closing"); - let _ = p.remove_writer_and_close_clients(writer_id).await; - break; + if let Some(deadline_at) = deadline { + if Instant::now() >= deadline_at { + warn!(writer_id, "Drain timeout, force-closing"); + let _ = p.remove_writer_and_close_clients(writer_id).await; + break; + } } if p.registry.is_writer_empty(writer_id).await { let _ = p.remove_writer_only(writer_id).await; @@ -661,6 +774,11 @@ impl MePool { }); } + pub(crate) async fn mark_writer_draining(self: &Arc, writer_id: u64) { + self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300))) + .await; + } + } fn hex_dump(data: &[u8]) -> String {