mirror of https://github.com/telemt/telemt.git
ME Keepalives
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
parent
e340b716b2
commit
820ed8d346
|
|
@ -333,6 +333,7 @@ match crate::transport::middle_proxy::fetch_proxy_secret(proxy_secret_path).awai
|
||||||
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
cfg_v4.default_dc.or(cfg_v6.default_dc),
|
||||||
decision.clone(),
|
decision.clone(),
|
||||||
rng.clone(),
|
rng.clone(),
|
||||||
|
stats.clone(),
|
||||||
config.general.me_keepalive_enabled,
|
config.general.me_keepalive_enabled,
|
||||||
config.general.me_keepalive_interval_secs,
|
config.general.me_keepalive_interval_secs,
|
||||||
config.general.me_keepalive_jitter_secs,
|
config.general.me_keepalive_jitter_secs,
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,22 @@ fn render_metrics(stats: &Stats) -> String {
|
||||||
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
|
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
|
||||||
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
|
let _ = writeln!(out, "telemt_handshake_timeouts_total {}", stats.get_handshake_timeouts());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_keepalive_sent_total ME keepalive frames sent");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_sent_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_keepalive_sent_total {}", stats.get_me_keepalive_sent());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_keepalive_failed_total ME keepalive send failures");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_keepalive_failed_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_keepalive_failed_total {}", stats.get_me_keepalive_failed());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_reconnect_attempts_total ME reconnect attempts");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_reconnect_attempts_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_reconnect_attempts_total {}", stats.get_me_reconnect_attempts());
|
||||||
|
|
||||||
|
let _ = writeln!(out, "# HELP telemt_me_reconnect_success_total ME reconnect successes");
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_reconnect_success_total counter");
|
||||||
|
let _ = writeln!(out, "telemt_me_reconnect_success_total {}", stats.get_me_reconnect_success());
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_total Per-user total connections");
|
||||||
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
let _ = writeln!(out, "# TYPE telemt_user_connections_total counter");
|
||||||
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
let _ = writeln!(out, "# HELP telemt_user_connections_current Per-user active connections");
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,10 @@ pub struct Stats {
|
||||||
connects_all: AtomicU64,
|
connects_all: AtomicU64,
|
||||||
connects_bad: AtomicU64,
|
connects_bad: AtomicU64,
|
||||||
handshake_timeouts: AtomicU64,
|
handshake_timeouts: AtomicU64,
|
||||||
|
me_keepalive_sent: AtomicU64,
|
||||||
|
me_keepalive_failed: AtomicU64,
|
||||||
|
me_reconnect_attempts: AtomicU64,
|
||||||
|
me_reconnect_success: AtomicU64,
|
||||||
user_stats: DashMap<String, UserStats>,
|
user_stats: DashMap<String, UserStats>,
|
||||||
start_time: parking_lot::RwLock<Option<Instant>>,
|
start_time: parking_lot::RwLock<Option<Instant>>,
|
||||||
}
|
}
|
||||||
|
|
@ -43,8 +47,16 @@ impl Stats {
|
||||||
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_connects_all(&self) { self.connects_all.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_connects_bad(&self) { self.connects_bad.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
|
pub fn increment_handshake_timeouts(&self) { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_sent(&self) { self.me_keepalive_sent.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_keepalive_failed(&self) { self.me_keepalive_failed.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_reconnect_attempt(&self) { self.me_reconnect_attempts.fetch_add(1, Ordering::Relaxed); }
|
||||||
|
pub fn increment_me_reconnect_success(&self) { self.me_reconnect_success.fetch_add(1, Ordering::Relaxed); }
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_reconnect_attempts(&self) -> u64 { self.me_reconnect_attempts.load(Ordering::Relaxed) }
|
||||||
|
pub fn get_me_reconnect_success(&self) -> u64 { self.me_reconnect_success.load(Ordering::Relaxed) }
|
||||||
|
|
||||||
pub fn increment_user_connects(&self, user: &str) {
|
pub fn increment_user_connects(&self, user: &str) {
|
||||||
self.user_stats.entry(user.to_string()).or_default()
|
self.user_stats.entry(user.to_string()).or_default()
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,7 @@ async fn check_family(
|
||||||
match res {
|
match res {
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
|
info!(%addr, dc = %dc, ?family, "ME reconnected for DC coverage");
|
||||||
|
pool.stats.increment_me_reconnect_success();
|
||||||
backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64);
|
backoff.insert(key, pool.me_reconnect_backoff_base.as_millis() as u64);
|
||||||
let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM;
|
let jitter = pool.me_reconnect_backoff_base.as_millis() as u64 / JITTER_FRAC_NUM;
|
||||||
let wait = pool.me_reconnect_backoff_base
|
let wait = pool.me_reconnect_backoff_base
|
||||||
|
|
@ -107,11 +108,15 @@ async fn check_family(
|
||||||
success = true;
|
success = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed"),
|
Ok(Err(e)) => {
|
||||||
|
pool.stats.increment_me_reconnect_attempt();
|
||||||
|
debug!(%addr, dc = %dc, error = %e, ?family, "ME reconnect failed")
|
||||||
|
}
|
||||||
Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"),
|
Err(_) => debug!(%addr, dc = %dc, ?family, "ME reconnect timed out"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !success {
|
if !success {
|
||||||
|
pool.stats.increment_me_reconnect_attempt();
|
||||||
let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
|
let curr = *backoff.get(&key).unwrap_or(&(pool.me_reconnect_backoff_base.as_millis() as u64));
|
||||||
let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
|
let next_ms = (curr.saturating_mul(2)).min(pool.me_reconnect_backoff_cap.as_millis() as u64);
|
||||||
backoff.insert(key, next_ms);
|
backoff.insert(key, next_ms);
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ pub struct MePool {
|
||||||
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
|
pub(super) nat_reflection_cache: Arc<Mutex<NatReflectionCache>>,
|
||||||
pub(super) writer_available: Arc<Notify>,
|
pub(super) writer_available: Arc<Notify>,
|
||||||
pub(super) conn_count: AtomicUsize,
|
pub(super) conn_count: AtomicUsize,
|
||||||
|
pub(super) stats: Arc<crate::stats::Stats>,
|
||||||
pool_size: usize,
|
pool_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,6 +100,7 @@ impl MePool {
|
||||||
default_dc: Option<i32>,
|
default_dc: Option<i32>,
|
||||||
decision: NetworkDecision,
|
decision: NetworkDecision,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
|
stats: Arc<crate::stats::Stats>,
|
||||||
me_keepalive_enabled: bool,
|
me_keepalive_enabled: bool,
|
||||||
me_keepalive_interval_secs: u64,
|
me_keepalive_interval_secs: u64,
|
||||||
me_keepalive_jitter_secs: u64,
|
me_keepalive_jitter_secs: u64,
|
||||||
|
|
@ -130,6 +132,7 @@ impl MePool {
|
||||||
stun_backoff_until: Arc::new(RwLock::new(None)),
|
stun_backoff_until: Arc::new(RwLock::new(None)),
|
||||||
me_one_retry,
|
me_one_retry,
|
||||||
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
|
me_one_timeout: Duration::from_millis(me_one_timeout_ms),
|
||||||
|
stats,
|
||||||
me_keepalive_enabled,
|
me_keepalive_enabled,
|
||||||
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
|
me_keepalive_interval: Duration::from_secs(me_keepalive_interval_secs),
|
||||||
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
|
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
|
||||||
|
|
@ -440,7 +443,15 @@ impl MePool {
|
||||||
if keepalive_random {
|
if keepalive_random {
|
||||||
rand::rng().fill(&mut payload);
|
rand::rng().fill(&mut payload);
|
||||||
}
|
}
|
||||||
if rpc_writer.send_keepalive(payload).await.is_err() { break; }
|
match rpc_writer.send_keepalive(payload).await {
|
||||||
|
Ok(()) => {
|
||||||
|
stats.increment_me_keepalive_sent();
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
stats.increment_me_keepalive_failed();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(WriterCommand::Close) | None => break,
|
Some(WriterCommand::Close) | None => break,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue