diff --git a/src/config/defaults.rs b/src/config/defaults.rs index b9b0da1..6b80ede 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -182,6 +182,26 @@ pub(crate) fn default_update_every_secs() -> u64 { 30 * 60 } +pub(crate) fn default_me_config_stable_snapshots() -> u8 { + 2 +} + +pub(crate) fn default_me_config_apply_cooldown_secs() -> u64 { + 300 +} + +pub(crate) fn default_proxy_secret_stable_snapshots() -> u8 { + 2 +} + +pub(crate) fn default_proxy_secret_rotate_runtime() -> bool { + true +} + +pub(crate) fn default_proxy_secret_len_max() -> usize { + 256 +} + pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { 120 } diff --git a/src/config/load.rs b/src/config/load.rs index 750e0dc..be34efa 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -147,6 +147,24 @@ impl ProxyConfig { } } + if config.general.me_config_stable_snapshots == 0 { + return Err(ProxyError::Config( + "general.me_config_stable_snapshots must be > 0".to_string(), + )); + } + + if config.general.proxy_secret_stable_snapshots == 0 { + return Err(ProxyError::Config( + "general.proxy_secret_stable_snapshots must be > 0".to_string(), + )); + } + + if !(32..=4096).contains(&config.general.proxy_secret_len_max) { + return Err(ProxyError::Config( + "general.proxy_secret_len_max must be within [32, 4096]".to_string(), + )); + } + if !(0.0..=1.0).contains(&config.general.me_pool_min_fresh_ratio) { return Err(ProxyError::Config( "general.me_pool_min_fresh_ratio must be within [0.0, 1.0]".to_string(), @@ -462,6 +480,66 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn me_config_stable_snapshots_zero_is_rejected() { + let toml = r#" + [general] + me_config_stable_snapshots = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_me_config_stable_snapshots_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.me_config_stable_snapshots must be > 0")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn proxy_secret_stable_snapshots_zero_is_rejected() { + let toml = r#" + [general] + proxy_secret_stable_snapshots = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_proxy_secret_stable_snapshots_zero_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.proxy_secret_stable_snapshots must be > 0")); + let _ = std::fs::remove_file(path); + } + + #[test] + fn proxy_secret_len_max_out_of_range_is_rejected() { + let toml = r#" + [general] + proxy_secret_len_max = 16 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_proxy_secret_len_max_out_of_range_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.proxy_secret_len_max must be within [32, 4096]")); + let _ = std::fs::remove_file(path); + } + #[test] fn me_pool_min_fresh_ratio_out_of_range_is_rejected() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index c9ceea4..bd9697e 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -267,6 +267,26 @@ pub struct GeneralConfig { #[serde(default)] pub update_every: Option, + /// Number of identical getProxyConfig snapshots required before applying ME map updates. + #[serde(default = "default_me_config_stable_snapshots")] + pub me_config_stable_snapshots: u8, + + /// Cooldown in seconds between applied ME map updates. + #[serde(default = "default_me_config_apply_cooldown_secs")] + pub me_config_apply_cooldown_secs: u64, + + /// Number of identical getProxySecret snapshots required before runtime secret rotation. + #[serde(default = "default_proxy_secret_stable_snapshots")] + pub proxy_secret_stable_snapshots: u8, + + /// Enable runtime proxy-secret rotation from getProxySecret. + #[serde(default = "default_proxy_secret_rotate_runtime")] + pub proxy_secret_rotate_runtime: bool, + + /// Maximum allowed proxy-secret length in bytes for startup and runtime refresh. + #[serde(default = "default_proxy_secret_len_max")] + pub proxy_secret_len_max: usize, + /// Drain-TTL in seconds for stale ME writers after endpoint map changes. /// During TTL, stale writers may be used only as fallback for new bindings. #[serde(default = "default_me_pool_drain_ttl_secs")] @@ -346,6 +366,11 @@ impl Default for GeneralConfig { hardswap: default_hardswap(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), update_every: Some(default_update_every_secs()), + me_config_stable_snapshots: default_me_config_stable_snapshots(), + me_config_apply_cooldown_secs: default_me_config_apply_cooldown_secs(), + proxy_secret_stable_snapshots: default_proxy_secret_stable_snapshots(), + proxy_secret_rotate_runtime: default_proxy_secret_rotate_runtime(), + proxy_secret_len_max: default_proxy_secret_len_max(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(), me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(), diff --git a/src/main.rs b/src/main.rs index 7264239..1c7b39c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -298,25 +298,30 @@ async fn main() -> std::result::Result<(), Box> { // proxy-secret is from: https://core.telegram.org/getProxySecret // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); -match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await { - Ok(proxy_secret) => { - info!( - secret_len = proxy_secret.len(), - key_sig = format_args!( - "0x{:08x}", - if proxy_secret.len() >= 4 { - u32::from_le_bytes([ - proxy_secret[0], - proxy_secret[1], - proxy_secret[2], - proxy_secret[3], - ]) - } else { - 0 - } - ), - "Proxy-secret loaded" - ); + match crate::transport::middle_proxy::fetch_proxy_secret( + proxy_secret_path, + config.general.proxy_secret_len_max, + ) + .await + { + Ok(proxy_secret) => { + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); // Load ME config (v4/v6) + default DC let mut cfg_v4 = fetch_proxy_config( diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 56d5b81..fc9ed3d 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::net::IpAddr; use std::sync::Arc; use std::time::Duration; @@ -11,7 +12,7 @@ use crate::config::ProxyConfig; use crate::error::Result; use super::MePool; -use super::secret::download_proxy_secret; +use super::secret::download_proxy_secret_with_max_len; use crate::crypto::SecureRandom; use std::time::SystemTime; @@ -39,6 +40,92 @@ pub struct ProxyConfigData { pub default_dc: Option, } +#[derive(Debug, Default)] +struct StableSnapshot { + candidate_hash: Option, + candidate_hits: u8, + applied_hash: Option, +} + +impl StableSnapshot { + fn observe(&mut self, hash: u64) -> u8 { + if self.candidate_hash == Some(hash) { + self.candidate_hits = self.candidate_hits.saturating_add(1); + } else { + self.candidate_hash = Some(hash); + self.candidate_hits = 1; + } + self.candidate_hits + } + + fn is_applied(&self, hash: u64) -> bool { + self.applied_hash == Some(hash) + } + + fn mark_applied(&mut self, hash: u64) { + self.applied_hash = Some(hash); + } +} + +#[derive(Debug, Default)] +struct UpdaterState { + config_v4: StableSnapshot, + config_v6: StableSnapshot, + secret: StableSnapshot, + last_map_apply_at: Option, +} + +fn hash_proxy_config(cfg: &ProxyConfigData) -> u64 { + let mut hasher = DefaultHasher::new(); + cfg.default_dc.hash(&mut hasher); + + let mut by_dc: Vec<(i32, Vec<(IpAddr, u16)>)> = + cfg.map.iter().map(|(dc, addrs)| (*dc, addrs.clone())).collect(); + by_dc.sort_by_key(|(dc, _)| *dc); + for (dc, mut addrs) in by_dc { + dc.hash(&mut hasher); + addrs.sort_unstable(); + for (ip, port) in addrs { + ip.hash(&mut hasher); + port.hash(&mut hasher); + } + } + + hasher.finish() +} + +fn hash_secret(secret: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + secret.hash(&mut hasher); + hasher.finish() +} + +fn map_apply_cooldown_ready( + last_applied: Option, + cooldown: Duration, +) -> bool { + if cooldown.is_zero() { + return true; + } + match last_applied { + Some(ts) => ts.elapsed() >= cooldown, + None => true, + } +} + +fn map_apply_cooldown_remaining_secs( + last_applied: tokio::time::Instant, + cooldown: Duration, +) -> u64 { + if cooldown.is_zero() { + return 0; + } + cooldown + .checked_sub(last_applied.elapsed()) + .map(|d| d.as_secs()) + .unwrap_or(0) +} + fn parse_host_port(s: &str) -> Option<(IpAddr, u16)> { if let Some(bracket_end) = s.rfind(']') && s.starts_with('[') @@ -130,7 +217,12 @@ pub async fn fetch_proxy_config(url: &str) -> Result { Ok(ProxyConfigData { map, default_dc }) } -async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &ProxyConfig) { +async fn run_update_cycle( + pool: &Arc, + rng: &Arc, + cfg: &ProxyConfig, + state: &mut UpdaterState, +) { pool.update_runtime_reinit_policy( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, @@ -138,33 +230,93 @@ async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &Pro cfg.general.me_pool_min_fresh_ratio, ); + let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); + let required_secret_snapshots = cfg.general.proxy_secret_stable_snapshots.max(1); + let apply_cooldown = Duration::from_secs(cfg.general.me_config_apply_cooldown_secs); let mut maps_changed = false; - // Update proxy config v4 + let mut ready_v4: Option<(ProxyConfigData, u64)> = None; let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; if let Some(cfg_v4) = cfg_v4 { - let changed = pool.update_proxy_maps(cfg_v4.map.clone(), None).await; - if let Some(dc) = cfg_v4.default_dc { - pool.default_dc - .store(dc, std::sync::atomic::Ordering::Relaxed); - } - if changed { - maps_changed = true; - info!("ME config updated (v4)"); + let cfg_v4_hash = hash_proxy_config(&cfg_v4); + let stable_hits = state.config_v4.observe(cfg_v4_hash); + if stable_hits < required_cfg_snapshots { + debug!( + stable_hits, + required_cfg_snapshots, + snapshot = format_args!("0x{cfg_v4_hash:016x}"), + "ME config v4 candidate observed" + ); + } else if state.config_v4.is_applied(cfg_v4_hash) { + debug!( + snapshot = format_args!("0x{cfg_v4_hash:016x}"), + "ME config v4 stable snapshot already applied" + ); } else { - debug!("ME config v4 unchanged"); + ready_v4 = Some((cfg_v4, cfg_v4_hash)); } } - // Update proxy config v6 (optional) + let mut ready_v6: Option<(ProxyConfigData, u64)> = None; let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; if let Some(cfg_v6) = cfg_v6 { - let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; - if changed { - maps_changed = true; - info!("ME config updated (v6)"); + let cfg_v6_hash = hash_proxy_config(&cfg_v6); + let stable_hits = state.config_v6.observe(cfg_v6_hash); + if stable_hits < required_cfg_snapshots { + debug!( + stable_hits, + required_cfg_snapshots, + snapshot = format_args!("0x{cfg_v6_hash:016x}"), + "ME config v6 candidate observed" + ); + } else if state.config_v6.is_applied(cfg_v6_hash) { + debug!( + snapshot = format_args!("0x{cfg_v6_hash:016x}"), + "ME config v6 stable snapshot already applied" + ); } else { - debug!("ME config v6 unchanged"); + ready_v6 = Some((cfg_v6, cfg_v6_hash)); + } + } + + if ready_v4.is_some() || ready_v6.is_some() { + if map_apply_cooldown_ready(state.last_map_apply_at, apply_cooldown) { + let update_v4 = ready_v4 + .as_ref() + .map(|(snapshot, _)| snapshot.map.clone()) + .unwrap_or_default(); + let update_v6 = ready_v6 + .as_ref() + .map(|(snapshot, _)| snapshot.map.clone()); + + let changed = pool.update_proxy_maps(update_v4, update_v6).await; + + if let Some((snapshot, hash)) = ready_v4 { + if let Some(dc) = snapshot.default_dc { + pool.default_dc + .store(dc, std::sync::atomic::Ordering::Relaxed); + } + state.config_v4.mark_applied(hash); + } + + if let Some((_snapshot, hash)) = ready_v6 { + state.config_v6.mark_applied(hash); + } + + state.last_map_apply_at = Some(tokio::time::Instant::now()); + + if changed { + maps_changed = true; + info!("ME config update applied after stable-gate"); + } else { + debug!("ME config stable-gate applied with no map delta"); + } + } else if let Some(last) = state.last_map_apply_at { + let wait_secs = map_apply_cooldown_remaining_secs(last, apply_cooldown); + debug!( + wait_secs, + "ME config stable snapshot deferred by cooldown" + ); } } @@ -175,14 +327,37 @@ async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &Pro pool.reset_stun_state(); - // Update proxy-secret - match download_proxy_secret().await { - Ok(secret) => { - if pool.update_secret(secret).await { - info!("proxy-secret updated and pool reconnect scheduled"); + if cfg.general.proxy_secret_rotate_runtime { + match download_proxy_secret_with_max_len(cfg.general.proxy_secret_len_max).await { + Ok(secret) => { + let secret_hash = hash_secret(&secret); + let stable_hits = state.secret.observe(secret_hash); + if stable_hits < required_secret_snapshots { + debug!( + stable_hits, + required_secret_snapshots, + snapshot = format_args!("0x{secret_hash:016x}"), + "proxy-secret candidate observed" + ); + } else if state.secret.is_applied(secret_hash) { + debug!( + snapshot = format_args!("0x{secret_hash:016x}"), + "proxy-secret stable snapshot already applied" + ); + } else { + let rotated = pool.update_secret(secret).await; + state.secret.mark_applied(secret_hash); + if rotated { + info!("proxy-secret rotated after stable-gate"); + } else { + debug!("proxy-secret stable snapshot confirmed as unchanged"); + } + } } + Err(e) => warn!(error = %e, "proxy-secret update failed"), } - Err(e) => warn!(error = %e, "proxy-secret update failed"), + } else { + debug!("proxy-secret runtime rotation disabled by config"); } } @@ -191,6 +366,7 @@ pub async fn me_config_updater( rng: Arc, mut config_rx: watch::Receiver>, ) { + let mut state = UpdaterState::default(); let mut update_every_secs = config_rx .borrow() .general @@ -207,7 +383,7 @@ pub async fn me_config_updater( tokio::select! { _ = &mut sleep => { let cfg = config_rx.borrow().clone(); - run_update_cycle(&pool, &rng, cfg.as_ref()).await; + run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await; let refreshed_secs = cfg.general.effective_update_every_secs().max(1); if refreshed_secs != update_every_secs { info!( @@ -245,7 +421,7 @@ pub async fn me_config_updater( ); update_every_secs = new_secs; update_every = Duration::from_secs(update_every_secs); - run_update_cycle(&pool, &rng, cfg.as_ref()).await; + run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await; next_tick = tokio::time::Instant::now() + update_every; } else { info!( diff --git a/src/transport/middle_proxy/secret.rs b/src/transport/middle_proxy/secret.rs index 69a3198..4991d32 100644 --- a/src/transport/middle_proxy/secret.rs +++ b/src/transport/middle_proxy/secret.rs @@ -4,12 +4,42 @@ use httpdate; use crate::error::{ProxyError, Result}; +pub const PROXY_SECRET_MIN_LEN: usize = 32; + +pub(super) fn validate_proxy_secret_len(data_len: usize, max_len: usize) -> Result<()> { + if max_len < PROXY_SECRET_MIN_LEN { + return Err(ProxyError::Proxy(format!( + "proxy-secret max length is invalid: {} bytes (must be >= {})", + max_len, + PROXY_SECRET_MIN_LEN + ))); + } + + if data_len < PROXY_SECRET_MIN_LEN { + return Err(ProxyError::Proxy(format!( + "proxy-secret too short: {} bytes (need >= {})", + data_len, + PROXY_SECRET_MIN_LEN + ))); + } + + if data_len > max_len { + return Err(ProxyError::Proxy(format!( + "proxy-secret too long: {} bytes (limit = {})", + data_len, + max_len + ))); + } + + Ok(()) +} + /// Fetch Telegram proxy-secret binary. -pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result> { +pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Result> { let cache = cache_path.unwrap_or("proxy-secret"); // 1) Try fresh download first. - match download_proxy_secret().await { + match download_proxy_secret_with_max_len(max_len).await { Ok(data) => { if let Err(e) = tokio::fs::write(cache, &data).await { warn!(error = %e, "Failed to cache proxy-secret (non-fatal)"); @@ -24,9 +54,9 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result> { } } - // 2) Fallback to cache/file regardless of age; require len>=32. + // 2) Fallback to cache/file regardless of age; require len in bounds. match tokio::fs::read(cache).await { - Ok(data) if data.len() >= 32 => { + Ok(data) if validate_proxy_secret_len(data.len(), max_len).is_ok() => { let age_hours = tokio::fs::metadata(cache) .await .ok() @@ -41,17 +71,14 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result> { ); Ok(data) } - Ok(data) => Err(ProxyError::Proxy(format!( - "Cached proxy-secret too short: {} bytes (need >= 32)", - data.len() - ))), + Ok(data) => validate_proxy_secret_len(data.len(), max_len).map(|_| data), Err(e) => Err(ProxyError::Proxy(format!( "Failed to read proxy-secret cache after download failure: {e}" ))), } } -pub async fn download_proxy_secret() -> Result> { +pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result> { let resp = reqwest::get("https://core.telegram.org/getProxySecret") .await .map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?; @@ -84,12 +111,7 @@ pub async fn download_proxy_secret() -> Result> { .map_err(|e| ProxyError::Proxy(format!("Read proxy-secret body: {e}")))? .to_vec(); - if data.len() < 32 { - return Err(ProxyError::Proxy(format!( - "proxy-secret too short: {} bytes (need >= 32)", - data.len() - ))); - } + validate_proxy_secret_len(data.len(), max_len)?; info!(len = data.len(), "Downloaded proxy-secret OK"); Ok(data)