ME Pool Init fixes

This commit is contained in:
Alexey 2026-03-05 15:31:36 +03:00
parent 813f1df63e
commit 8066ea2163
No known key found for this signature in database
7 changed files with 461 additions and 225 deletions

View File

@ -141,6 +141,14 @@ pub(crate) fn default_proxy_secret_path() -> Option<String> {
Some("proxy-secret".to_string()) Some("proxy-secret".to_string())
} }
pub(crate) fn default_proxy_config_v4_cache_path() -> Option<String> {
Some("cache/proxy-config-v4.txt".to_string())
}
pub(crate) fn default_proxy_config_v6_cache_path() -> Option<String> {
Some("cache/proxy-config-v6.txt".to_string())
}
pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> { pub(crate) fn default_middle_proxy_nat_stun() -> Option<String> {
None None
} }

View File

@ -405,6 +405,12 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b
warned = true; warned = true;
warn!("config reload: general.me2dc_fallback changed; restart required"); warn!("config reload: general.me2dc_fallback changed; restart required");
} }
if old.general.proxy_config_v4_cache_path != new.general.proxy_config_v4_cache_path
|| old.general.proxy_config_v6_cache_path != new.general.proxy_config_v6_cache_path
{
warned = true;
warn!("config reload: general.proxy_config_*_cache_path changed; restart required");
}
if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled if old.general.me_keepalive_enabled != new.general.me_keepalive_enabled
|| old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs || old.general.me_keepalive_interval_secs != new.general.me_keepalive_interval_secs
|| old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs || old.general.me_keepalive_jitter_secs != new.general.me_keepalive_jitter_secs

View File

@ -203,6 +203,22 @@ impl ProxyConfig {
sanitize_ad_tag(&mut config.general.ad_tag); sanitize_ad_tag(&mut config.general.ad_tag);
if let Some(path) = &config.general.proxy_config_v4_cache_path
&& path.trim().is_empty()
{
return Err(ProxyError::Config(
"general.proxy_config_v4_cache_path cannot be empty when provided".to_string(),
));
}
if let Some(path) = &config.general.proxy_config_v6_cache_path
&& path.trim().is_empty()
{
return Err(ProxyError::Config(
"general.proxy_config_v6_cache_path cannot be empty when provided".to_string(),
));
}
if let Some(update_every) = config.general.update_every { if let Some(update_every) = config.general.update_every {
if update_every == 0 { if update_every == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
@ -691,6 +707,14 @@ mod tests {
cfg.general.me2dc_fallback, cfg.general.me2dc_fallback,
default_me2dc_fallback() default_me2dc_fallback()
); );
assert_eq!(
cfg.general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path()
);
assert_eq!(
cfg.general.proxy_config_v6_cache_path,
default_proxy_config_v6_cache_path()
);
assert_eq!( assert_eq!(
cfg.general.me_single_endpoint_shadow_writers, cfg.general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers() default_me_single_endpoint_shadow_writers()
@ -801,6 +825,14 @@ mod tests {
default_me_init_retry_attempts() default_me_init_retry_attempts()
); );
assert_eq!(general.me2dc_fallback, default_me2dc_fallback()); assert_eq!(general.me2dc_fallback, default_me2dc_fallback());
assert_eq!(
general.proxy_config_v4_cache_path,
default_proxy_config_v4_cache_path()
);
assert_eq!(
general.proxy_config_v6_cache_path,
default_proxy_config_v6_cache_path()
);
assert_eq!( assert_eq!(
general.me_single_endpoint_shadow_writers, general.me_single_endpoint_shadow_writers,
default_me_single_endpoint_shadow_writers() default_me_single_endpoint_shadow_writers()
@ -1267,6 +1299,42 @@ mod tests {
let _ = std::fs::remove_file(path); let _ = std::fs::remove_file(path);
} }
#[test]
fn proxy_config_cache_paths_empty_are_rejected() {
let toml = r#"
[general]
proxy_config_v4_cache_path = " "
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_proxy_config_v4_cache_path_empty_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.proxy_config_v4_cache_path cannot be empty"));
let _ = std::fs::remove_file(path);
let toml_v6 = r#"
[general]
proxy_config_v6_cache_path = ""
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let path_v6 = dir.join("telemt_proxy_config_v6_cache_path_empty_test.toml");
std::fs::write(&path_v6, toml_v6).unwrap();
let err_v6 = ProxyConfig::load(&path_v6).unwrap_err().to_string();
assert!(err_v6.contains("general.proxy_config_v6_cache_path cannot be empty"));
let _ = std::fs::remove_file(path_v6);
}
#[test] #[test]
fn me_hardswap_warmup_defaults_are_set() { fn me_hardswap_warmup_defaults_are_set() {
let toml = r#" let toml = r#"

View File

@ -343,6 +343,14 @@ pub struct GeneralConfig {
#[serde(default = "default_proxy_secret_path")] #[serde(default = "default_proxy_secret_path")]
pub proxy_secret_path: Option<String>, pub proxy_secret_path: Option<String>,
/// Optional path to cache raw getProxyConfig (IPv4) snapshot for startup fallback.
#[serde(default = "default_proxy_config_v4_cache_path")]
pub proxy_config_v4_cache_path: Option<String>,
/// Optional path to cache raw getProxyConfigV6 snapshot for startup fallback.
#[serde(default = "default_proxy_config_v6_cache_path")]
pub proxy_config_v6_cache_path: Option<String>,
/// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags. /// Global ad_tag (32 hex chars from @MTProxybot). Fallback when user has no per-user tag in access.user_ad_tags.
#[serde(default)] #[serde(default)]
pub ad_tag: Option<String>, pub ad_tag: Option<String>,
@ -727,6 +735,8 @@ impl Default for GeneralConfig {
use_middle_proxy: default_true(), use_middle_proxy: default_true(),
ad_tag: None, ad_tag: None,
proxy_secret_path: default_proxy_secret_path(), proxy_secret_path: default_proxy_secret_path(),
proxy_config_v4_cache_path: default_proxy_config_v4_cache_path(),
proxy_config_v6_cache_path: default_proxy_config_v6_cache_path(),
middle_proxy_nat_ip: None, middle_proxy_nat_ip: None,
middle_proxy_nat_probe: default_true(), middle_proxy_nat_probe: default_true(),
middle_proxy_nat_stun: default_middle_proxy_nat_stun(), middle_proxy_nat_stun: default_middle_proxy_nat_stun(),

View File

@ -41,8 +41,9 @@ use crate::stats::telemetry::TelemetryPolicy;
use crate::stats::{ReplayChecker, Stats}; use crate::stats::{ReplayChecker, Stats};
use crate::stream::BufferPool; use crate::stream::BufferPool;
use crate::transport::middle_proxy::{ use crate::transport::middle_proxy::{
MePool, fetch_proxy_config, run_me_ping, MePingFamily, MePingSample, MeReinitTrigger, format_sample_line, MePool, ProxyConfigData, fetch_proxy_config_with_raw, format_me_route, format_sample_line,
format_me_route, load_proxy_config_cache, run_me_ping, save_proxy_config_cache, MePingFamily, MePingSample,
MeReinitTrigger,
}; };
use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes}; use crate::transport::{ListenOptions, UpstreamManager, create_listener, find_listener_processes};
use crate::tls_front::TlsFrontCache; use crate::tls_front::TlsFrontCache;
@ -172,6 +173,120 @@ async fn write_beobachten_snapshot(path: &str, payload: &str) -> std::io::Result
tokio::fs::write(path, payload).await tokio::fs::write(path, payload).await
} }
async fn load_startup_proxy_config_snapshot(
url: &str,
cache_path: Option<&str>,
me2dc_fallback: bool,
label: &'static str,
) -> Option<ProxyConfigData> {
loop {
match fetch_proxy_config_with_raw(url).await {
Ok((cfg, raw)) => {
if !cfg.map.is_empty() {
if let Some(path) = cache_path
&& let Err(e) = save_proxy_config_cache(path, &raw).await
{
warn!(error = %e, path, snapshot = label, "Failed to store startup proxy-config cache");
}
return Some(cfg);
}
warn!(snapshot = label, url, "Startup proxy-config is empty; trying disk cache");
if let Some(path) = cache_path {
match load_proxy_config_cache(path).await {
Ok(cached) if !cached.map.is_empty() => {
info!(
snapshot = label,
path,
proxy_for_lines = cached.proxy_for_lines,
"Loaded startup proxy-config from disk cache"
);
return Some(cached);
}
Ok(_) => {
warn!(
snapshot = label,
path,
"Startup proxy-config cache is empty; ignoring cache file"
);
}
Err(cache_err) => {
debug!(
snapshot = label,
path,
error = %cache_err,
"Startup proxy-config cache unavailable"
);
}
}
}
if me2dc_fallback {
error!(
snapshot = label,
"Startup proxy-config unavailable and no saved config found; falling back to direct mode"
);
return None;
}
warn!(
snapshot = label,
retry_in_secs = 2,
"Startup proxy-config unavailable and no saved config found; retrying because me2dc_fallback=false"
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
Err(fetch_err) => {
if let Some(path) = cache_path {
match load_proxy_config_cache(path).await {
Ok(cached) if !cached.map.is_empty() => {
info!(
snapshot = label,
path,
proxy_for_lines = cached.proxy_for_lines,
"Loaded startup proxy-config from disk cache"
);
return Some(cached);
}
Ok(_) => {
warn!(
snapshot = label,
path,
"Startup proxy-config cache is empty; ignoring cache file"
);
}
Err(cache_err) => {
debug!(
snapshot = label,
path,
error = %cache_err,
"Startup proxy-config cache unavailable"
);
}
}
}
if me2dc_fallback {
error!(
snapshot = label,
error = %fetch_err,
"Startup proxy-config unavailable and no cached data; falling back to direct mode"
);
return None;
}
warn!(
snapshot = label,
error = %fetch_err,
retry_in_secs = 2,
"Startup proxy-config unavailable; retrying because me2dc_fallback=false"
);
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
#[tokio::main] #[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> { async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let (config_path, cli_silent, cli_log_level) = parse_cli(); let (config_path, cli_silent, cli_log_level) = parse_cli();
@ -484,46 +599,34 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// ============================================================= // =============================================================
let proxy_secret_path = config.general.proxy_secret_path.as_deref(); let proxy_secret_path = config.general.proxy_secret_path.as_deref();
let pool_size = config.general.middle_proxy_pool_size.max(1); let pool_size = config.general.middle_proxy_pool_size.max(1);
let mut init_attempt: u32 = 0; let proxy_secret = loop {
loop { match crate::transport::middle_proxy::fetch_proxy_secret(
init_attempt = init_attempt.saturating_add(1);
let proxy_secret = match crate::transport::middle_proxy::fetch_proxy_secret(
proxy_secret_path, proxy_secret_path,
config.general.proxy_secret_len_max, config.general.proxy_secret_len_max,
) )
.await .await
{ {
Ok(proxy_secret) => proxy_secret, Ok(proxy_secret) => break Some(proxy_secret),
Err(e) => { Err(e) => {
let retries_limited = me2dc_fallback && me_init_retry_attempts > 0; if me2dc_fallback {
if retries_limited && init_attempt >= me_init_retry_attempts {
error!( error!(
error = %e, error = %e,
attempt = init_attempt, "ME startup failed: proxy-secret is unavailable and no saved secret found; falling back to direct mode"
retry_limit = me_init_retry_attempts,
"ME startup retries exhausted while loading proxy-secret; falling back to direct mode"
); );
break None; break None;
} }
warn!( warn!(
error = %e, error = %e,
attempt = init_attempt,
retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited")
} else {
me_init_retry_attempts.to_string()
},
me2dc_fallback = me2dc_fallback,
retry_in_secs = 2, retry_in_secs = 2,
"Failed to fetch proxy-secret; retrying ME startup" "ME startup failed: proxy-secret is unavailable and no saved secret found; retrying because me2dc_fallback=false"
); );
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
continue; }
} }
}; };
match proxy_secret {
Some(proxy_secret) => {
info!( info!(
secret_len = proxy_secret.len(), secret_len = proxy_secret.len(),
key_sig = format_args!( key_sig = format_args!(
@ -542,25 +645,22 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
"Proxy-secret loaded" "Proxy-secret loaded"
); );
// Load ME config (v4/v6) + default DC let cfg_v4 = load_startup_proxy_config_snapshot(
let mut cfg_v4 = fetch_proxy_config(
"https://core.telegram.org/getProxyConfig", "https://core.telegram.org/getProxyConfig",
config.general.proxy_config_v4_cache_path.as_deref(),
me2dc_fallback,
"getProxyConfig",
) )
.await .await;
.unwrap_or_default(); let cfg_v6 = load_startup_proxy_config_snapshot(
let mut cfg_v6 = fetch_proxy_config(
"https://core.telegram.org/getProxyConfigV6", "https://core.telegram.org/getProxyConfigV6",
config.general.proxy_config_v6_cache_path.as_deref(),
me2dc_fallback,
"getProxyConfigV6",
) )
.await .await;
.unwrap_or_default();
if cfg_v4.map.is_empty() {
cfg_v4.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V4.clone();
}
if cfg_v6.map.is_empty() {
cfg_v6.map = crate::protocol::constants::TG_MIDDLE_PROXIES_V6.clone();
}
if let (Some(cfg_v4), Some(cfg_v6)) = (cfg_v4, cfg_v6) {
let pool = MePool::new( let pool = MePool::new(
proxy_tag.clone(), proxy_tag.clone(),
proxy_secret, proxy_secret,
@ -623,6 +723,9 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_route_inline_recovery_wait_ms, config.general.me_route_inline_recovery_wait_ms,
); );
let mut init_attempt: u32 = 0;
loop {
init_attempt = init_attempt.saturating_add(1);
match pool.init(pool_size, &rng).await { match pool.init(pool_size, &rng).await {
Ok(()) => { Ok(()) => {
info!( info!(
@ -655,14 +758,15 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
break None; break None;
} }
warn!( let retry_limit = if !me2dc_fallback || me_init_retry_attempts == 0 {
error = %e,
attempt = init_attempt,
retry_limit = if me_init_retry_attempts == 0 {
String::from("unlimited") String::from("unlimited")
} else { } else {
me_init_retry_attempts.to_string() me_init_retry_attempts.to_string()
}, };
warn!(
error = %e,
attempt = init_attempt,
retry_limit = retry_limit,
me2dc_fallback = me2dc_fallback, me2dc_fallback = me2dc_fallback,
retry_in_secs = 2, retry_in_secs = 2,
"ME pool is not ready yet; retrying startup initialization" "ME pool is not ready yet; retrying startup initialization"
@ -674,6 +778,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
} }
} else { } else {
None None
}
}
None => None,
}
} else {
None
}; };
// If ME failed to initialize, force direct-only mode. // If ME failed to initialize, force direct-only mode.

View File

@ -1,6 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher}; use std::hash::{DefaultHasher, Hash, Hasher};
use std::net::IpAddr; use std::net::IpAddr;
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -42,6 +43,87 @@ pub struct ProxyConfigData {
pub proxy_for_lines: u32, pub proxy_for_lines: u32,
} }
pub fn parse_proxy_config_text(text: &str, http_status: u16) -> ProxyConfigData {
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
let mut proxy_for_lines: u32 = 0;
for line in text.lines() {
if let Some((dc, ip, port)) = parse_proxy_line(line) {
map.entry(dc).or_default().push((ip, port));
proxy_for_lines = proxy_for_lines.saturating_add(1);
}
}
let default_dc = text.lines().find_map(|l| {
let t = l.trim();
if let Some(rest) = t.strip_prefix("default") {
return rest.trim().trim_end_matches(';').parse::<i32>().ok();
}
None
});
ProxyConfigData {
map,
default_dc,
http_status,
proxy_for_lines,
}
}
pub async fn load_proxy_config_cache(path: &str) -> Result<ProxyConfigData> {
let text = tokio::fs::read_to_string(path).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!("read proxy-config cache '{path}' failed: {e}"))
})?;
Ok(parse_proxy_config_text(&text, 200))
}
pub async fn save_proxy_config_cache(path: &str, raw_text: &str) -> Result<()> {
if let Some(parent) = Path::new(path).parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!(
"create proxy-config cache dir '{}' failed: {e}",
parent.display()
))
})?;
}
tokio::fs::write(path, raw_text).await.map_err(|e| {
crate::error::ProxyError::Proxy(format!("write proxy-config cache '{path}' failed: {e}"))
})?;
Ok(())
}
pub async fn fetch_proxy_config_with_raw(url: &str) -> Result<(ProxyConfigData, String)> {
let resp = reqwest::get(url)
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))?
;
let http_status = resp.status().as_u16();
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
{
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
}
}
let text = resp
.text()
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let parsed = parse_proxy_config_text(&text, http_status);
Ok((parsed, text))
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct StableSnapshot { struct StableSnapshot {
candidate_hash: Option<u64>, candidate_hash: Option<u64>,
@ -170,61 +252,9 @@ fn parse_proxy_line(line: &str) -> Option<(i32, IpAddr, u16)> {
} }
pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> { pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
let resp = reqwest::get(url) fetch_proxy_config_with_raw(url)
.await .await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config GET failed: {e}")))? .map(|(parsed, _raw)| parsed)
;
let http_status = resp.status().as_u16();
if let Some(date) = resp.headers().get(reqwest::header::DATE)
&& let Ok(date_str) = date.to_str()
&& let Ok(server_time) = httpdate::parse_http_date(date_str)
&& let Ok(skew) = SystemTime::now().duration_since(server_time).or_else(|e| {
server_time.duration_since(SystemTime::now()).map_err(|_| e)
})
{
let skew_secs = skew.as_secs();
if skew_secs > 60 {
warn!(skew_secs, "Time skew >60s detected from fetch_proxy_config Date header");
} else if skew_secs > 30 {
warn!(skew_secs, "Time skew >30s detected from fetch_proxy_config Date header");
}
}
let text = resp
.text()
.await
.map_err(|e| crate::error::ProxyError::Proxy(format!("fetch_proxy_config read failed: {e}")))?;
let mut map: HashMap<i32, Vec<(IpAddr, u16)>> = HashMap::new();
let mut proxy_for_lines: u32 = 0;
for line in text.lines() {
if let Some((dc, ip, port)) = parse_proxy_line(line) {
map.entry(dc).or_default().push((ip, port));
proxy_for_lines = proxy_for_lines.saturating_add(1);
}
}
let default_dc = text
.lines()
.find_map(|l| {
let t = l.trim();
if let Some(rest) = t.strip_prefix("default") {
return rest
.trim()
.trim_end_matches(';')
.parse::<i32>()
.ok();
}
None
});
Ok(ProxyConfigData {
map,
default_dc,
http_status,
proxy_for_lines,
})
} }
fn snapshot_passes_guards( fn snapshot_passes_guards(

View File

@ -30,7 +30,11 @@ pub use pool::MePool;
pub use pool_nat::{stun_probe, detect_public_ip}; pub use pool_nat::{stun_probe, detect_public_ip};
pub use registry::ConnRegistry; pub use registry::ConnRegistry;
pub use secret::fetch_proxy_secret; pub use secret::fetch_proxy_secret;
pub use config_updater::{fetch_proxy_config, me_config_updater}; #[allow(unused_imports)]
pub use config_updater::{
ProxyConfigData, fetch_proxy_config, fetch_proxy_config_with_raw, load_proxy_config_cache,
me_config_updater, save_proxy_config_cache,
};
pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task}; pub use rotation::{MeReinitTrigger, me_reinit_scheduler, me_rotation_task};
pub use wire::proto_flags_for_tag; pub use wire::proto_flags_for_tag;