ME Writer Rebinding - Lifecycle and Consistency fixes

This commit is contained in:
Alexey
2026-03-15 00:17:54 +03:00
parent dda31b3d2f
commit ac0698b772
7 changed files with 361 additions and 63 deletions

View File

@@ -62,6 +62,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut degraded_interval = true;
loop {
let interval = if degraded_interval {
@@ -71,7 +72,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
};
tokio::time::sleep(interval).await;
pool.prune_closed_writers().await;
reap_draining_writers(&pool).await;
reap_draining_writers(&pool, &mut drain_warn_next_allowed).await;
let v4_degraded = check_family(
IpFamily::V4,
&pool,
@@ -110,17 +111,47 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
}
}
async fn reap_draining_writers(pool: &Arc<MePool>) {
async fn reap_draining_writers(
pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>,
) {
let now_epoch_secs = MePool::now_epoch_secs();
let now = Instant::now();
let drain_ttl_secs = pool.me_pool_drain_ttl_secs.load(std::sync::atomic::Ordering::Relaxed);
let writers = pool.writers.read().await.clone();
for writer in writers {
if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) {
continue;
}
if pool.registry.is_writer_empty(writer.id).await {
let is_empty = pool.registry.is_writer_empty(writer.id).await;
if is_empty {
pool.remove_writer_and_close_clients(writer.id).await;
continue;
}
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if drain_ttl_secs > 0
&& drain_started_at_epoch_secs != 0
&& now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs
&& should_emit_writer_warn(
warn_next_allowed,
writer.id,
now,
pool.warn_rate_limit_duration(),
)
{
warn!(
writer_id = writer.id,
writer_dc = writer.writer_dc,
endpoint = %writer.addr,
generation = writer.generation,
drain_ttl_secs,
force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed),
allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed),
"ME draining writer remains non-empty past drain TTL"
);
}
let deadline_epoch_secs = writer
.drain_deadline_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
@@ -132,6 +163,23 @@ async fn reap_draining_writers(pool: &Arc<MePool>) {
}
}
fn should_emit_writer_warn(
next_allowed: &mut HashMap<u64, Instant>,
writer_id: u64,
now: Instant,
cooldown: Duration,
) -> bool {
let Some(ready_at) = next_allowed.get(&writer_id).copied() else {
next_allowed.insert(writer_id, now + cooldown);
return true;
};
if now >= ready_at {
next_allowed.insert(writer_id, now + cooldown);
return true;
}
false
}
async fn check_family(
family: IpFamily,
pool: &Arc<MePool>,