diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 70b6411..476b549 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -161,6 +161,20 @@ async fn insert_draining_writer( } } +async fn wait_for_pool_empty(pool: &Arc, timeout: Duration) { + let start = Instant::now(); + loop { + if pool.writers.read().await.is_empty() { + return; + } + assert!( + start.elapsed() < timeout, + "timed out waiting for pool.writers to become empty" + ); + tokio::time::sleep(Duration::from_millis(5)).await; + } +} + #[tokio::test] async fn me_health_monitor_drains_expired_backlog_over_multiple_cycles() { let (pool, rng) = make_pool(128, 1, 1).await; @@ -178,7 +192,7 @@ async fn me_health_monitor_drains_expired_backlog_over_multiple_cycles() { } let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); - tokio::time::sleep(Duration::from_millis(60)).await; + wait_for_pool_empty(&pool, Duration::from_secs(1)).await; monitor.abort(); let _ = monitor.await; @@ -194,7 +208,7 @@ async fn me_health_monitor_cleans_empty_draining_writers_without_force_close() { } let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); - tokio::time::sleep(Duration::from_millis(30)).await; + wait_for_pool_empty(&pool, Duration::from_secs(1)).await; monitor.abort(); let _ = monitor.await; @@ -219,7 +233,7 @@ async fn me_health_monitor_converges_retry_like_threshold_backlog_to_empty() { } let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); - tokio::time::sleep(Duration::from_millis(60)).await; + wait_for_pool_empty(&pool, Duration::from_secs(1)).await; monitor.abort(); let _ = monitor.await;