diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 9ac6c53..b73013a 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -153,6 +153,14 @@ pub(crate) fn default_middle_proxy_warm_standby() -> usize { DEFAULT_MIDDLE_PROXY_WARM_STANDBY } +pub(crate) fn default_me_init_retry_attempts() -> u32 { + 0 +} + +pub(crate) fn default_me2dc_fallback() -> bool { + true +} + pub(crate) fn default_keepalive_interval() -> u64 { 8 } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 6d88800..b03f83e 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -381,6 +381,14 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b warned = true; warn!("config reload: general.middle_proxy_pool_size changed; restart required"); } + if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts { + warned = true; + warn!("config reload: general.me_init_retry_attempts changed; restart required"); + } + if old.general.me2dc_fallback != new.general.me2dc_fallback { + warned = true; + warn!("config reload: general.me2dc_fallback changed; restart required"); + } if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled || old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs || old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs diff --git a/src/config/load.rs b/src/config/load.rs index 666c938..a2ee5f0 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -237,6 +237,12 @@ impl ProxyConfig { )); } + if config.general.me_init_retry_attempts > 1_000_000 { + return Err(ProxyError::Config( + "general.me_init_retry_attempts must be within [0, 1000000]".to_string(), + )); + } + if config.general.upstream_connect_retry_attempts == 0 { return Err(ProxyError::Config( "general.upstream_connect_retry_attempts must be > 0".to_string(), @@ -659,6 +665,14 @@ mod tests { cfg.general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + cfg.general.me_init_retry_attempts, + default_me_init_retry_attempts() + ); + assert_eq!( + cfg.general.me2dc_fallback, + default_me2dc_fallback() + ); assert_eq!( cfg.general.me_single_endpoint_shadow_writers, default_me_single_endpoint_shadow_writers() @@ -764,6 +778,11 @@ mod tests { general.me_reconnect_fast_retry_count, default_me_reconnect_fast_retry_count() ); + assert_eq!( + general.me_init_retry_attempts, + default_me_init_retry_attempts() + ); + assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); assert_eq!( general.me_single_endpoint_shadow_writers, default_me_single_endpoint_shadow_writers() diff --git a/src/config/types.rs b/src/config/types.rs index 00260a8..5dc1c87 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -353,6 +353,15 @@ pub struct GeneralConfig { #[serde(default = "default_middle_proxy_warm_standby")] pub middle_proxy_warm_standby: usize, + /// Startup retries for Middle-End pool initialization before ME→Direct fallback. + /// 0 means unlimited retries. + #[serde(default = "default_me_init_retry_attempts")] + pub me_init_retry_attempts: u32, + + /// Allow fallback from Middle-End mode to direct DC when ME startup cannot be initialized. + #[serde(default = "default_me2dc_fallback")] + pub me2dc_fallback: bool, + /// Enable ME keepalive padding frames. #[serde(default = "default_true")] pub me_keepalive_enabled: bool, @@ -680,6 +689,8 @@ impl Default for GeneralConfig { stun_nat_probe_concurrency: default_stun_nat_probe_concurrency(), middle_proxy_pool_size: default_pool_size(), middle_proxy_warm_standby: default_middle_proxy_warm_standby(), + me_init_retry_attempts: default_me_init_retry_attempts(), + me2dc_fallback: default_me2dc_fallback(), me_keepalive_enabled: default_true(), me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_jitter_secs: default_keepalive_jitter(), diff --git a/src/main.rs b/src/main.rs index 6bff525..c4a9c37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -416,7 +416,7 @@ async fn main() -> std::result::Result<(), Box> { log_probe_result(&probe, &decision); let prefer_ipv6 = decision.prefer_ipv6(); - let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); + let mut use_middle_proxy = config.general.use_middle_proxy; let beobachten = Arc::new(BeobachtenStore::new()); let rng = Arc::new(SecureRandom::new()); @@ -443,9 +443,17 @@ async fn main() -> std::result::Result<(), Box> { // Connection concurrency limit let max_connections = Arc::new(Semaphore::new(10_000)); + let me2dc_fallback = config.general.me2dc_fallback; + let me_init_retry_attempts = config.general.me_init_retry_attempts; if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { - warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); - use_middle_proxy = false; + if me2dc_fallback { + warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); + use_middle_proxy = false; + } else { + warn!( + "No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active" + ); + } } // ===================================================================== @@ -475,142 +483,189 @@ async fn main() -> std::result::Result<(), Box> { // proxy-secret is from: https://core.telegram.org/getProxySecret // ============================================================= let proxy_secret_path = config.general.proxy_secret_path.as_deref(); - match crate::transport::middle_proxy::fetch_proxy_secret( - proxy_secret_path, - config.general.proxy_secret_len_max, - ) - .await - { - Ok(proxy_secret) => { - info!( - secret_len = proxy_secret.len(), - key_sig = format_args!( - "0x{:08x}", - if proxy_secret.len() >= 4 { - u32::from_le_bytes([ - proxy_secret[0], - proxy_secret[1], - proxy_secret[2], - proxy_secret[3], - ]) - } else { - 0 - } - ), - "Proxy-secret loaded" - ); + let pool_size = config.general.middle_proxy_pool_size.max(1); + let mut init_attempt: u32 = 0; + loop { + init_attempt = init_attempt.saturating_add(1); - // Load ME config (v4/v6) + default DC - let mut cfg_v4 = fetch_proxy_config( - "https://core.telegram.org/getProxyConfig", - ) - .await - .unwrap_or_default(); - let mut cfg_v6 = fetch_proxy_config( - "https://core.telegram.org/getProxyConfigV6", - ) - .await - .unwrap_or_default(); - - if cfg_v4.map.is_empty() { - cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); - } - if cfg_v6.map.is_empty() { - cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); - } - - let pool = MePool::new( - proxy_tag, - proxy_secret, - config.general.middle_proxy_nat_ip, - me_nat_probe, - None, - config.network.stun_servers.clone(), - config.general.stun_nat_probe_concurrency, - probe.detected_ipv6, - config.timeouts.me_one_retry, - config.timeouts.me_one_timeout_ms, - cfg_v4.map.clone(), - cfg_v6.map.clone(), - cfg_v4.default_dc.or(cfg_v6.default_dc), - decision.clone(), - Some(upstream_manager.clone()), - rng.clone(), - stats.clone(), - config.general.me_keepalive_enabled, - config.general.me_keepalive_interval_secs, - config.general.me_keepalive_jitter_secs, - config.general.me_keepalive_payload_random, - config.general.rpc_proxy_req_every, - config.general.me_warmup_stagger_enabled, - config.general.me_warmup_step_delay_ms, - config.general.me_warmup_step_jitter_ms, - config.general.me_reconnect_max_concurrent_per_dc, - config.general.me_reconnect_backoff_base_ms, - config.general.me_reconnect_backoff_cap_ms, - config.general.me_reconnect_fast_retry_count, - config.general.me_single_endpoint_shadow_writers, - config.general.me_single_endpoint_outage_mode_enabled, - config.general.me_single_endpoint_outage_disable_quarantine, - config.general.me_single_endpoint_outage_backoff_min_ms, - config.general.me_single_endpoint_outage_backoff_max_ms, - config.general.me_single_endpoint_shadow_rotate_every_secs, - config.general.me_floor_mode, - config.general.me_adaptive_floor_idle_secs, - config.general.me_adaptive_floor_min_writers_single_endpoint, - config.general.me_adaptive_floor_recover_grace_secs, - config.general.hardswap, - config.general.me_pool_drain_ttl_secs, - config.general.effective_me_pool_force_close_secs(), - config.general.me_pool_min_fresh_ratio, - config.general.me_hardswap_warmup_delay_min_ms, - config.general.me_hardswap_warmup_delay_max_ms, - config.general.me_hardswap_warmup_extra_passes, - config.general.me_hardswap_warmup_pass_backoff_base_ms, - config.general.me_bind_stale_mode, - config.general.me_bind_stale_ttl_secs, - config.general.me_secret_atomic_snapshot, - config.general.me_deterministic_writer_sort, - config.general.me_socks_kdf_policy, - config.general.me_route_backpressure_base_timeout_ms, - config.general.me_route_backpressure_high_timeout_ms, - config.general.me_route_backpressure_high_watermark_pct, - ); - - let pool_size = config.general.middle_proxy_pool_size.max(1); - loop { - match pool.init(pool_size, &rng).await { - Ok(()) => { - info!("Middle-End pool initialized successfully"); - - // Phase 4: Start health monitor - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - - break Some(pool); - } - Err(e) => { - warn!( - error = %e, - retry_in_secs = 2, - "ME pool is not ready yet; retrying startup initialization" - ); - pool.reset_stun_state(); - tokio::time::sleep(Duration::from_secs(2)).await; - } + let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret( + proxy_secret_path, + config.general.proxy_secret_len_max, + ) + .await + { + Ok(proxy_secret) => proxy_secret, + Err(e) => { + let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME startup retries exhausted while loading proxy-secret; falling back to direct mode" + ); + break None; } + + warn!( + error = %e, + attempt = init_attempt, + retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "Failed to fetch proxy-secret; retrying ME startup" + ); + tokio::time::sleep(Duration::from_secs(2)).await; + continue; } + }; + + info!( + secret_len = proxy_secret.len(), + key_sig = format_args!( + "0x{:08x}", + if proxy_secret.len() >= 4 { + u32::from_le_bytes([ + proxy_secret[0], + proxy_secret[1], + proxy_secret[2], + proxy_secret[3], + ]) + } else { + 0 + } + ), + "Proxy-secret loaded" + ); + + // Load ME config (v4/v6) + default DC + let mut cfg_v4 = fetch_proxy_config( + "https://core.telegram.org/getProxyConfig", + ) + .await + .unwrap_or_default(); + let mut cfg_v6 = fetch_proxy_config( + "https://core.telegram.org/getProxyConfigV6", + ) + .await + .unwrap_or_default(); + + if cfg_v4.map.is_empty() { + cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone(); } - Err(e) => { - error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode."); - None + if cfg_v6.map.is_empty() { + cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone(); + } + + let pool = MePool::new( + proxy_tag.clone(), + proxy_secret, + config.general.middle_proxy_nat_ip, + me_nat_probe, + None, + config.network.stun_servers.clone(), + config.general.stun_nat_probe_concurrency, + probe.detected_ipv6, + config.timeouts.me_one_retry, + config.timeouts.me_one_timeout_ms, + cfg_v4.map.clone(), + cfg_v6.map.clone(), + cfg_v4.default_dc.or(cfg_v6.default_dc), + decision.clone(), + Some(upstream_manager.clone()), + rng.clone(), + stats.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.rpc_proxy_req_every, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, + config.general.me_single_endpoint_shadow_writers, + config.general.me_single_endpoint_outage_mode_enabled, + config.general.me_single_endpoint_outage_disable_quarantine, + config.general.me_single_endpoint_outage_backoff_min_ms, + config.general.me_single_endpoint_outage_backoff_max_ms, + config.general.me_single_endpoint_shadow_rotate_every_secs, + config.general.me_floor_mode, + config.general.me_adaptive_floor_idle_secs, + config.general.me_adaptive_floor_min_writers_single_endpoint, + config.general.me_adaptive_floor_recover_grace_secs, + config.general.hardswap, + config.general.me_pool_drain_ttl_secs, + config.general.effective_me_pool_force_close_secs(), + config.general.me_pool_min_fresh_ratio, + config.general.me_hardswap_warmup_delay_min_ms, + config.general.me_hardswap_warmup_delay_max_ms, + config.general.me_hardswap_warmup_extra_passes, + config.general.me_hardswap_warmup_pass_backoff_base_ms, + config.general.me_bind_stale_mode, + config.general.me_bind_stale_ttl_secs, + config.general.me_secret_atomic_snapshot, + config.general.me_deterministic_writer_sort, + config.general.me_socks_kdf_policy, + config.general.me_route_backpressure_base_timeout_ms, + config.general.me_route_backpressure_high_timeout_ms, + config.general.me_route_backpressure_high_watermark_pct, + ); + + match pool.init(pool_size, &rng).await { + Ok(()) => { + info!( + attempt = init_attempt, + "Middle-End pool initialized successfully" + ); + + // Phase 4: Start health monitor + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + pool_clone, rng_clone, min_conns, + ) + .await; + }); + + break Some(pool); + } + Err(e) => { + let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; + if retries_limited && init_attempt >= me_init_retry_attempts { + error!( + error = %e, + attempt = init_attempt, + retry_limit = me_init_retry_attempts, + "ME pool init retries exhausted; falling back to direct mode" + ); + break None; + } + + warn!( + error = %e, + attempt = init_attempt, + retry_limit = if me_init_retry_attempts == 0 { + String::from("unlimited") + } else { + me_init_retry_attempts.to_string() + }, + me2dc_fallback = me2dc_fallback, + retry_in_secs = 2, + "ME pool is not ready yet; retrying startup initialization" + ); + pool.reset_stun_state(); + tokio::time::sleep(Duration::from_secs(2)).await; + } } } } else {