diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 0b1310a..65c5897 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -332,25 +332,68 @@ pub(crate) async fn initialize_me_pool( "Middle-End pool initialized successfully" ); - let pool_health = pool_bg.clone(); - let rng_health = rng_bg.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_health, - rng_health, - min_conns, - ) - .await; - }); - let pool_drain_enforcer = pool_bg.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_drain_timeout_enforcer( - pool_drain_enforcer, - ) - .await; - }); - break; + // ── Supervised background tasks ────────────────── + // Each task runs inside a nested tokio::spawn so + // that a panic is caught via JoinHandle and the + // outer loop restarts the task automatically. + let pool_health = pool_bg.clone(); + let rng_health = rng_bg.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + loop { + let p = pool_health.clone(); + let r = rng_health.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + p, r, min_conns, + ) + .await; + }) + .await; + match res { + Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_health_monitor panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_drain_enforcer = pool_bg.clone(); + tokio::spawn(async move { + loop { + let p = pool_drain_enforcer.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_watchdog = pool_bg.clone(); + tokio::spawn(async move { + loop { + let p = pool_watchdog.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + break; } Err(e) => { startup_tracker_bg.set_me_last_error(Some(e.to_string())).await; @@ -408,23 +451,65 @@ pub(crate) async fn initialize_me_pool( "Middle-End pool initialized successfully" ); - let pool_clone = pool.clone(); - let rng_clone = rng.clone(); - let min_conns = pool_size; - tokio::spawn(async move { - crate::transport::middle_proxy::me_health_monitor( - pool_clone, rng_clone, min_conns, - ) - .await; - }); - let pool_drain_enforcer = pool.clone(); - tokio::spawn(async move { - crate::transport::middle_proxy::me_drain_timeout_enforcer( - pool_drain_enforcer, - ) - .await; - }); - + // ── Supervised background tasks ────────────────── + let pool_clone = pool.clone(); + let rng_clone = rng.clone(); + let min_conns = pool_size; + tokio::spawn(async move { + loop { + let p = pool_clone.clone(); + let r = rng_clone.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_health_monitor( + p, r, min_conns, + ) + .await; + }) + .await; + match res { + Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_health_monitor panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_drain_enforcer = pool.clone(); + tokio::spawn(async move { + loop { + let p = pool_drain_enforcer.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + let pool_watchdog = pool.clone(); + tokio::spawn(async move { + loop { + let p = pool_watchdog.clone(); + let res = tokio::spawn(async move { + crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await; + }) + .await; + match res { + Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"), + Err(e) => { + error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + }); + break Some(pool); } Err(e) => { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 8b62cff..ed69526 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -1550,7 +1550,56 @@ async fn maybe_rotate_single_endpoint_shadow( ); } -#[cfg(test)] + /// Last-resort safety net for draining writers stuck past their deadline. + /// + /// Runs every `TICK_SECS` and force-closes any writer whose + /// `drain_deadline_epoch_secs` has been exceeded by more than + /// `ZOMBIE_THRESHOLD_SECS`. Intentionally kept trivial to minimise + /// the probability of panicking itself. + pub async fn me_zombie_writer_watchdog(pool: Arc) { + const TICK_SECS: u64 = 30; + const ZOMBIE_THRESHOLD_SECS: u64 = 300; + + loop { + tokio::time::sleep(Duration::from_secs(TICK_SECS)).await; + + let now_epoch_secs = MePool::now_epoch_secs(); + + // Collect zombie IDs under a short read-lock. + let zombie_ids: Vec = { + let ws = pool.writers.read().await; + ws.iter() + .filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed)) + .filter(|w| { + let deadline = w + .drain_deadline_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + deadline != 0 + && now_epoch_secs.saturating_sub(deadline) > ZOMBIE_THRESHOLD_SECS + }) + .map(|w| w.id) + .collect() + }; + + if zombie_ids.is_empty() { + continue; + } + + warn!( + zombie_count = zombie_ids.len(), + threshold_secs = ZOMBIE_THRESHOLD_SECS, + "Zombie draining writers detected by watchdog, force-closing" + ); + for writer_id in zombie_ids { + pool.remove_writer_and_close_clients(writer_id).await; + pool.stats.increment_pool_force_close_total(); + pool.stats + .increment_me_draining_writers_reap_progress_total(); + } + } + } + + #[cfg(test)] mod tests { use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 26ded29..8c57717 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -30,7 +30,7 @@ mod health_adversarial_tests; use bytes::Bytes; -pub use health::{me_drain_timeout_enforcer, me_health_monitor}; +pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog}; #[allow(unused_imports)] pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use pool::MePool;