ME Supervised Anti-Stuck

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-19 20:03:42 +03:00
parent 67dc1e8d18
commit 697eda6d16
No known key found for this signature in database
3 changed files with 172 additions and 38 deletions

View File

@ -332,25 +332,68 @@ pub(crate) async fn initialize_me_pool(
"Middle-End pool initialized successfully" "Middle-End pool initialized successfully"
); );
let pool_health = pool_bg.clone(); // ── Supervised background tasks ──────────────────
let rng_health = rng_bg.clone(); // Each task runs inside a nested tokio::spawn so
let min_conns = pool_size; // that a panic is caught via JoinHandle and the
tokio::spawn(async move { // outer loop restarts the task automatically.
crate::transport::middle_proxy::me_health_monitor( let pool_health = pool_bg.clone();
pool_health, let rng_health = rng_bg.clone();
rng_health, let min_conns = pool_size;
min_conns, tokio::spawn(async move {
) loop {
.await; let p = pool_health.clone();
}); let r = rng_health.clone();
let pool_drain_enforcer = pool_bg.clone(); let res = tokio::spawn(async move {
tokio::spawn(async move { crate::transport::middle_proxy::me_health_monitor(
crate::transport::middle_proxy::me_drain_timeout_enforcer( p, r, min_conns,
pool_drain_enforcer, )
) .await;
.await; })
}); .await;
break; match res {
Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_health_monitor panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_drain_enforcer = pool_bg.clone();
tokio::spawn(async move {
loop {
let p = pool_drain_enforcer.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await;
})
.await;
match res {
Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_watchdog = pool_bg.clone();
tokio::spawn(async move {
loop {
let p = pool_watchdog.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await;
})
.await;
match res {
Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
break;
} }
Err(e) => { Err(e) => {
startup_tracker_bg.set_me_last_error(Some(e.to_string())).await; startup_tracker_bg.set_me_last_error(Some(e.to_string())).await;
@ -408,23 +451,65 @@ pub(crate) async fn initialize_me_pool(
"Middle-End pool initialized successfully" "Middle-End pool initialized successfully"
); );
let pool_clone = pool.clone(); // ── Supervised background tasks ──────────────────
let rng_clone = rng.clone(); let pool_clone = pool.clone();
let min_conns = pool_size; let rng_clone = rng.clone();
tokio::spawn(async move { let min_conns = pool_size;
crate::transport::middle_proxy::me_health_monitor( tokio::spawn(async move {
pool_clone, rng_clone, min_conns, loop {
) let p = pool_clone.clone();
.await; let r = rng_clone.clone();
}); let res = tokio::spawn(async move {
let pool_drain_enforcer = pool.clone(); crate::transport::middle_proxy::me_health_monitor(
tokio::spawn(async move { p, r, min_conns,
crate::transport::middle_proxy::me_drain_timeout_enforcer( )
pool_drain_enforcer, .await;
) })
.await; .await;
}); match res {
Ok(()) => warn!("me_health_monitor exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_health_monitor panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_drain_enforcer = pool.clone();
tokio::spawn(async move {
loop {
let p = pool_drain_enforcer.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_drain_timeout_enforcer(p).await;
})
.await;
match res {
Ok(()) => warn!("me_drain_timeout_enforcer exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_drain_timeout_enforcer panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
let pool_watchdog = pool.clone();
tokio::spawn(async move {
loop {
let p = pool_watchdog.clone();
let res = tokio::spawn(async move {
crate::transport::middle_proxy::me_zombie_writer_watchdog(p).await;
})
.await;
match res {
Ok(()) => warn!("me_zombie_writer_watchdog exited unexpectedly, restarting"),
Err(e) => {
error!(error = %e, "me_zombie_writer_watchdog panicked, restarting in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
break Some(pool); break Some(pool);
} }
Err(e) => { Err(e) => {

View File

@ -1550,7 +1550,56 @@ async fn maybe_rotate_single_endpoint_shadow(
); );
} }
#[cfg(test)] /// Last-resort safety net for draining writers stuck past their deadline.
///
/// Runs every `TICK_SECS` and force-closes any writer whose
/// `drain_deadline_epoch_secs` has been exceeded by more than
/// `ZOMBIE_THRESHOLD_SECS`. Intentionally kept trivial to minimise
/// the probability of panicking itself.
pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
const TICK_SECS: u64 = 30;
const ZOMBIE_THRESHOLD_SECS: u64 = 300;
loop {
tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
let now_epoch_secs = MePool::now_epoch_secs();
// Collect zombie IDs under a short read-lock.
let zombie_ids: Vec<u64> = {
let ws = pool.writers.read().await;
ws.iter()
.filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed))
.filter(|w| {
let deadline = w
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
deadline != 0
&& now_epoch_secs.saturating_sub(deadline) > ZOMBIE_THRESHOLD_SECS
})
.map(|w| w.id)
.collect()
};
if zombie_ids.is_empty() {
continue;
}
warn!(
zombie_count = zombie_ids.len(),
threshold_secs = ZOMBIE_THRESHOLD_SECS,
"Zombie draining writers detected by watchdog, force-closing"
);
for writer_id in zombie_ids {
pool.remove_writer_and_close_clients(writer_id).await;
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
}
}
}
#[cfg(test)]
mod tests { mod tests {
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};

View File

@ -30,7 +30,7 @@ mod health_adversarial_tests;
use bytes::Bytes; use bytes::Bytes;
pub use health::{me_drain_timeout_enforcer, me_health_monitor}; pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog};
#[allow(unused_imports)] #[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily}; pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool; pub use pool::MePool;