Drafting ME Healthcheck

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-02-19 15:39:30 +03:00
parent 356d64371a
commit e340b716b2
6 changed files with 217 additions and 112 deletions

View File

@@ -21,9 +21,9 @@ use super::registry::{BoundConn, ConnMeta};
use super::codec::{RpcWriter, WriterCommand};
use super::reader::reader_loop;
use super::MeResponse;
const ME_ACTIVE_PING_SECS: u64 = 25;
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4;
#[derive(Clone)]
pub struct MeWriter {
@@ -54,6 +54,17 @@ pub struct MePool {
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
pub(super) me_one_retry: u8,
pub(super) me_one_timeout: Duration,
pub(super) me_keepalive_enabled: bool,
pub(super) me_keepalive_interval: Duration,
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration,
pub(super) me_reconnect_max_concurrent_per_dc: u32,
pub(super) me_reconnect_backoff_base: Duration,
pub(super) me_reconnect_backoff_cap: Duration,
pub(super) me_reconnect_fast_retry_count: u32,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32,
@@ -88,6 +99,17 @@ impl MePool {
default_dc: Option<i32>,
decision: NetworkDecision,
rng: Arc<SecureRandom>,
me_keepalive_enabled: bool,
me_keepalive_interval_secs: u64,
me_keepalive_jitter_secs: u64,
me_keepalive_payload_random: bool,
me_warmup_stagger_enabled: bool,
me_warmup_step_delay_ms: u64,
me_warmup_step_jitter_ms: u64,
me_reconnect_max_concurrent_per_dc: u32,
me_reconnect_backoff_base_ms: u64,
me_reconnect_backoff_cap_ms: u64,
me_reconnect_fast_retry_count: u32,
) -> Arc<Self> {
Arc::new(Self {
registry: Arc::new(ConnRegistry::new()),
@@ -108,6 +130,17 @@ impl MePool {
stun_backoff_until: Arc::new(RwLock::new(None)),
me_one_retry,
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
me_keepalive_enabled,
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
me_reconnect_max_concurrent_per_dc,
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
me_reconnect_fast_retry_count,
pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@@ -323,7 +356,24 @@ impl MePool {
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
}
// Additional connections up to pool_size total (round-robin across DCs)
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
if self.me_warmup_stagger_enabled {
let mut delay_ms = 0u64;
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
break;
}
let addr = SocketAddr::new(*ip, *port);
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
}
}
}
} else {
for (dc, addrs) in dc_addrs.iter() {
for (ip, port) in addrs {
if self.connection_count() >= pool_size {
@@ -338,6 +388,7 @@ impl MePool {
break;
}
}
}
if !self.decision.effective_multipath && self.connection_count() > 0 {
break;
@@ -364,6 +415,8 @@ impl MePool {
let degraded = Arc::new(AtomicBool::new(false));
let draining = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let tx_for_keepalive = tx.clone();
let keepalive_random = self.me_keepalive_payload_random;
let mut rpc_writer = RpcWriter {
writer: hs.wr,
key: hs.write_key,
@@ -382,6 +435,13 @@ impl MePool {
Some(WriterCommand::DataAndFlush(payload)) => {
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
}
Some(WriterCommand::Keepalive) => {
let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN];
if keepalive_random {
rand::rng().fill(&mut payload);
}
if rpc_writer.send_keepalive(payload).await.is_err() { break; }
}
Some(WriterCommand::Close) | None => break,
}
}
@@ -412,9 +472,14 @@ impl MePool {
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
let keepalive_interval = self.me_keepalive_interval;
let keepalive_jitter = self.me_keepalive_jitter;
let cancel_reader_token = cancel.clone();
let cancel_ping_token = cancel_ping.clone();
let cancel_keepalive_token = cancel.clone();
tokio::spawn(async move {
let cancel_reader = cancel.clone();
let res = reader_loop(
hs.rd,
hs.read_key,
@@ -427,7 +492,7 @@ impl MePool {
rtt_stats.clone(),
writer_id,
degraded.clone(),
cancel_reader.clone(),
cancel_reader_token.clone(),
)
.await;
if let Some(pool) = pool.upgrade() {
@@ -454,7 +519,7 @@ impl MePool {
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
tokio::select! {
_ = cancel_ping.cancelled() => {
_ = cancel_ping_token.cancelled() => {
break;
}
_ = tokio::time::sleep(Duration::from_secs(wait)) => {}
@@ -485,6 +550,25 @@ impl MePool {
}
});
if keepalive_enabled {
let tx_keepalive = tx_for_keepalive;
let cancel_keepalive = cancel_keepalive_token;
tokio::spawn(async move {
// Per-writer jittered start to avoid phase sync.
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
loop {
tokio::select! {
_ = cancel_keepalive.cancelled() => break,
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
}
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
break;
}
}
});
}
Ok(())
}