Authoritative Teardown + Orphan Watchdog + Force-Close Safery Policy

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-20 12:11:47 +03:00
parent ed4d1167dd
commit f61d25ebe0
No known key found for this signature in database
11 changed files with 477 additions and 211 deletions

View File

@ -199,8 +199,14 @@ update_every = 43200
hardswap = false
me_pool_drain_ttl_secs = 90
me_instadrain = false
me_pool_drain_threshold = 32
me_pool_drain_soft_evict_grace_secs = 10
me_pool_drain_soft_evict_per_writer = 2
me_pool_drain_soft_evict_budget_per_core = 16
me_pool_drain_soft_evict_cooldown_ms = 1000
me_bind_stale_mode = "never"
me_pool_min_fresh_ratio = 0.8
me_reinit_drain_timeout_secs = 120
me_reinit_drain_timeout_secs = 90
[network]
ipv4 = true

View File

@ -40,10 +40,10 @@ const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000;
const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250;
const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 10;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 2;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
@ -606,7 +606,7 @@ pub(crate) fn default_proxy_secret_len_max() -> usize {
}
pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 {
120
90
}
pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 {
@ -618,7 +618,7 @@ pub(crate) fn default_me_instadrain() -> bool {
}
pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128
32
}
pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool {

View File

@ -2037,6 +2037,45 @@ mod tests {
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_default_matches_drain_ttl() {
let toml = r#"
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_force_close_default_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 90);
assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 90);
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_zero_uses_runtime_safety_fallback() {
let toml = r#"
[general]
me_reinit_drain_timeout_secs = 0
[censorship]
tls_domain = "example.com"
[access.users]
user = "00000000000000000000000000000000"
"#;
let dir = std::env::temp_dir();
let path = dir.join("telemt_force_close_zero_fallback_test.toml");
std::fs::write(&path, toml).unwrap();
let cfg = ProxyConfig::load(&path).unwrap();
assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 0);
assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 300);
let _ = std::fs::remove_file(path);
}
#[test]
fn force_close_bumped_when_below_drain_ttl() {
let toml = r#"

View File

@ -135,8 +135,8 @@ impl MeSocksKdfPolicy {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MeBindStaleMode {
Never,
#[default]
Never,
Ttl,
Always,
}
@ -855,7 +855,7 @@ pub struct GeneralConfig {
pub me_pool_min_fresh_ratio: f32,
/// Drain timeout in seconds for stale ME writers after endpoint map changes.
/// Set to 0 to keep stale writers draining indefinitely (no force-close).
/// Set to 0 to use the runtime safety fallback timeout.
#[serde(default = "default_me_reinit_drain_timeout_secs")]
pub me_reinit_drain_timeout_secs: u64,
@ -1068,8 +1068,13 @@ impl GeneralConfig {
/// Resolve force-close timeout for stale writers.
/// `me_reinit_drain_timeout_secs` remains backward-compatible alias.
/// A configured `0` uses the runtime safety fallback (300s).
pub fn effective_me_pool_force_close_secs(&self) -> u64 {
self.me_reinit_drain_timeout_secs
if self.me_reinit_drain_timeout_secs == 0 {
300
} else {
self.me_reinit_drain_timeout_secs
}
}
}

View File

@ -332,25 +332,76 @@ pub(crate) async fn initialize_me_pool(
"Middle-End pool initialized successfully"
);
let pool_health = pool_bg.clone();
let rng_health = rng_bg.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_health,
rng_health,
min_conns,
)
.await;
});
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
break;
// ── Supervised background tasks ──────────────────
// Each task runs inside a nested tokio::spawn so
// that a panic is caught via JoinHandle and the
// outer loop restarts the task automatically.
let pool_health = pool_bg.clone();
let rng_health = rng_bg.clone();
let min_conns = pool_size;
tokio::spawn(async move {
loop {
let p = pool_health.clone();
let r = rng_health.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
p, r, min_conns,
)
.await;
})
.await;
match res {
Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_health_monitor panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
loop {
let p = pool_drain_enforcer.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await;
})
.await;
match res {
Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_watchdog = pool_bg.clone();
tokio::spawn(async move {
loop {
let p = pool_watchdog.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await;
})
.await;
match res {
Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
// CRITICAL: keep the current-thread runtime
// alive. Without this, block_on() returns,
// the Runtime is dropped, and ALL spawned
// background tasks (health monitor, drain
// enforcer, zombie watchdog) are silently
// cancelled — causing the draining-writer
// leak that brought us here.
std::future::pending::<()>().await;
unreachable!();
}
Err(e) => {
startup_tracker_bg.set_me_last_error(Some(e.to_string())).await;
@ -408,23 +459,65 @@ pub(crate) async fn initialize_me_pool(
"Middle-End pool initialized successfully"
);
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
pool_clone, rng_clone, min_conns,
)
.await;
});
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(
pool_drain_enforcer,
)
.await;
});
// ── Supervised background tasks ──────────────────
let pool_clone = pool.clone();
let rng_clone = rng.clone();
let min_conns = pool_size;
tokio::spawn(async move {
loop {
let p = pool_clone.clone();
let r = rng_clone.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_health_monitor(
p, r, min_conns,
)
.await;
})
.await;
match res {
Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_health_monitor panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
loop {
let p = pool_drain_enforcer.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await;
})
.await;
match res {
Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_watchdog = pool.clone();
tokio::spawn(async move {
loop {
let p = pool_watchdog.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await;
})
.await;
match res {
Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
break Some(pool);
}
Err(e) => {

View File

@ -1327,33 +1327,6 @@ async fn recover_single_endpoint_outage(
}
let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms();
let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine();
if !bypass_quarantine {
let quarantine_remaining = {
let mut guard = pool.endpoint_quarantine.lock().await;
let quarantine_now = Instant::now();
guard.retain(|_, expiry| *expiry > quarantine_now);
guard
.get(&endpoint)
.map(|expiry| expiry.saturating_duration_since(quarantine_now))
};
if let Some(remaining) = quarantine_remaining
&& !remaining.is_zero()
{
outage_next_attempt.insert(key, now + remaining);
debug!(
dc = %key.0,
family = ?key.1,
%endpoint,
required,
wait_ms = remaining.as_millis(),
"Single-endpoint outage reconnect deferred by endpoint quarantine"
);
return;
}
}
if *reconnect_budget == 0 {
outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250)));
debug!(
@ -1369,6 +1342,7 @@ async fn recover_single_endpoint_outage(
pool.stats
.increment_me_single_endpoint_outage_reconnect_attempt_total();
let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine();
let attempt_ok = if bypass_quarantine {
pool.stats
.increment_me_single_endpoint_quarantine_bypass_total();
@ -1576,6 +1550,170 @@ async fn maybe_rotate_single_endpoint_shadow(
);
}
/// Last-resort safety net for draining writers stuck past their deadline.
///
/// Runs every `TICK_SECS` and force-closes any draining writer whose
/// `drain_deadline_epoch_secs` has been exceeded by more than a threshold.
///
/// Two thresholds:
/// - `SOFT_THRESHOLD_SECS` (60s): writers with no bound clients
/// - `HARD_THRESHOLD_SECS` (300s): writers WITH bound clients (unconditional)
///
/// Intentionally kept trivial and independent of pool config to minimise
/// the probability of panicking itself. Uses `SystemTime` directly
/// as a fallback clock source and timeouts on every lock acquisition
/// and writer removal so one stuck writer cannot block the rest.
pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
use std::time::{SystemTime, UNIX_EPOCH};
const TICK_SECS: u64 = 30;
const SOFT_THRESHOLD_SECS: u64 = 60;
const HARD_THRESHOLD_SECS: u64 = 300;
const LOCK_TIMEOUT_SECS: u64 = 5;
const REMOVE_TIMEOUT_SECS: u64 = 10;
const HARD_DETACH_TIMEOUT_STREAK: u8 = 3;
let mut removal_timeout_streak = HashMap::<u64, u8>::new();
loop {
tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => continue,
};
// Phase 1: collect zombie IDs under a short read-lock with timeout.
let zombie_ids_with_meta: Vec<(u64, bool)> = {
let Ok(ws) = tokio::time::timeout(
Duration::from_secs(LOCK_TIMEOUT_SECS),
pool.writers.read(),
)
.await
else {
warn!("zombie_watchdog: writers read-lock timeout, skipping tick");
continue;
};
ws.iter()
.filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed))
.filter_map(|w| {
let deadline = w
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline == 0 {
return None;
}
let overdue = now.saturating_sub(deadline);
if overdue == 0 {
return None;
}
let started = w
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
let drain_age = now.saturating_sub(started);
if drain_age > HARD_THRESHOLD_SECS {
return Some((w.id, true));
}
if overdue > SOFT_THRESHOLD_SECS {
return Some((w.id, false));
}
None
})
.collect()
};
// read lock released here
if zombie_ids_with_meta.is_empty() {
removal_timeout_streak.clear();
continue;
}
let mut active_zombie_ids = HashSet::<u64>::with_capacity(zombie_ids_with_meta.len());
for (writer_id, _) in &zombie_ids_with_meta {
active_zombie_ids.insert(*writer_id);
}
removal_timeout_streak.retain(|writer_id, _| active_zombie_ids.contains(writer_id));
warn!(
zombie_count = zombie_ids_with_meta.len(),
soft_threshold_secs = SOFT_THRESHOLD_SECS,
hard_threshold_secs = HARD_THRESHOLD_SECS,
"Zombie draining writers detected by watchdog, force-closing"
);
// Phase 2: remove each writer individually with a timeout.
// One stuck removal cannot block the rest.
for (writer_id, had_clients) in &zombie_ids_with_meta {
let result = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_writer_and_close_clients(*writer_id),
)
.await;
match result {
Ok(()) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
info!(
writer_id,
had_clients,
"Zombie writer removed by watchdog"
);
}
Err(_) => {
let streak = removal_timeout_streak
.entry(*writer_id)
.and_modify(|value| *value = value.saturating_add(1))
.or_insert(1);
warn!(
writer_id,
had_clients,
timeout_streak = *streak,
"Zombie writer removal timed out"
);
if *streak < HARD_DETACH_TIMEOUT_STREAK {
continue;
}
let hard_detach = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_draining_writer_hard_detach(*writer_id),
)
.await;
match hard_detach {
Ok(true) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
info!(
writer_id,
had_clients,
"Zombie writer hard-detached after repeated timeouts"
);
}
Ok(false) => {
removal_timeout_streak.remove(writer_id);
debug!(
writer_id,
had_clients,
"Zombie hard-detach skipped (writer already gone or no longer draining)"
);
}
Err(_) => {
warn!(
writer_id,
had_clients,
"Zombie hard-detach timed out, will retry next tick"
);
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@ -1587,10 +1725,9 @@ mod tests {
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use super::{reap_draining_writers, recover_single_endpoint_outage};
use super::reap_draining_writers;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use crate::network::probe::NetworkDecision;
use crate::stats::Stats;
use crate::transport::middle_proxy::codec::WriterCommand;
@ -1772,65 +1909,4 @@ mod tests {
assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);
assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30);
}
#[tokio::test]
async fn removing_draining_writer_still_quarantines_flapping_endpoint() {
let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let writer_id = 11u64;
let writer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16);
let conn_id =
insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(5)).await;
assert!(pool
.registry
.evict_bound_conn_if_writer(conn_id, writer_id)
.await);
pool.remove_writer_and_close_clients(writer_id).await;
assert!(pool.is_endpoint_quarantined(writer_addr).await);
}
#[tokio::test]
async fn single_endpoint_outage_respects_quarantine_when_bypass_disabled() {
let pool = make_pool(1).await;
pool.me_single_endpoint_outage_disable_quarantine
.store(false, Ordering::Relaxed);
let key = (2, IpFamily::V4);
let endpoint = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7443);
let quarantine_ttl = Duration::from_millis(200);
{
let mut guard = pool.endpoint_quarantine.lock().await;
guard.insert(endpoint, Instant::now() + quarantine_ttl);
}
let rng = Arc::new(SecureRandom::new());
let mut outage_backoff = HashMap::new();
let mut outage_next_attempt = HashMap::new();
let mut reconnect_budget = 1usize;
let started_at = Instant::now();
recover_single_endpoint_outage(
&pool,
&rng,
key,
endpoint,
1,
&mut outage_backoff,
&mut outage_next_attempt,
&mut reconnect_budget,
)
.await;
assert_eq!(reconnect_budget, 1);
assert_eq!(
pool.stats
.get_me_single_endpoint_outage_reconnect_attempt_total(),
0
);
assert_eq!(pool.stats.get_me_single_endpoint_quarantine_bypass_total(), 0);
let next_attempt = outage_next_attempt.get(&key).copied().unwrap();
assert!(next_attempt >= started_at + Duration::from_millis(120));
}
}

View File

@ -12,7 +12,9 @@ use super::codec::WriterCommand;
use super::health::{health_drain_close_budget, reap_draining_writers};
use super::pool::{MePool, MeWriter, WriterContour};
use super::registry::ConnMeta;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::config::{
GeneralConfig, MeBindStaleMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode,
};
use crate::crypto::SecureRandom;
use crate::network::probe::NetworkDecision;
use crate::stats::Stats;
@ -646,10 +648,23 @@ async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediatel
#[test]
fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 32);
assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
1
GeneralConfig::default().me_pool_drain_soft_evict_grace_secs,
10
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
2
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_budget_per_core,
16
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_cooldown_ms,
1000
);
assert_eq!(GeneralConfig::default().me_bind_stale_mode, MeBindStaleMode::Never);
}

View File

@ -30,7 +30,7 @@ mod health_adversarial_tests;
use bytes::Bytes;
pub use health::{me_drain_timeout_enforcer, me_health_monitor};
pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog};
#[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool;

View File

@ -18,6 +18,8 @@ use crate::transport::UpstreamManager;
use super::ConnRegistry;
use super::codec::WriterCommand;
const ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS: u64 = 300;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(super) struct RefillDcKey {
pub dc: i32,
@ -229,6 +231,14 @@ impl MePool {
.as_secs()
}
fn normalize_force_close_secs(force_close_secs: u64) -> u64 {
if force_close_secs == 0 {
ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS
} else {
force_close_secs
}
}
pub fn new(
proxy_tag: Option<Vec<u8>>,
proxy_secret: Vec<u8>,
@ -477,7 +487,9 @@ impl MePool {
me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new(
me_pool_drain_soft_evict_cooldown_ms.max(1),
),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs(
me_pool_force_close_secs,
)),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio,
)),
@ -587,8 +599,10 @@ impl MePool {
);
self.me_pool_drain_soft_evict_cooldown_ms
.store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed);
self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed);
self.me_pool_force_close_secs.store(
Self::normalize_force_close_secs(force_close_secs),
Ordering::Relaxed,
);
self.me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
self.me_hardswap_warmup_delay_min_ms
@ -733,12 +747,9 @@ impl MePool {
}
pub(super) fn force_close_timeout(&self) -> Option<Duration> {
let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed);
if secs == 0 {
None
} else {
Some(Duration::from_secs(secs))
}
let secs =
Self::normalize_force_close_secs(self.me_pool_force_close_secs.load(Ordering::Relaxed));
Some(Duration::from_secs(secs))
}
pub(super) fn drain_soft_evict_enabled(&self) -> bool {

View File

@ -49,43 +49,36 @@ impl MePool {
return Vec::new();
}
loop {
let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now();
guard.retain(|_, expiry| *expiry > now);
let mut guard = self.endpoint_quarantine.lock().await;
let now = Instant::now();
guard.retain(|_, expiry| *expiry > now);
let mut ready = Vec::<SocketAddr>::with_capacity(endpoints.len());
let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None;
for addr in endpoints {
if let Some(expiry) = guard.get(addr).copied() {
match earliest_quarantine {
Some((_, current_expiry)) if current_expiry <= expiry => {}
_ => earliest_quarantine = Some((*addr, expiry)),
}
} else {
ready.push(*addr);
let mut ready = Vec::<SocketAddr>::with_capacity(endpoints.len());
let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None;
for addr in endpoints {
if let Some(expiry) = guard.get(addr).copied() {
match earliest_quarantine {
Some((_, current_expiry)) if current_expiry <= expiry => {}
_ => earliest_quarantine = Some((*addr, expiry)),
}
} else {
ready.push(*addr);
}
}
if !ready.is_empty() {
return ready;
}
if !ready.is_empty() {
return ready;
}
let Some((addr, expiry)) = earliest_quarantine else {
return Vec::new();
};
let remaining = expiry.saturating_duration_since(now);
if remaining.is_zero() {
return vec![addr];
}
drop(guard);
if let Some((addr, expiry)) = earliest_quarantine {
debug!(
%addr,
wait_ms = remaining.as_millis(),
"All ME endpoints quarantined; waiting for earliest to expire"
wait_ms = expiry.saturating_duration_since(now).as_millis(),
"All ME endpoints are quarantined for the DC group; waiting for quarantine expiry"
);
tokio::time::sleep(remaining).await;
}
Vec::new()
}
pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool {

View File

@ -20,7 +20,6 @@ use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
use super::codec::{RpcWriter, WriterCommand};
use super::pool::{MePool, MeWriter, WriterContour};
use super::reader::reader_loop;
use super::registry::BoundConn;
use super::wire::build_proxy_req_payload;
const ME_ACTIVE_PING_SECS: u64 = 25;
@ -28,6 +27,12 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
#[derive(Clone, Copy)]
enum WriterTeardownMode {
Any,
DrainingOnly,
}
fn is_me_peer_closed_error(error: &ProxyError) -> bool {
matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof)
}
@ -142,10 +147,10 @@ impl MePool {
seq_no: 0,
crc_mode: hs.crc_mode,
};
let cancel_wr = cancel.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_writer = cleanup_done.clone();
let pool_writer = Arc::downgrade(self);
let cancel_wr = cancel.clone();
let pool_writer_task = Arc::downgrade(self);
tokio::spawn(async move {
loop {
tokio::select! {
@ -163,15 +168,14 @@ impl MePool {
_ = cancel_wr.cancelled() => break,
}
}
cancel_wr.cancel();
if cleanup_for_writer
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
if let Some(pool) = pool_writer.upgrade() {
if let Some(pool) = pool_writer_task.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
debug!(writer_id, "ME writer cleanup skipped: pool dropped");
cancel_wr.cancel();
}
}
});
@ -255,7 +259,6 @@ impl MePool {
stats_reader_close.increment_me_idle_close_by_peer_total();
info!(writer_id, "ME socket closed by peer on idle writer");
}
cancel_reader_token.cancel();
if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
@ -263,12 +266,9 @@ impl MePool {
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
} else {
let remaining = writers_arc.read().await.len();
debug!(
writer_id,
remaining,
"ME reader cleanup skipped: pool dropped"
);
// Fallback for shutdown races: make writer task exit quickly so stale
// channels are observable by periodic prune.
cancel_reader_token.cancel();
}
}
if let Err(e) = res {
@ -276,6 +276,8 @@ impl MePool {
warn!(error = %e, "ME reader ended");
}
}
let remaining = writers_arc.read().await.len();
debug!(writer_id, remaining, "ME reader task finished");
});
let pool_ping = Arc::downgrade(self);
@ -365,13 +367,12 @@ impl MePool {
stats_ping.increment_me_keepalive_failed();
debug!("ME ping failed, removing dead writer");
cancel_ping.cancel();
if cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
if let Some(pool) = pool_ping.upgrade()
&& cleanup_for_ping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
if let Some(pool) = pool_ping.upgrade() {
pool.remove_writer_and_close_clients(writer_id).await;
}
pool.remove_writer_and_close_clients(writer_id).await;
}
break;
}
@ -514,18 +515,49 @@ impl MePool {
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
// Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection.
let _ = self.remove_writer_only(writer_id).await;
let _ = self
.remove_writer_with_mode(writer_id, WriterTeardownMode::Any)
.await;
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
pub(super) async fn remove_draining_writer_hard_detach(
self: &Arc<Self>,
writer_id: u64,
) -> bool {
self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly)
.await
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> bool {
self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any)
.await
}
// Authoritative teardown primitive shared by normal cleanup and watchdog path.
// Lock-order invariant:
// 1) mutate `writers` under pool write lock,
// 2) release pool lock,
// 3) run registry/metrics/refill side effects.
// `registry.writer_lost` must never run while `writers` lock is held.
async fn remove_writer_with_mode(
self: &Arc<Self>,
writer_id: u64,
mode: WriterTeardownMode,
) -> bool {
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None;
let mut removed_dc: Option<i32> = None;
let mut removed_uptime: Option<Duration> = None;
let mut trigger_refill = false;
let mut removed = false;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
if matches!(mode, WriterTeardownMode::DrainingOnly)
&& !ws[pos].draining.load(Ordering::Relaxed)
{
return false;
}
let w = ws.remove(pos);
let was_draining = w.draining.load(Ordering::Relaxed);
if was_draining {
@ -542,6 +574,7 @@ impl MePool {
}
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
removed = true;
}
}
// State invariant:
@ -549,7 +582,7 @@ impl MePool {
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await;
let _ = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
tracker.retain(|_, (_, wid)| *wid != writer_id);
@ -576,22 +609,17 @@ impl MePool {
}
}
}
// Quarantine flapping endpoints regardless of draining state —
// a rapidly dying endpoint is unstable whether it was draining or not.
if let Some(addr) = removed_addr {
if let Some(uptime) = removed_uptime {
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
if trigger_refill
&& let Some(writer_dc) = removed_dc
{
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
}
// Only trigger immediate refill for unexpected (non-draining) removals.
// Draining writers are intentionally being retired.
if trigger_refill
&& let Some(addr) = removed_addr
&& let Some(writer_dc) = removed_dc
{
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
conns
removed
}
pub(crate) async fn mark_writer_draining_with_timeout(