ME Pool Updater + Soft-staged Reinit w/o Reconcile

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-23 16:04:19 +03:00
parent d08ddd718a
commit d8dcbbb61e
8 changed files with 336 additions and 60 deletions

View File

@@ -4,8 +4,10 @@ use std::sync::Arc;
use std::time::Duration;
use httpdate;
use tokio::sync::watch;
use tracing::{debug, info, warn};
use crate::config::ProxyConfig;
use crate::error::Result;
use super::MePool;
@@ -128,49 +130,126 @@ pub async fn fetch_proxy_config(url: &str) -> Result<ProxyConfigData> {
Ok(ProxyConfigData { map, default_dc })
}
pub async fn me_config_updater(pool: Arc<MePool>, rng: Arc<SecureRandom>, interval: Duration) {
let mut tick = tokio::time::interval(interval);
// skip immediate tick to avoid double-fetch right after startup
tick.tick().await;
async fn run_update_cycle(pool: &Arc<MePool>, rng: &Arc<SecureRandom>, cfg: &ProxyConfig) {
let mut maps_changed = false;
// Update proxy config v4
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg_v4) = cfg_v4 {
let changed = pool.update_proxy_maps(cfg_v4.map.clone(), None).await;
if let Some(dc) = cfg_v4.default_dc {
pool.default_dc
.store(dc, std::sync::atomic::Ordering::Relaxed);
}
if changed {
maps_changed = true;
info!("ME config updated (v4)");
} else {
debug!("ME config v4 unchanged");
}
}
// Update proxy config v6 (optional)
let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
if let Some(cfg_v6) = cfg_v6 {
let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
if changed {
maps_changed = true;
info!("ME config updated (v6)");
} else {
debug!("ME config v6 unchanged");
}
}
if maps_changed {
let drain_timeout = if cfg.general.me_reinit_drain_timeout_secs == 0 {
None
} else {
Some(Duration::from_secs(cfg.general.me_reinit_drain_timeout_secs))
};
pool.zero_downtime_reinit_after_map_change(rng.as_ref(), drain_timeout)
.await;
}
pool.reset_stun_state();
// Update proxy-secret
match download_proxy_secret().await {
Ok(secret) => {
if pool.update_secret(secret).await {
info!("proxy-secret updated and pool reconnect scheduled");
}
}
Err(e) => warn!(error = %e, "proxy-secret update failed"),
}
}
pub async fn me_config_updater(
pool: Arc<MePool>,
rng: Arc<SecureRandom>,
mut config_rx: watch::Receiver<Arc<ProxyConfig>>,
) {
let mut update_every_secs = config_rx
.borrow()
.general
.effective_update_every_secs()
.max(1);
let mut update_every = Duration::from_secs(update_every_secs);
let mut next_tick = tokio::time::Instant::now() + update_every;
info!(update_every_secs, "ME config updater started");
loop {
tick.tick().await;
let sleep = tokio::time::sleep_until(next_tick);
tokio::pin!(sleep);
// Update proxy config v4
let cfg_v4 = retry_fetch("https://core.telegram.org/getProxyConfig").await;
if let Some(cfg) = cfg_v4 {
let changed = pool.update_proxy_maps(cfg.map.clone(), None).await;
if let Some(dc) = cfg.default_dc {
pool.default_dc.store(dc, std::sync::atomic::Ordering::Relaxed);
tokio::select! {
_ = &mut sleep => {
let cfg = config_rx.borrow().clone();
run_update_cycle(&pool, &rng, cfg.as_ref()).await;
let refreshed_secs = cfg.general.effective_update_every_secs().max(1);
if refreshed_secs != update_every_secs {
info!(
old_update_every_secs = update_every_secs,
new_update_every_secs = refreshed_secs,
"ME config updater interval changed"
);
update_every_secs = refreshed_secs;
update_every = Duration::from_secs(update_every_secs);
}
next_tick = tokio::time::Instant::now() + update_every;
}
if changed {
info!("ME config updated (v4), reconciling connections");
pool.reconcile_connections(&rng).await;
} else {
debug!("ME config v4 unchanged");
}
}
changed = config_rx.changed() => {
if changed.is_err() {
warn!("ME config updater stopped: config channel closed");
break;
}
let cfg = config_rx.borrow().clone();
let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs {
continue;
}
// Update proxy config v6 (optional)
let cfg_v6 = retry_fetch("https://core.telegram.org/getProxyConfigV6").await;
if let Some(cfg_v6) = cfg_v6 {
let changed = pool.update_proxy_maps(HashMap::new(), Some(cfg_v6.map)).await;
if changed {
info!("ME config updated (v6), reconciling connections");
pool.reconcile_connections(&rng).await;
} else {
debug!("ME config v6 unchanged");
}
}
pool.reset_stun_state();
// Update proxy-secret
match download_proxy_secret().await {
Ok(secret) => {
if pool.update_secret(secret).await {
info!("proxy-secret updated and pool reconnect scheduled");
if new_secs < update_every_secs {
info!(
old_update_every_secs = update_every_secs,
new_update_every_secs = new_secs,
"ME config updater interval decreased, running immediate refresh"
);
update_every_secs = new_secs;
update_every = Duration::from_secs(update_every_secs);
run_update_cycle(&pool, &rng, cfg.as_ref()).await;
next_tick = tokio::time::Instant::now() + update_every;
} else {
info!(
old_update_every_secs = update_every_secs,
new_update_every_secs = new_secs,
"ME config updater interval increased"
);
update_every_secs = new_secs;
update_every = Duration::from_secs(update_every_secs);
next_tick = tokio::time::Instant::now() + update_every;
}
}
Err(e) => warn!(error = %e, "proxy-secret update failed"),
}
}
}

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicUsize, Ordering};
@@ -178,7 +178,6 @@ impl MePool {
}
pub async fn reconcile_connections(self: &Arc<Self>, rng: &SecureRandom) {
use std::collections::HashSet;
let writers = self.writers.read().await;
let current: HashSet<SocketAddr> = writers
.iter()
@@ -210,6 +209,101 @@ impl MePool {
}
}
async fn desired_dc_endpoints(&self) -> HashMap<i32, HashSet<SocketAddr>> {
let mut out: HashMap<i32, HashSet<SocketAddr>> = HashMap::new();
if self.decision.ipv4_me {
let map_v4 = self.proxy_map_v4.read().await.clone();
for (dc, addrs) in map_v4 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
if self.decision.ipv6_me {
let map_v6 = self.proxy_map_v6.read().await.clone();
for (dc, addrs) in map_v6 {
let entry = out.entry(dc.abs()).or_default();
for (ip, port) in addrs {
entry.insert(SocketAddr::new(ip, port));
}
}
}
out
}
pub async fn zero_downtime_reinit_after_map_change(
self: &Arc<Self>,
rng: &SecureRandom,
drain_timeout: Option<Duration>,
) {
self.reconcile_connections(rng).await;
let desired_by_dc = self.desired_dc_endpoints().await;
if desired_by_dc.is_empty() {
warn!("ME endpoint map is empty after update; skipping stale writer drain");
return;
}
let writers = self.writers.read().await;
let active_writer_addrs: HashSet<SocketAddr> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.map(|w| w.addr)
.collect();
let mut missing_dc = Vec::<i32>::new();
for (dc, endpoints) in &desired_by_dc {
if endpoints.is_empty() {
continue;
}
if !endpoints.iter().any(|addr| active_writer_addrs.contains(addr)) {
missing_dc.push(*dc);
}
}
if !missing_dc.is_empty() {
missing_dc.sort_unstable();
warn!(
missing_dc = ?missing_dc,
"ME reinit coverage incomplete after map update; keeping stale writers"
);
return;
}
let desired_addrs: HashSet<SocketAddr> = desired_by_dc
.values()
.flat_map(|set| set.iter().copied())
.collect();
let stale_writer_ids: Vec<u64> = writers
.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.filter(|w| !desired_addrs.contains(&w.addr))
.map(|w| w.id)
.collect();
drop(writers);
if stale_writer_ids.is_empty() {
debug!("ME map update completed with no stale writers");
return;
}
let drain_timeout_secs = drain_timeout.map(|d| d.as_secs()).unwrap_or(0);
info!(
stale_writers = stale_writer_ids.len(),
drain_timeout_secs,
"ME map update covered; draining stale writers"
);
for writer_id in stale_writer_ids {
self.mark_writer_draining_with_timeout(writer_id, drain_timeout)
.await;
}
}
pub async fn update_proxy_maps(
&self,
new_v4: HashMap<i32, Vec<(IpAddr, u16)>>,
@@ -631,23 +725,40 @@ impl MePool {
self.registry.writer_lost(writer_id).await
}
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
{
pub(crate) async fn mark_writer_draining_with_timeout(
self: &Arc<Self>,
writer_id: u64,
timeout: Option<Duration>,
) {
let timeout = timeout.filter(|d| !d.is_zero());
let found = {
let mut ws = self.writers.write().await;
if let Some(w) = ws.iter_mut().find(|w| w.id == writer_id) {
w.draining.store(true, Ordering::Relaxed);
true
} else {
false
}
};
if !found {
return;
}
let timeout_secs = timeout.map(|d| d.as_secs()).unwrap_or(0);
debug!(writer_id, timeout_secs, "ME writer marked draining");
let pool = Arc::downgrade(self);
tokio::spawn(async move {
let deadline = Instant::now() + Duration::from_secs(300);
let deadline = timeout.map(|t| Instant::now() + t);
loop {
if let Some(p) = pool.upgrade() {
if Instant::now() >= deadline {
warn!(writer_id, "Drain timeout, force-closing");
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
if let Some(deadline_at) = deadline {
if Instant::now() >= deadline_at {
warn!(writer_id, "Drain timeout, force-closing");
let _ = p.remove_writer_and_close_clients(writer_id).await;
break;
}
}
if p.registry.is_writer_empty(writer_id).await {
let _ = p.remove_writer_only(writer_id).await;
@@ -661,6 +772,11 @@ impl MePool {
});
}
pub(crate) async fn mark_writer_draining(self: &Arc<Self>, writer_id: u64) {
self.mark_writer_draining_with_timeout(writer_id, Some(Duration::from_secs(300)))
.await;
}
}
fn hex_dump(data: &[u8]) -> String {