diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 8ac6839..edc9598 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -124,12 +124,12 @@ pub(super) async fn reap_draining_writers( let drain_threshold = pool .me_pool_drain_threshold .load(std::sync::atomic::Ordering::Relaxed); - let writers = pool.writers.read().await.clone(); let activity = pool.registry.writer_activity_snapshot().await; - let mut draining_writers = Vec::new(); + let mut draining_writers = Vec::::new(); let mut empty_writer_ids = Vec::::new(); let mut force_close_writer_ids = Vec::::new(); - for writer in writers { + let writers = pool.writers.read().await; + for writer in writers.iter() { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { continue; } @@ -143,23 +143,38 @@ pub(super) async fn reap_draining_writers( empty_writer_ids.push(writer.id); continue; } - draining_writers.push(writer); + draining_writers.push(DrainingWriterSnapshot { + id: writer.id, + writer_dc: writer.writer_dc, + addr: writer.addr, + generation: writer.generation, + created_at: writer.created_at, + draining_started_at_epoch_secs: writer + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed), + drain_deadline_epoch_secs: writer + .drain_deadline_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed), + allow_drain_fallback: writer + .allow_drain_fallback + .load(std::sync::atomic::Ordering::Relaxed), + }); } + drop(writers); - if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { + let overflow = if drain_threshold > 0 && draining_writers.len() > drain_threshold as usize { + draining_writers.len().saturating_sub(drain_threshold as usize) + } else { + 0 + }; + + if overflow > 0 { draining_writers.sort_by(|left, right| { - let left_started = left - .draining_started_at_epoch_secs - .load(std::sync::atomic::Ordering::Relaxed); - let right_started = right - .draining_started_at_epoch_secs - .load(std::sync::atomic::Ordering::Relaxed); - left_started - .cmp(&right_started) + left.draining_started_at_epoch_secs + .cmp(&right.draining_started_at_epoch_secs) .then_with(|| left.created_at.cmp(&right.created_at)) .then_with(|| left.id.cmp(&right.id)) }); - let overflow = draining_writers.len().saturating_sub(drain_threshold as usize); warn!( draining_writers = draining_writers.len(), me_pool_drain_threshold = drain_threshold, @@ -174,12 +189,9 @@ pub(super) async fn reap_draining_writers( let mut active_draining_writer_ids = HashSet::with_capacity(draining_writers.len()); for writer in draining_writers { active_draining_writer_ids.insert(writer.id); - let drain_started_at_epoch_secs = writer - .draining_started_at_epoch_secs - .load(std::sync::atomic::Ordering::Relaxed); if drain_ttl_secs > 0 - && drain_started_at_epoch_secs != 0 - && now_epoch_secs.saturating_sub(drain_started_at_epoch_secs) > drain_ttl_secs + && writer.draining_started_at_epoch_secs != 0 + && now_epoch_secs.saturating_sub(writer.draining_started_at_epoch_secs) > drain_ttl_secs && should_emit_writer_warn( warn_next_allowed, writer.id, @@ -194,14 +206,12 @@ pub(super) async fn reap_draining_writers( generation = writer.generation, drain_ttl_secs, force_close_secs = pool.me_pool_force_close_secs.load(std::sync::atomic::Ordering::Relaxed), - allow_drain_fallback = writer.allow_drain_fallback.load(std::sync::atomic::Ordering::Relaxed), + allow_drain_fallback = writer.allow_drain_fallback, "ME draining writer remains non-empty past drain TTL" ); } - let deadline_epoch_secs = writer - .drain_deadline_epoch_secs - .load(std::sync::atomic::Ordering::Relaxed); - if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs { + if writer.drain_deadline_epoch_secs != 0 && now_epoch_secs >= writer.drain_deadline_epoch_secs + { warn!(writer_id = writer.id, "Drain timeout, force-closing"); force_close_writer_ids.push(writer.id); active_draining_writer_ids.remove(&writer.id); @@ -258,6 +268,18 @@ pub(super) fn health_drain_close_budget() -> usize { .clamp(HEALTH_DRAIN_CLOSE_BUDGET_MIN, HEALTH_DRAIN_CLOSE_BUDGET_MAX) } +#[derive(Debug, Clone)] +struct DrainingWriterSnapshot { + id: u64, + writer_dc: i32, + addr: SocketAddr, + generation: u64, + created_at: Instant, + draining_started_at_epoch_secs: u64, + drain_deadline_epoch_secs: u64, + allow_drain_fallback: bool, +} + fn should_emit_writer_warn( next_allowed: &mut HashMap, writer_id: u64, @@ -1391,6 +1413,15 @@ mod tests { me_pool_drain_threshold, ..GeneralConfig::default() }; + let mut proxy_map_v4 = HashMap::new(); + proxy_map_v4.insert( + 2, + vec![(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 10)), 443)], + ); + let decision = NetworkDecision { + ipv4_me: true, + ..NetworkDecision::default() + }; MePool::new( None, vec![1u8; 32], @@ -1402,10 +1433,10 @@ mod tests { None, 12, 1200, - HashMap::new(), + proxy_map_v4, HashMap::new(), None, - NetworkDecision::default(), + decision, None, Arc::new(SecureRandom::new()), Arc::new(Stats::default()), @@ -1516,8 +1547,55 @@ mod tests { conn_id } + async fn insert_live_writer(pool: &Arc, writer_id: u64, writer_dc: i32) { + let (tx, _writer_rx) = mpsc::channel::(8); + let writer = MeWriter { + id: writer_id, + addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(203, 0, 113, (writer_id as u8).saturating_add(1))), + 4000 + writer_id as u16, + ), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc, + generation: 2, + contour: Arc::new(AtomicU8::new(WriterContour::Active.as_u8())), + created_at: Instant::now(), + tx: tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(false)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new(0)), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(writer); + pool.registry.register_writer(writer_id, tx).await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + } + #[tokio::test] async fn reap_draining_writers_force_closes_oldest_over_threshold() { + let pool = make_pool(2).await; + insert_live_writer(&pool, 1, 2).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await; + let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await; + let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + let mut writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); + writer_ids.sort_unstable(); + assert_eq!(writer_ids, vec![1, 20, 30]); + assert!(pool.registry.get_writer(conn_a).await.is_none()); + assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20); + assert_eq!(pool.registry.get_writer(conn_c).await.unwrap().writer_id, 30); + } + + #[tokio::test] + async fn reap_draining_writers_force_closes_overflow_without_replacement() { let pool = make_pool(2).await; let now_epoch_secs = MePool::now_epoch_secs(); let conn_a = insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(30)).await; @@ -1527,7 +1605,8 @@ mod tests { reap_draining_writers(&pool, &mut warn_next_allowed).await; - let writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); + let mut writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); + writer_ids.sort_unstable(); assert_eq!(writer_ids, vec![20, 30]); assert!(pool.registry.get_writer(conn_a).await.is_none()); assert_eq!(pool.registry.get_writer(conn_b).await.unwrap().writer_id, 20);