From 8066ea2163e8ae6720cce2151366a47b79357221 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:31:36 +0300 Subject: [PATCH 1/5] ME Pool Init fixes --- src/config/defaults.rs | 8 + src/config/hot_reload.rs | 6 + src/config/load.rs | 68 +++ src/config/types.rs | 10 + src/main.rs | 450 ++++++++++++------- src/transport/middle_proxy/config_updater.rs | 138 +++--- src/transport/middle_proxy/mod.rs | 6 +- 7 files changed, 461 insertions(+), 225 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 15be561..4b94be6 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -141,6 +141,14 @@ pub(crate) fn default_proxy_secret_path() -> Option { Some("proxy-secret".to_string()) } +pub(crate) fn default_proxy_config_v4_cache_path() -> Option { + Some("cache/proxy-config-v4.txt".to_string()) +} + +pub(crate) fn default_proxy_config_v6_cache_path() -> Option { + Some("cache/proxy-config-v6.txt".to_string()) +} + pub(crate) fn default_middle_proxy_nat_stun() -> Option { None } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 14939a0..97d5e4e 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -405,6 +405,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b warned = true; warn!("config reload: general.me2dc_fallback changed; restart required"); } + if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path + || old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path + { + warned = true; + warn!("config reload: general.proxy_config_*_cache_path changed; restart required"); + } if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled || old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs || old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs diff --git a/src/config/load.rs b/src/config/load.rs index 2954f04..9abab30 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -203,6 +203,22 @@ impl ProxyConfig { sanitize_ad_tag(&mut config.general.ad_tag); + if let Some(path) = &config.general.proxy_config_v4_cache_path + && path.trim().is_empty() + { + return Err(ProxyError::Config( + "general.proxy_config_v4_cache_path cannot be empty when provided".to_string(), + )); + } + + if let Some(path) = &config.general.proxy_config_v6_cache_path + && path.trim().is_empty() + { + return Err(ProxyError::Config( + "general.proxy_config_v6_cache_path cannot be empty when provided".to_string(), + )); + } + if let Some(update_every) = config.general.update_every { if update_every == 0 { return Err(ProxyError::Config( @@ -691,6 +707,14 @@ mod tests { cfg.general.me2dc_fallback, default_me2dc_fallback() ); + assert_eq!( + cfg.general.proxy_config_v4_cache_path, + default_proxy_config_v4_cache_path() + ); + assert_eq!( + cfg.general.proxy_config_v6_cache_path, + default_proxy_config_v6_cache_path() + ); assert_eq!( cfg.general.me_single_endpoint_shadow_writers, default_me_single_endpoint_shadow_writers() @@ -801,6 +825,14 @@ mod tests { default_me_init_retry_attempts() ); assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); + assert_eq!( + general.proxy_config_v4_cache_path, + default_proxy_config_v4_cache_path() + ); + assert_eq!( + general.proxy_config_v6_cache_path, + default_proxy_config_v6_cache_path() + ); assert_eq!( general.me_single_endpoint_shadow_writers, default_me_single_endpoint_shadow_writers() @@ -1267,6 +1299,42 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn proxy_config_cache_paths_empty_are_rejected() { + let toml = r#" + [general] + proxy_config_v4_cache_path = " " + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_proxy_config_v4_cache_path_empty_test.toml"); + std::fs::write(&path, toml).unwrap(); + let err = ProxyConfig::load(&path).unwrap_err().to_string(); + assert!(err.contains("general.proxy_config_v4_cache_path cannot be empty")); + let _ = std::fs::remove_file(path); + + let toml_v6 = r#" + [general] + proxy_config_v6_cache_path = "" + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let path_v6 = dir.join("telemt_proxy_config_v6_cache_path_empty_test.toml"); + std::fs::write(&path_v6, toml_v6).unwrap(); + let err_v6 = ProxyConfig::load(&path_v6).unwrap_err().to_string(); + assert!(err_v6.contains("general.proxy_config_v6_cache_path cannot be empty")); + let _ = std::fs::remove_file(path_v6); + } + #[test] fn me_hardswap_warmup_defaults_are_set() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index a9eaeae..4c19f8a 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -343,6 +343,14 @@ pub struct GeneralConfig { #[serde(default = "default_proxy_secret_path")] pub proxy_secret_path: Option, + /// Optional path to cache raw getProxyConfig (IPv4) snapshot for startup fallback. + #[serde(default = "default_proxy_config_v4_cache_path")] + pub proxy_config_v4_cache_path: Option, + + /// Optional path to cache raw getProxyConfigV6 snapshot for startup fallback. + #[serde(default = "default_proxy_config_v6_cache_path")] + pub proxy_config_v6_cache_path: Option, + /// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags. #[serde(default)] pub ad_tag: Option, @@ -727,6 +735,8 @@ impl Default for GeneralConfig { use_middle_proxy: default_true(), ad_tag: None, proxy_secret_path: default_proxy_secret_path(), + proxy_config_v4_cache_path: default_proxy_config_v4_cache_path(), + proxy_config_v6_cache_path: default_proxy_config_v6_cache_path(), middle_proxy_nat_ip: None, middle_proxy_nat_probe: default_true(), middle_proxy_nat_stun: default_middle_proxy_nat_stun(), diff --git a/src/main.rs b/src/main.rs index fc7cf0e..1da8123 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,8 +41,9 @@ use crate::stats::telemetry::TelemetryPolicy; use crate::stats::{ReplayChecker, Stats}; use crate::stream::BufferPool; use crate::transport::middle_proxy::{ - MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, MeReinitTrigger, format_sample_line, - format_me_route, + MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line, + load_proxy_config_cache, run_me_ping, save_proxy_config_cache, MePingFamily, MePingSample, + MeReinitTrigger, }; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::tls_front::TlsFrontCache; @@ -172,6 +173,120 @@ async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result tokio::fs::write(path, payload).await } +async fn load_startup_proxy_config_snapshot( + url: &str, + cache_path: Option<&str>, + me2dc_fallback: bool, + label: &'static str, +) -> Option { + loop { + match fetch_proxy_config_with_raw(url).await { + Ok((cfg, raw)) => { + if !cfg.map.is_empty() { + if let Some(path) = cache_path + && let Err(e) = save_proxy_config_cache(path, &raw).await + { + warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache"); + } + return Some(cfg); + } + + warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache"); + if let Some(path) = cache_path { + match load_proxy_config_cache(path).await { + Ok(cached) if !cached.map.is_empty() => { + info!( + snapshot = label, + path, + proxy_for_lines = cached.proxy_for_lines, + "Loaded startup proxy-config from disk cache" + ); + return Some(cached); + } + Ok(_) => { + warn!( + snapshot = label, + path, + "Startup proxy-config cache is empty; ignoring cache file" + ); + } + Err(cache_err) => { + debug!( + snapshot = label, + path, + error = %cache_err, + "Startup proxy-config cache unavailable" + ); + } + } + } + + if me2dc_fallback { + error!( + snapshot = label, + "Startup proxy-config unavailable and no saved config found; falling back to direct mode" + ); + return None; + } + + warn!( + snapshot = label, + retry_in_secs = 2, + "Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + } + Err(fetch_err) => { + if let Some(path) = cache_path { + match load_proxy_config_cache(path).await { + Ok(cached) if !cached.map.is_empty() => { + info!( + snapshot = label, + path, + proxy_for_lines = cached.proxy_for_lines, + "Loaded startup proxy-config from disk cache" + ); + return Some(cached); + } + Ok(_) => { + warn!( + snapshot = label, + path, + "Startup proxy-config cache is empty; ignoring cache file" + ); + } + Err(cache_err) => { + debug!( + snapshot = label, + path, + error = %cache_err, + "Startup proxy-config cache unavailable" + ); + } + } + } + + if me2dc_fallback { + error!( + snapshot = label, + error = %fetch_err, + "Startup proxy-config unavailable and no cached data; falling back to direct mode" + ); + return None; + } + + warn!( + snapshot = label, + error = %fetch_err, + retry_in_secs = 2, + "Startup proxy-config unavailable; retrying because me2dc_fallback=false" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } +} + #[tokio::main] async fn main() -> std::result::Result<(), Box> { let (config_path, cli_silent, cli_log_level) = parse_cli(); @@ -484,193 +599,188 @@ async fn main() -> std::result::Result<(), Box> { // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); let pool_size = config.general.middle_proxy_pool_size.max(1); - let mut init_attempt: u32 = 0; - loop { - init_attempt = init_attempt.saturating_add(1); - - let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret( + let proxy_secret = loop { + match crate::transport::middle_proxy::fetch_proxy_secret( proxy_secret_path, config.general.proxy_secret_len_max, ) .await { - Ok(proxy_secret) => proxy_secret, + Ok(proxy_secret) => break Some(proxy_secret), Err(e) => { - let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; - if retries_limited && init_attempt >= me_init_retry_attempts { + if me2dc_fallback { error!( error = %e, - attempt = init_attempt, - retry_limit = me_init_retry_attempts, - "ME startup retries exhausted while loading proxy-secret; falling back to direct mode" + "ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode" ); break None; } warn!( error = %e, - attempt = init_attempt, - retry_limit = if me_init_retry_attempts == 0 { - String::from("unlimited") - } else { - me_init_retry_attempts.to_string() - }, - me2dc_fallback = me2dc_fallback, retry_in_secs = 2, - "Failed to fetch proxy-secret; retrying ME startup" + "ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false" ); tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - }; - - info!( - secret_len = proxy_secret.len(), - key_sig = format_args!( - "0x{:08x}", - if proxy_secret.len() >= 4 { - u32::from_le_bytes([ - proxy_secret[0], - proxy_secret[1], - proxy_secret[2], - proxy_secret[3], - ]) - } else { - 0 - } - ), - "Proxy-secret loaded" - ); - - // Load ME config (v4/v6) + default DC - let mut cfg_v4 = fetch_proxy_config( - "https://core.telegram.org/getProxyConfig", - ) - .await - .unwrap_or_default(); - let mut cfg_v6 = fetch_proxy_config( - "https://core.telegram.org/getProxyConfigV6", - ) - .await - .unwrap_or_default(); - - if cfg_v4.map.is_empty() { - cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); - } - if cfg_v6.map.is_empty() { - cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); - } - - let pool = MePool::new( - proxy_tag.clone(), - proxy_secret, - config.general.middle_proxy_nat_ip, - me_nat_probe, - None, - config.network.stun_servers.clone(), - config.general.stun_nat_probe_concurrency, - probe.detected_ipv6, - config.timeouts.me_one_retry, - config.timeouts.me_one_timeout_ms, - cfg_v4.map.clone(), - cfg_v6.map.clone(), - cfg_v4.default_dc.or(cfg_v6.default_dc), - decision.clone(), - Some(upstream_manager.clone()), - rng.clone(), - stats.clone(), - config.general.me_keepalive_enabled, - config.general.me_keepalive_interval_secs, - config.general.me_keepalive_jitter_secs, - config.general.me_keepalive_payload_random, - config.general.rpc_proxy_req_every, - config.general.me_warmup_stagger_enabled, - config.general.me_warmup_step_delay_ms, - config.general.me_warmup_step_jitter_ms, - config.general.me_reconnect_max_concurrent_per_dc, - config.general.me_reconnect_backoff_base_ms, - config.general.me_reconnect_backoff_cap_ms, - config.general.me_reconnect_fast_retry_count, - config.general.me_single_endpoint_shadow_writers, - config.general.me_single_endpoint_outage_mode_enabled, - config.general.me_single_endpoint_outage_disable_quarantine, - config.general.me_single_endpoint_outage_backoff_min_ms, - config.general.me_single_endpoint_outage_backoff_max_ms, - config.general.me_single_endpoint_shadow_rotate_every_secs, - config.general.me_floor_mode, - config.general.me_adaptive_floor_idle_secs, - config.general.me_adaptive_floor_min_writers_single_endpoint, - config.general.me_adaptive_floor_recover_grace_secs, - config.general.hardswap, - config.general.me_pool_drain_ttl_secs, - config.general.effective_me_pool_force_close_secs(), - config.general.me_pool_min_fresh_ratio, - config.general.me_hardswap_warmup_delay_min_ms, - config.general.me_hardswap_warmup_delay_max_ms, - config.general.me_hardswap_warmup_extra_passes, - config.general.me_hardswap_warmup_pass_backoff_base_ms, - config.general.me_bind_stale_mode, - config.general.me_bind_stale_ttl_secs, - config.general.me_secret_atomic_snapshot, - config.general.me_deterministic_writer_sort, - config.general.me_socks_kdf_policy, - config.general.me_route_backpressure_base_timeout_ms, - config.general.me_route_backpressure_high_timeout_ms, - config.general.me_route_backpressure_high_watermark_pct, - config.general.me_route_no_writer_mode, - config.general.me_route_no_writer_wait_ms, - config.general.me_route_inline_recovery_attempts, - config.general.me_route_inline_recovery_wait_ms, - ); - - match pool.init(pool_size, &rng).await { - Ok(()) => { - info!( - attempt = init_attempt, - "Middle-End pool initialized successfully" - ); - - // Phase 4: Start health monitor - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - - break Some(pool); - } - Err(e) => { - let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; - if retries_limited && init_attempt >= me_init_retry_attempts { - error!( - error = %e, - attempt = init_attempt, - retry_limit = me_init_retry_attempts, - "ME pool init retries exhausted; falling back to direct mode" - ); - break None; - } - - warn!( - error = %e, - attempt = init_attempt, - retry_limit = if me_init_retry_attempts == 0 { - String::from("unlimited") - } else { - me_init_retry_attempts.to_string() - }, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); - pool.reset_stun_state(); - tokio::time::sleep(Duration::from_secs(2)).await; } } + }; + match proxy_secret { + Some(proxy_secret) => { + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); + + let cfg_v4 = load_startup_proxy_config_snapshot( + "https://core.telegram.org/getProxyConfig", + config.general.proxy_config_v4_cache_path.as_deref(), + me2dc_fallback, + "getProxyConfig", + ) + .await; + let cfg_v6 = load_startup_proxy_config_snapshot( + "https://core.telegram.org/getProxyConfigV6", + config.general.proxy_config_v6_cache_path.as_deref(), + me2dc_fallback, + "getProxyConfigV6", + ) + .await; + + if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) { + let pool = MePool::new( + proxy_tag.clone(), + proxy_secret, + config.general.middle_proxy_nat_ip, + me_nat_probe, + None, + config.network.stun_servers.clone(), + config.general.stun_nat_probe_concurrency, + probe.detected_ipv6, + config.timeouts.me_one_retry, + config.timeouts.me_one_timeout_ms, + cfg_v4.map.clone(), + cfg_v6.map.clone(), + cfg_v4.default_dc.or(cfg_v6.default_dc), + decision.clone(), + Some(upstream_manager.clone()), + rng.clone(), + stats.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.rpc_proxy_req_every, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, + config.general.me_single_endpoint_shadow_writers, + config.general.me_single_endpoint_outage_mode_enabled, + config.general.me_single_endpoint_outage_disable_quarantine, + config.general.me_single_endpoint_outage_backoff_min_ms, + config.general.me_single_endpoint_outage_backoff_max_ms, + config.general.me_single_endpoint_shadow_rotate_every_secs, + config.general.me_floor_mode, + config.general.me_adaptive_floor_idle_secs, + config.general.me_adaptive_floor_min_writers_single_endpoint, + config.general.me_adaptive_floor_recover_grace_secs, + config.general.hardswap, + config.general.me_pool_drain_ttl_secs, + config.general.effective_me_pool_force_close_secs(), + config.general.me_pool_min_fresh_ratio, + config.general.me_hardswap_warmup_delay_min_ms, + config.general.me_hardswap_warmup_delay_max_ms, + config.general.me_hardswap_warmup_extra_passes, + config.general.me_hardswap_warmup_pass_backoff_base_ms, + config.general.me_bind_stale_mode, + config.general.me_bind_stale_ttl_secs, + config.general.me_secret_atomic_snapshot, + config.general.me_deterministic_writer_sort, + config.general.me_socks_kdf_policy, + config.general.me_route_backpressure_base_timeout_ms, + config.general.me_route_backpressure_high_timeout_ms, + config.general.me_route_backpressure_high_watermark_pct, + config.general.me_route_no_writer_mode, + config.general.me_route_no_writer_wait_ms, + config.general.me_route_inline_recovery_attempts, + config.general.me_route_inline_recovery_wait_ms, + ); + + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); + match pool.init(pool_size, &rng).await { + Ok(()) => { + info!( + attempt = init_attempt, + "Middle-End pool initialized successfully" + ); + + // Phase 4: Start health monitor + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, min_conns, + ) + .await; + }); + + break Some(pool); + } + Err(e) => { + let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME pool init retries exhausted; falling back to direct mode" + ); + break None; + } + + let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }; + warn!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + pool.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + } else { + None + } + } + None => None, } } else { None diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index a9c50ab..072c1f6 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::hash::{DefaultHasher, Hash, Hasher}; use std::net::IpAddr; +use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -42,6 +43,87 @@ pub struct ProxyConfigData { pub proxy_for_lines: u32, } +pub fn parse_proxy_config_text(text: &str, http_status: u16) -> ProxyConfigData { + let mut map: HashMap> = HashMap::new(); + let mut proxy_for_lines: u32 = 0; + for line in text.lines() { + if let Some((dc, ip, port)) = parse_proxy_line(line) { + map.entry(dc).or_default().push((ip, port)); + proxy_for_lines = proxy_for_lines.saturating_add(1); + } + } + + let default_dc = text.lines().find_map(|l| { + let t = l.trim(); + if let Some(rest) = t.strip_prefix("default") { + return rest.trim().trim_end_matches(';').parse::().ok(); + } + None + }); + + ProxyConfigData { + map, + default_dc, + http_status, + proxy_for_lines, + } +} + +pub async fn load_proxy_config_cache(path: &str) -> Result { + let text = tokio::fs::read_to_string(path).await.map_err(|e| { + crate::error::ProxyError::Proxy(format!("read proxy-config cache '{path}' failed: {e}")) + })?; + Ok(parse_proxy_config_text(&text, 200)) +} + +pub async fn save_proxy_config_cache(path: &str, raw_text: &str) -> Result<()> { + if let Some(parent) = Path::new(path).parent() + && !parent.as_os_str().is_empty() + { + tokio::fs::create_dir_all(parent).await.map_err(|e| { + crate::error::ProxyError::Proxy(format!( + "create proxy-config cache dir '{}' failed: {e}", + parent.display() + )) + })?; + } + + tokio::fs::write(path, raw_text).await.map_err(|e| { + crate::error::ProxyError::Proxy(format!("write proxy-config cache '{path}' failed: {e}")) + })?; + Ok(()) +} + +pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, String)> { + let resp = reqwest::get(url) + .await + .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? + ; + let http_status = resp.status().as_u16(); + + if let Some(date) = resp.headers().get(reqwest::header::DATE) + && let Ok(date_str) = date.to_str() + && let Ok(server_time) = httpdate::parse_http_date(date_str) + && let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| { + server_time.duration_since(SystemTime::now()).map_err(|_| e) + }) + { + let skew_secs = skew.as_secs(); + if skew_secs > 60 { + warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header"); + } else if skew_secs > 30 { + warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header"); + } + } + + let text = resp + .text() + .await + .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?; + let parsed = parse_proxy_config_text(&text, http_status); + Ok((parsed, text)) +} + #[derive(Debug, Default)] struct StableSnapshot { candidate_hash: Option, @@ -170,61 +252,9 @@ fn parse_proxy_line(line: &str) -> Option<(i32, IpAddr, u16)> { } pub async fn fetch_proxy_config(url: &str) -> Result { - let resp = reqwest::get(url) + fetch_proxy_config_with_raw(url) .await - .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? - ; - let http_status = resp.status().as_u16(); - - if let Some(date) = resp.headers().get(reqwest::header::DATE) - && let Ok(date_str) = date.to_str() - && let Ok(server_time) = httpdate::parse_http_date(date_str) - && let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| { - server_time.duration_since(SystemTime::now()).map_err(|_| e) - }) - { - let skew_secs = skew.as_secs(); - if skew_secs > 60 { - warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header"); - } else if skew_secs > 30 { - warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header"); - } - } - - let text = resp - .text() - .await - .map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?; - - let mut map: HashMap> = HashMap::new(); - let mut proxy_for_lines: u32 = 0; - for line in text.lines() { - if let Some((dc, ip, port)) = parse_proxy_line(line) { - map.entry(dc).or_default().push((ip, port)); - proxy_for_lines = proxy_for_lines.saturating_add(1); - } - } - - let default_dc = text - .lines() - .find_map(|l| { - let t = l.trim(); - if let Some(rest) = t.strip_prefix("default") { - return rest - .trim() - .trim_end_matches(';') - .parse::() - .ok(); - } - None - }); - - Ok(ProxyConfigData { - map, - default_dc, - http_status, - proxy_for_lines, - }) + .map(|(parsed, _raw)| parsed) } fn snapshot_passes_guards( diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index e7c7957..131e215 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -30,7 +30,11 @@ pub use pool::MePool; pub use pool_nat::{stun_probe, detect_public_ip}; pub use registry::ConnRegistry; pub use secret::fetch_proxy_secret; -pub use config_updater::{fetch_proxy_config, me_config_updater}; +#[allow(unused_imports)] +pub use config_updater::{ + ProxyConfigData, fetch_proxy_config, fetch_proxy_config_with_raw, load_proxy_config_cache, + me_config_updater, save_proxy_config_cache, +}; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task}; pub use wire::proto_flags_for_tag; From 3260746785d57284f0a0c24c6ed8b16f5879c706 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:48:09 +0300 Subject: [PATCH 2/5] Init + Uptime timers --- src/main.rs | 104 ++++++++++++++++++++++++- src/transport/middle_proxy/registry.rs | 5 ++ src/transport/middle_proxy/send.rs | 33 +++++++- 3 files changed, 139 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1da8123..fe001c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use rand::Rng; use tokio::net::TcpListener; use tokio::signal; @@ -173,6 +173,74 @@ async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result tokio::fs::write(path, payload).await } +fn unit_label(value: u64, singular: &'static str, plural: &'static str) -> &'static str { + if value == 1 { singular } else { plural } +} + +fn format_uptime(total_secs: u64) -> String { + const SECS_PER_MINUTE: u64 = 60; + const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE; + const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR; + const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY; + const SECS_PER_YEAR: u64 = 365 * SECS_PER_DAY; + + let mut remaining = total_secs; + let years = remaining / SECS_PER_YEAR; + remaining %= SECS_PER_YEAR; + let months = remaining / SECS_PER_MONTH; + remaining %= SECS_PER_MONTH; + let days = remaining / SECS_PER_DAY; + remaining %= SECS_PER_DAY; + let hours = remaining / SECS_PER_HOUR; + remaining %= SECS_PER_HOUR; + let minutes = remaining / SECS_PER_MINUTE; + let seconds = remaining % SECS_PER_MINUTE; + + let mut parts = Vec::new(); + if years > 0 { + parts.push(format!( + "{} {}", + years, + unit_label(years, "year", "years") + )); + } + if total_secs >= SECS_PER_YEAR { + parts.push(format!( + "{} {}", + months, + unit_label(months, "month", "months") + )); + } + if total_secs >= SECS_PER_MONTH { + parts.push(format!( + "{} {}", + days, + unit_label(days, "day", "days") + )); + } + if total_secs >= SECS_PER_DAY { + parts.push(format!( + "{} {}", + hours, + unit_label(hours, "hour", "hours") + )); + } + if total_secs >= SECS_PER_HOUR { + parts.push(format!( + "{} {}", + minutes, + unit_label(minutes, "minute", "minutes") + )); + } + parts.push(format!( + "{} {}", + seconds, + unit_label(seconds, "second", "seconds") + )); + + format!("{} / {} seconds", parts.join(", "), total_secs) +} + async fn load_startup_proxy_config_snapshot( url: &str, cache_path: Option<&str>, @@ -289,6 +357,7 @@ async fn load_startup_proxy_config_snapshot( #[tokio::main] async fn main() -> std::result::Result<(), Box> { + let process_started_at = Instant::now(); let (config_path, cli_silent, cli_log_level) = parse_cli(); let mut config = match ProxyConfig::load(&config_path) { @@ -961,6 +1030,15 @@ async fn main() -> std::result::Result<(), Box> { } } + let initialized_secs = process_started_at.elapsed().as_secs(); + let second_suffix = if initialized_secs == 1 { "" } else { "s" }; + info!("================= Telegram Startup ================="); + info!( + " DC/ME Initialized in {} second{}", + initialized_secs, second_suffix + ); + info!("============================================================"); + // Background tasks let um_clone = upstream_manager.clone(); let decision_clone = decision.clone(); @@ -1514,7 +1592,29 @@ async fn main() -> std::result::Result<(), Box> { } match signal::ctrl_c().await { - Ok(()) => info!("Shutting down..."), + Ok(()) => { + let uptime_secs = process_started_at.elapsed().as_secs(); + info!("Uptime: {}", format_uptime(uptime_secs)); + info!("Shutting down..."); + if let Some(pool) = &me_pool { + match tokio::time::timeout( + Duration::from_secs(2), + pool.shutdown_send_close_conn_all(), + ) + .await + { + Ok(total) => { + info!( + close_conn_sent = total, + "ME shutdown: RPC_CLOSE_CONN broadcast completed" + ); + } + Err(_) => { + warn!("ME shutdown: RPC_CLOSE_CONN broadcast timed out"); + } + } + } + } Err(e) => error!("Signal error: {}", e), } diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 869030a..e4d0031 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -278,6 +278,11 @@ impl ConnRegistry { Some(ConnWriter { writer_id, tx: writer }) } + pub async fn active_conn_ids(&self) -> Vec { + let inner = self.inner.read().await; + inner.writer_for_conn.keys().copied().collect() + } + pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut inner = self.inner.write().await; inner.writers.remove(&writer_id); diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 8bd21ee..c6db028 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -11,7 +11,7 @@ use tracing::{debug, warn}; use crate::config::MeRouteNoWriterMode; use crate::error::{ProxyError, Result}; use crate::network::IpFamily; -use crate::protocol::constants::RPC_CLOSE_EXT_U32; +use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32}; use super::MePool; use super::codec::WriterCommand; @@ -476,6 +476,37 @@ impl MePool { Ok(()) } + pub async fn send_close_conn(self: &Arc, conn_id: u64) -> Result<()> { + if let Some(w) = self.registry.get_writer(conn_id).await { + let mut p = Vec::with_capacity(12); + p.extend_from_slice(&RPC_CLOSE_CONN_U32.to_le_bytes()); + p.extend_from_slice(&conn_id.to_le_bytes()); + match w.tx.try_send(WriterCommand::DataAndFlush(p)) { + Ok(()) => {} + Err(TrySendError::Full(cmd)) => { + let _ = tokio::time::timeout(Duration::from_millis(50), w.tx.send(cmd)).await; + } + Err(TrySendError::Closed(_)) => { + debug!(conn_id, "ME close_conn skipped: writer channel closed"); + } + } + } else { + debug!(conn_id, "ME close_conn skipped (writer missing)"); + } + + self.registry.unregister(conn_id).await; + Ok(()) + } + + pub async fn shutdown_send_close_conn_all(self: &Arc) -> usize { + let conn_ids = self.registry.active_conn_ids().await; + let total = conn_ids.len(); + for conn_id in conn_ids { + let _ = self.send_close_conn(conn_id).await; + } + total + } + pub fn connection_count(&self) -> usize { self.conn_count.load(Ordering::Relaxed) } From d62a6e041738d05073a1a0f874caeced4d568893 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 16:04:32 +0300 Subject: [PATCH 3/5] Shutdown Timer fixes --- src/main.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index fe001c3..84e61a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -182,7 +182,7 @@ fn format_uptime(total_secs: u64) -> String { const SECS_PER_HOUR: u64 = 60 * SECS_PER_MINUTE; const SECS_PER_DAY: u64 = 24 * SECS_PER_HOUR; const SECS_PER_MONTH: u64 = 30 * SECS_PER_DAY; - const SECS_PER_YEAR: u64 = 365 * SECS_PER_DAY; + const SECS_PER_YEAR: u64 = 12 * SECS_PER_MONTH; let mut remaining = total_secs; let years = remaining / SECS_PER_YEAR; @@ -197,35 +197,35 @@ fn format_uptime(total_secs: u64) -> String { let seconds = remaining % SECS_PER_MINUTE; let mut parts = Vec::new(); - if years > 0 { + if total_secs > SECS_PER_YEAR { parts.push(format!( "{} {}", years, unit_label(years, "year", "years") )); } - if total_secs >= SECS_PER_YEAR { + if total_secs > SECS_PER_MONTH { parts.push(format!( "{} {}", months, unit_label(months, "month", "months") )); } - if total_secs >= SECS_PER_MONTH { + if total_secs > SECS_PER_DAY { parts.push(format!( "{} {}", days, unit_label(days, "day", "days") )); } - if total_secs >= SECS_PER_DAY { + if total_secs > SECS_PER_HOUR { parts.push(format!( "{} {}", hours, unit_label(hours, "hour", "hours") )); } - if total_secs >= SECS_PER_HOUR { + if total_secs > SECS_PER_MINUTE { parts.push(format!( "{} {}", minutes, @@ -1032,7 +1032,7 @@ async fn main() -> std::result::Result<(), Box> { let initialized_secs = process_started_at.elapsed().as_secs(); let second_suffix = if initialized_secs == 1 { "" } else { "s" }; - info!("================= Telegram Startup ================="); + info!("===================== Telegram Startup ====================="); info!( " DC/ME Initialized in {} second{}", initialized_secs, second_suffix @@ -1593,9 +1593,10 @@ async fn main() -> std::result::Result<(), Box> { match signal::ctrl_c().await { Ok(()) => { + let shutdown_started_at = Instant::now(); + info!("Shutting down..."); let uptime_secs = process_started_at.elapsed().as_secs(); info!("Uptime: {}", format_uptime(uptime_secs)); - info!("Shutting down..."); if let Some(pool) = &me_pool { match tokio::time::timeout( Duration::from_secs(2), @@ -1614,6 +1615,12 @@ async fn main() -> std::result::Result<(), Box> { } } } + let shutdown_secs = shutdown_started_at.elapsed().as_secs(); + info!( + "Shutdown completed successfully in {} {}.", + shutdown_secs, + unit_label(shutdown_secs, "second", "seconds") + ); } Err(e) => error!("Signal error: {}", e), } From 64130dd02e2c843f76b01707cfa26408bec96a02 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 16:13:40 +0300 Subject: [PATCH 4/5] MEP not ready only after 3 attempts --- src/main.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 84e61a4..e10c18b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -629,6 +629,7 @@ async fn main() -> std::result::Result<(), Box> { let me2dc_fallback = config.general.me2dc_fallback; let me_init_retry_attempts = config.general.me_init_retry_attempts; + let me_init_warn_after_attempts: u32 = 3; if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { if me2dc_fallback { warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); @@ -832,14 +833,25 @@ async fn main() -> std::result::Result<(), Box> { } else { me_init_retry_attempts.to_string() }; - warn!( - error = %e, - attempt = init_attempt, - retry_limit = retry_limit, - me2dc_fallback = me2dc_fallback, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); + if init_attempt >= me_init_warn_after_attempts { + warn!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + } else { + info!( + error = %e, + attempt = init_attempt, + retry_limit = retry_limit, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool startup warmup: retrying initialization" + ); + } pool.reset_stun_state(); tokio::time::sleep(Duration::from_secs(2)).await; } From a80be783450f68f2f56d111e5534ce3aa550fea4 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 5 Mar 2026 16:32:31 +0300 Subject: [PATCH 5/5] DC writer floor is below required only in runtime --- src/main.rs | 4 ++++ src/transport/middle_proxy/health.rs | 30 +++++++++++++++++++--------- src/transport/middle_proxy/pool.rs | 10 ++++++++++ 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index e10c18b..798790a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1051,6 +1051,10 @@ async fn main() -> std::result::Result<(), Box> { ); info!("============================================================"); + if let Some(ref pool) = me_pool { + pool.set_runtime_ready(true); + } + // Background tasks let um_clone = upstream_manager.clone(); let decision_clone = decision.clone(); diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index c9ad34c..1cc8d8a 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -295,15 +295,27 @@ async fn check_family( let wait = Duration::from_millis(next_ms) + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); next_attempt.insert(key, now + wait); - warn!( - dc = %dc, - ?family, - alive = now_alive, - required, - endpoint_count = endpoints.len(), - backoff_ms = next_ms, - "DC writer floor is below required level, scheduled reconnect" - ); + if pool.is_runtime_ready() { + warn!( + dc = %dc, + ?family, + alive = now_alive, + required, + endpoint_count = endpoints.len(), + backoff_ms = next_ms, + "DC writer floor is below required level, scheduled reconnect" + ); + } else { + info!( + dc = %dc, + ?family, + alive = now_alive, + required, + endpoint_count = endpoints.len(), + backoff_ms = next_ms, + "DC writer floor is below required level during startup, scheduled reconnect" + ); + } } if let Some(v) = inflight.get_mut(&key) { *v = v.saturating_sub(1); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index d553944..8cc078e 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -149,6 +149,7 @@ pub struct MePool { pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_wait: Duration, + pub(super) runtime_ready: AtomicBool, pool_size: usize, } @@ -355,6 +356,7 @@ impl MePool { me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), + runtime_ready: AtomicBool::new(false), }) } @@ -362,6 +364,14 @@ impl MePool { self.active_generation.load(Ordering::Relaxed) } + pub fn set_runtime_ready(&self, ready: bool) { + self.runtime_ready.store(ready, Ordering::Relaxed); + } + + pub fn is_runtime_ready(&self) -> bool { + self.runtime_ready.load(Ordering::Relaxed) + } + pub fn update_runtime_reinit_policy( &self, hardswap: bool,