diff --git a/src/cli.rs b/src/cli.rs index 5fbd7d5..b6e2d92 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -199,8 +199,14 @@ update_every = 43200 hardswap = false me_pool_drain_ttl_secs = 90 me_instadrain = false +me_pool_drain_threshold = 32 +me_pool_drain_soft_evict_grace_secs = 10 +me_pool_drain_soft_evict_per_writer = 2 +me_pool_drain_soft_evict_budget_per_core = 16 +me_pool_drain_soft_evict_cooldown_ms = 1000 +me_bind_stale_mode = "never" me_pool_min_fresh_ratio = 0.8 -me_reinit_drain_timeout_secs = 120 +me_reinit_drain_timeout_secs = 90 [network] ipv4 = true diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 6d74c93..fea8305 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -40,10 +40,10 @@ const DEFAULT_ME_ROUTE_HYBRID_MAX_WAIT_MS: u64 = 3000; const DEFAULT_ME_ROUTE_BLOCKING_SEND_TIMEOUT_MS: u64 = 250; const DEFAULT_ME_C2ME_SEND_TIMEOUT_MS: u64 = 4000; const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; -const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 10; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 2; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 16; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 1000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_ACCEPT_PERMIT_TIMEOUT_MS: u64 = 250; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; @@ -606,7 +606,7 @@ pub(crate) fn default_proxy_secret_len_max() -> usize { } pub(crate) fn default_me_reinit_drain_timeout_secs() -> u64 { - 120 + 90 } pub(crate) fn default_me_pool_drain_ttl_secs() -> u64 { @@ -618,7 +618,7 @@ pub(crate) fn default_me_instadrain() -> bool { } pub(crate) fn default_me_pool_drain_threshold() -> u64 { - 128 + 32 } pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool { diff --git a/src/config/load.rs b/src/config/load.rs index c296697..14799ed 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -2037,6 +2037,45 @@ mod tests { let _ = std::fs::remove_file(path); } + #[test] + fn force_close_default_matches_drain_ttl() { + let toml = r#" + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_force_close_default_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 90); + assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 90); + let _ = std::fs::remove_file(path); + } + + #[test] + fn force_close_zero_uses_runtime_safety_fallback() { + let toml = r#" + [general] + me_reinit_drain_timeout_secs = 0 + + [censorship] + tls_domain = "example.com" + + [access.users] + user = "00000000000000000000000000000000" + "#; + let dir = std::env::temp_dir(); + let path = dir.join("telemt_force_close_zero_fallback_test.toml"); + std::fs::write(&path, toml).unwrap(); + let cfg = ProxyConfig::load(&path).unwrap(); + assert_eq!(cfg.general.me_reinit_drain_timeout_secs, 0); + assert_eq!(cfg.general.effective_me_pool_force_close_secs(), 300); + let _ = std::fs::remove_file(path); + } + #[test] fn force_close_bumped_when_below_drain_ttl() { let toml = r#" diff --git a/src/config/types.rs b/src/config/types.rs index ecd051d..d018187 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -135,8 +135,8 @@ impl MeSocksKdfPolicy { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] pub enum MeBindStaleMode { - Never, #[default] + Never, Ttl, Always, } @@ -855,7 +855,7 @@ pub struct GeneralConfig { pub me_pool_min_fresh_ratio: f32, /// Drain timeout in seconds for stale ME writers after endpoint map changes. - /// Set to 0 to keep stale writers draining indefinitely (no force-close). + /// Set to 0 to use the runtime safety fallback timeout. #[serde(default = "default_me_reinit_drain_timeout_secs")] pub me_reinit_drain_timeout_secs: u64, @@ -1068,8 +1068,13 @@ impl GeneralConfig { /// Resolve force-close timeout for stale writers. /// `me_reinit_drain_timeout_secs` remains backward-compatible alias. + /// A configured `0` uses the runtime safety fallback (300s). pub fn effective_me_pool_force_close_secs(&self) -> u64 { - self.me_reinit_drain_timeout_secs + if self.me_reinit_drain_timeout_secs == 0 { + 300 + } else { + self.me_reinit_drain_timeout_secs + } } } diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 0b1310a..eb45cc4 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -332,25 +332,76 @@ pub(crate) async fn initialize_me_pool( "Middle-End pool initialized successfully" ); - let pool_health = pool_bg.clone(); - let rng_health = rng_bg.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_health, - rng_health, - min_conns, - ) - .await; - }); - let pool_drain_enforcer = pool_bg.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_drain_timeout_enforcer( - pool_drain_enforcer, - ) - .await; - }); - break; + // ── Supervised background tasks ────────────────── + // Each task runs inside a nested tokio::spawn so + // that a panic is caught via JoinHandle and the + // outer loop restarts the task automatically. + let pool_health = pool_bg.clone(); + let rng_health = rng_bg.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + loop { + let p = pool_health.clone(); + let r = rng_health.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + p, r, min_conns, + ) + .await; + }) + .await; + match res { + Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_health_monitor panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_drain_enforcer = pool_bg.clone(); + tokio::spawn(async move { + loop { + let p = pool_drain_enforcer.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_watchdog = pool_bg.clone(); + tokio::spawn(async move { + loop { + let p = pool_watchdog.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + // CRITICAL: keep the current-thread runtime + // alive. Without this, block_on() returns, + // the Runtime is dropped, and ALL spawned + // background tasks (health monitor, drain + // enforcer, zombie watchdog) are silently + // cancelled — causing the draining-writer + // leak that brought us here. + std::future::pending::<()>().await; + unreachable!(); } Err(e) => { startup_tracker_bg.set_me_last_error(Some(e.to_string())).await; @@ -408,23 +459,65 @@ pub(crate) async fn initialize_me_pool( "Middle-End pool initialized successfully" ); - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - let pool_drain_enforcer = pool.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_drain_timeout_enforcer( - pool_drain_enforcer, - ) - .await; - }); - + // ── Supervised background tasks ────────────────── + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + loop { + let p = pool_clone.clone(); + let r = rng_clone.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + p, r, min_conns, + ) + .await; + }) + .await; + match res { + Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_health_monitor panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_drain_enforcer = pool.clone(); + tokio::spawn(async move { + loop { + let p = pool_drain_enforcer.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_watchdog = pool.clone(); + tokio::spawn(async move { + loop { + let p = pool_watchdog.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + break Some(pool); } Err(e) => { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 5829de4..9d4cc70 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1327,33 +1327,6 @@ async fn recover_single_endpoint_outage( } let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); - let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); - if !bypass_quarantine { - let quarantine_remaining = { - let mut guard = pool.endpoint_quarantine.lock().await; - let quarantine_now = Instant::now(); - guard.retain(|_, expiry| *expiry > quarantine_now); - guard - .get(&endpoint) - .map(|expiry| expiry.saturating_duration_since(quarantine_now)) - }; - - if let Some(remaining) = quarantine_remaining - && !remaining.is_zero() - { - outage_next_attempt.insert(key, now + remaining); - debug!( - dc = %key.0, - family = ?key.1, - %endpoint, - required, - wait_ms = remaining.as_millis(), - "Single-endpoint outage reconnect deferred by endpoint quarantine" - ); - return; - } - } - if *reconnect_budget == 0 { outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); debug!( @@ -1369,6 +1342,7 @@ async fn recover_single_endpoint_outage( pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); + let bypass_quarantine = pool.single_endpoint_outage_disable_quarantine(); let attempt_ok = if bypass_quarantine { pool.stats .increment_me_single_endpoint_quarantine_bypass_total(); @@ -1576,6 +1550,170 @@ async fn maybe_rotate_single_endpoint_shadow( ); } +/// Last-resort safety net for draining writers stuck past their deadline. +/// +/// Runs every `TICK_SECS` and force-closes any draining writer whose +/// `drain_deadline_epoch_secs` has been exceeded by more than a threshold. +/// +/// Two thresholds: +/// - `SOFT_THRESHOLD_SECS` (60s): writers with no bound clients +/// - `HARD_THRESHOLD_SECS` (300s): writers WITH bound clients (unconditional) +/// +/// Intentionally kept trivial and independent of pool config to minimise +/// the probability of panicking itself. Uses `SystemTime` directly +/// as a fallback clock source and timeouts on every lock acquisition +/// and writer removal so one stuck writer cannot block the rest. +pub async fn me_zombie_writer_watchdog(pool: Arc) { + use std::time::{SystemTime, UNIX_EPOCH}; + + const TICK_SECS: u64 = 30; + const SOFT_THRESHOLD_SECS: u64 = 60; + const HARD_THRESHOLD_SECS: u64 = 300; + const LOCK_TIMEOUT_SECS: u64 = 5; + const REMOVE_TIMEOUT_SECS: u64 = 10; + const HARD_DETACH_TIMEOUT_STREAK: u8 = 3; + + let mut removal_timeout_streak = HashMap::::new(); + + loop { + tokio::time::sleep(Duration::from_secs(TICK_SECS)).await; + + let now = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(d) => d.as_secs(), + Err(_) => continue, + }; + + // Phase 1: collect zombie IDs under a short read-lock with timeout. + let zombie_ids_with_meta: Vec<(u64, bool)> = { + let Ok(ws) = tokio::time::timeout( + Duration::from_secs(LOCK_TIMEOUT_SECS), + pool.writers.read(), + ) + .await + else { + warn!("zombie_watchdog: writers read-lock timeout, skipping tick"); + continue; + }; + ws.iter() + .filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed)) + .filter_map(|w| { + let deadline = w + .drain_deadline_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if deadline == 0 { + return None; + } + let overdue = now.saturating_sub(deadline); + if overdue == 0 { + return None; + } + let started = w + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + let drain_age = now.saturating_sub(started); + if drain_age > HARD_THRESHOLD_SECS { + return Some((w.id, true)); + } + if overdue > SOFT_THRESHOLD_SECS { + return Some((w.id, false)); + } + None + }) + .collect() + }; + // read lock released here + + if zombie_ids_with_meta.is_empty() { + removal_timeout_streak.clear(); + continue; + } + + let mut active_zombie_ids = HashSet::::with_capacity(zombie_ids_with_meta.len()); + for (writer_id, _) in &zombie_ids_with_meta { + active_zombie_ids.insert(*writer_id); + } + removal_timeout_streak.retain(|writer_id, _| active_zombie_ids.contains(writer_id)); + + warn!( + zombie_count = zombie_ids_with_meta.len(), + soft_threshold_secs = SOFT_THRESHOLD_SECS, + hard_threshold_secs = HARD_THRESHOLD_SECS, + "Zombie draining writers detected by watchdog, force-closing" + ); + + // Phase 2: remove each writer individually with a timeout. + // One stuck removal cannot block the rest. + for (writer_id, had_clients) in &zombie_ids_with_meta { + let result = tokio::time::timeout( + Duration::from_secs(REMOVE_TIMEOUT_SECS), + pool.remove_writer_and_close_clients(*writer_id), + ) + .await; + match result { + Ok(()) => { + removal_timeout_streak.remove(writer_id); + pool.stats.increment_pool_force_close_total(); + pool.stats + .increment_me_draining_writers_reap_progress_total(); + info!( + writer_id, + had_clients, + "Zombie writer removed by watchdog" + ); + } + Err(_) => { + let streak = removal_timeout_streak + .entry(*writer_id) + .and_modify(|value| *value = value.saturating_add(1)) + .or_insert(1); + warn!( + writer_id, + had_clients, + timeout_streak = *streak, + "Zombie writer removal timed out" + ); + if *streak < HARD_DETACH_TIMEOUT_STREAK { + continue; + } + + let hard_detach = tokio::time::timeout( + Duration::from_secs(REMOVE_TIMEOUT_SECS), + pool.remove_draining_writer_hard_detach(*writer_id), + ) + .await; + match hard_detach { + Ok(true) => { + removal_timeout_streak.remove(writer_id); + pool.stats.increment_pool_force_close_total(); + pool.stats + .increment_me_draining_writers_reap_progress_total(); + info!( + writer_id, + had_clients, + "Zombie writer hard-detached after repeated timeouts" + ); + } + Ok(false) => { + removal_timeout_streak.remove(writer_id); + debug!( + writer_id, + had_clients, + "Zombie hard-detach skipped (writer already gone or no longer draining)" + ); + } + Err(_) => { + warn!( + writer_id, + had_clients, + "Zombie hard-detach timed out, will retry next tick" + ); + } + } + } + } + } + } +} #[cfg(test)] mod tests { use std::collections::HashMap; @@ -1587,10 +1725,9 @@ mod tests { use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; - use super::{reap_draining_writers, recover_single_endpoint_outage}; + use super::reap_draining_writers; use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; use crate::crypto::SecureRandom; - use crate::network::IpFamily; use crate::network::probe::NetworkDecision; use crate::stats::Stats; use crate::transport::middle_proxy::codec::WriterCommand; @@ -1772,65 +1909,4 @@ mod tests { assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); } - - #[tokio::test] - async fn removing_draining_writer_still_quarantines_flapping_endpoint() { - let pool = make_pool(1).await; - let now_epoch_secs = MePool::now_epoch_secs(); - let writer_id = 11u64; - let writer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000 + writer_id as u16); - let conn_id = - insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(5)).await; - - assert!(pool - .registry - .evict_bound_conn_if_writer(conn_id, writer_id) - .await); - pool.remove_writer_and_close_clients(writer_id).await; - - assert!(pool.is_endpoint_quarantined(writer_addr).await); - } - - #[tokio::test] - async fn single_endpoint_outage_respects_quarantine_when_bypass_disabled() { - let pool = make_pool(1).await; - pool.me_single_endpoint_outage_disable_quarantine - .store(false, Ordering::Relaxed); - - let key = (2, IpFamily::V4); - let endpoint = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7443); - let quarantine_ttl = Duration::from_millis(200); - { - let mut guard = pool.endpoint_quarantine.lock().await; - guard.insert(endpoint, Instant::now() + quarantine_ttl); - } - - let rng = Arc::new(SecureRandom::new()); - let mut outage_backoff = HashMap::new(); - let mut outage_next_attempt = HashMap::new(); - let mut reconnect_budget = 1usize; - let started_at = Instant::now(); - - recover_single_endpoint_outage( - &pool, - &rng, - key, - endpoint, - 1, - &mut outage_backoff, - &mut outage_next_attempt, - &mut reconnect_budget, - ) - .await; - - assert_eq!(reconnect_budget, 1); - assert_eq!( - pool.stats - .get_me_single_endpoint_outage_reconnect_attempt_total(), - 0 - ); - assert_eq!(pool.stats.get_me_single_endpoint_quarantine_bypass_total(), 0); - let next_attempt = outage_next_attempt.get(&key).copied().unwrap(); - assert!(next_attempt >= started_at + Duration::from_millis(120)); - } } diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index bcdaf2e..230cd64 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -12,7 +12,9 @@ use super::codec::WriterCommand; use super::health::{health_drain_close_budget, reap_draining_writers}; use super::pool::{MePool, MeWriter, WriterContour}; use super::registry::ConnMeta; -use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; +use crate::config::{ + GeneralConfig, MeBindStaleMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode, +}; use crate::crypto::SecureRandom; use crate::network::probe::NetworkDecision; use crate::stats::Stats; @@ -646,10 +648,23 @@ async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediatel #[test] fn general_config_default_drain_threshold_remains_enabled() { - assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); + assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 32); assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled); assert_eq!( - GeneralConfig::default().me_pool_drain_soft_evict_per_writer, - 1 + GeneralConfig::default().me_pool_drain_soft_evict_grace_secs, + 10 ); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_per_writer, + 2 + ); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_budget_per_core, + 16 + ); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_cooldown_ms, + 1000 + ); + assert_eq!(GeneralConfig::default().me_bind_stale_mode, MeBindStaleMode::Never); } diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 26ded29..8c57717 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -30,7 +30,7 @@ mod health_adversarial_tests; use bytes::Bytes; -pub use health::{me_drain_timeout_enforcer, me_health_monitor}; +pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog}; #[allow(unused_imports)] pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use pool::MePool; diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 441d41d..f825058 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -18,6 +18,8 @@ use crate::transport::UpstreamManager; use super::ConnRegistry; use super::codec::WriterCommand; +const ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS: u64 = 300; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(super) struct RefillDcKey { pub dc: i32, @@ -229,6 +231,14 @@ impl MePool { .as_secs() } + fn normalize_force_close_secs(force_close_secs: u64) -> u64 { + if force_close_secs == 0 { + ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS + } else { + force_close_secs + } + } + pub fn new( proxy_tag: Option>, proxy_secret: Vec, @@ -477,7 +487,9 @@ impl MePool { me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new( me_pool_drain_soft_evict_cooldown_ms.max(1), ), - me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), + me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs( + me_pool_force_close_secs, + )), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio, )), @@ -587,8 +599,10 @@ impl MePool { ); self.me_pool_drain_soft_evict_cooldown_ms .store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed); - self.me_pool_force_close_secs - .store(force_close_secs, Ordering::Relaxed); + self.me_pool_force_close_secs.store( + Self::normalize_force_close_secs(force_close_secs), + Ordering::Relaxed, + ); self.me_pool_min_fresh_ratio_permille .store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed); self.me_hardswap_warmup_delay_min_ms @@ -733,12 +747,9 @@ impl MePool { } pub(super) fn force_close_timeout(&self) -> Option { - let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed); - if secs == 0 { - None - } else { - Some(Duration::from_secs(secs)) - } + let secs = + Self::normalize_force_close_secs(self.me_pool_force_close_secs.load(Ordering::Relaxed)); + Some(Duration::from_secs(secs)) } pub(super) fn drain_soft_evict_enabled(&self) -> bool { diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index 43e2e6b..e4fb95f 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -49,43 +49,36 @@ impl MePool { return Vec::new(); } - loop { - let mut guard = self.endpoint_quarantine.lock().await; - let now = Instant::now(); - guard.retain(|_, expiry| *expiry > now); + let mut guard = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + guard.retain(|_, expiry| *expiry > now); - let mut ready = Vec::::with_capacity(endpoints.len()); - let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; - for addr in endpoints { - if let Some(expiry) = guard.get(addr).copied() { - match earliest_quarantine { - Some((_, current_expiry)) if current_expiry <= expiry => {} - _ => earliest_quarantine = Some((*addr, expiry)), - } - } else { - ready.push(*addr); + let mut ready = Vec::::with_capacity(endpoints.len()); + let mut earliest_quarantine: Option<(SocketAddr, Instant)> = None; + for addr in endpoints { + if let Some(expiry) = guard.get(addr).copied() { + match earliest_quarantine { + Some((_, current_expiry)) if current_expiry <= expiry => {} + _ => earliest_quarantine = Some((*addr, expiry)), } + } else { + ready.push(*addr); } + } - if !ready.is_empty() { - return ready; - } + if !ready.is_empty() { + return ready; + } - let Some((addr, expiry)) = earliest_quarantine else { - return Vec::new(); - }; - let remaining = expiry.saturating_duration_since(now); - if remaining.is_zero() { - return vec![addr]; - } - drop(guard); + if let Some((addr, expiry)) = earliest_quarantine { debug!( %addr, - wait_ms = remaining.as_millis(), - "All ME endpoints quarantined; waiting for earliest to expire" + wait_ms = expiry.saturating_duration_since(now).as_millis(), + "All ME endpoints are quarantined for the DC group; waiting for quarantine expiry" ); - tokio::time::sleep(remaining).await; } + + Vec::new() } pub(super) async fn has_refill_inflight_for_dc_key(&self, key: RefillDcKey) -> bool { diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index e368ead..e3ea44d 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -20,7 +20,6 @@ use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32}; use super::codec::{RpcWriter, WriterCommand}; use super::pool::{MePool, MeWriter, WriterContour}; use super::reader::reader_loop; -use super::registry::BoundConn; use super::wire::build_proxy_req_payload; const ME_ACTIVE_PING_SECS: u64 = 25; @@ -28,6 +27,12 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; +#[derive(Clone, Copy)] +enum WriterTeardownMode { + Any, + DrainingOnly, +} + fn is_me_peer_closed_error(error: &ProxyError) -> bool { matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof) } @@ -142,10 +147,10 @@ impl MePool { seq_no: 0, crc_mode: hs.crc_mode, }; + let cancel_wr = cancel.clone(); let cleanup_done = Arc::new(AtomicBool::new(false)); let cleanup_for_writer = cleanup_done.clone(); - let pool_writer = Arc::downgrade(self); - let cancel_wr = cancel.clone(); + let pool_writer_task = Arc::downgrade(self); tokio::spawn(async move { loop { tokio::select! { @@ -163,15 +168,14 @@ impl MePool { _ = cancel_wr.cancelled() => break, } } - cancel_wr.cancel(); if cleanup_for_writer .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() { - if let Some(pool) = pool_writer.upgrade() { + if let Some(pool) = pool_writer_task.upgrade() { pool.remove_writer_and_close_clients(writer_id).await; } else { - debug!(writer_id, "ME writer cleanup skipped: pool dropped"); + cancel_wr.cancel(); } } }); @@ -255,7 +259,6 @@ impl MePool { stats_reader_close.increment_me_idle_close_by_peer_total(); info!(writer_id, "ME socket closed by peer on idle writer"); } - cancel_reader_token.cancel(); if cleanup_for_reader .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok() @@ -263,12 +266,9 @@ impl MePool { if let Some(pool) = pool.upgrade() { pool.remove_writer_and_close_clients(writer_id).await; } else { - let remaining = writers_arc.read().await.len(); - debug!( - writer_id, - remaining, - "ME reader cleanup skipped: pool dropped" - ); + // Fallback for shutdown races: make writer task exit quickly so stale + // channels are observable by periodic prune. + cancel_reader_token.cancel(); } } if let Err(e) = res { @@ -276,6 +276,8 @@ impl MePool { warn!(error = %e, "ME reader ended"); } } + let remaining = writers_arc.read().await.len(); + debug!(writer_id, remaining, "ME reader task finished"); }); let pool_ping = Arc::downgrade(self); @@ -365,13 +367,12 @@ impl MePool { stats_ping.increment_me_keepalive_failed(); debug!("ME ping failed, removing dead writer"); cancel_ping.cancel(); - if cleanup_for_ping - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) - .is_ok() + if let Some(pool) = pool_ping.upgrade() + && cleanup_for_ping + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() { - if let Some(pool) = pool_ping.upgrade() { - pool.remove_writer_and_close_clients(writer_id).await; - } + pool.remove_writer_and_close_clients(writer_id).await; } break; } @@ -514,18 +515,49 @@ impl MePool { pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { // Full client cleanup now happens inside `registry.writer_lost` to keep // writer reap/remove paths strictly non-blocking per connection. - let _ = self.remove_writer_only(writer_id).await; + let _ = self + .remove_writer_with_mode(writer_id, WriterTeardownMode::Any) + .await; } - async fn remove_writer_only(self: &Arc, writer_id: u64) -> Vec { + pub(super) async fn remove_draining_writer_hard_detach( + self: &Arc, + writer_id: u64, + ) -> bool { + self.remove_writer_with_mode(writer_id, WriterTeardownMode::DrainingOnly) + .await + } + + async fn remove_writer_only(self: &Arc, writer_id: u64) -> bool { + self.remove_writer_with_mode(writer_id, WriterTeardownMode::Any) + .await + } + + // Authoritative teardown primitive shared by normal cleanup and watchdog path. + // Lock-order invariant: + // 1) mutate `writers` under pool write lock, + // 2) release pool lock, + // 3) run registry/metrics/refill side effects. + // `registry.writer_lost` must never run while `writers` lock is held. + async fn remove_writer_with_mode( + self: &Arc, + writer_id: u64, + mode: WriterTeardownMode, + ) -> bool { let mut close_tx: Option> = None; let mut removed_addr: Option = None; let mut removed_dc: Option = None; let mut removed_uptime: Option = None; let mut trigger_refill = false; + let mut removed = false; { let mut ws = self.writers.write().await; if let Some(pos) = ws.iter().position(|w| w.id == writer_id) { + if matches!(mode, WriterTeardownMode::DrainingOnly) + && !ws[pos].draining.load(Ordering::Relaxed) + { + return false; + } let w = ws.remove(pos); let was_draining = w.draining.load(Ordering::Relaxed); if was_draining { @@ -542,6 +574,7 @@ impl MePool { } close_tx = Some(w.tx.clone()); self.conn_count.fetch_sub(1, Ordering::Relaxed); + removed = true; } } // State invariant: @@ -549,7 +582,7 @@ impl MePool { // - writer is removed from registry routing/binding maps via `writer_lost`. // The close command below is only a best-effort accelerator for task shutdown. // Cleanup progress must never depend on command-channel availability. - let conns = self.registry.writer_lost(writer_id).await; + let _ = self.registry.writer_lost(writer_id).await; { let mut tracker = self.ping_tracker.lock().await; tracker.retain(|_, (_, wid)| *wid != writer_id); @@ -576,22 +609,17 @@ impl MePool { } } } - // Quarantine flapping endpoints regardless of draining state — - // a rapidly dying endpoint is unstable whether it was draining or not. if let Some(addr) = removed_addr { if let Some(uptime) = removed_uptime { self.maybe_quarantine_flapping_endpoint(addr, uptime).await; } + if trigger_refill + && let Some(writer_dc) = removed_dc + { + self.trigger_immediate_refill_for_dc(addr, writer_dc); + } } - // Only trigger immediate refill for unexpected (non-draining) removals. - // Draining writers are intentionally being retired. - if trigger_refill - && let Some(addr) = removed_addr - && let Some(writer_dc) = removed_dc - { - self.trigger_immediate_refill_for_dc(addr, writer_dc); - } - conns + removed } pub(crate) async fn mark_writer_draining_with_timeout(