Add health monitoring tests for draining writers

- Introduced adversarial tests to validate the behavior of the health monitoring system under various conditions, including the management of draining writers.
- Implemented integration tests to ensure the health monitor correctly handles expired and empty draining writers.
- Added regression tests to verify the functionality of the draining writers' cleanup process, ensuring it adheres to the defined thresholds and budgets.
- Updated the module structure to include the new test files for better organization and maintainability.
This commit is contained in:
David Osipov
2026-03-17 17:11:51 +04:00
parent 9c9ba4becd
commit c9271d9083
7 changed files with 1653 additions and 8 deletions

View File

@@ -25,6 +25,9 @@ const HEALTH_RECONNECT_BUDGET_PER_CORE: usize = 2;
const HEALTH_RECONNECT_BUDGET_PER_DC: usize = 1;
const HEALTH_RECONNECT_BUDGET_MIN: usize = 4;
const HEALTH_RECONNECT_BUDGET_MAX: usize = 128;
const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
#[derive(Debug, Clone)]
struct DcFloorPlanEntry {
@@ -111,7 +114,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
}
}
async fn reap_draining_writers(
pub(super) async fn reap_draining_writers(
pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>,
) {
@@ -122,14 +125,22 @@ async fn reap_draining_writers(
.me_pool_drain_threshold
.load(std::sync::atomic::Ordering::Relaxed);
let writers = pool.writers.read().await.clone();
let activity = pool.registry.writer_activity_snapshot().await;
let mut draining_writers = Vec::new();
let mut empty_writer_ids = Vec::<u64>::new();
let mut force_close_writer_ids = Vec::<u64>::new();
for writer in writers {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue;
}
let is_empty = pool.registry.is_writer_empty(writer.id).await;
if is_empty {
pool.remove_writer_and_close_clients(writer.id).await;
if activity
.bound_clients_by_writer
.get(&writer.id)
.copied()
.unwrap_or(0)
== 0
{
empty_writer_ids.push(writer.id);
continue;
}
draining_writers.push(writer);
@@ -156,12 +167,13 @@ async fn reap_draining_writers(
"ME draining writer threshold exceeded, force-closing oldest draining writers"
);
for writer in draining_writers.drain(..overflow) {
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer.id).await;
force_close_writer_ids.push(writer.id);
}
}
let mut active_draining_writer_ids = HashSet::with_capacity(draining_writers.len());
for writer in draining_writers {
active_draining_writer_ids.insert(writer.id);
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
@@ -191,10 +203,59 @@ async fn reap_draining_writers(
.load(std::sync::atomic::Ordering::Relaxed);
if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs {
warn!(writer_id = writer.id, "Drain timeout, force-closing");
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer.id).await;
force_close_writer_ids.push(writer.id);
active_draining_writer_ids.remove(&writer.id);
}
}
warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id));
let close_budget = health_drain_close_budget();
let requested_force_close = force_close_writer_ids.len();
let requested_empty_close = empty_writer_ids.len();
let requested_close_total = requested_force_close.saturating_add(requested_empty_close);
let mut closed_writer_ids = HashSet::<u64>::new();
let mut closed_total = 0usize;
for writer_id in force_close_writer_ids {
if closed_total >= close_budget {
break;
}
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.stats.increment_pool_force_close_total();
pool.remove_writer_and_close_clients(writer_id).await;
closed_total = closed_total.saturating_add(1);
}
for writer_id in empty_writer_ids {
if closed_total >= close_budget {
break;
}
if !closed_writer_ids.insert(writer_id) {
continue;
}
pool.remove_writer_and_close_clients(writer_id).await;
closed_total = closed_total.saturating_add(1);
}
let pending_close_total = requested_close_total.saturating_sub(closed_total);
if pending_close_total > 0 {
warn!(
close_budget,
closed_total,
pending_close_total,
"ME draining close backlog deferred to next health cycle"
);
}
}
pub(super) fn health_drain_close_budget() -> usize {
let cpu_cores = std::thread::available_parallelism()
.map(std::num::NonZeroUsize::get)
.unwrap_or(1);
cpu_cores
.saturating_mul(HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE)
.clamp(HEALTH_DRAIN_CLOSE_BUDGET_MIN, HEALTH_DRAIN_CLOSE_BUDGET_MAX)
}
fn should_emit_writer_warn(