mirror of
https://github.com/telemt/telemt.git
synced 2026-04-17 10:34:11 +03:00
ME Hardswap Generation stability
Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
@@ -2,6 +2,7 @@ use std::collections::HashSet;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
@@ -9,19 +10,97 @@ use crate::crypto::SecureRandom;
|
||||
|
||||
use super::pool::MePool;
|
||||
|
||||
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
|
||||
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
|
||||
|
||||
impl MePool {
|
||||
pub(super) async fn maybe_quarantine_flapping_endpoint(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
uptime: Duration,
|
||||
) {
|
||||
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
|
||||
return;
|
||||
}
|
||||
|
||||
let until = Instant::now() + Duration::from_secs(ME_FLAP_QUARANTINE_SECS);
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
guard.retain(|_, expiry| *expiry > Instant::now());
|
||||
guard.insert(addr, until);
|
||||
warn!(
|
||||
%addr,
|
||||
uptime_ms = uptime.as_millis(),
|
||||
quarantine_secs = ME_FLAP_QUARANTINE_SECS,
|
||||
"ME endpoint temporarily quarantined due to rapid writer flap"
|
||||
);
|
||||
}
|
||||
|
||||
async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
let now = Instant::now();
|
||||
guard.retain(|_, expiry| *expiry > now);
|
||||
guard.contains_key(&addr)
|
||||
}
|
||||
|
||||
async fn connectable_endpoints(&self, endpoints: &[SocketAddr]) -> Vec<SocketAddr> {
|
||||
if endpoints.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
let now = Instant::now();
|
||||
guard.retain(|_, expiry| *expiry > now);
|
||||
|
||||
let mut ready = Vec::<SocketAddr>::with_capacity(endpoints.len());
|
||||
let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None;
|
||||
for addr in endpoints {
|
||||
if let Some(expiry) = guard.get(addr).copied() {
|
||||
match earliest_quarantine {
|
||||
Some((_, current_expiry)) if current_expiry <= expiry => {}
|
||||
_ => earliest_quarantine = Some((*addr, expiry)),
|
||||
}
|
||||
} else {
|
||||
ready.push(*addr);
|
||||
}
|
||||
}
|
||||
|
||||
if !ready.is_empty() {
|
||||
return ready;
|
||||
}
|
||||
|
||||
if let Some((addr, expiry)) = earliest_quarantine {
|
||||
debug!(
|
||||
%addr,
|
||||
wait_ms = expiry.saturating_duration_since(now).as_millis(),
|
||||
"All ME endpoints are quarantined for the DC group; retrying earliest one"
|
||||
);
|
||||
return vec![addr];
|
||||
}
|
||||
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
pub(super) async fn has_refill_inflight_for_endpoints(&self, endpoints: &[SocketAddr]) -> bool {
|
||||
if endpoints.is_empty() {
|
||||
return false;
|
||||
}
|
||||
let guard = self.refill_inflight.lock().await;
|
||||
endpoints.iter().any(|addr| guard.contains(addr))
|
||||
}
|
||||
|
||||
pub(super) async fn connect_endpoints_round_robin(
|
||||
self: &Arc<Self>,
|
||||
endpoints: &[SocketAddr],
|
||||
rng: &SecureRandom,
|
||||
) -> bool {
|
||||
if endpoints.is_empty() {
|
||||
let candidates = self.connectable_endpoints(endpoints).await;
|
||||
if candidates.is_empty() {
|
||||
return false;
|
||||
}
|
||||
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % endpoints.len();
|
||||
for offset in 0..endpoints.len() {
|
||||
let idx = (start + offset) % endpoints.len();
|
||||
let addr = endpoints[idx];
|
||||
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
|
||||
for offset in 0..candidates.len() {
|
||||
let idx = (start + offset) % candidates.len();
|
||||
let addr = candidates[idx];
|
||||
match self.connect_one(addr, rng).await {
|
||||
Ok(()) => return true,
|
||||
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
|
||||
@@ -83,29 +162,37 @@ impl MePool {
|
||||
|
||||
async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr) -> bool {
|
||||
let fast_retries = self.me_reconnect_fast_retry_count.max(1);
|
||||
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
|
||||
|
||||
for attempt in 0..fast_retries {
|
||||
self.stats.increment_me_reconnect_attempt();
|
||||
match self.connect_one(addr, self.rng.as_ref()).await {
|
||||
Ok(()) => {
|
||||
self.stats.increment_me_reconnect_success();
|
||||
self.stats.increment_me_writer_restored_same_endpoint_total();
|
||||
info!(
|
||||
%addr,
|
||||
attempt = attempt + 1,
|
||||
"ME writer restored on the same endpoint"
|
||||
);
|
||||
return true;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
%addr,
|
||||
attempt = attempt + 1,
|
||||
error = %e,
|
||||
"ME immediate same-endpoint reconnect failed"
|
||||
);
|
||||
if !same_endpoint_quarantined {
|
||||
for attempt in 0..fast_retries {
|
||||
self.stats.increment_me_reconnect_attempt();
|
||||
match self.connect_one(addr, self.rng.as_ref()).await {
|
||||
Ok(()) => {
|
||||
self.stats.increment_me_reconnect_success();
|
||||
self.stats.increment_me_writer_restored_same_endpoint_total();
|
||||
info!(
|
||||
%addr,
|
||||
attempt = attempt + 1,
|
||||
"ME writer restored on the same endpoint"
|
||||
);
|
||||
return true;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
%addr,
|
||||
attempt = attempt + 1,
|
||||
error = %e,
|
||||
"ME immediate same-endpoint reconnect failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
%addr,
|
||||
"Skipping immediate same-endpoint reconnect because endpoint is quarantined"
|
||||
);
|
||||
}
|
||||
|
||||
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
|
||||
|
||||
Reference in New Issue
Block a user