From e340b716b2f4751b223424e64f761ab52392ff16 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Thu, 19 Feb 2026 15:39:30 +0300 Subject: [PATCH] Drafting ME Healthcheck Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 24 +++++ src/config/types.rs | 55 +++++++++++ src/main.rs | 11 +++ src/transport/middle_proxy/codec.rs | 9 ++ src/transport/middle_proxy/health.rs | 136 ++++++--------------------- src/transport/middle_proxy/pool.rs | 94 +++++++++++++++++- 6 files changed, 217 insertions(+), 112 deletions(-) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index c11e738..19269a2 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -78,6 +78,30 @@ pub(crate) fn default_pool_size() -> usize { 2 } +pub(crate) fn default_keepalive_interval() -> u64 { + 25 +} + +pub(crate) fn default_keepalive_jitter() -> u64 { + 5 +} + +pub(crate) fn default_warmup_step_delay_ms() -> u64 { + 500 +} + +pub(crate) fn default_warmup_step_jitter_ms() -> u64 { + 300 +} + +pub(crate) fn default_reconnect_backoff_base_ms() -> u64 { + 500 +} + +pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 { + 30_000 +} + // Custom deserializer helpers #[derive(Deserialize)] diff --git a/src/config/types.rs b/src/config/types.rs index 1cdb1bf..9f6467a 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -155,6 +155,50 @@ pub struct GeneralConfig { #[serde(default)] pub middle_proxy_warm_standby: usize, + /// Enable ME keepalive padding frames. + #[serde(default = "default_true")] + pub me_keepalive_enabled: bool, + + /// Keepalive interval in seconds. + #[serde(default = "default_keepalive_interval")] + pub me_keepalive_interval_secs: u64, + + /// Keepalive jitter in seconds. + #[serde(default = "default_keepalive_jitter")] + pub me_keepalive_jitter_secs: u64, + + /// Keepalive payload randomized (4 bytes); otherwise zeros. + #[serde(default = "default_true")] + pub me_keepalive_payload_random: bool, + + /// Enable staggered warmup of extra ME writers. + #[serde(default = "default_true")] + pub me_warmup_stagger_enabled: bool, + + /// Base delay between warmup connections in ms. + #[serde(default = "default_warmup_step_delay_ms")] + pub me_warmup_step_delay_ms: u64, + + /// Jitter for warmup delay in ms. + #[serde(default = "default_warmup_step_jitter_ms")] + pub me_warmup_step_jitter_ms: u64, + + /// Max concurrent reconnect attempts per DC. + #[serde(default)] + pub me_reconnect_max_concurrent_per_dc: u32, + + /// Base backoff in ms for reconnect. + #[serde(default = "default_reconnect_backoff_base_ms")] + pub me_reconnect_backoff_base_ms: u64, + + /// Cap backoff in ms for reconnect. + #[serde(default = "default_reconnect_backoff_cap_ms")] + pub me_reconnect_backoff_cap_ms: u64, + + /// Fast retry attempts before backoff. + #[serde(default)] + pub me_reconnect_fast_retry_count: u32, + /// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected). #[serde(default)] pub stun_iface_mismatch_ignore: bool, @@ -190,6 +234,17 @@ impl Default for GeneralConfig { middle_proxy_nat_stun_servers: Vec::new(), middle_proxy_pool_size: default_pool_size(), middle_proxy_warm_standby: 0, + me_keepalive_enabled: true, + me_keepalive_interval_secs: default_keepalive_interval(), + me_keepalive_jitter_secs: default_keepalive_jitter(), + me_keepalive_payload_random: true, + me_warmup_stagger_enabled: true, + me_warmup_step_delay_ms: default_warmup_step_delay_ms(), + me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), + me_reconnect_max_concurrent_per_dc: 1, + me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(), + me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(), + me_reconnect_fast_retry_count: 1, stun_iface_mismatch_ignore: false, unknown_dc_log_path: default_unknown_dc_log_path(), log_level: LogLevel::Normal, diff --git a/src/main.rs b/src/main.rs index 33aefcb..e8883f2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -333,6 +333,17 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai cfg_v4.default_dc.or(cfg_v6.default_dc), decision.clone(), rng.clone(), + config.general.me_keepalive_enabled, + config.general.me_keepalive_interval_secs, + config.general.me_keepalive_jitter_secs, + config.general.me_keepalive_payload_random, + config.general.me_warmup_stagger_enabled, + config.general.me_warmup_step_delay_ms, + config.general.me_warmup_step_jitter_ms, + config.general.me_reconnect_max_concurrent_per_dc, + config.general.me_reconnect_backoff_base_ms, + config.general.me_reconnect_backoff_cap_ms, + config.general.me_reconnect_fast_retry_count, ); let pool_size = config.general.middle_proxy_pool_size.max(1); diff --git a/src/transport/middle_proxy/codec.rs b/src/transport/middle_proxy/codec.rs index 12efc45..82f0960 100644 --- a/src/transport/middle_proxy/codec.rs +++ b/src/transport/middle_proxy/codec.rs @@ -8,6 +8,7 @@ use crate::protocol::constants::*; pub(crate) enum WriterCommand { Data(Vec), DataAndFlush(Vec), + Keepalive, Close, } @@ -188,4 +189,12 @@ impl RpcWriter { self.send(payload).await?; self.writer.flush().await.map_err(ProxyError::Io) } + + pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> { + // Keepalive is a frame with fl == 4 and 4 bytes payload. + let mut frame = Vec::with_capacity(8); + frame.extend_from_slice(&4u32.to_le_bytes()); + frame.extend_from_slice(&payload); + self.send(&frame).await + } } diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 574ea73..e2cdad6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -5,6 +5,7 @@ use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; use rand::seq::SliceRandom; +use rand::Rng; use crate::crypto::SecureRandom; use crate::network::IpFamily; @@ -12,13 +13,11 @@ use crate::network::IpFamily; use super::MePool; const HEALTH_INTERVAL_SECS: u64 = 1; -const QUICK_RETRY_ATTEMPTS: u8 = 10; -const QUICK_RETRY_DELAY_MS: u64 = 2500; +const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); - let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); - let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new(); + let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); loop { tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; check_family( @@ -26,8 +25,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &pool, &rng, &mut backoff, - &mut last_attempt, - &mut inflight_single, + &mut next_attempt, ) .await; check_family( @@ -35,8 +33,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &pool, &rng, &mut backoff, - &mut last_attempt, - &mut inflight_single, + &mut next_attempt, ) .await; } @@ -47,8 +44,7 @@ async fn check_family( pool: &Arc, rng: &Arc, backoff: &mut HashMap<(i32, IpFamily), u64>, - last_attempt: &mut HashMap<(i32, IpFamily), Instant>, - inflight_single: &mut HashSet<(i32, IpFamily)>, + next_attempt: &mut HashMap<(i32, IpFamily), Instant>, ) { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, @@ -84,120 +80,46 @@ async fn check_family( for (dc, dc_addrs) in entries { let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a)); if has_coverage { - inflight_single.remove(&(dc, family)); continue; } - // Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff. let key = (dc, family); - for attempt in 0..QUICK_RETRY_ATTEMPTS { - let mut shuffled = dc_addrs.clone(); - shuffled.shuffle(&mut rand::rng()); - let mut success = false; - for addr in &shuffled { - match pool.connect_one(*addr, rng.as_ref()).await { - Ok(()) => { - info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)"); - backoff.insert(key, HEALTH_INTERVAL_SECS); - last_attempt.insert(key, Instant::now()); - inflight_single.remove(&key); - success = true; - break; - } - Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"), - } - } - if success { - continue; - } - tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await; - } - - let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS); let now = Instant::now(); - if let Some(last) = last_attempt.get(&key) { - if now.duration_since(*last).as_secs() < delay { + if let Some(ts) = next_attempt.get(&key) { + if now < *ts { continue; } } - if dc_addrs.len() == 1 { - // Single ME address: fast retries then slower background retries. - if inflight_single.contains(&key) { - continue; - } - inflight_single.insert(key); - let addr = dc_addrs[0]; - let dc_id = dc; - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let timeout = pool.me_one_timeout; - let quick_attempts = pool.me_one_retry.max(1); - tokio::spawn(async move { - let mut success = false; - for _ in 0..quick_attempts { - let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; - match res { - Ok(Ok(())) => { - info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); - success = true; - break; - } - Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), - Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), - } - tokio::time::sleep(Duration::from_millis(1000)).await; - } - if success { - return; - } - let timeout_ms = timeout.as_millis(); - warn!( - dc = %dc_id, - ?family, - attempts = quick_attempts, - timeout_ms, - "DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...", - dc_id, - quick_attempts, - timeout_ms - ); - loop { - tokio::time::sleep(Duration::from_secs(5)).await; - let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await; - match res { - Ok(Ok(())) => { - info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage"); - break; - } - Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"), - Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"), - } - } - // will drop inflight flag in outer loop when coverage detected - }); - continue; - } - warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)..."); let mut shuffled = dc_addrs.clone(); shuffled.shuffle(&mut rand::rng()); - let mut reconnected = false; + let mut success = false; for addr in shuffled { - match pool.connect_one(addr, rng.as_ref()).await { - Ok(()) => { + let res = tokio::time::timeout(pool.me_one_timeout, pool.connect_one(addr, rng.as_ref())).await; + match res { + Ok(Ok(())) => { info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage"); - backoff.insert(key, 30); - last_attempt.insert(key, now); - reconnected = true; + backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64); + let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM; + let wait = pool.me_reconnect_backoff_base + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + success = true; break; } - Err(e) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + Ok(Err(e)) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"), + Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"), } } - if !reconnected { - let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60); - backoff.insert(key, next); - last_attempt.insert(key, now); + if !success { + let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64)); + let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64); + backoff.insert(key, next_ms); + let jitter = next_ms / JITTER_FRAC_NUM; + let wait = Duration::from_millis(next_ms) + + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); + next_attempt.insert(key, now + wait); + warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect"); } } } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index e860dc8..5106004 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -21,9 +21,9 @@ use super::registry::{BoundConn, ConnMeta}; use super::codec::{RpcWriter, WriterCommand}; use super::reader::reader_loop; use super::MeResponse; - const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; +const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4; #[derive(Clone)] pub struct MeWriter { @@ -54,6 +54,17 @@ pub struct MePool { pub(super) stun_backoff_until: Arc>>, pub(super) me_one_retry: u8, pub(super) me_one_timeout: Duration, + pub(super) me_keepalive_enabled: bool, + pub(super) me_keepalive_interval: Duration, + pub(super) me_keepalive_jitter: Duration, + pub(super) me_keepalive_payload_random: bool, + pub(super) me_warmup_stagger_enabled: bool, + pub(super) me_warmup_step_delay: Duration, + pub(super) me_warmup_step_jitter: Duration, + pub(super) me_reconnect_max_concurrent_per_dc: u32, + pub(super) me_reconnect_backoff_base: Duration, + pub(super) me_reconnect_backoff_cap: Duration, + pub(super) me_reconnect_fast_retry_count: u32, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -88,6 +99,17 @@ impl MePool { default_dc: Option, decision: NetworkDecision, rng: Arc, + me_keepalive_enabled: bool, + me_keepalive_interval_secs: u64, + me_keepalive_jitter_secs: u64, + me_keepalive_payload_random: bool, + me_warmup_stagger_enabled: bool, + me_warmup_step_delay_ms: u64, + me_warmup_step_jitter_ms: u64, + me_reconnect_max_concurrent_per_dc: u32, + me_reconnect_backoff_base_ms: u64, + me_reconnect_backoff_cap_ms: u64, + me_reconnect_fast_retry_count: u32, ) -> Arc { Arc::new(Self { registry: Arc::new(ConnRegistry::new()), @@ -108,6 +130,17 @@ impl MePool { stun_backoff_until: Arc::new(RwLock::new(None)), me_one_retry, me_one_timeout: Duration::from_millis(me_one_timeout_ms), + me_keepalive_enabled, + me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs), + me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), + me_keepalive_payload_random, + me_warmup_stagger_enabled, + me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), + me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), + me_reconnect_max_concurrent_per_dc, + me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms), + me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms), + me_reconnect_fast_retry_count, pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -323,7 +356,24 @@ impl MePool { return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into())); } - // Additional connections up to pool_size total (round-robin across DCs) + // Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles. + if self.me_warmup_stagger_enabled { + let mut delay_ms = 0u64; + for (dc, addrs) in dc_addrs.iter() { + for (ip, port) in addrs { + if self.connection_count() >= pool_size { + break; + } + let addr = SocketAddr::new(*ip, *port); + let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64); + delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + if let Err(e) = self.connect_one(addr, rng.as_ref()).await { + debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)"); + } + } + } + } else { for (dc, addrs) in dc_addrs.iter() { for (ip, port) in addrs { if self.connection_count() >= pool_size { @@ -338,6 +388,7 @@ impl MePool { break; } } + } if !self.decision.effective_multipath && self.connection_count() > 0 { break; @@ -364,6 +415,8 @@ impl MePool { let degraded = Arc::new(AtomicBool::new(false)); let draining = Arc::new(AtomicBool::new(false)); let (tx, mut rx) = mpsc::channel::(4096); + let tx_for_keepalive = tx.clone(); + let keepalive_random = self.me_keepalive_payload_random; let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, @@ -382,6 +435,13 @@ impl MePool { Some(WriterCommand::DataAndFlush(payload)) => { if rpc_writer.send_and_flush(&payload).await.is_err() { break; } } + Some(WriterCommand::Keepalive) => { + let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN]; + if keepalive_random { + rand::rng().fill(&mut payload); + } + if rpc_writer.send_keepalive(payload).await.is_err() { break; } + } Some(WriterCommand::Close) | None => break, } } @@ -412,9 +472,14 @@ impl MePool { let cleanup_done = Arc::new(AtomicBool::new(false)); let cleanup_for_reader = cleanup_done.clone(); let cleanup_for_ping = cleanup_done.clone(); + let keepalive_enabled = self.me_keepalive_enabled; + let keepalive_interval = self.me_keepalive_interval; + let keepalive_jitter = self.me_keepalive_jitter; + let cancel_reader_token = cancel.clone(); + let cancel_ping_token = cancel_ping.clone(); + let cancel_keepalive_token = cancel.clone(); tokio::spawn(async move { - let cancel_reader = cancel.clone(); let res = reader_loop( hs.rd, hs.read_key, @@ -427,7 +492,7 @@ impl MePool { rtt_stats.clone(), writer_id, degraded.clone(), - cancel_reader.clone(), + cancel_reader_token.clone(), ) .await; if let Some(pool) = pool.upgrade() { @@ -454,7 +519,7 @@ impl MePool { .random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS); let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64; tokio::select! { - _ = cancel_ping.cancelled() => { + _ = cancel_ping_token.cancelled() => { break; } _ = tokio::time::sleep(Duration::from_secs(wait)) => {} @@ -485,6 +550,25 @@ impl MePool { } }); + if keepalive_enabled { + let tx_keepalive = tx_for_keepalive; + let cancel_keepalive = cancel_keepalive_token; + tokio::spawn(async move { + // Per-writer jittered start to avoid phase sync. + let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64); + tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await; + loop { + tokio::select! { + _ = cancel_keepalive.cancelled() => break, + _ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {} + } + if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() { + break; + } + } + }); + } + Ok(()) }