mirror of https://github.com/telemt/telemt.git
Merge pull request #292 from telemt/flow-mep
ME Hardswap Generation stability + Dead-code deletion
This commit is contained in:
commit
fad4b652c4
|
|
@ -112,7 +112,18 @@ async fn check_family(
|
|||
|
||||
let max_concurrent = pool.me_reconnect_max_concurrent_per_dc.max(1) as usize;
|
||||
if *inflight.get(&key).unwrap_or(&0) >= max_concurrent {
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
if pool.has_refill_inflight_for_endpoints(&endpoints).await {
|
||||
debug!(
|
||||
dc = %dc,
|
||||
?family,
|
||||
alive,
|
||||
required,
|
||||
endpoint_count = endpoints.len(),
|
||||
"Skipping health reconnect: immediate refill is already in flight for this DC group"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
*inflight.entry(key).or_insert(0) += 1;
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ pub struct MeWriter {
|
|||
pub id: u64,
|
||||
pub addr: SocketAddr,
|
||||
pub generation: u64,
|
||||
pub created_at: Instant,
|
||||
pub tx: mpsc::Sender<WriterCommand>,
|
||||
pub cancel: CancellationToken,
|
||||
pub degraded: Arc<AtomicBool>,
|
||||
|
|
@ -82,7 +83,9 @@ pub struct MePool {
|
|||
pub(super) conn_count: AtomicUsize,
|
||||
pub(super) stats: Arc<crate::stats::Stats>,
|
||||
pub(super) generation: AtomicU64,
|
||||
pub(super) pending_hardswap_generation: AtomicU64,
|
||||
pub(super) hardswap: AtomicBool,
|
||||
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
|
||||
pub(super) me_pool_drain_ttl_secs: AtomicU64,
|
||||
pub(super) me_pool_force_close_secs: AtomicU64,
|
||||
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
|
||||
|
|
@ -232,7 +235,9 @@ impl MePool {
|
|||
refill_inflight: Arc::new(Mutex::new(HashSet::new())),
|
||||
conn_count: AtomicUsize::new(0),
|
||||
generation: AtomicU64::new(1),
|
||||
pending_hardswap_generation: AtomicU64::new(0),
|
||||
hardswap: AtomicBool::new(hardswap),
|
||||
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
|
||||
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
|
||||
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
|
||||
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
|
||||
|
|
@ -252,10 +257,6 @@ impl MePool {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn has_proxy_tag(&self) -> bool {
|
||||
self.proxy_tag.is_some()
|
||||
}
|
||||
|
||||
pub fn current_generation(&self) -> u64 {
|
||||
self.generation.load(Ordering::Relaxed)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::collections::HashSet;
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
|
|
@ -9,19 +10,97 @@ use crate::crypto::SecureRandom;
|
|||
|
||||
use super::pool::MePool;
|
||||
|
||||
const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20;
|
||||
const ME_FLAP_QUARANTINE_SECS: u64 = 25;
|
||||
|
||||
impl MePool {
|
||||
pub(super) async fn maybe_quarantine_flapping_endpoint(
|
||||
&self,
|
||||
addr: SocketAddr,
|
||||
uptime: Duration,
|
||||
) {
|
||||
if uptime > Duration::from_secs(ME_FLAP_UPTIME_THRESHOLD_SECS) {
|
||||
return;
|
||||
}
|
||||
|
||||
let until = Instant::now() + Duration::from_secs(ME_FLAP_QUARANTINE_SECS);
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
guard.retain(|_, expiry| *expiry > Instant::now());
|
||||
guard.insert(addr, until);
|
||||
warn!(
|
||||
%addr,
|
||||
uptime_ms = uptime.as_millis(),
|
||||
quarantine_secs = ME_FLAP_QUARANTINE_SECS,
|
||||
"ME endpoint temporarily quarantined due to rapid writer flap"
|
||||
);
|
||||
}
|
||||
|
||||
async fn is_endpoint_quarantined(&self, addr: SocketAddr) -> bool {
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
let now = Instant::now();
|
||||
guard.retain(|_, expiry| *expiry > now);
|
||||
guard.contains_key(&addr)
|
||||
}
|
||||
|
||||
async fn connectable_endpoints(&self, endpoints: &[SocketAddr]) -> Vec<SocketAddr> {
|
||||
if endpoints.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let mut guard = self.endpoint_quarantine.lock().await;
|
||||
let now = Instant::now();
|
||||
guard.retain(|_, expiry| *expiry > now);
|
||||
|
||||
let mut ready = Vec::<SocketAddr>::with_capacity(endpoints.len());
|
||||
let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None;
|
||||
for addr in endpoints {
|
||||
if let Some(expiry) = guard.get(addr).copied() {
|
||||
match earliest_quarantine {
|
||||
Some((_, current_expiry)) if current_expiry <= expiry => {}
|
||||
_ => earliest_quarantine = Some((*addr, expiry)),
|
||||
}
|
||||
} else {
|
||||
ready.push(*addr);
|
||||
}
|
||||
}
|
||||
|
||||
if !ready.is_empty() {
|
||||
return ready;
|
||||
}
|
||||
|
||||
if let Some((addr, expiry)) = earliest_quarantine {
|
||||
debug!(
|
||||
%addr,
|
||||
wait_ms = expiry.saturating_duration_since(now).as_millis(),
|
||||
"All ME endpoints are quarantined for the DC group; retrying earliest one"
|
||||
);
|
||||
return vec![addr];
|
||||
}
|
||||
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
pub(super) async fn has_refill_inflight_for_endpoints(&self, endpoints: &[SocketAddr]) -> bool {
|
||||
if endpoints.is_empty() {
|
||||
return false;
|
||||
}
|
||||
let guard = self.refill_inflight.lock().await;
|
||||
endpoints.iter().any(|addr| guard.contains(addr))
|
||||
}
|
||||
|
||||
pub(super) async fn connect_endpoints_round_robin(
|
||||
self: &Arc<Self>,
|
||||
endpoints: &[SocketAddr],
|
||||
rng: &SecureRandom,
|
||||
) -> bool {
|
||||
if endpoints.is_empty() {
|
||||
let candidates = self.connectable_endpoints(endpoints).await;
|
||||
if candidates.is_empty() {
|
||||
return false;
|
||||
}
|
||||
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % endpoints.len();
|
||||
for offset in 0..endpoints.len() {
|
||||
let idx = (start + offset) % endpoints.len();
|
||||
let addr = endpoints[idx];
|
||||
let start = (self.rr.fetch_add(1, Ordering::Relaxed) as usize) % candidates.len();
|
||||
for offset in 0..candidates.len() {
|
||||
let idx = (start + offset) % candidates.len();
|
||||
let addr = candidates[idx];
|
||||
match self.connect_one(addr, rng).await {
|
||||
Ok(()) => return true,
|
||||
Err(e) => debug!(%addr, error = %e, "ME connect failed during round-robin warmup"),
|
||||
|
|
@ -83,7 +162,9 @@ impl MePool {
|
|||
|
||||
async fn refill_writer_after_loss(self: &Arc<Self>, addr: SocketAddr) -> bool {
|
||||
let fast_retries = self.me_reconnect_fast_retry_count.max(1);
|
||||
let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await;
|
||||
|
||||
if !same_endpoint_quarantined {
|
||||
for attempt in 0..fast_retries {
|
||||
self.stats.increment_me_reconnect_attempt();
|
||||
match self.connect_one(addr, self.rng.as_ref()).await {
|
||||
|
|
@ -107,6 +188,12 @@ impl MePool {
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
%addr,
|
||||
"Skipping immediate same-endpoint reconnect because endpoint is quarantined"
|
||||
);
|
||||
}
|
||||
|
||||
let dc_endpoints = self.endpoints_for_same_dc(addr).await;
|
||||
if dc_endpoints.is_empty() {
|
||||
|
|
|
|||
|
|
@ -266,8 +266,26 @@ impl MePool {
|
|||
}
|
||||
|
||||
let previous_generation = self.current_generation();
|
||||
let generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let hardswap = self.hardswap.load(Ordering::Relaxed);
|
||||
let generation = if hardswap {
|
||||
let pending_generation = self.pending_hardswap_generation.load(Ordering::Relaxed);
|
||||
if pending_generation != 0 && pending_generation >= previous_generation {
|
||||
debug!(
|
||||
previous_generation,
|
||||
generation = pending_generation,
|
||||
"ME hardswap continues with pending generation"
|
||||
);
|
||||
pending_generation
|
||||
} else {
|
||||
let next_generation = self.generation.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
self.pending_hardswap_generation
|
||||
.store(next_generation, Ordering::Relaxed);
|
||||
next_generation
|
||||
}
|
||||
} else {
|
||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
||||
self.generation.fetch_add(1, Ordering::Relaxed) + 1
|
||||
};
|
||||
|
||||
if hardswap {
|
||||
self.warmup_generation_for_all_dcs(rng, generation, &desired_by_dc)
|
||||
|
|
@ -354,6 +372,9 @@ impl MePool {
|
|||
drop(writers);
|
||||
|
||||
if stale_writer_ids.is_empty() {
|
||||
if hardswap {
|
||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
||||
}
|
||||
debug!("ME reinit cycle completed with no stale writers");
|
||||
return;
|
||||
}
|
||||
|
|
@ -375,6 +396,9 @@ impl MePool {
|
|||
self.mark_writer_draining_with_timeout(writer_id, drain_timeout, !hardswap)
|
||||
.await;
|
||||
}
|
||||
if hardswap {
|
||||
self.pending_hardswap_generation.store(0, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn zero_downtime_reinit_periodic(self: &Arc<Self>, rng: &SecureRandom) {
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ impl MePool {
|
|||
id: writer_id,
|
||||
addr,
|
||||
generation,
|
||||
created_at: Instant::now(),
|
||||
tx: tx.clone(),
|
||||
cancel: cancel.clone(),
|
||||
degraded: degraded.clone(),
|
||||
|
|
@ -249,6 +250,7 @@ impl MePool {
|
|||
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
|
||||
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
|
||||
let mut removed_addr: Option<SocketAddr> = None;
|
||||
let mut removed_uptime: Option<Duration> = None;
|
||||
let mut trigger_refill = false;
|
||||
{
|
||||
let mut ws = self.writers.write().await;
|
||||
|
|
@ -261,6 +263,7 @@ impl MePool {
|
|||
self.stats.increment_me_writer_removed_total();
|
||||
w.cancel.cancel();
|
||||
removed_addr = Some(w.addr);
|
||||
removed_uptime = Some(w.created_at.elapsed());
|
||||
trigger_refill = !was_draining;
|
||||
if trigger_refill {
|
||||
self.stats.increment_me_writer_removed_unexpected_total();
|
||||
|
|
@ -275,6 +278,9 @@ impl MePool {
|
|||
if trigger_refill
|
||||
&& let Some(addr) = removed_addr
|
||||
{
|
||||
if let Some(uptime) = removed_uptime {
|
||||
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
|
||||
}
|
||||
self.trigger_immediate_refill(addr);
|
||||
}
|
||||
self.rtt_stats.lock().await.remove(&writer_id);
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Instant;
|
||||
|
|
@ -45,7 +46,10 @@ pub(crate) async fn reader_loop(
|
|||
_ = cancel.cancelled() => return Ok(()),
|
||||
};
|
||||
if n == 0 {
|
||||
return Ok(());
|
||||
return Err(ProxyError::Io(std::io::Error::new(
|
||||
ErrorKind::UnexpectedEof,
|
||||
"ME socket closed by peer",
|
||||
)));
|
||||
}
|
||||
raw.extend_from_slice(&tmp[..n]);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue