ME Soft Reinit tuning

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-02-24 18:19:39 +03:00
parent 2356ae5584
commit d2f08fb707
No known key found for this signature in database
6 changed files with 386 additions and 60 deletions

View File

@ -182,6 +182,26 @@ pub(crate) fn default_update_every_secs() -> u64 {
30 * 60
}
pub(crate) fn default_me_config_stable_snapshots() -> u8 {
2
}
pub(crate) fn default_me_config_apply_cooldown_secs() -> u64 {
300
}
pub(crate) fn default_proxy_secret_stable_snapshots() -> u8 {
2
}
pub(crate) fn default_proxy_secret_rotate_runtime() -> bool {
true
}
pub(crate) fn default_proxy_secret_len_max() -> usize {
256
}
pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 {
120
}

View File

@ -147,6 +147,24 @@ impl ProxyConfig {
}
}
if config.general.me_config_stable_snapshots == 0 {
return Err(ProxyError::Config(
"general.me_config_stable_snapshots must be > 0".to_string(),
));
}
if config.general.proxy_secret_stable_snapshots == 0 {
return Err(ProxyError::Config(
"general.proxy_secret_stable_snapshots must be > 0".to_string(),
));
}
if !(32..=4096).contains(&config.general.proxy_secret_len_max) {
return Err(ProxyError::Config(
"general.proxy_secret_len_max must be within [32, 4096]".to_string(),
));
}
if !(0.0..=1.0).contains(&config.general.me_pool_min_fresh_ratio) {
return Err(ProxyError::Config(
"general.me_pool_min_fresh_ratio must be within [0.0, 1.0]".to_string(),
@ -462,6 +480,66 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn me_config_stable_snapshots_zero_is_rejected() {
let toml = r#"
[general]
me_config_stable_snapshots = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_me_config_stable_snapshots_zero_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.me_config_stable_snapshots must be > 0"));
let _ = std::fs::remove_file(path);
}
#[test]
fn proxy_secret_stable_snapshots_zero_is_rejected() {
let toml = r#"
[general]
proxy_secret_stable_snapshots = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_proxy_secret_stable_snapshots_zero_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.proxy_secret_stable_snapshots must be > 0"));
let _ = std::fs::remove_file(path);
}
#[test]
fn proxy_secret_len_max_out_of_range_is_rejected() {
let toml = r#"
[general]
proxy_secret_len_max = 16
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_proxy_secret_len_max_out_of_range_test.toml");
std::fs::write(&path, toml).unwrap();
let err = ProxyConfig::load(&path).unwrap_err().to_string();
assert!(err.contains("general.proxy_secret_len_max must be within [32, 4096]"));
let _ = std::fs::remove_file(path);
}
#[test]
fn me_pool_min_fresh_ratio_out_of_range_is_rejected() {
let toml = r#"

View File

@ -267,6 +267,26 @@ pub struct GeneralConfig {
#[serde(default)]
pub update_every: Option<u64>,
/// Number of identical getProxyConfig snapshots required before applying ME map updates.
#[serde(default = "default_me_config_stable_snapshots")]
pub me_config_stable_snapshots: u8,
/// Cooldown in seconds between applied ME map updates.
#[serde(default = "default_me_config_apply_cooldown_secs")]
pub me_config_apply_cooldown_secs: u64,
/// Number of identical getProxySecret snapshots required before runtime secret rotation.
#[serde(default = "default_proxy_secret_stable_snapshots")]
pub proxy_secret_stable_snapshots: u8,
/// Enable runtime proxy-secret rotation from getProxySecret.
#[serde(default = "default_proxy_secret_rotate_runtime")]
pub proxy_secret_rotate_runtime: bool,
/// Maximum allowed proxy-secret length in bytes for startup and runtime refresh.
#[serde(default = "default_proxy_secret_len_max")]
pub proxy_secret_len_max: usize,
/// Drain-TTL in seconds for stale ME writers after endpoint map changes.
/// During TTL, stale writers may be used only as fallback for new bindings.
#[serde(default = "default_me_pool_drain_ttl_secs")]
@ -346,6 +366,11 @@ impl Default for GeneralConfig {
hardswap: default_hardswap(),
fast_mode_min_tls_record: default_fast_mode_min_tls_record(),
update_every: Some(default_update_every_secs()),
me_config_stable_snapshots: default_me_config_stable_snapshots(),
me_config_apply_cooldown_secs: default_me_config_apply_cooldown_secs(),
proxy_secret_stable_snapshots: default_proxy_secret_stable_snapshots(),
proxy_secret_rotate_runtime: default_proxy_secret_rotate_runtime(),
proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),
me_reinit_drain_timeout_secs: default_me_reinit_drain_timeout_secs(),

View File

@ -298,25 +298,30 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
// proxy-secret is from: https://core.telegram.org/getProxySecret
// =============================================================
let proxy_secret_path = config.general.proxy_secret_path.as_deref();
match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).await {
Ok(proxy_secret) => {
info!(
secret_len = proxy_secret.len(),
key_sig = format_args!(
"0x{:08x}",
if proxy_secret.len() >= 4 {
u32::from_le_bytes([
proxy_secret[0],
proxy_secret[1],
proxy_secret[2],
proxy_secret[3],
])
} else {
0
}
),
"Proxy-secret loaded"
);
match crate::transport::middle_proxy::fetch_proxy_secret(
proxy_secret_path,
config.general.proxy_secret_len_max,
)
.await
{
Ok(proxy_secret) => {
info!(
secret_len = proxy_secret.len(),
key_sig = format_args!(
"0x{:08x}",
if proxy_secret.len() >= 4 {
u32::from_le_bytes([
proxy_secret[0],
proxy_secret[1],
proxy_secret[2],
proxy_secret[3],
])
} else {
0
}
),
"Proxy-secret loaded"
);
// Load ME config (v4/v6) + default DC
let mut cfg_v4 = fetch_proxy_config(

View File

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
@ -11,7 +12,7 @@ use crate::config::ProxyConfig;
use crate::error::Result;
use super::MePool;
use super::secret::download_proxy_secret;
use super::secret::download_proxy_secret_with_max_len;
use crate::crypto::SecureRandom;
use std::time::SystemTime;
@ -39,6 +40,92 @@ pub struct ProxyConfigData {
pub default_dc: Option<i32>,
}
#[derive(Debug, Default)]
struct StableSnapshot {
candidate_hash: Option<u64>,
candidate_hits: u8,
applied_hash: Option<u64>,
}
impl StableSnapshot {
fn observe(&mut self, hash: u64) -> u8 {
if self.candidate_hash == Some(hash) {
self.candidate_hits = self.candidate_hits.saturating_add(1);
} else {
self.candidate_hash = Some(hash);
self.candidate_hits = 1;
}
self.candidate_hits
}
fn is_applied(&self, hash: u64) -> bool {
self.applied_hash == Some(hash)
}
fn mark_applied(&mut self, hash: u64) {
self.applied_hash = Some(hash);
}
}
#[derive(Debug, Default)]
struct UpdaterState {
config_v4: StableSnapshot,
config_v6: StableSnapshot,
secret: StableSnapshot,
last_map_apply_at: Option<tokio::time::Instant>,
}
fn hash_proxy_config(cfg: &ProxyConfigData) -> u64 {
let mut hasher = DefaultHasher::new();
cfg.default_dc.hash(&mut hasher);
let mut by_dc: Vec<(i32, Vec<(IpAddr, u16)>)> =
cfg.map.iter().map(|(dc, addrs)| (*dc, addrs.clone())).collect();
by_dc.sort_by_key(|(dc, _)| *dc);
for (dc, mut addrs) in by_dc {
dc.hash(&mut hasher);
addrs.sort_unstable();
for (ip, port) in addrs {
ip.hash(&mut hasher);
port.hash(&mut hasher);
}
}
hasher.finish()
}
fn hash_secret(secret: &[u8]) -> u64 {
let mut hasher = DefaultHasher::new();
secret.hash(&mut hasher);
hasher.finish()
}
fn map_apply_cooldown_ready(
last_applied: Option<tokio::time::Instant>,
cooldown: Duration,
) -> bool {
if cooldown.is_zero() {
return true;
}
match last_applied {
Some(ts) => ts.elapsed() >= cooldown,
None => true,
}
}
fn map_apply_cooldown_remaining_secs(
last_applied: tokio::time::Instant,
cooldown: Duration,
) -> u64 {
if cooldown.is_zero() {
return 0;
}
cooldown
.checked_sub(last_applied.elapsed())
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn parse_host_port(s: &str) -> Option<(IpAddr, u16)> {
if let Some(bracket_end) = s.rfind(']')
&& s.starts_with('[')
@ -130,7 +217,12 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
Ok(ProxyConfigData { map, default_dc })
}
async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &ProxyConfig) {
async fn run_update_cycle(
pool: &Arc<MePool>,
rng: &Arc<SecureRandom>,
cfg: &ProxyConfig,
state: &mut UpdaterState,
) {
pool.update_runtime_reinit_policy(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
@ -138,33 +230,93 @@ async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &Pro
cfg.general.me_pool_min_fresh_ratio,
);
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
let required_secret_snapshots = cfg.general.proxy_secret_stable_snapshots.max(1);
let apply_cooldown = Duration::from_secs(cfg.general.me_config_apply_cooldown_secs);
let mut maps_changed = false;
// Update proxy config v4
let mut ready_v4: Option<(ProxyConfigData, u64)> = None;
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg_v4) = cfg_v4 {
let changed = pool.update_proxy_maps(cfg_v4.map.clone(), None).await;
if let Some(dc) = cfg_v4.default_dc {
pool.default_dc
.store(dc, std::sync::atomic::Ordering::Relaxed);
}
if changed {
maps_changed = true;
info!("ME config updated (v4)");
let cfg_v4_hash = hash_proxy_config(&cfg_v4);
let stable_hits = state.config_v4.observe(cfg_v4_hash);
if stable_hits < required_cfg_snapshots {
debug!(
stable_hits,
required_cfg_snapshots,
snapshot = format_args!("0x{cfg_v4_hash:016x}"),
"ME config v4 candidate observed"
);
} else if state.config_v4.is_applied(cfg_v4_hash) {
debug!(
snapshot = format_args!("0x{cfg_v4_hash:016x}"),
"ME config v4 stable snapshot already applied"
);
} else {
debug!("ME config v4 unchanged");
ready_v4 = Some((cfg_v4, cfg_v4_hash));
}
}
// Update proxy config v6 (optional)
let mut ready_v6: Option<(ProxyConfigData, u64)> = None;
let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
if let Some(cfg_v6) = cfg_v6 {
let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
if changed {
maps_changed = true;
info!("ME config updated (v6)");
let cfg_v6_hash = hash_proxy_config(&cfg_v6);
let stable_hits = state.config_v6.observe(cfg_v6_hash);
if stable_hits < required_cfg_snapshots {
debug!(
stable_hits,
required_cfg_snapshots,
snapshot = format_args!("0x{cfg_v6_hash:016x}"),
"ME config v6 candidate observed"
);
} else if state.config_v6.is_applied(cfg_v6_hash) {
debug!(
snapshot = format_args!("0x{cfg_v6_hash:016x}"),
"ME config v6 stable snapshot already applied"
);
} else {
debug!("ME config v6 unchanged");
ready_v6 = Some((cfg_v6, cfg_v6_hash));
}
}
if ready_v4.is_some() || ready_v6.is_some() {
if map_apply_cooldown_ready(state.last_map_apply_at, apply_cooldown) {
let update_v4 = ready_v4
.as_ref()
.map(|(snapshot, _)| snapshot.map.clone())
.unwrap_or_default();
let update_v6 = ready_v6
.as_ref()
.map(|(snapshot, _)| snapshot.map.clone());
let changed = pool.update_proxy_maps(update_v4, update_v6).await;
if let Some((snapshot, hash)) = ready_v4 {
if let Some(dc) = snapshot.default_dc {
pool.default_dc
.store(dc, std::sync::atomic::Ordering::Relaxed);
}
state.config_v4.mark_applied(hash);
}
if let Some((_snapshot, hash)) = ready_v6 {
state.config_v6.mark_applied(hash);
}
state.last_map_apply_at = Some(tokio::time::Instant::now());
if changed {
maps_changed = true;
info!("ME config update applied after stable-gate");
} else {
debug!("ME config stable-gate applied with no map delta");
}
} else if let Some(last) = state.last_map_apply_at {
let wait_secs = map_apply_cooldown_remaining_secs(last, apply_cooldown);
debug!(
wait_secs,
"ME config stable snapshot deferred by cooldown"
);
}
}
@ -175,14 +327,37 @@ async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &Pro
pool.reset_stun_state();
// Update proxy-secret
match download_proxy_secret().await {
Ok(secret) => {
if pool.update_secret(secret).await {
info!("proxy-secret updated and pool reconnect scheduled");
if cfg.general.proxy_secret_rotate_runtime {
match download_proxy_secret_with_max_len(cfg.general.proxy_secret_len_max).await {
Ok(secret) => {
let secret_hash = hash_secret(&secret);
let stable_hits = state.secret.observe(secret_hash);
if stable_hits < required_secret_snapshots {
debug!(
stable_hits,
required_secret_snapshots,
snapshot = format_args!("0x{secret_hash:016x}"),
"proxy-secret candidate observed"
);
} else if state.secret.is_applied(secret_hash) {
debug!(
snapshot = format_args!("0x{secret_hash:016x}"),
"proxy-secret stable snapshot already applied"
);
} else {
let rotated = pool.update_secret(secret).await;
state.secret.mark_applied(secret_hash);
if rotated {
info!("proxy-secret rotated after stable-gate");
} else {
debug!("proxy-secret stable snapshot confirmed as unchanged");
}
}
}
Err(e) => warn!(error = %e, "proxy-secret update failed"),
}
Err(e) => warn!(error = %e, "proxy-secret update failed"),
} else {
debug!("proxy-secret runtime rotation disabled by config");
}
}
@ -191,6 +366,7 @@ pub async fn me_config_updater(
rng: Arc<SecureRandom>,
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
) {
let mut state = UpdaterState::default();
let mut update_every_secs = config_rx
.borrow()
.general
@ -207,7 +383,7 @@ pub async fn me_config_updater(
tokio::select! {
_ = &mut sleep => {
let cfg = config_rx.borrow().clone();
run_update_cycle(&pool, &rng, cfg.as_ref()).await;
run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await;
let refreshed_secs = cfg.general.effective_update_every_secs().max(1);
if refreshed_secs != update_every_secs {
info!(
@ -245,7 +421,7 @@ pub async fn me_config_updater(
);
update_every_secs = new_secs;
update_every = Duration::from_secs(update_every_secs);
run_update_cycle(&pool, &rng, cfg.as_ref()).await;
run_update_cycle(&pool, &rng, cfg.as_ref(), &mut state).await;
next_tick = tokio::time::Instant::now() + update_every;
} else {
info!(

View File

@ -4,12 +4,42 @@ use httpdate;
use crate::error::{ProxyError, Result};
pub const PROXY_SECRET_MIN_LEN: usize = 32;
pub(super) fn validate_proxy_secret_len(data_len: usize, max_len: usize) -> Result<()> {
if max_len < PROXY_SECRET_MIN_LEN {
return Err(ProxyError::Proxy(format!(
"proxy-secret max length is invalid: {} bytes (must be >= {})",
max_len,
PROXY_SECRET_MIN_LEN
)));
}
if data_len < PROXY_SECRET_MIN_LEN {
return Err(ProxyError::Proxy(format!(
"proxy-secret too short: {} bytes (need >= {})",
data_len,
PROXY_SECRET_MIN_LEN
)));
}
if data_len > max_len {
return Err(ProxyError::Proxy(format!(
"proxy-secret too long: {} bytes (limit = {})",
data_len,
max_len
)));
}
Ok(())
}
/// Fetch Telegram proxy-secret binary.
pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result<Vec<u8>> {
pub async fn fetch_proxy_secret(cache_path: Option<&str>, max_len: usize) -> Result<Vec<u8>> {
let cache = cache_path.unwrap_or("proxy-secret");
// 1) Try fresh download first.
match download_proxy_secret().await {
match download_proxy_secret_with_max_len(max_len).await {
Ok(data) => {
if let Err(e) = tokio::fs::write(cache, &data).await {
warn!(error = %e, "Failed to cache proxy-secret (non-fatal)");
@ -24,9 +54,9 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result<Vec<u8>> {
}
}
// 2) Fallback to cache/file regardless of age; require len>=32.
// 2) Fallback to cache/file regardless of age; require len in bounds.
match tokio::fs::read(cache).await {
Ok(data) if data.len() >= 32 => {
Ok(data) if validate_proxy_secret_len(data.len(), max_len).is_ok() => {
let age_hours = tokio::fs::metadata(cache)
.await
.ok()
@ -41,17 +71,14 @@ pub async fn fetch_proxy_secret(cache_path: Option<&str>) -> Result<Vec<u8>> {
);
Ok(data)
}
Ok(data) => Err(ProxyError::Proxy(format!(
"Cached proxy-secret too short: {} bytes (need >= 32)",
data.len()
))),
Ok(data) => validate_proxy_secret_len(data.len(), max_len).map(|_| data),
Err(e) => Err(ProxyError::Proxy(format!(
"Failed to read proxy-secret cache after download failure: {e}"
))),
}
}
pub async fn download_proxy_secret() -> Result<Vec<u8>> {
pub async fn download_proxy_secret_with_max_len(max_len: usize) -> Result<Vec<u8>> {
let resp = reqwest::get("https://core.telegram.org/getProxySecret")
.await
.map_err(|e| ProxyError::Proxy(format!("Failed to download proxy-secret: {e}")))?;
@ -84,12 +111,7 @@ pub async fn download_proxy_secret() -> Result<Vec<u8>> {
.map_err(|e| ProxyError::Proxy(format!("Read proxy-secret body: {e}")))?
.to_vec();
if data.len() < 32 {
return Err(ProxyError::Proxy(format!(
"proxy-secret too short: {} bytes (need >= 32)",
data.len()
)));
}
validate_proxy_secret_len(data.len(), max_len)?;
info!(len = data.len(), "Downloaded proxy-secret OK");
Ok(data)