ME2DC Fallback + ME Init Retries

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-05 12:43:07 +03:00
parent 651f257a5d
commit ccfda10713
No known key found for this signature in database
5 changed files with 236 additions and 135 deletions

View File

@ -153,6 +153,14 @@ pub(crate) fn default_middle_proxy_warm_standby() -> usize {
DEFAULT_MIDDLE_PROXY_WARM_STANDBY DEFAULT_MIDDLE_PROXY_WARM_STANDBY
} }
pub(crate) fn default_me_init_retry_attempts() -> u32 {
0
}
pub(crate) fn default_me2dc_fallback() -> bool {
true
}
pub(crate) fn default_keepalive_interval() -> u64 { pub(crate) fn default_keepalive_interval() -> u64 {
8 8
} }

View File

@ -381,6 +381,14 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.middle_proxy_pool_size changed; restart required"); warn!("config reload: general.middle_proxy_pool_size changed; restart required");
} }
if old.general.me_init_retry_attempts != new.general.me_init_retry_attempts {
warned = true;
warn!("config reload: general.me_init_retry_attempts changed; restart required");
}
if old.general.me2dc_fallback != new.general.me2dc_fallback {
warned = true;
warn!("config reload: general.me2dc_fallback changed; restart required");
}
if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled
|| old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs || old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs
|| old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs || old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs

View File

@ -237,6 +237,12 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_init_retry_attempts > 1_000_000 {
return Err(ProxyError::Config(
"general.me_init_retry_attempts must be within [0, 1000000]".to_string(),
));
}
if config.general.upstream_connect_retry_attempts == 0 { if config.general.upstream_connect_retry_attempts == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.upstream_connect_retry_attempts must be > 0".to_string(), "general.upstream_connect_retry_attempts must be > 0".to_string(),
@ -659,6 +665,14 @@ mod tests {
cfg.general.me_reconnect_fast_retry_count, cfg.general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count() default_me_reconnect_fast_retry_count()
); );
assert_eq!(
cfg.general.me_init_retry_attempts,
default_me_init_retry_attempts()
);
assert_eq!(
cfg.general.me2dc_fallback,
default_me2dc_fallback()
);
assert_eq!( assert_eq!(
cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers() default_me_single_endpoint_shadow_writers()
@ -764,6 +778,11 @@ mod tests {
general.me_reconnect_fast_retry_count, general.me_reconnect_fast_retry_count,
default_me_reconnect_fast_retry_count() default_me_reconnect_fast_retry_count()
); );
assert_eq!(
general.me_init_retry_attempts,
default_me_init_retry_attempts()
);
assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
assert_eq!( assert_eq!(
general.me_single_endpoint_shadow_writers, general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers() default_me_single_endpoint_shadow_writers()

View File

@ -353,6 +353,15 @@ pub struct GeneralConfig {
#[serde(default = "default_middle_proxy_warm_standby")] #[serde(default = "default_middle_proxy_warm_standby")]
pub middle_proxy_warm_standby: usize, pub middle_proxy_warm_standby: usize,
/// Startup retries for Middle-End pool initialization before ME→Direct fallback.
/// 0 means unlimited retries.
#[serde(default = "default_me_init_retry_attempts")]
pub me_init_retry_attempts: u32,
/// Allow fallback from Middle-End mode to direct DC when ME startup cannot be initialized.
#[serde(default = "default_me2dc_fallback")]
pub me2dc_fallback: bool,
/// Enable ME keepalive padding frames. /// Enable ME keepalive padding frames.
#[serde(default = "default_true")] #[serde(default = "default_true")]
pub me_keepalive_enabled: bool, pub me_keepalive_enabled: bool,
@ -680,6 +689,8 @@ impl Default for GeneralConfig {
stun_nat_probe_concurrency: default_stun_nat_probe_concurrency(), stun_nat_probe_concurrency: default_stun_nat_probe_concurrency(),
middle_proxy_pool_size: default_pool_size(), middle_proxy_pool_size: default_pool_size(),
middle_proxy_warm_standby: default_middle_proxy_warm_standby(), middle_proxy_warm_standby: default_middle_proxy_warm_standby(),
me_init_retry_attempts: default_me_init_retry_attempts(),
me2dc_fallback: default_me2dc_fallback(),
me_keepalive_enabled: default_true(), me_keepalive_enabled: default_true(),
me_keepalive_interval_secs: default_keepalive_interval(), me_keepalive_interval_secs: default_keepalive_interval(),
me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_jitter_secs: default_keepalive_jitter(),

View File

@ -416,7 +416,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
log_probe_result(&probe, &decision); log_probe_result(&probe, &decision);
let prefer_ipv6 = decision.prefer_ipv6(); let prefer_ipv6 = decision.prefer_ipv6();
let mut use_middle_proxy = config.general.use_middle_proxy && (decision.ipv4_me || decision.ipv6_me); let mut use_middle_proxy = config.general.use_middle_proxy;
let beobachten = Arc::new(BeobachtenStore::new()); let beobachten = Arc::new(BeobachtenStore::new());
let rng = Arc::new(SecureRandom::new()); let rng = Arc::new(SecureRandom::new());
@ -443,9 +443,17 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// Connection concurrency limit // Connection concurrency limit
let max_connections = Arc::new(Semaphore::new(10_000)); let max_connections = Arc::new(Semaphore::new(10_000));
let me2dc_fallback = config.general.me2dc_fallback;
let me_init_retry_attempts = config.general.me_init_retry_attempts;
if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me { if use_middle_proxy && !decision.ipv4_me && !decision.ipv6_me {
if me2dc_fallback {
warn!("No usable IP family for Middle Proxy detected; falling back to direct DC"); warn!("No usable IP family for Middle Proxy detected; falling back to direct DC");
use_middle_proxy = false; use_middle_proxy = false;
} else {
warn!(
"No usable IP family for Middle Proxy detected; me2dc_fallback=false, ME init retries stay active"
);
}
} }
// ===================================================================== // =====================================================================
@ -475,13 +483,47 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// proxy-secret is from: https://core.telegram.org/getProxySecret // proxy-secret is from: https://core.telegram.org/getProxySecret
// ============================================================= // =============================================================
let proxy_secret_path = config.general.proxy_secret_path.as_deref(); let proxy_secret_path = config.general.proxy_secret_path.as_deref();
match crate::transport::middle_proxy::fetch_proxy_secret( let pool_size = config.general.middle_proxy_pool_size.max(1);
let mut init_attempt: u32 = 0;
loop {
init_attempt = init_attempt.saturating_add(1);
let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret(
proxy_secret_path, proxy_secret_path,
config.general.proxy_secret_len_max, config.general.proxy_secret_len_max,
) )
.await .await
{ {
Ok(proxy_secret) => { Ok(proxy_secret) => proxy_secret,
Err(e) => {
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
if retries_limited && init_attempt >= me_init_retry_attempts {
error!(
error = %e,
attempt = init_attempt,
retry_limit = me_init_retry_attempts,
"ME startup retries exhausted while loading proxy-secret; falling back to direct mode"
);
break None;
}
warn!(
error = %e,
attempt = init_attempt,
retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
},
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2,
"Failed to fetch proxy-secret; retrying ME startup"
);
tokio::time::sleep(Duration::from_secs(2)).await;
continue;
}
};
info!( info!(
secret_len = proxy_secret.len(), secret_len = proxy_secret.len(),
key_sig = format_args!( key_sig = format_args!(
@ -520,7 +562,7 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
} }
let pool = MePool::new( let pool = MePool::new(
proxy_tag, proxy_tag.clone(),
proxy_secret, proxy_secret,
config.general.middle_proxy_nat_ip, config.general.middle_proxy_nat_ip,
me_nat_probe, me_nat_probe,
@ -577,11 +619,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_route_backpressure_high_watermark_pct, config.general.me_route_backpressure_high_watermark_pct,
); );
let pool_size = config.general.middle_proxy_pool_size.max(1);
loop {
match pool.init(pool_size, &rng).await { match pool.init(pool_size, &rng).await {
Ok(()) => { Ok(()) => {
info!("Middle-End pool initialized successfully"); info!(
attempt = init_attempt,
"Middle-End pool initialized successfully"
);
// Phase 4: Start health monitor // Phase 4: Start health monitor
let pool_clone = pool.clone(); let pool_clone = pool.clone();
@ -597,8 +640,26 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
break Some(pool); break Some(pool);
} }
Err(e) => { Err(e) => {
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0;
if retries_limited && init_attempt >= me_init_retry_attempts {
error!(
error = %e,
attempt = init_attempt,
retry_limit = me_init_retry_attempts,
"ME pool init retries exhausted; falling back to direct mode"
);
break None;
}
warn!( warn!(
error = %e, error = %e,
attempt = init_attempt,
retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
},
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2, retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization" "ME pool is not ready yet; retrying startup initialization"
); );
@ -607,12 +668,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
} }
} }
} }
}
Err(e) => {
error!(error = %e, "Failed to fetch proxy-secret. Falling back to direct mode.");
None
}
}
} else { } else {
None None
}; };