mirror of https://github.com/telemt/telemt.git
Merge pull request #165 from telemt/flow
ME Healthcheck + Keepalives + Concurrency
This commit is contained in:
commit
bf2da8f5d8
|
|
@ -78,6 +78,30 @@ pub(crate) fn default_pool_size() -> usize {
|
||||||
2
|
2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_keepalive_interval() -> u64 {
|
||||||
|
25
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_keepalive_jitter() -> u64 {
|
||||||
|
5
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_warmup_step_delay_ms() -> u64 {
|
||||||
|
500
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_warmup_step_jitter_ms() -> u64 {
|
||||||
|
300
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_reconnect_backoff_base_ms() -> u64 {
|
||||||
|
500
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_reconnect_backoff_cap_ms() -> u64 {
|
||||||
|
30_000
|
||||||
|
}
|
||||||
|
|
||||||
// Custom deserializer helpers
|
// Custom deserializer helpers
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
|
|
|
||||||
|
|
@ -155,6 +155,50 @@ pub struct GeneralConfig {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub middle_proxy_warm_standby: usize,
|
pub middle_proxy_warm_standby: usize,
|
||||||
|
|
||||||
|
/// Enable ME keepalive padding frames.
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
pub me_keepalive_enabled: bool,
|
||||||
|
|
||||||
|
/// Keepalive interval in seconds.
|
||||||
|
#[serde(default = "default_keepalive_interval")]
|
||||||
|
pub me_keepalive_interval_secs: u64,
|
||||||
|
|
||||||
|
/// Keepalive jitter in seconds.
|
||||||
|
#[serde(default = "default_keepalive_jitter")]
|
||||||
|
pub me_keepalive_jitter_secs: u64,
|
||||||
|
|
||||||
|
/// Keepalive payload randomized (4 bytes); otherwise zeros.
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
pub me_keepalive_payload_random: bool,
|
||||||
|
|
||||||
|
/// Enable staggered warmup of extra ME writers.
|
||||||
|
#[serde(default = "default_true")]
|
||||||
|
pub me_warmup_stagger_enabled: bool,
|
||||||
|
|
||||||
|
/// Base delay between warmup connections in ms.
|
||||||
|
#[serde(default = "default_warmup_step_delay_ms")]
|
||||||
|
pub me_warmup_step_delay_ms: u64,
|
||||||
|
|
||||||
|
/// Jitter for warmup delay in ms.
|
||||||
|
#[serde(default = "default_warmup_step_jitter_ms")]
|
||||||
|
pub me_warmup_step_jitter_ms: u64,
|
||||||
|
|
||||||
|
/// Max concurrent reconnect attempts per DC.
|
||||||
|
#[serde(default)]
|
||||||
|
pub me_reconnect_max_concurrent_per_dc: u32,
|
||||||
|
|
||||||
|
/// Base backoff in ms for reconnect.
|
||||||
|
#[serde(default = "default_reconnect_backoff_base_ms")]
|
||||||
|
pub me_reconnect_backoff_base_ms: u64,
|
||||||
|
|
||||||
|
/// Cap backoff in ms for reconnect.
|
||||||
|
#[serde(default = "default_reconnect_backoff_cap_ms")]
|
||||||
|
pub me_reconnect_backoff_cap_ms: u64,
|
||||||
|
|
||||||
|
/// Fast retry attempts before backoff.
|
||||||
|
#[serde(default)]
|
||||||
|
pub me_reconnect_fast_retry_count: u32,
|
||||||
|
|
||||||
/// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected).
|
/// Ignore STUN/interface IP mismatch (keep using Middle Proxy even if NAT detected).
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub stun_iface_mismatch_ignore: bool,
|
pub stun_iface_mismatch_ignore: bool,
|
||||||
|
|
@ -190,6 +234,17 @@ impl Default for GeneralConfig {
|
||||||
middle_proxy_nat_stun_servers: Vec::new(),
|
middle_proxy_nat_stun_servers: Vec::new(),
|
||||||
middle_proxy_pool_size: default_pool_size(),
|
middle_proxy_pool_size: default_pool_size(),
|
||||||
middle_proxy_warm_standby: 0,
|
middle_proxy_warm_standby: 0,
|
||||||
|
me_keepalive_enabled: true,
|
||||||
|
me_keepalive_interval_secs: default_keepalive_interval(),
|
||||||
|
me_keepalive_jitter_secs: default_keepalive_jitter(),
|
||||||
|
me_keepalive_payload_random: true,
|
||||||
|
me_warmup_stagger_enabled: true,
|
||||||
|
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
|
||||||
|
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
|
||||||
|
me_reconnect_max_concurrent_per_dc: 1,
|
||||||
|
me_reconnect_backoff_base_ms: default_reconnect_backoff_base_ms(),
|
||||||
|
me_reconnect_backoff_cap_ms: default_reconnect_backoff_cap_ms(),
|
||||||
|
me_reconnect_fast_retry_count: 1,
|
||||||
stun_iface_mismatch_ignore: false,
|
stun_iface_mismatch_ignore: false,
|
||||||
unknown_dc_log_path: default_unknown_dc_log_path(),
|
unknown_dc_log_path: default_unknown_dc_log_path(),
|
||||||
log_level: LogLevel::Normal,
|
log_level: LogLevel::Normal,
|
||||||
|
|
|
||||||
12
src/main.rs
12
src/main.rs
|
|
@ -333,6 +333,18 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
||||||
decision.clone(),
|
decision.clone(),
|
||||||
rng.clone(),
|
rng.clone(),
|
||||||
|
stats.clone(),
|
||||||
|
config.general.me_keepalive_enabled,
|
||||||
|
config.general.me_keepalive_interval_secs,
|
||||||
|
config.general.me_keepalive_jitter_secs,
|
||||||
|
config.general.me_keepalive_payload_random,
|
||||||
|
config.general.me_warmup_stagger_enabled,
|
||||||
|
config.general.me_warmup_step_delay_ms,
|
||||||
|
config.general.me_warmup_step_jitter_ms,
|
||||||
|
config.general.me_reconnect_max_concurrent_per_dc,
|
||||||
|
config.general.me_reconnect_backoff_base_ms,
|
||||||
|
config.general.me_reconnect_backoff_cap_ms,
|
||||||
|
config.general.me_reconnect_fast_retry_count,
|
||||||
);
|
);
|
||||||
|
|
||||||
let pool_size = config.general.middle_proxy_pool_size.max(1);
|
let pool_size = config.general.middle_proxy_pool_size.max(1);
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String {
|
||||||
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
|
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
|
||||||
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
|
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,10 @@ pub struct Stats {
|
||||||
connects_all: AtomicU64,
|
connects_all: AtomicU64,
|
||||||
connects_bad: AtomicU64,
|
connects_bad: AtomicU64,
|
||||||
handshake_timeouts: AtomicU64,
|
handshake_timeouts: AtomicU64,
|
||||||
|
me_keepalive_sent: AtomicU64,
|
||||||
|
me_keepalive_failed: AtomicU64,
|
||||||
|
me_reconnect_attempts: AtomicU64,
|
||||||
|
me_reconnect_success: AtomicU64,
|
||||||
user_stats: DashMap<String, UserStats>,
|
user_stats: DashMap<String, UserStats>,
|
||||||
start_time: parking_lot::RwLock<Option<Instant>>,
|
start_time: parking_lot::RwLock<Option<Instant>>,
|
||||||
}
|
}
|
||||||
|
|
@ -43,8 +47,16 @@ impl Stats {
|
||||||
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
|
||||||
|
|
||||||
pub fn increment_user_connects(&self, user: &str) {
|
pub fn increment_user_connects(&self, user: &str) {
|
||||||
self.user_stats.entry(user.to_string()).or_default()
|
self.user_stats.entry(user.to_string()).or_default()
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ use crate::protocol::constants::*;
|
||||||
pub(crate) enum WriterCommand {
|
pub(crate) enum WriterCommand {
|
||||||
Data(Vec<u8>),
|
Data(Vec<u8>),
|
||||||
DataAndFlush(Vec<u8>),
|
DataAndFlush(Vec<u8>),
|
||||||
|
Keepalive,
|
||||||
Close,
|
Close,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -188,4 +189,12 @@ impl RpcWriter {
|
||||||
self.send(payload).await?;
|
self.send(payload).await?;
|
||||||
self.writer.flush().await.map_err(ProxyError::Io)
|
self.writer.flush().await.map_err(ProxyError::Io)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn send_keepalive(&mut self, payload: [u8; 4]) -> Result<()> {
|
||||||
|
// Keepalive is a frame with fl == 4 and 4 bytes payload.
|
||||||
|
let mut frame = Vec::with_capacity(8);
|
||||||
|
frame.extend_from_slice(&4u32.to_le_bytes());
|
||||||
|
frame.extend_from_slice(&payload);
|
||||||
|
self.send(&frame).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
use crate::crypto::SecureRandom;
|
use crate::crypto::SecureRandom;
|
||||||
use crate::network::IpFamily;
|
use crate::network::IpFamily;
|
||||||
|
|
@ -12,13 +13,13 @@ use crate::network::IpFamily;
|
||||||
use super::MePool;
|
use super::MePool;
|
||||||
|
|
||||||
const HEALTH_INTERVAL_SECS: u64 = 1;
|
const HEALTH_INTERVAL_SECS: u64 = 1;
|
||||||
const QUICK_RETRY_ATTEMPTS: u8 = 10;
|
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
|
||||||
const QUICK_RETRY_DELAY_MS: u64 = 2500;
|
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
|
||||||
|
|
||||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||||
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
||||||
let mut last_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
let mut inflight_single: HashSet<(i32, IpFamily)> = HashSet::new();
|
let mut inflight: HashMap<(i32, IpFamily), usize> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
|
||||||
check_family(
|
check_family(
|
||||||
|
|
@ -26,8 +27,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
&pool,
|
&pool,
|
||||||
&rng,
|
&rng,
|
||||||
&mut backoff,
|
&mut backoff,
|
||||||
&mut last_attempt,
|
&mut next_attempt,
|
||||||
&mut inflight_single,
|
&mut inflight,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
check_family(
|
check_family(
|
||||||
|
|
@ -35,8 +36,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
|
||||||
&pool,
|
&pool,
|
||||||
&rng,
|
&rng,
|
||||||
&mut backoff,
|
&mut backoff,
|
||||||
&mut last_attempt,
|
&mut next_attempt,
|
||||||
&mut inflight_single,
|
&mut inflight,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
@ -47,8 +48,8 @@ async fn check_family(
|
||||||
pool: &Arc<MePool>,
|
pool: &Arc<MePool>,
|
||||||
rng: &Arc<SecureRandom>,
|
rng: &Arc<SecureRandom>,
|
||||||
backoff: &mut HashMap<(i32, IpFamily), u64>,
|
backoff: &mut HashMap<(i32, IpFamily), u64>,
|
||||||
last_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
inflight_single: &mut HashSet<(i32, IpFamily)>,
|
inflight: &mut HashMap<(i32, IpFamily), usize>,
|
||||||
) {
|
) {
|
||||||
let enabled = match family {
|
let enabled = match family {
|
||||||
IpFamily::V4 => pool.decision.ipv4_me,
|
IpFamily::V4 => pool.decision.ipv4_me,
|
||||||
|
|
@ -84,120 +85,60 @@ async fn check_family(
|
||||||
for (dc, dc_addrs) in entries {
|
for (dc, dc_addrs) in entries {
|
||||||
let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a));
|
let has_coverage = dc_addrs.iter().any(|a| writer_addrs.contains(a));
|
||||||
if has_coverage {
|
if has_coverage {
|
||||||
inflight_single.remove(&(dc, family));
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggressive quick-retry burst: up to 10 attempts every 2.5s before falling back to exponential backoff.
|
|
||||||
let key = (dc, family);
|
let key = (dc, family);
|
||||||
for attempt in 0..QUICK_RETRY_ATTEMPTS {
|
let now = Instant::now();
|
||||||
let mut shuffled = dc_addrs.clone();
|
if let Some(ts) = next_attempt.get(&key) {
|
||||||
shuffled.shuffle(&mut rand::rng());
|
if now < *ts {
|
||||||
let mut success = false;
|
|
||||||
for addr in &shuffled {
|
|
||||||
match pool.connect_one(*addr, rng.as_ref()).await {
|
|
||||||
Ok(()) => {
|
|
||||||
info!(%addr, dc = %dc, ?family, attempt, "ME reconnected (quick burst)");
|
|
||||||
backoff.insert(key, HEALTH_INTERVAL_SECS);
|
|
||||||
last_attempt.insert(key, Instant::now());
|
|
||||||
inflight_single.remove(&key);
|
|
||||||
success = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => debug!(%addr, dc = %dc, error = %e, attempt, ?family, "ME reconnect failed (quick)"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if success {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(QUICK_RETRY_DELAY_MS)).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let delay = *backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS);
|
let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize;
|
||||||
let now = Instant::now();
|
if *inflight.get(&key).unwrap_or(&0) >= max_concurrent {
|
||||||
if let Some(last) = last_attempt.get(&key) {
|
|
||||||
if now.duration_since(*last).as_secs() < delay {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if dc_addrs.len() == 1 {
|
|
||||||
// Single ME address: fast retries then slower background retries.
|
|
||||||
if inflight_single.contains(&key) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
inflight_single.insert(key);
|
|
||||||
let addr = dc_addrs[0];
|
|
||||||
let dc_id = dc;
|
|
||||||
let pool_clone = pool.clone();
|
|
||||||
let rng_clone = rng.clone();
|
|
||||||
let timeout = pool.me_one_timeout;
|
|
||||||
let quick_attempts = pool.me_one_retry.max(1);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut success = false;
|
|
||||||
for _ in 0..quick_attempts {
|
|
||||||
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
|
|
||||||
match res {
|
|
||||||
Ok(Ok(())) => {
|
|
||||||
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
|
|
||||||
success = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
|
|
||||||
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
|
|
||||||
}
|
|
||||||
tokio::time::sleep(Duration::from_millis(1000)).await;
|
|
||||||
}
|
|
||||||
if success {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let timeout_ms = timeout.as_millis();
|
*inflight.entry(key).or_insert(0) += 1;
|
||||||
warn!(
|
|
||||||
dc = %dc_id,
|
|
||||||
?family,
|
|
||||||
attempts = quick_attempts,
|
|
||||||
timeout_ms,
|
|
||||||
"DC={} has no ME coverage: {} tries * {} ms... retry in 5 seconds...",
|
|
||||||
dc_id,
|
|
||||||
quick_attempts,
|
|
||||||
timeout_ms
|
|
||||||
);
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
|
||||||
let res = tokio::time::timeout(timeout, pool_clone.connect_one(addr, rng_clone.as_ref())).await;
|
|
||||||
match res {
|
|
||||||
Ok(Ok(())) => {
|
|
||||||
info!(%addr, dc = %dc_id, ?family, "ME reconnected for DC coverage");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(Err(e)) => debug!(%addr, dc = %dc_id, error = %e, ?family, "ME reconnect failed"),
|
|
||||||
Err(_) => debug!(%addr, dc = %dc_id, ?family, "ME reconnect timed out"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// will drop inflight flag in outer loop when coverage detected
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!(dc = %dc, delay, ?family, "DC has no ME coverage, reconnecting (backoff)...");
|
|
||||||
let mut shuffled = dc_addrs.clone();
|
let mut shuffled = dc_addrs.clone();
|
||||||
shuffled.shuffle(&mut rand::rng());
|
shuffled.shuffle(&mut rand::rng());
|
||||||
let mut reconnected = false;
|
let mut success = false;
|
||||||
for addr in shuffled {
|
for addr in shuffled {
|
||||||
match pool.connect_one(addr, rng.as_ref()).await {
|
let res = tokio::time::timeout(pool.me_one_timeout, pool.connect_one(addr, rng.as_ref())).await;
|
||||||
Ok(()) => {
|
match res {
|
||||||
|
Ok(Ok(())) => {
|
||||||
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
|
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
|
||||||
backoff.insert(key, 30);
|
pool.stats.increment_me_reconnect_success();
|
||||||
last_attempt.insert(key, now);
|
backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64);
|
||||||
reconnected = true;
|
let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM;
|
||||||
|
let wait = pool.me_reconnect_backoff_base
|
||||||
|
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||||
|
next_attempt.insert(key, now + wait);
|
||||||
|
success = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(e) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"),
|
Ok(Err(e)) => {
|
||||||
|
pool.stats.increment_me_reconnect_attempt();
|
||||||
|
debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed")
|
||||||
|
}
|
||||||
|
Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !reconnected {
|
if !success {
|
||||||
let next = (*backoff.get(&key).unwrap_or(&HEALTH_INTERVAL_SECS)).saturating_mul(2).min(60);
|
pool.stats.increment_me_reconnect_attempt();
|
||||||
backoff.insert(key, next);
|
let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
|
||||||
last_attempt.insert(key, now);
|
let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
|
||||||
|
backoff.insert(key, next_ms);
|
||||||
|
let jitter = next_ms / JITTER_FRAC_NUM;
|
||||||
|
let wait = Duration::from_millis(next_ms)
|
||||||
|
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
|
||||||
|
next_attempt.insert(key, now + wait);
|
||||||
|
warn!(dc = %dc, backoff_ms = next_ms, ?family, "DC has no ME coverage, scheduled reconnect");
|
||||||
|
}
|
||||||
|
if let Some(v) = inflight.get_mut(&key) {
|
||||||
|
*v = v.saturating_sub(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,9 +21,9 @@ use super::registry::{BoundConn, ConnMeta};
|
||||||
use super::codec::{RpcWriter, WriterCommand};
|
use super::codec::{RpcWriter, WriterCommand};
|
||||||
use super::reader::reader_loop;
|
use super::reader::reader_loop;
|
||||||
use super::MeResponse;
|
use super::MeResponse;
|
||||||
|
|
||||||
const ME_ACTIVE_PING_SECS: u64 = 25;
|
const ME_ACTIVE_PING_SECS: u64 = 25;
|
||||||
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
|
const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
|
||||||
|
const ME_KEEPALIVE_PAYLOAD_LEN: usize = 4;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MeWriter {
|
pub struct MeWriter {
|
||||||
|
|
@ -54,6 +54,17 @@ pub struct MePool {
|
||||||
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
|
pub(super) stun_backoff_until: Arc<RwLock<Option<Instant>>>,
|
||||||
pub(super) me_one_retry: u8,
|
pub(super) me_one_retry: u8,
|
||||||
pub(super) me_one_timeout: Duration,
|
pub(super) me_one_timeout: Duration,
|
||||||
|
pub(super) me_keepalive_enabled: bool,
|
||||||
|
pub(super) me_keepalive_interval: Duration,
|
||||||
|
pub(super) me_keepalive_jitter: Duration,
|
||||||
|
pub(super) me_keepalive_payload_random: bool,
|
||||||
|
pub(super) me_warmup_stagger_enabled: bool,
|
||||||
|
pub(super) me_warmup_step_delay: Duration,
|
||||||
|
pub(super) me_warmup_step_jitter: Duration,
|
||||||
|
pub(super) me_reconnect_max_concurrent_per_dc: u32,
|
||||||
|
pub(super) me_reconnect_backoff_base: Duration,
|
||||||
|
pub(super) me_reconnect_backoff_cap: Duration,
|
||||||
|
pub(super) me_reconnect_fast_retry_count: u32,
|
||||||
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) default_dc: AtomicI32,
|
pub(super) default_dc: AtomicI32,
|
||||||
|
|
@ -63,6 +74,7 @@ pub struct MePool {
|
||||||
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
|
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
|
||||||
pub(super) writer_available: Arc<Notify>,
|
pub(super) writer_available: Arc<Notify>,
|
||||||
pub(super) conn_count: AtomicUsize,
|
pub(super) conn_count: AtomicUsize,
|
||||||
|
pub(super) stats: Arc<crate::stats::Stats>,
|
||||||
pool_size: usize,
|
pool_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -88,6 +100,18 @@ impl MePool {
|
||||||
default_dc: Option<i32>,
|
default_dc: Option<i32>,
|
||||||
decision: NetworkDecision,
|
decision: NetworkDecision,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
|
stats: Arc<crate::stats::Stats>,
|
||||||
|
me_keepalive_enabled: bool,
|
||||||
|
me_keepalive_interval_secs: u64,
|
||||||
|
me_keepalive_jitter_secs: u64,
|
||||||
|
me_keepalive_payload_random: bool,
|
||||||
|
me_warmup_stagger_enabled: bool,
|
||||||
|
me_warmup_step_delay_ms: u64,
|
||||||
|
me_warmup_step_jitter_ms: u64,
|
||||||
|
me_reconnect_max_concurrent_per_dc: u32,
|
||||||
|
me_reconnect_backoff_base_ms: u64,
|
||||||
|
me_reconnect_backoff_cap_ms: u64,
|
||||||
|
me_reconnect_fast_retry_count: u32,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
registry: Arc::new(ConnRegistry::new()),
|
registry: Arc::new(ConnRegistry::new()),
|
||||||
|
|
@ -108,6 +132,18 @@ impl MePool {
|
||||||
stun_backoff_until: Arc::new(RwLock::new(None)),
|
stun_backoff_until: Arc::new(RwLock::new(None)),
|
||||||
me_one_retry,
|
me_one_retry,
|
||||||
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
|
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
|
||||||
|
stats,
|
||||||
|
me_keepalive_enabled,
|
||||||
|
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
|
||||||
|
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
|
||||||
|
me_keepalive_payload_random,
|
||||||
|
me_warmup_stagger_enabled,
|
||||||
|
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
|
||||||
|
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
|
||||||
|
me_reconnect_max_concurrent_per_dc,
|
||||||
|
me_reconnect_backoff_base: Duration::from_millis(me_reconnect_backoff_base_ms),
|
||||||
|
me_reconnect_backoff_cap: Duration::from_millis(me_reconnect_backoff_cap_ms),
|
||||||
|
me_reconnect_fast_retry_count,
|
||||||
pool_size: 2,
|
pool_size: 2,
|
||||||
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
||||||
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
||||||
|
|
@ -323,7 +359,24 @@ impl MePool {
|
||||||
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
|
return Err(ProxyError::Proxy("Too many ME DC init failures, falling back to direct".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Additional connections up to pool_size total (round-robin across DCs)
|
// Additional connections up to pool_size total (round-robin across DCs), staggered to de-phase lifecycles.
|
||||||
|
if self.me_warmup_stagger_enabled {
|
||||||
|
let mut delay_ms = 0u64;
|
||||||
|
for (dc, addrs) in dc_addrs.iter() {
|
||||||
|
for (ip, port) in addrs {
|
||||||
|
if self.connection_count() >= pool_size {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let addr = SocketAddr::new(*ip, *port);
|
||||||
|
let jitter = rand::rng().random_range(0..=self.me_warmup_step_jitter.as_millis() as u64);
|
||||||
|
delay_ms = delay_ms.saturating_add(self.me_warmup_step_delay.as_millis() as u64 + jitter);
|
||||||
|
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
|
||||||
|
if let Err(e) = self.connect_one(addr, rng.as_ref()).await {
|
||||||
|
debug!(%addr, dc = %dc, error = %e, "Extra ME connect failed (staggered)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
for (dc, addrs) in dc_addrs.iter() {
|
for (dc, addrs) in dc_addrs.iter() {
|
||||||
for (ip, port) in addrs {
|
for (ip, port) in addrs {
|
||||||
if self.connection_count() >= pool_size {
|
if self.connection_count() >= pool_size {
|
||||||
|
|
@ -338,6 +391,7 @@ impl MePool {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !self.decision.effective_multipath && self.connection_count() > 0 {
|
if !self.decision.effective_multipath && self.connection_count() > 0 {
|
||||||
break;
|
break;
|
||||||
|
|
@ -364,6 +418,9 @@ impl MePool {
|
||||||
let degraded = Arc::new(AtomicBool::new(false));
|
let degraded = Arc::new(AtomicBool::new(false));
|
||||||
let draining = Arc::new(AtomicBool::new(false));
|
let draining = Arc::new(AtomicBool::new(false));
|
||||||
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
|
||||||
|
let tx_for_keepalive = tx.clone();
|
||||||
|
let keepalive_random = self.me_keepalive_payload_random;
|
||||||
|
let stats = self.stats.clone();
|
||||||
let mut rpc_writer = RpcWriter {
|
let mut rpc_writer = RpcWriter {
|
||||||
writer: hs.wr,
|
writer: hs.wr,
|
||||||
key: hs.write_key,
|
key: hs.write_key,
|
||||||
|
|
@ -382,6 +439,21 @@ impl MePool {
|
||||||
Some(WriterCommand::DataAndFlush(payload)) => {
|
Some(WriterCommand::DataAndFlush(payload)) => {
|
||||||
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
|
if rpc_writer.send_and_flush(&payload).await.is_err() { break; }
|
||||||
}
|
}
|
||||||
|
Some(WriterCommand::Keepalive) => {
|
||||||
|
let mut payload = [0u8; ME_KEEPALIVE_PAYLOAD_LEN];
|
||||||
|
if keepalive_random {
|
||||||
|
rand::rng().fill(&mut payload);
|
||||||
|
}
|
||||||
|
match rpc_writer.send_keepalive(payload).await {
|
||||||
|
Ok(()) => {
|
||||||
|
stats.increment_me_keepalive_sent();
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
stats.increment_me_keepalive_failed();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Some(WriterCommand::Close) | None => break,
|
Some(WriterCommand::Close) | None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -412,9 +484,14 @@ impl MePool {
|
||||||
let cleanup_done = Arc::new(AtomicBool::new(false));
|
let cleanup_done = Arc::new(AtomicBool::new(false));
|
||||||
let cleanup_for_reader = cleanup_done.clone();
|
let cleanup_for_reader = cleanup_done.clone();
|
||||||
let cleanup_for_ping = cleanup_done.clone();
|
let cleanup_for_ping = cleanup_done.clone();
|
||||||
|
let keepalive_enabled = self.me_keepalive_enabled;
|
||||||
|
let keepalive_interval = self.me_keepalive_interval;
|
||||||
|
let keepalive_jitter = self.me_keepalive_jitter;
|
||||||
|
let cancel_reader_token = cancel.clone();
|
||||||
|
let cancel_ping_token = cancel_ping.clone();
|
||||||
|
let cancel_keepalive_token = cancel.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let cancel_reader = cancel.clone();
|
|
||||||
let res = reader_loop(
|
let res = reader_loop(
|
||||||
hs.rd,
|
hs.rd,
|
||||||
hs.read_key,
|
hs.read_key,
|
||||||
|
|
@ -427,7 +504,7 @@ impl MePool {
|
||||||
rtt_stats.clone(),
|
rtt_stats.clone(),
|
||||||
writer_id,
|
writer_id,
|
||||||
degraded.clone(),
|
degraded.clone(),
|
||||||
cancel_reader.clone(),
|
cancel_reader_token.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Some(pool) = pool.upgrade() {
|
if let Some(pool) = pool.upgrade() {
|
||||||
|
|
@ -454,7 +531,7 @@ impl MePool {
|
||||||
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
|
.random_range(-ME_ACTIVE_PING_JITTER_SECS..=ME_ACTIVE_PING_JITTER_SECS);
|
||||||
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
|
let wait = (ME_ACTIVE_PING_SECS as i64 + jitter).max(5) as u64;
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel_ping.cancelled() => {
|
_ = cancel_ping_token.cancelled() => {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ = tokio::time::sleep(Duration::from_secs(wait)) => {}
|
_ = tokio::time::sleep(Duration::from_secs(wait)) => {}
|
||||||
|
|
@ -485,6 +562,25 @@ impl MePool {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if keepalive_enabled {
|
||||||
|
let tx_keepalive = tx_for_keepalive;
|
||||||
|
let cancel_keepalive = cancel_keepalive_token;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Per-writer jittered start to avoid phase sync.
|
||||||
|
let initial_jitter_ms = rand::rng().random_range(0..=keepalive_jitter.as_millis().max(1) as u64);
|
||||||
|
tokio::time::sleep(Duration::from_millis(initial_jitter_ms)).await;
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel_keepalive.cancelled() => break,
|
||||||
|
_ = tokio::time::sleep(keepalive_interval + Duration::from_millis(rand::rng().random_range(0..=keepalive_jitter.as_millis() as u64))) => {}
|
||||||
|
}
|
||||||
|
if tx_keepalive.send(WriterCommand::Keepalive).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue