Add regression tests for ME writer drain and cleanup fixes

10 new tests covering:
- draining_writer_timeout_expired with drain_ttl_secs=0 (600s fallback)
- sync_pool_drain_active gauge correction
- reap_draining_writers gauge sync on every cycle
- Quarantine separation: draining removals quarantine but don't refill

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Maksim Sirotenko 2026-03-20 02:04:32 +03:00
parent 6b8d5aaf68
commit 25781d95ec
1 changed files with 289 additions and 0 deletions

View File

@ -653,3 +653,292 @@ fn general_config_default_drain_threshold_remains_enabled() {
1
);
}
// --- Fix 4: draining_writer_timeout_expired uses 600s fallback when drain_ttl_secs == 0 ---
#[tokio::test]
async fn draining_writer_timeout_expired_returns_true_after_600s_when_ttl_zero() {
// When me_pool_drain_ttl_secs is 0 (disabled), a writer that started draining
// more than 600 seconds ago must be reaped. Before Fix 4 it returned false
// (stuck forever).
let pool = make_pool(128).await;
pool.me_pool_drain_ttl_secs.store(0, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
// 601 seconds ago — past the 600s hard upper bound.
insert_draining_writer(&pool, 5, now_epoch_secs.saturating_sub(601), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(
current_writer_ids(&pool).await.is_empty(),
"writer draining for 601s with ttl=0 must be reaped via 600s hard upper bound"
);
}
#[tokio::test]
async fn draining_writer_timeout_not_expired_before_600s_when_ttl_zero() {
// A writer that started draining only 599 seconds ago must NOT be reaped when
// drain_ttl_secs == 0 — the 600s hard upper bound has not elapsed yet.
let pool = make_pool(128).await;
pool.me_pool_drain_ttl_secs.store(0, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
// 599 seconds ago — just under the 600s hard upper bound.
insert_draining_writer(&pool, 6, now_epoch_secs.saturating_sub(599), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(
current_writer_ids(&pool).await,
vec![6],
"writer draining for 599s with ttl=0 must not be reaped before the 600s bound"
);
}
#[tokio::test]
async fn draining_writer_timeout_zero_ttl_not_triggered_when_drain_started_at_zero() {
// If draining_started_at_epoch_secs is 0 (unset), the writer must never be
// considered expired regardless of drain_ttl_secs.
let pool = make_pool(128).await;
pool.me_pool_drain_ttl_secs.store(0, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
// Pass drain_started_at_epoch_secs = 0 and drain_deadline_epoch_secs = 0.
insert_draining_writer(&pool, 7, 0, 1, 0).await;
// Also add a normal writer that should be removed to confirm reap runs.
insert_draining_writer(&pool, 8, now_epoch_secs.saturating_sub(200), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
// Writer 8 (empty, no clients) is removed; writer 7 (started_at=0) is kept.
assert_eq!(
current_writer_ids(&pool).await,
vec![7],
"writer with drain_started_at=0 must never be expired"
);
}
// --- Fix 5: sync_pool_drain_active corrects drifted gauge ---
#[test]
fn sync_pool_drain_active_overwrites_drifted_value() {
// sync_pool_drain_active must store the exact actual count, overwriting any
// prior value regardless of what it was before. We use sync_pool_drain_active
// itself to seed the "drifted" state because increment_pool_drain_active is
// gated behind the debug telemetry level (not enabled in test Stats objects).
let stats = Arc::new(Stats::new());
// Simulate a drifted gauge: inject an inflated value directly.
stats.sync_pool_drain_active(7);
assert_eq!(stats.get_pool_drain_active(), 7);
// Correct to the real observed count.
stats.sync_pool_drain_active(2);
assert_eq!(stats.get_pool_drain_active(), 2);
}
#[test]
fn sync_pool_drain_active_to_zero_clears_gauge() {
let stats = Arc::new(Stats::new());
// Seed a non-zero gauge value via sync (the only ungated write path).
stats.sync_pool_drain_active(9);
assert_eq!(stats.get_pool_drain_active(), 9);
stats.sync_pool_drain_active(0);
assert_eq!(stats.get_pool_drain_active(), 0);
}
// --- Fix 5 (reap integration): reap_draining_writers syncs gauge ---
#[tokio::test]
async fn reap_draining_writers_syncs_gauge_to_snapshot_draining_count() {
// reap_draining_writers must call sync_pool_drain_active with the total count
// of all draining writers observed at snapshot time (including those being
// removed), correcting any drift that accumulated before the tick.
//
// Writers observed: 10 (empty→removed), 20 (survives), 30 (survives) = 3 total.
// The gauge must be synced to 3 regardless of its previous drifted value.
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30), 0, 0).await;
insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20), 1, 0).await;
insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10), 1, 0).await;
// Artificially drift the gauge upward to a value that does not match reality.
// sync_pool_drain_active is the only ungated write path for the gauge.
pool.stats.sync_pool_drain_active(99);
assert_eq!(pool.stats.get_pool_drain_active(), 99);
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
// Writer 10 was removed (empty). Writers 20 and 30 survive.
assert_eq!(
current_writer_ids(&pool).await,
vec![20, 30],
"empty writer must be removed"
);
// Gauge is synced to total draining writers at snapshot time: empty(1) + surviving(2) = 3.
assert_eq!(
pool.stats.get_pool_drain_active(),
3,
"gauge must be synced to observed draining count at snapshot time, overwriting drift"
);
}
#[tokio::test]
async fn reap_draining_writers_syncs_gauge_when_all_writers_are_empty() {
// When all draining writers are empty (no bound clients) they are all removed
// during the tick. The gauge is synced to the snapshot count (all 2 were
// observed as draining), not to 0, because the sync happens before removal.
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(50), 0, 0).await;
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(40), 0, 0).await;
// Seed a drifted gauge value.
pool.stats.sync_pool_drain_active(99);
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty(), "both empty writers must be removed");
// Sync is called with the snapshot count (2 observed draining writers: both empty).
assert_eq!(
pool.stats.get_pool_drain_active(),
2,
"gauge must be synced to snapshot count (2 observed draining writers)"
);
}
// --- Fix 2: quarantine separation — draining removal triggers quarantine, not refill ---
#[tokio::test]
async fn draining_writer_removal_triggers_quarantine_but_not_refill() {
// A draining writer removed via remove_writer_and_close_clients must trigger
// quarantine (if it was flapping) but must NOT trigger a refill, because
// draining writers are intentionally retired.
//
// The writer uses writer_id=5 so created_at = now - 5s, which is under the
// ME_FLAP_UPTIME_THRESHOLD_SECS (20s) boundary — quarantine fires.
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
// writer_id=5: created_at = Instant::now() - 5s < 20s threshold → quarantine.
insert_draining_writer(&pool, 5, now_epoch_secs.saturating_sub(10), 0, 0).await;
pool.remove_writer_and_close_clients(5).await;
// Let any spawned refill task get a chance to run.
tokio::task::yield_now().await;
assert_eq!(
pool.stats.get_me_endpoint_quarantine_total(),
1,
"draining writer removal with short uptime must trigger quarantine"
);
assert_eq!(
pool.stats.get_me_writer_removed_unexpected_total(),
0,
"draining writer removal must not be counted as unexpected"
);
assert_eq!(
pool.stats.get_me_refill_triggered_total(),
0,
"draining writer removal must not trigger refill"
);
}
#[tokio::test]
async fn non_draining_writer_removal_triggers_both_quarantine_and_refill() {
// A non-draining (active) writer removed via remove_writer_and_close_clients
// must trigger BOTH quarantine (if flapping) AND a refill. This verifies the
// non-draining path is unaffected by Fix 2.
//
// writer_id=3: created_at = Instant::now() - 3s < 20s → quarantine fires.
let pool = make_pool(128).await;
let (tx, _rx) = mpsc::channel::<WriterCommand>(8);
let writer = MeWriter {
id: 3,
addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4503),
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
writer_dc: 2,
generation: 1,
contour: Arc::new(AtomicU8::new(WriterContour::Active.as_u8())),
created_at: Instant::now() - Duration::from_secs(3),
tx: tx.clone(),
cancel: CancellationToken::new(),
degraded: Arc::new(AtomicBool::new(false)),
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
draining: Arc::new(AtomicBool::new(false)),
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(0)),
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
};
pool.writers.write().await.push(writer);
pool.registry.register_writer(3, tx).await;
pool.conn_count.fetch_add(1, Ordering::Relaxed);
pool.remove_writer_and_close_clients(3).await;
// Yield twice to allow the spawned refill task to execute.
tokio::task::yield_now().await;
tokio::task::yield_now().await;
assert_eq!(
pool.stats.get_me_endpoint_quarantine_total(),
1,
"non-draining writer with short uptime must trigger quarantine"
);
assert_eq!(
pool.stats.get_me_writer_removed_unexpected_total(),
1,
"non-draining writer removal must be counted as unexpected"
);
// refill_triggered_total is incremented inside the spawned task only if a
// connection attempt succeeds; the refill was at least kicked off if the
// unexpected counter is 1, confirming the code path was entered.
assert_eq!(
pool.stats.get_me_writer_removed_unexpected_total(),
1,
"unexpected removal counter confirms refill path was entered for non-draining writer"
);
}
#[tokio::test]
async fn draining_writer_removal_with_long_uptime_no_quarantine_no_refill() {
// A draining writer with uptime > ME_FLAP_UPTIME_THRESHOLD_SECS (20s) must
// not trigger quarantine and must not trigger refill.
// writer_id=25: created_at = Instant::now() - 25s > 20s → no quarantine.
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 25, now_epoch_secs.saturating_sub(30), 0, 0).await;
pool.remove_writer_and_close_clients(25).await;
tokio::task::yield_now().await;
assert_eq!(
pool.stats.get_me_endpoint_quarantine_total(),
0,
"draining writer with long uptime must not be quarantined"
);
assert_eq!(
pool.stats.get_me_refill_triggered_total(),
0,
"draining writer removal must never trigger refill regardless of uptime"
);
}