diff --git a/src/cli.rs b/src/cli.rs index 7e31f26..3525a22 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -195,6 +195,8 @@ fast_mode = true use_middle_proxy = false log_level = "normal" desync_all_full = false +update_every = 43200 +me_reinit_drain_timeout_secs = 300 [network] ipv4 = true diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 5216e29..6c3e60d 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -171,6 +171,14 @@ pub(crate) fn default_proxy_config_reload_secs() -> u64 { 12 * 60 * 60 } +pub(crate) fn default_update_every_secs() -> u64 { + 12 * 60 * 60 +} + +pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { + 300 +} + pub(crate) fn default_ntp_check() -> bool { true } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 56cfa0f..5c7263f 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -11,6 +11,8 @@ //! | `general` | `middle_proxy_pool_size` | Passed on next connection | //! | `general` | `me_keepalive_*` | Passed on next connection | //! | `general` | `desync_all_full` | Applied immediately | +//! | `general` | `update_every` | Applied to ME updater immediately | +//! | `general` | `me_reinit_drain_timeout_secs`| Applied on next ME map update | //! | `access` | All user/quota fields | Effective immediately | //! //! Fields that require re-binding sockets (`server.port`, `censorship.*`, @@ -36,6 +38,8 @@ pub struct HotFields { pub ad_tag: Option, pub middle_proxy_pool_size: usize, pub desync_all_full: bool, + pub update_every_secs: u64, + pub me_reinit_drain_timeout_secs: u64, pub me_keepalive_enabled: bool, pub me_keepalive_interval_secs: u64, pub me_keepalive_jitter_secs: u64, @@ -50,6 +54,8 @@ impl HotFields { ad_tag: cfg.general.ad_tag.clone(), middle_proxy_pool_size: cfg.general.middle_proxy_pool_size, desync_all_full: cfg.general.desync_all_full, + update_every_secs: cfg.general.effective_update_every_secs(), + me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_keepalive_enabled: cfg.general.me_keepalive_enabled, me_keepalive_interval_secs: cfg.general.me_keepalive_interval_secs, me_keepalive_jitter_secs: cfg.general.me_keepalive_jitter_secs, @@ -185,6 +191,20 @@ fn log_changes( ); } + if old_hot.update_every_secs != new_hot.update_every_secs { + info!( + "config reload: update_every(effective): {}s → {}s", + old_hot.update_every_secs, new_hot.update_every_secs, + ); + } + + if old_hot.me_reinit_drain_timeout_secs != new_hot.me_reinit_drain_timeout_secs { + info!( + "config reload: me_reinit_drain_timeout_secs: {}s → {}s", + old_hot.me_reinit_drain_timeout_secs, new_hot.me_reinit_drain_timeout_secs, + ); + } + if old_hot.me_keepalive_enabled != new_hot.me_keepalive_enabled || old_hot.me_keepalive_interval_secs != new_hot.me_keepalive_interval_secs || old_hot.me_keepalive_jitter_secs != new_hot.me_keepalive_jitter_secs diff --git a/src/config/load.rs b/src/config/load.rs index 827687a..5a8b8a5 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -117,6 +117,34 @@ impl ProxyConfig { let mut config: ProxyConfig = toml::from_str(&processed).map_err(|e| ProxyError::Config(e.to_string()))?; + if let Some(update_every) = config.general.update_every { + if update_every == 0 { + return Err(ProxyError::Config( + "general.update_every must be > 0".to_string(), + )); + } + } else { + let legacy_secret = config.general.proxy_secret_auto_reload_secs; + let legacy_config = config.general.proxy_config_auto_reload_secs; + let effective = legacy_secret.min(legacy_config); + if effective == 0 { + return Err(ProxyError::Config( + "legacy proxy_*_auto_reload_secs values must be > 0 when general.update_every is not set".to_string(), + )); + } + + if legacy_secret != default_proxy_secret_reload_secs() + || legacy_config != default_proxy_config_reload_secs() + { + warn!( + proxy_secret_auto_reload_secs = legacy_secret, + proxy_config_auto_reload_secs = legacy_config, + effective_update_every_secs = effective, + "proxy_*_auto_reload_secs are deprecated; set general.update_every" + ); + } + } + // Validate secrets. for (user, secret) in &config.access.users { if !secret.chars().all(|c| c.is_ascii_hexdigit()) || secret.len() != 32 { diff --git a/src/config/types.rs b/src/config/types.rs index 39ba683..54a20f3 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -257,11 +257,23 @@ pub struct GeneralConfig { #[serde(default = "default_fast_mode_min_tls_record")] pub fast_mode_min_tls_record: usize, - /// Automatically reload proxy-secret every N seconds. + /// Unified ME updater interval in seconds for getProxyConfig/getProxyConfigV6/getProxySecret. + /// When omitted, effective value falls back to legacy proxy_*_auto_reload_secs fields. + #[serde(default)] + pub update_every: Option, + + /// Drain timeout in seconds for stale ME writers after endpoint map changes. + /// Set to 0 to keep stale writers draining indefinitely (no force-close). + #[serde(default = "default_me_reinit_drain_timeout_secs")] + pub me_reinit_drain_timeout_secs: u64, + + /// Deprecated legacy setting; kept for backward compatibility fallback. + /// Use `update_every` instead. #[serde(default = "default_proxy_secret_reload_secs")] pub proxy_secret_auto_reload_secs: u64, - /// Automatically reload proxy-multi.conf every N seconds. + /// Deprecated legacy setting; kept for backward compatibility fallback. + /// Use `update_every` instead. #[serde(default = "default_proxy_config_reload_secs")] pub proxy_config_auto_reload_secs: u64, @@ -317,6 +329,8 @@ impl Default for GeneralConfig { max_client_frame: default_max_client_frame(), desync_all_full: default_desync_all_full(), fast_mode_min_tls_record: default_fast_mode_min_tls_record(), + update_every: Some(default_update_every_secs()), + me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(), proxy_secret_auto_reload_secs: default_proxy_secret_reload_secs(), proxy_config_auto_reload_secs: default_proxy_config_reload_secs(), ntp_check: default_ntp_check(), @@ -327,6 +341,13 @@ impl Default for GeneralConfig { } } +impl GeneralConfig { + pub fn effective_update_every_secs(&self) -> u64 { + self.update_every + .unwrap_or_else(|| self.proxy_secret_auto_reload_secs.min(self.proxy_config_auto_reload_secs)) + } +} + /// `[general.links]` — proxy link generation settings. #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct LinksConfig { diff --git a/src/main.rs b/src/main.rs index 61debb9..af1a069 100644 --- a/src/main.rs +++ b/src/main.rs @@ -392,18 +392,6 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai .await; }); - // Periodic updater: getProxyConfig + proxy-secret - let pool_clone2 = pool.clone(); - let rng_clone2 = rng.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_config_updater( - pool_clone2, - rng_clone2, - std::time::Duration::from_secs(12 * 3600), - ) - .await; - }); - Some(pool) } Err(e) => { @@ -702,6 +690,20 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai detected_ip_v6, ); + if let Some(ref pool) = me_pool { + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let config_rx_clone = config_rx.clone(); + tokio::spawn(async move { + crate::transport::middle_proxy::me_config_updater( + pool_clone, + rng_clone, + config_rx_clone, + ) + .await; + }); + } + let mut listeners = Vec::new(); for listener_conf in &config.server.listeners { diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index d2bb550..479a880 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -4,8 +4,10 @@ use std::sync::Arc; use std::time::Duration; use httpdate; +use tokio::sync::watch; use tracing::{debug, info, warn}; +use crate::config::ProxyConfig; use crate::error::Result; use super::MePool; @@ -128,49 +130,126 @@ pub async fn fetch_proxy_config(url: &str) -> Result { Ok(ProxyConfigData { map, default_dc }) } -pub async fn me_config_updater(pool: Arc, rng: Arc, interval: Duration) { - let mut tick = tokio::time::interval(interval); - // skip immediate tick to avoid double-fetch right after startup - tick.tick().await; +async fn run_update_cycle(pool: &Arc, rng: &Arc, cfg: &ProxyConfig) { + let mut maps_changed = false; + + // Update proxy config v4 + let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; + if let Some(cfg_v4) = cfg_v4 { + let changed = pool.update_proxy_maps(cfg_v4.map.clone(), None).await; + if let Some(dc) = cfg_v4.default_dc { + pool.default_dc + .store(dc, std::sync::atomic::Ordering::Relaxed); + } + if changed { + maps_changed = true; + info!("ME config updated (v4)"); + } else { + debug!("ME config v4 unchanged"); + } + } + + // Update proxy config v6 (optional) + let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; + if let Some(cfg_v6) = cfg_v6 { + let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; + if changed { + maps_changed = true; + info!("ME config updated (v6)"); + } else { + debug!("ME config v6 unchanged"); + } + } + + if maps_changed { + let drain_timeout = if cfg.general.me_reinit_drain_timeout_secs == 0 { + None + } else { + Some(Duration::from_secs(cfg.general.me_reinit_drain_timeout_secs)) + }; + pool.zero_downtime_reinit_after_map_change(rng.as_ref(), drain_timeout) + .await; + } + + pool.reset_stun_state(); + + // Update proxy-secret + match download_proxy_secret().await { + Ok(secret) => { + if pool.update_secret(secret).await { + info!("proxy-secret updated and pool reconnect scheduled"); + } + } + Err(e) => warn!(error = %e, "proxy-secret update failed"), + } +} + +pub async fn me_config_updater( + pool: Arc, + rng: Arc, + mut config_rx: watch::Receiver>, +) { + let mut update_every_secs = config_rx + .borrow() + .general + .effective_update_every_secs() + .max(1); + let mut update_every = Duration::from_secs(update_every_secs); + let mut next_tick = tokio::time::Instant::now() + update_every; + info!(update_every_secs, "ME config updater started"); + loop { - tick.tick().await; + let sleep = tokio::time::sleep_until(next_tick); + tokio::pin!(sleep); - // Update proxy config v4 - let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await; - if let Some(cfg) = cfg_v4 { - let changed = pool.update_proxy_maps(cfg.map.clone(), None).await; - if let Some(dc) = cfg.default_dc { - pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed); + tokio::select! { + _ = &mut sleep => { + let cfg = config_rx.borrow().clone(); + run_update_cycle(&pool, &rng, cfg.as_ref()).await; + let refreshed_secs = cfg.general.effective_update_every_secs().max(1); + if refreshed_secs != update_every_secs { + info!( + old_update_every_secs = update_every_secs, + new_update_every_secs = refreshed_secs, + "ME config updater interval changed" + ); + update_every_secs = refreshed_secs; + update_every = Duration::from_secs(update_every_secs); + } + next_tick = tokio::time::Instant::now() + update_every; } - if changed { - info!("ME config updated (v4), reconciling connections"); - pool.reconcile_connections(&rng).await; - } else { - debug!("ME config v4 unchanged"); - } - } + changed = config_rx.changed() => { + if changed.is_err() { + warn!("ME config updater stopped: config channel closed"); + break; + } + let cfg = config_rx.borrow().clone(); + let new_secs = cfg.general.effective_update_every_secs().max(1); + if new_secs == update_every_secs { + continue; + } - // Update proxy config v6 (optional) - let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await; - if let Some(cfg_v6) = cfg_v6 { - let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await; - if changed { - info!("ME config updated (v6), reconciling connections"); - pool.reconcile_connections(&rng).await; - } else { - debug!("ME config v6 unchanged"); - } - } - pool.reset_stun_state(); - - // Update proxy-secret - match download_proxy_secret().await { - Ok(secret) => { - if pool.update_secret(secret).await { - info!("proxy-secret updated and pool reconnect scheduled"); + if new_secs < update_every_secs { + info!( + old_update_every_secs = update_every_secs, + new_update_every_secs = new_secs, + "ME config updater interval decreased, running immediate refresh" + ); + update_every_secs = new_secs; + update_every = Duration::from_secs(update_every_secs); + run_update_cycle(&pool, &rng, cfg.as_ref()).await; + next_tick = tokio::time::Instant::now() + update_every; + } else { + info!( + old_update_every_secs = update_every_secs, + new_update_every_secs = new_secs, + "ME config updater interval increased" + ); + update_every_secs = new_secs; + update_every = Duration::from_secs(update_every_secs); + next_tick = tokio::time::Instant::now() + update_every; } } - Err(e) => warn!(error = %e, "proxy-secret update failed"), } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 8faeabf..858d4bf 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering}; @@ -178,7 +178,6 @@ impl MePool { } pub async fn reconcile_connections(self: &Arc, rng: &SecureRandom) { - use std::collections::HashSet; let writers = self.writers.read().await; let current: HashSet = writers .iter() @@ -210,6 +209,101 @@ impl MePool { } } + async fn desired_dc_endpoints(&self) -> HashMap> { + let mut out: HashMap> = HashMap::new(); + + if self.decision.ipv4_me { + let map_v4 = self.proxy_map_v4.read().await.clone(); + for (dc, addrs) in map_v4 { + let entry = out.entry(dc.abs()).or_default(); + for (ip, port) in addrs { + entry.insert(SocketAddr::new(ip, port)); + } + } + } + + if self.decision.ipv6_me { + let map_v6 = self.proxy_map_v6.read().await.clone(); + for (dc, addrs) in map_v6 { + let entry = out.entry(dc.abs()).or_default(); + for (ip, port) in addrs { + entry.insert(SocketAddr::new(ip, port)); + } + } + } + + out + } + + pub async fn zero_downtime_reinit_after_map_change( + self: &Arc, + rng: &SecureRandom, + drain_timeout: Option, + ) { + self.reconcile_connections(rng).await; + + let desired_by_dc = self.desired_dc_endpoints().await; + if desired_by_dc.is_empty() { + warn!("ME endpoint map is empty after update; skipping stale writer drain"); + return; + } + + let writers = self.writers.read().await; + let active_writer_addrs: HashSet = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .map(|w| w.addr) + .collect(); + + let mut missing_dc = Vec::::new(); + for (dc, endpoints) in &desired_by_dc { + if endpoints.is_empty() { + continue; + } + if !endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) { + missing_dc.push(*dc); + } + } + + if !missing_dc.is_empty() { + missing_dc.sort_unstable(); + warn!( + missing_dc = ?missing_dc, + "ME reinit coverage incomplete after map update; keeping stale writers" + ); + return; + } + + let desired_addrs: HashSet = desired_by_dc + .values() + .flat_map(|set| set.iter().copied()) + .collect(); + + let stale_writer_ids: Vec = writers + .iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .filter(|w| !desired_addrs.contains(&w.addr)) + .map(|w| w.id) + .collect(); + drop(writers); + + if stale_writer_ids.is_empty() { + debug!("ME map update completed with no stale writers"); + return; + } + + let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0); + info!( + stale_writers = stale_writer_ids.len(), + drain_timeout_secs, + "ME map update covered; draining stale writers" + ); + for writer_id in stale_writer_ids { + self.mark_writer_draining_with_timeout(writer_id, drain_timeout) + .await; + } + } + pub async fn update_proxy_maps( &self, new_v4: HashMap>, @@ -631,23 +725,40 @@ impl MePool { self.registry.writer_lost(writer_id).await } - pub(crate) async fn mark_writer_draining(self: &Arc, writer_id: u64) { - { + pub(crate) async fn mark_writer_draining_with_timeout( + self: &Arc, + writer_id: u64, + timeout: Option, + ) { + let timeout = timeout.filter(|d| !d.is_zero()); + let found = { let mut ws = self.writers.write().await; if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) { w.draining.store(true, Ordering::Relaxed); + true + } else { + false } + }; + + if !found { + return; } + let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0); + debug!(writer_id, timeout_secs, "ME writer marked draining"); + let pool = Arc::downgrade(self); tokio::spawn(async move { - let deadline = Instant::now() + Duration::from_secs(300); + let deadline = timeout.map(|t| Instant::now() + t); loop { if let Some(p) = pool.upgrade() { - if Instant::now() >= deadline { - warn!(writer_id, "Drain timeout, force-closing"); - let _ = p.remove_writer_and_close_clients(writer_id).await; - break; + if let Some(deadline_at) = deadline { + if Instant::now() >= deadline_at { + warn!(writer_id, "Drain timeout, force-closing"); + let _ = p.remove_writer_and_close_clients(writer_id).await; + break; + } } if p.registry.is_writer_empty(writer_id).await { let _ = p.remove_writer_only(writer_id).await; @@ -661,6 +772,11 @@ impl MePool { }); } + pub(crate) async fn mark_writer_draining(self: &Arc, writer_id: u64) { + self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300))) + .await; + } + } fn hex_dump(data: &[u8]) -> String {