Merge branch 'main' into feat/shadowsocks-upstream

# Conflicts:
#	Cargo.lock
#	src/api/runtime_stats.rs
This commit is contained in:
Maxim Myalin
2026-03-20 15:22:36 +03:00
30 changed files with 1965 additions and 548 deletions

View File

@@ -298,6 +298,7 @@ async fn run_update_cycle(
pool.update_runtime_reinit_policy(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,
@@ -530,6 +531,7 @@ pub async fn me_config_updater(
pool.update_runtime_reinit_policy(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_instadrain,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,

View File

@@ -10,8 +10,10 @@ use tracing::{debug, info, warn};
use crate::config::MeFloorMode;
use crate::crypto::SecureRandom;
use crate::network::IpFamily;
use crate::stats::MeWriterTeardownReason;
use super::MePool;
use super::pool::MeWriter;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)]
@@ -30,6 +32,8 @@ const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS: u64 = 1;
const HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS: u64 = 1;
#[derive(Debug, Clone)]
struct DcFloorPlanEntry {
@@ -99,6 +103,8 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
let v6_degraded = check_family(
@@ -116,12 +122,63 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
degraded_interval = v4_degraded || v6_degraded;
}
}
pub async fn me_drain_timeout_enforcer(pool: Arc<MePool>) {
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut drain_soft_evict_next_allowed: HashMap<u64, Instant> = HashMap::new();
loop {
tokio::time::sleep(Duration::from_secs(
HEALTH_DRAIN_TIMEOUT_ENFORCER_INTERVAL_SECS,
))
.await;
reap_draining_writers(
&pool,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
}
}
fn draining_writer_timeout_expired(
pool: &MePool,
writer: &MeWriter,
now_epoch_secs: u64,
drain_ttl_secs: u64,
) -> bool {
if pool
.me_instadrain
.load(std::sync::atomic::Ordering::Relaxed)
{
return true;
}
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 {
return now_epoch_secs >= deadline_epoch_secs;
}
if drain_ttl_secs == 0 {
return false;
}
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if drain_started_at_epoch_secs == 0 {
return false;
}
now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
}
pub(super) async fn reap_draining_writers(
pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>,
@@ -137,11 +194,16 @@ pub(super) async fn reap_draining_writers(
let activity = pool.registry.writer_activity_snapshot().await;
let mut draining_writers = Vec::new();
let mut empty_writer_ids = Vec::<u64>::new();
let mut timeout_expired_writer_ids = Vec::<u64>::new();
let mut force_close_writer_ids = Vec::<u64>::new();
for writer in writers {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue;
}
if draining_writer_timeout_expired(pool, &writer, now_epoch_secs, drain_ttl_secs) {
timeout_expired_writer_ids.push(writer.id);
continue;
}
if activity
.bound_clients_by_writer
.get(&writer.id)
@@ -207,14 +269,6 @@ pub(super) async fn reap_draining_writers(
"ME draining writer remains non-empty past drain TTL"
);
}
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs {
warn!(writer_id = writer.id, "Drain timeout, force-closing");
force_close_writer_ids.push(writer.id);
active_draining_writer_ids.remove(&writer.id);
}
}
warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id));
@@ -299,11 +353,22 @@ pub(super) async fn reap_draining_writers(
}
}
let close_budget = health_drain_close_budget();
let mut closed_writer_ids = HashSet::<u64>::new();
for writer_id in timeout_expired_writer_ids {
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapTimeoutExpired)
.await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
}
let requested_force_close = force_close_writer_ids.len();
let requested_empty_close = empty_writer_ids.len();
let requested_close_total = requested_force_close.saturating_add(requested_empty_close);
let mut closed_writer_ids = HashSet::<u64>::new();
let close_budget = health_drain_close_budget();
let mut closed_total = 0usize;
for writer_id in force_close_writer_ids {
if closed_total >= close_budget {
@@ -313,7 +378,8 @@ pub(super) async fn reap_draining_writers(
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapThresholdForce)
.await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1);
@@ -325,7 +391,8 @@ pub(super) async fn reap_draining_writers(
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(writer_id, MeWriterTeardownReason::ReapEmpty)
.await;
pool.stats
.increment_me_draining_writers_reap_progress_total();
closed_total = closed_total.saturating_add(1);
@@ -396,6 +463,8 @@ async fn check_family(
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
drain_warn_next_allowed: &mut HashMap<u64, Instant>,
drain_soft_evict_next_allowed: &mut HashMap<u64, Instant>,
) -> bool {
let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me,
@@ -476,8 +545,15 @@ async fn check_family(
floor_plan.active_writers_current,
floor_plan.warm_writers_current,
);
let mut next_drain_reap_at = Instant::now();
for (dc, endpoints) in dc_endpoints {
if Instant::now() >= next_drain_reap_at {
reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed)
.await;
next_drain_reap_at = Instant::now()
+ Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS);
}
if endpoints.is_empty() {
continue;
}
@@ -621,6 +697,12 @@ async fn check_family(
let mut restored = 0usize;
for _ in 0..missing {
if Instant::now() >= next_drain_reap_at {
reap_draining_writers(pool, drain_warn_next_allowed, drain_soft_evict_next_allowed)
.await;
next_drain_reap_at = Instant::now()
+ Duration::from_secs(HEALTH_DRAIN_REAP_OPPORTUNISTIC_INTERVAL_SECS);
}
if reconnect_budget == 0 {
break;
}
@@ -1472,6 +1554,187 @@ async fn maybe_rotate_single_endpoint_shadow(
);
}
/// Last-resort safety net for draining writers stuck past their deadline.
///
/// Runs every `TICK_SECS` and force-closes any draining writer whose
/// `drain_deadline_epoch_secs` has been exceeded by more than a threshold.
///
/// Two thresholds:
/// - `SOFT_THRESHOLD_SECS` (60s): writers with no bound clients
/// - `HARD_THRESHOLD_SECS` (300s): writers WITH bound clients (unconditional)
///
/// Intentionally kept trivial and independent of pool config to minimise
/// the probability of panicking itself. Uses `SystemTime` directly
/// as a fallback clock source and timeouts on every lock acquisition
/// and writer removal so one stuck writer cannot block the rest.
pub async fn me_zombie_writer_watchdog(pool: Arc<MePool>) {
use std::time::{SystemTime, UNIX_EPOCH};
const TICK_SECS: u64 = 30;
const SOFT_THRESHOLD_SECS: u64 = 60;
const HARD_THRESHOLD_SECS: u64 = 300;
const LOCK_TIMEOUT_SECS: u64 = 5;
const REMOVE_TIMEOUT_SECS: u64 = 10;
const HARD_DETACH_TIMEOUT_STREAK: u8 = 3;
let mut removal_timeout_streak = HashMap::<u64, u8>::new();
loop {
tokio::time::sleep(Duration::from_secs(TICK_SECS)).await;
let now = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => continue,
};
// Phase 1: collect zombie IDs under a short read-lock with timeout.
let zombie_ids_with_meta: Vec<(u64, bool)> = {
let Ok(ws) = tokio::time::timeout(
Duration::from_secs(LOCK_TIMEOUT_SECS),
pool.writers.read(),
)
.await
else {
warn!("zombie_watchdog: writers read-lock timeout, skipping tick");
continue;
};
ws.iter()
.filter(|w| w.draining.load(std::sync::atomic::Ordering::Relaxed))
.filter_map(|w| {
let deadline = w
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if deadline == 0 {
return None;
}
let overdue = now.saturating_sub(deadline);
if overdue == 0 {
return None;
}
let started = w
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
let drain_age = now.saturating_sub(started);
if drain_age > HARD_THRESHOLD_SECS {
return Some((w.id, true));
}
if overdue > SOFT_THRESHOLD_SECS {
return Some((w.id, false));
}
None
})
.collect()
};
// read lock released here
if zombie_ids_with_meta.is_empty() {
removal_timeout_streak.clear();
continue;
}
let mut active_zombie_ids = HashSet::<u64>::with_capacity(zombie_ids_with_meta.len());
for (writer_id, _) in &zombie_ids_with_meta {
active_zombie_ids.insert(*writer_id);
}
removal_timeout_streak.retain(|writer_id, _| active_zombie_ids.contains(writer_id));
warn!(
zombie_count = zombie_ids_with_meta.len(),
soft_threshold_secs = SOFT_THRESHOLD_SECS,
hard_threshold_secs = HARD_THRESHOLD_SECS,
"Zombie draining writers detected by watchdog, force-closing"
);
// Phase 2: remove each writer individually with a timeout.
// One stuck removal cannot block the rest.
for (writer_id, had_clients) in &zombie_ids_with_meta {
let result = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_writer_and_close_clients(
*writer_id,
MeWriterTeardownReason::WatchdogStuckDraining,
),
)
.await;
match result {
Ok(true) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
info!(
writer_id,
had_clients,
"Zombie writer removed by watchdog"
);
}
Ok(false) => {
removal_timeout_streak.remove(writer_id);
debug!(
writer_id,
had_clients,
"Zombie writer watchdog removal became no-op"
);
}
Err(_) => {
pool.stats.increment_me_writer_teardown_timeout_total();
let streak = removal_timeout_streak
.entry(*writer_id)
.and_modify(|value| *value = value.saturating_add(1))
.or_insert(1);
warn!(
writer_id,
had_clients,
timeout_streak = *streak,
"Zombie writer removal timed out"
);
if *streak < HARD_DETACH_TIMEOUT_STREAK {
continue;
}
pool.stats.increment_me_writer_teardown_escalation_total();
let hard_detach = tokio::time::timeout(
Duration::from_secs(REMOVE_TIMEOUT_SECS),
pool.remove_draining_writer_hard_detach(
*writer_id,
MeWriterTeardownReason::WatchdogStuckDraining,
),
)
.await;
match hard_detach {
Ok(true) => {
removal_timeout_streak.remove(writer_id);
pool.stats.increment_pool_force_close_total();
pool.stats
.increment_me_draining_writers_reap_progress_total();
info!(
writer_id,
had_clients,
"Zombie writer hard-detached after repeated timeouts"
);
}
Ok(false) => {
removal_timeout_streak.remove(writer_id);
debug!(
writer_id,
had_clients,
"Zombie hard-detach skipped (writer already gone or no longer draining)"
);
}
Err(_) => {
pool.stats.increment_me_writer_teardown_timeout_total();
warn!(
writer_id,
had_clients,
"Zombie hard-detach timed out, will retry next tick"
);
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@@ -1548,6 +1811,7 @@ mod tests {
general.me_adaptive_floor_max_warm_writers_global,
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,

View File

@@ -81,6 +81,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global,
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
@@ -213,7 +214,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(600).saturating_add(writer_id),
now_epoch_secs.saturating_sub(20),
1,
0,
)
@@ -230,7 +231,7 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
}
assert_eq!(writer_count(&pool).await, threshold as usize);
assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]);
assert_eq!(sorted_writer_ids(&pool).await, vec![1, 2, 3]);
}
#[tokio::test]
@@ -315,7 +316,12 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let ids = sorted_writer_ids(&pool).await;
for writer_id in ids.into_iter().take(3) {
let _ = pool.remove_writer_and_close_clients(writer_id).await;
let _ = pool
.remove_writer_and_close_clients(
writer_id,
crate::stats::MeWriterTeardownReason::ReapEmpty,
)
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;

View File

@@ -80,6 +80,7 @@ async fn make_pool(
general.me_adaptive_floor_max_warm_writers_global,
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,

View File

@@ -12,7 +12,9 @@ use super::codec::WriterCommand;
use super::health::{health_drain_close_budget, reap_draining_writers};
use super::pool::{MePool, MeWriter, WriterContour};
use super::registry::ConnMeta;
use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode};
use crate::config::{
GeneralConfig, MeBindStaleMode, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode,
};
use crate::crypto::SecureRandom;
use crate::network::probe::NetworkDecision;
use crate::stats::Stats;
@@ -74,6 +76,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.me_adaptive_floor_max_warm_writers_global,
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_instadrain,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
@@ -180,15 +183,23 @@ async fn current_writer_ids(pool: &Arc<MePool>) -> Vec<u64> {
async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let conn_ids =
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await;
let conn_ids = insert_draining_writer(
&pool,
7,
now_epoch_secs.saturating_sub(180),
1,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.contains_key(&7));
let _ = pool.remove_writer_and_close_clients(7).await;
let _ = pool
.remove_writer_and_close_clients(7, crate::stats::MeWriterTeardownReason::ReapEmpty)
.await;
assert!(pool.registry.get_writer(conn_ids[0]).await.is_none());
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
@@ -331,17 +342,17 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
#[tokio::test]
async fn reap_draining_writers_limits_closes_per_health_tick() {
let pool = make_pool(128).await;
let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget();
let writer_total = close_budget.saturating_add(19);
let writer_total = close_budget.saturating_add(20);
for writer_id in 1..=writer_total as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
now_epoch_secs.saturating_sub(1),
0,
)
.await;
}
@@ -364,8 +375,8 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
now_epoch_secs.saturating_sub(1),
0,
0,
)
.await;
}
@@ -393,7 +404,7 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(200).saturating_add(writer_id),
now_epoch_secs.saturating_sub(20),
1,
0,
)
@@ -429,27 +440,27 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
#[tokio::test]
async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let pool = make_pool(128).await;
let pool = make_pool(1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let close_budget = health_drain_close_budget();
for writer_id in 1..=close_budget as u64 {
for writer_id in 1..=close_budget.saturating_add(1) as u64 {
insert_draining_writer(
&pool,
writer_id,
now_epoch_secs.saturating_sub(20),
1,
now_epoch_secs.saturating_sub(1),
0,
)
.await;
}
let empty_writer_id = close_budget as u64 + 1;
let empty_writer_id = close_budget.saturating_add(2) as u64;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]);
assert_eq!(current_writer_ids(&pool).await, vec![1, empty_writer_id]);
}
#[tokio::test]
@@ -518,7 +529,12 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
let existing_writer_ids = current_writer_ids(&pool).await;
for writer_id in existing_writer_ids.into_iter().take(4) {
let _ = pool.remove_writer_and_close_clients(writer_id).await;
let _ = pool
.remove_writer_and_close_clients(
writer_id,
crate::stats::MeWriterTeardownReason::ReapEmpty,
)
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
@@ -571,7 +587,14 @@ async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() {
.store(1, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await;
insert_draining_writer(
&pool,
77,
now_epoch_secs.saturating_sub(240),
3,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
@@ -595,7 +618,14 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
.store(60_000, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await;
insert_draining_writer(
&pool,
88,
now_epoch_secs.saturating_sub(240),
3,
now_epoch_secs.saturating_add(3_600),
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
@@ -608,12 +638,40 @@ async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
}
#[tokio::test]
async fn reap_draining_writers_instadrain_removes_non_expired_writers_immediately() {
let pool = make_pool(0).await;
pool.me_instadrain.store(true, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 101, now_epoch_secs.saturating_sub(5), 1, 0).await;
insert_draining_writer(&pool, 102, now_epoch_secs.saturating_sub(4), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
#[test]
fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 32);
assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
1
GeneralConfig::default().me_pool_drain_soft_evict_grace_secs,
10
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
2
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_budget_per_core,
16
);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_cooldown_ms,
1000
);
assert_eq!(GeneralConfig::default().me_bind_stale_mode, MeBindStaleMode::Never);
}

View File

@@ -30,7 +30,7 @@ mod health_adversarial_tests;
use bytes::Bytes;
pub use health::me_health_monitor;
pub use health::{me_drain_timeout_enforcer, me_health_monitor, me_zombie_writer_watchdog};
#[allow(unused_imports)]
pub use ping::{run_me_ping, format_sample_line, format_me_route, MePingReport, MePingSample, MePingFamily};
pub use pool::MePool;

View File

@@ -18,6 +18,8 @@ use crate::transport::UpstreamManager;
use super::ConnRegistry;
use super::codec::WriterCommand;
const ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS: u64 = 300;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(super) struct RefillDcKey {
pub dc: i32,
@@ -171,6 +173,7 @@ pub struct MePool {
pub(super) endpoint_quarantine: Arc<Mutex<HashMap<SocketAddr, Instant>>>,
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_instadrain: AtomicBool,
pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_drain_soft_evict_enabled: AtomicBool,
pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64,
@@ -228,6 +231,14 @@ impl MePool {
.as_secs()
}
fn normalize_force_close_secs(force_close_secs: u64) -> u64 {
if force_close_secs == 0 {
ME_FORCE_CLOSE_SAFETY_FALLBACK_SECS
} else {
force_close_secs
}
}
pub fn new(
proxy_tag: Option<Vec<u8>>,
proxy_secret: Vec<u8>,
@@ -279,6 +290,7 @@ impl MePool {
me_adaptive_floor_max_warm_writers_global: u32,
hardswap: bool,
me_pool_drain_ttl_secs: u64,
me_instadrain: bool,
me_pool_drain_threshold: u64,
me_pool_drain_soft_evict_enabled: bool,
me_pool_drain_soft_evict_grace_secs: u64,
@@ -462,6 +474,7 @@ impl MePool {
endpoint_quarantine: Arc::new(Mutex::new(HashMap::new())),
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_instadrain: AtomicBool::new(me_instadrain),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs),
@@ -474,7 +487,9 @@ impl MePool {
me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new(
me_pool_drain_soft_evict_cooldown_ms.max(1),
),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_force_close_secs: AtomicU64::new(Self::normalize_force_close_secs(
me_pool_force_close_secs,
)),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio,
)),
@@ -524,6 +539,7 @@ impl MePool {
&self,
hardswap: bool,
drain_ttl_secs: u64,
instadrain: bool,
pool_drain_threshold: u64,
pool_drain_soft_evict_enabled: bool,
pool_drain_soft_evict_grace_secs: u64,
@@ -568,6 +584,7 @@ impl MePool {
self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs
.store(drain_ttl_secs, Ordering::Relaxed);
self.me_instadrain.store(instadrain, Ordering::Relaxed);
self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed);
self.me_pool_drain_soft_evict_enabled
@@ -582,8 +599,10 @@ impl MePool {
);
self.me_pool_drain_soft_evict_cooldown_ms
.store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed);
self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed);
self.me_pool_force_close_secs.store(
Self::normalize_force_close_secs(force_close_secs),
Ordering::Relaxed,
);
self.me_pool_min_fresh_ratio_permille
.store(Self::ratio_to_permille(min_fresh_ratio), Ordering::Relaxed);
self.me_hardswap_warmup_delay_min_ms
@@ -728,12 +747,9 @@ impl MePool {
}
pub(super) fn force_close_timeout(&self) -> Option<Duration> {
let secs = self.me_pool_force_close_secs.load(Ordering::Relaxed);
if secs == 0 {
None
} else {
Some(Duration::from_secs(secs))
}
let secs =
Self::normalize_force_close_secs(self.me_pool_force_close_secs.load(Ordering::Relaxed));
Some(Duration::from_secs(secs))
}
pub(super) fn drain_soft_evict_enabled(&self) -> bool {

View File

@@ -74,9 +74,8 @@ impl MePool {
debug!(
%addr,
wait_ms = expiry.saturating_duration_since(now).as_millis(),
"All ME endpoints are quarantined for the DC group; retrying earliest one"
"All ME endpoints are quarantined for the DC group; waiting for quarantine expiry"
);
return vec![addr];
}
Vec::new()

View File

@@ -126,6 +126,7 @@ pub(crate) struct MeApiRuntimeSnapshot {
pub me_reconnect_backoff_cap_ms: u64,
pub me_reconnect_fast_retry_count: u32,
pub me_pool_drain_ttl_secs: u64,
pub me_instadrain: bool,
pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64,
pub me_pool_drain_soft_evict_per_writer: u8,
@@ -583,6 +584,7 @@ impl MePool {
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_instadrain: self.me_instadrain.load(Ordering::Relaxed),
me_pool_drain_soft_evict_enabled: self
.me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed),

View File

@@ -16,11 +16,13 @@ use crate::config::MeBindStaleMode;
use crate::crypto::SecureRandom;
use crate::error::{ProxyError, Result};
use crate::protocol::constants::{RPC_CLOSE_EXT_U32, RPC_PING_U32};
use crate::stats::{
MeWriterCleanupSideEffectStep, MeWriterTeardownMode, MeWriterTeardownReason,
};
use super::codec::{RpcWriter, WriterCommand};
use super::pool::{MePool, MeWriter, WriterContour};
use super::reader::reader_loop;
use super::registry::BoundConn;
use super::wire::build_proxy_req_payload;
const ME_ACTIVE_PING_SECS: u64 = 25;
@@ -28,6 +30,12 @@ const ME_ACTIVE_PING_JITTER_SECS: i64 = 5;
const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5;
const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700;
#[derive(Clone, Copy)]
enum WriterRemoveGuardMode {
Any,
DrainingOnly,
}
fn is_me_peer_closed_error(error: &ProxyError) -> bool {
matches!(error, ProxyError::Io(ioe) if ioe.kind() == ErrorKind::UnexpectedEof)
}
@@ -44,9 +52,16 @@ impl MePool {
for writer_id in closed_writer_ids {
if self.registry.is_writer_empty(writer_id).await {
let _ = self.remove_writer_only(writer_id).await;
let _ = self
.remove_writer_only(writer_id, MeWriterTeardownReason::PruneClosedWriter)
.await;
} else {
let _ = self.remove_writer_and_close_clients(writer_id).await;
let _ = self
.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::PruneClosedWriter,
)
.await;
}
}
}
@@ -143,6 +158,9 @@ impl MePool {
crc_mode: hs.crc_mode,
};
let cancel_wr = cancel.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_writer = cleanup_done.clone();
let pool_writer_task = Arc::downgrade(self);
tokio::spawn(async move {
loop {
tokio::select! {
@@ -160,6 +178,20 @@ impl MePool {
_ = cancel_wr.cancelled() => break,
}
}
if cleanup_for_writer
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
if let Some(pool) = pool_writer_task.upgrade() {
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::WriterTaskExit,
)
.await;
} else {
cancel_wr.cancel();
}
}
});
let writer = MeWriter {
id: writer_id,
@@ -196,7 +228,6 @@ impl MePool {
let cancel_ping = cancel.clone();
let tx_ping = tx.clone();
let ping_tracker_ping = ping_tracker.clone();
let cleanup_done = Arc::new(AtomicBool::new(false));
let cleanup_for_reader = cleanup_done.clone();
let cleanup_for_ping = cleanup_done.clone();
let keepalive_enabled = self.me_keepalive_enabled;
@@ -242,21 +273,29 @@ impl MePool {
stats_reader_close.increment_me_idle_close_by_peer_total();
info!(writer_id, "ME socket closed by peer on idle writer");
}
if let Some(pool) = pool.upgrade()
&& cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
if cleanup_for_reader
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
if let Some(pool) = pool.upgrade() {
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::ReaderExit,
)
.await;
} else {
// Fallback for shutdown races: make writer task exit quickly so stale
// channels are observable by periodic prune.
cancel_reader_token.cancel();
}
}
if let Err(e) = res {
if !idle_close_by_peer {
warn!(error = %e, "ME reader ended");
}
}
let mut ws = writers_arc.write().await;
ws.retain(|w| w.id != writer_id);
info!(remaining = ws.len(), "Dead ME writer removed from pool");
let remaining = writers_arc.read().await.len();
debug!(writer_id, remaining, "ME reader task finished");
});
let pool_ping = Arc::downgrade(self);
@@ -351,7 +390,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::PingSendFail,
)
.await;
}
break;
}
@@ -444,7 +487,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::SignalSendFail,
)
.await;
}
break;
}
@@ -478,7 +525,11 @@ impl MePool {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
pool.remove_writer_and_close_clients(writer_id).await;
pool.remove_writer_and_close_clients(
writer_id,
MeWriterTeardownReason::SignalSendFail,
)
.await;
}
break;
}
@@ -491,21 +542,83 @@ impl MePool {
Ok(())
}
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
pub(crate) async fn remove_writer_and_close_clients(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
// Full client cleanup now happens inside `registry.writer_lost` to keep
// writer reap/remove paths strictly non-blocking per connection.
let _ = self.remove_writer_only(writer_id).await;
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::Normal,
WriterRemoveGuardMode::Any,
)
.await
}
async fn remove_writer_only(self: &Arc<Self>, writer_id: u64) -> Vec<BoundConn> {
pub(super) async fn remove_draining_writer_hard_detach(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::HardDetach,
WriterRemoveGuardMode::DrainingOnly,
)
.await
}
async fn remove_writer_only(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
) -> bool {
self.remove_writer_with_mode(
writer_id,
reason,
MeWriterTeardownMode::Normal,
WriterRemoveGuardMode::Any,
)
.await
}
// Authoritative teardown primitive shared by normal cleanup and watchdog path.
// Lock-order invariant:
// 1) mutate `writers` under pool write lock,
// 2) release pool lock,
// 3) run registry/metrics/refill side effects.
// `registry.writer_lost` must never run while `writers` lock is held.
async fn remove_writer_with_mode(
self: &Arc<Self>,
writer_id: u64,
reason: MeWriterTeardownReason,
mode: MeWriterTeardownMode,
guard_mode: WriterRemoveGuardMode,
) -> bool {
let started_at = Instant::now();
self.stats
.increment_me_writer_teardown_attempt_total(reason, mode);
let mut close_tx: Option<mpsc::Sender<WriterCommand>> = None;
let mut removed_addr: Option<SocketAddr> = None;
let mut removed_dc: Option<i32> = None;
let mut removed_uptime: Option<Duration> = None;
let mut trigger_refill = false;
let mut removed = false;
{
let mut ws = self.writers.write().await;
if let Some(pos) = ws.iter().position(|w| w.id == writer_id) {
if matches!(guard_mode, WriterRemoveGuardMode::DrainingOnly)
&& !ws[pos].draining.load(Ordering::Relaxed)
{
self.stats.increment_me_writer_teardown_noop_total();
self.stats
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
return false;
}
let w = ws.remove(pos);
let was_draining = w.draining.load(Ordering::Relaxed);
if was_draining {
@@ -522,6 +635,7 @@ impl MePool {
}
close_tx = Some(w.tx.clone());
self.conn_count.fetch_sub(1, Ordering::Relaxed);
removed = true;
}
}
// State invariant:
@@ -529,7 +643,7 @@ impl MePool {
// - writer is removed from registry routing/binding maps via `writer_lost`.
// The close command below is only a best-effort accelerator for task shutdown.
// Cleanup progress must never depend on command-channel availability.
let conns = self.registry.writer_lost(writer_id).await;
let _ = self.registry.writer_lost(writer_id).await;
{
let mut tracker = self.ping_tracker.lock().await;
tracker.retain(|_, (_, wid)| *wid != writer_id);
@@ -542,6 +656,9 @@ impl MePool {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats
.increment_me_writer_close_signal_channel_full_total();
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelFull,
);
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is full"
@@ -549,6 +666,9 @@ impl MePool {
}
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_close_signal_drop_total();
self.stats.increment_me_writer_cleanup_side_effect_failures_total(
MeWriterCleanupSideEffectStep::CloseSignalChannelClosed,
);
debug!(
writer_id,
"Skipping close signal for removed writer: command channel is closed"
@@ -556,16 +676,24 @@ impl MePool {
}
}
}
if trigger_refill
&& let Some(addr) = removed_addr
&& let Some(writer_dc) = removed_dc
{
if let Some(addr) = removed_addr {
if let Some(uptime) = removed_uptime {
self.maybe_quarantine_flapping_endpoint(addr, uptime).await;
}
self.trigger_immediate_refill_for_dc(addr, writer_dc);
if trigger_refill
&& let Some(writer_dc) = removed_dc
{
self.trigger_immediate_refill_for_dc(addr, writer_dc);
}
}
conns
if removed {
self.stats.increment_me_writer_teardown_success_total(mode);
} else {
self.stats.increment_me_writer_teardown_noop_total();
}
self.stats
.observe_me_writer_teardown_duration(mode, started_at.elapsed());
removed
}
pub(crate) async fn mark_writer_draining_with_timeout(

View File

@@ -14,6 +14,7 @@ use crate::config::{MeRouteNoWriterMode, MeWriterPickMode};
use crate::error::{ProxyError, Result};
use crate::network::IpFamily;
use crate::protocol::constants::{RPC_CLOSE_CONN_U32, RPC_CLOSE_EXT_U32};
use crate::stats::MeWriterTeardownReason;
use super::MePool;
use super::codec::WriterCommand;
@@ -134,7 +135,11 @@ impl MePool {
Ok(()) => return Ok(()),
Err(TimedSendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
self.remove_writer_and_close_clients(
current.writer_id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue;
}
Err(TimedSendError::Timeout(_)) => {
@@ -151,7 +156,11 @@ impl MePool {
}
Err(TrySendError::Closed(_)) => {
warn!(writer_id = current.writer_id, "ME writer channel closed");
self.remove_writer_and_close_clients(current.writer_id).await;
self.remove_writer_and_close_clients(
current.writer_id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue;
}
}
@@ -458,7 +467,11 @@ impl MePool {
Err(TrySendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed");
self.remove_writer_and_close_clients(w.id).await;
self.remove_writer_and_close_clients(
w.id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
continue;
}
}
@@ -503,7 +516,11 @@ impl MePool {
Err(TimedSendError::Closed(_)) => {
self.stats.increment_me_writer_pick_closed_total(pick_mode);
warn!(writer_id = w.id, "ME writer channel closed (blocking)");
self.remove_writer_and_close_clients(w.id).await;
self.remove_writer_and_close_clients(
w.id,
MeWriterTeardownReason::RouteChannelClosed,
)
.await;
}
Err(TimedSendError::Timeout(_)) => {
self.stats.increment_me_writer_pick_full_total(pick_mode);
@@ -654,7 +671,11 @@ impl MePool {
}
Err(TrySendError::Closed(_)) => {
debug!("ME close write failed");
self.remove_writer_and_close_clients(w.writer_id).await;
self.remove_writer_and_close_clients(
w.writer_id,
MeWriterTeardownReason::CloseRpcChannelClosed,
)
.await;
}
}
} else {