From c9271d90837c03d684f734d46ee7f0d5539d1a5d Mon Sep 17 00:00:00 2001 From: David Osipov Date: Tue, 17 Mar 2026 17:11:51 +0400 Subject: [PATCH] Add health monitoring tests for draining writers - Introduced adversarial tests to validate the behavior of the health monitoring system under various conditions, including the management of draining writers. - Implemented integration tests to ensure the health monitor correctly handles expired and empty draining writers. - Added regression tests to verify the functionality of the draining writers' cleanup process, ensuring it adheres to the defined thresholds and budgets. - Updated the module structure to include the new test files for better organization and maintainability. --- src/ip_tracker_regression_tests.rs | 450 +++++++++++++++++ src/main.rs | 2 + src/transport/middle_proxy/health.rs | 77 ++- .../middle_proxy/health_adversarial_tests.rs | 437 +++++++++++++++++ .../middle_proxy/health_integration_tests.rs | 227 +++++++++ .../middle_proxy/health_regression_tests.rs | 462 ++++++++++++++++++ src/transport/middle_proxy/mod.rs | 6 + 7 files changed, 1653 insertions(+), 8 deletions(-) create mode 100644 src/ip_tracker_regression_tests.rs create mode 100644 src/transport/middle_proxy/health_adversarial_tests.rs create mode 100644 src/transport/middle_proxy/health_integration_tests.rs create mode 100644 src/transport/middle_proxy/health_regression_tests.rs diff --git a/src/ip_tracker_regression_tests.rs b/src/ip_tracker_regression_tests.rs new file mode 100644 index 0000000..5d6b358 --- /dev/null +++ b/src/ip_tracker_regression_tests.rs @@ -0,0 +1,450 @@ +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::Arc; +use std::time::Duration; + +use crate::config::UserMaxUniqueIpsMode; +use crate::ip_tracker::UserIpTracker; + +fn ip_from_idx(idx: u32) -> IpAddr { + let a = 10u8; + let b = ((idx / 65_536) % 256) as u8; + let c = ((idx / 256) % 256) as u8; + let d = (idx % 256) as u8; + IpAddr::V4(Ipv4Addr::new(a, b, c, d)) +} + +#[tokio::test] +async fn active_window_enforces_large_unique_ip_burst() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("burst_user", 64).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::ActiveWindow, 30) + .await; + + for idx in 0..64 { + assert!(tracker.check_and_add("burst_user", ip_from_idx(idx)).await.is_ok()); + } + assert!(tracker.check_and_add("burst_user", ip_from_idx(9_999)).await.is_err()); + assert_eq!(tracker.get_active_ip_count("burst_user").await, 64); +} + +#[tokio::test] +async fn global_limit_applies_across_many_users() { + let tracker = UserIpTracker::new(); + tracker.load_limits(3, &HashMap::new()).await; + + for user_idx in 0..150u32 { + let user = format!("u{}", user_idx); + assert!(tracker.check_and_add(&user, ip_from_idx(user_idx * 10)).await.is_ok()); + assert!(tracker + .check_and_add(&user, ip_from_idx(user_idx * 10 + 1)) + .await + .is_ok()); + assert!(tracker + .check_and_add(&user, ip_from_idx(user_idx * 10 + 2)) + .await + .is_ok()); + assert!(tracker + .check_and_add(&user, ip_from_idx(user_idx * 10 + 3)) + .await + .is_err()); + } + + assert_eq!(tracker.get_stats().await.len(), 150); +} + +#[tokio::test] +async fn user_zero_override_falls_back_to_global_limit() { + let tracker = UserIpTracker::new(); + let mut limits = HashMap::new(); + limits.insert("target".to_string(), 0); + tracker.load_limits(2, &limits).await; + + assert!(tracker.check_and_add("target", ip_from_idx(1)).await.is_ok()); + assert!(tracker.check_and_add("target", ip_from_idx(2)).await.is_ok()); + assert!(tracker.check_and_add("target", ip_from_idx(3)).await.is_err()); + assert_eq!(tracker.get_user_limit("target").await, Some(2)); +} + +#[tokio::test] +async fn remove_ip_is_idempotent_after_counter_reaches_zero() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("u", 2).await; + let ip = ip_from_idx(42); + + tracker.check_and_add("u", ip).await.unwrap(); + tracker.remove_ip("u", ip).await; + tracker.remove_ip("u", ip).await; + tracker.remove_ip("u", ip).await; + + assert_eq!(tracker.get_active_ip_count("u").await, 0); + assert!(!tracker.is_ip_active("u", ip).await); +} + +#[tokio::test] +async fn clear_user_ips_resets_active_and_recent() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("u", 10).await; + + for idx in 0..6 { + tracker.check_and_add("u", ip_from_idx(idx)).await.unwrap(); + } + + tracker.clear_user_ips("u").await; + + assert_eq!(tracker.get_active_ip_count("u").await, 0); + let counts = tracker + .get_recent_counts_for_users(&["u".to_string()]) + .await; + assert_eq!(counts.get("u").copied().unwrap_or(0), 0); +} + +#[tokio::test] +async fn clear_all_resets_multi_user_state() { + let tracker = UserIpTracker::new(); + + for user_idx in 0..80u32 { + let user = format!("u{}", user_idx); + for ip_idx in 0..3 { + tracker + .check_and_add(&user, ip_from_idx(user_idx * 100 + ip_idx)) + .await + .unwrap(); + } + } + + tracker.clear_all().await; + + assert!(tracker.get_stats().await.is_empty()); + let users = (0..80u32) + .map(|idx| format!("u{}", idx)) + .collect::>(); + let recent = tracker.get_recent_counts_for_users(&users).await; + assert!(recent.values().all(|count| *count == 0)); +} + +#[tokio::test] +async fn get_active_ips_for_users_are_sorted() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("user", 10).await; + + tracker + .check_and_add("user", IpAddr::V4(Ipv4Addr::new(10, 0, 0, 9))) + .await + .unwrap(); + tracker + .check_and_add("user", IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))) + .await + .unwrap(); + tracker + .check_and_add("user", IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5))) + .await + .unwrap(); + + let map = tracker + .get_active_ips_for_users(&["user".to_string()]) + .await; + let ips = map.get("user").cloned().unwrap_or_default(); + + assert_eq!( + ips, + vec![ + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 9)), + ] + ); +} + +#[tokio::test] +async fn get_recent_ips_for_users_are_sorted() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("user", 10).await; + + tracker + .check_and_add("user", IpAddr::V4(Ipv4Addr::new(10, 1, 0, 9))) + .await + .unwrap(); + tracker + .check_and_add("user", IpAddr::V4(Ipv4Addr::new(10, 1, 0, 1))) + .await + .unwrap(); + tracker + .check_and_add("user", IpAddr::V4(Ipv4Addr::new(10, 1, 0, 5))) + .await + .unwrap(); + + let map = tracker + .get_recent_ips_for_users(&["user".to_string()]) + .await; + let ips = map.get("user").cloned().unwrap_or_default(); + + assert_eq!( + ips, + vec![ + IpAddr::V4(Ipv4Addr::new(10, 1, 0, 1)), + IpAddr::V4(Ipv4Addr::new(10, 1, 0, 5)), + IpAddr::V4(Ipv4Addr::new(10, 1, 0, 9)), + ] + ); +} + +#[tokio::test] +async fn time_window_expires_for_large_rotation() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("tw", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::TimeWindow, 1) + .await; + + tracker.check_and_add("tw", ip_from_idx(1)).await.unwrap(); + tracker.remove_ip("tw", ip_from_idx(1)).await; + assert!(tracker.check_and_add("tw", ip_from_idx(2)).await.is_err()); + + tokio::time::sleep(Duration::from_millis(1_100)).await; + assert!(tracker.check_and_add("tw", ip_from_idx(2)).await.is_ok()); +} + +#[tokio::test] +async fn combined_mode_blocks_recent_after_disconnect() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("cmb", 1).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::Combined, 2) + .await; + + tracker.check_and_add("cmb", ip_from_idx(11)).await.unwrap(); + tracker.remove_ip("cmb", ip_from_idx(11)).await; + + assert!(tracker.check_and_add("cmb", ip_from_idx(12)).await.is_err()); +} + +#[tokio::test] +async fn load_limits_replaces_large_limit_map() { + let tracker = UserIpTracker::new(); + let mut first = HashMap::new(); + let mut second = HashMap::new(); + + for idx in 0..300usize { + first.insert(format!("u{}", idx), 2usize); + } + for idx in 150..450usize { + second.insert(format!("u{}", idx), 4usize); + } + + tracker.load_limits(0, &first).await; + tracker.load_limits(0, &second).await; + + assert_eq!(tracker.get_user_limit("u20").await, None); + assert_eq!(tracker.get_user_limit("u200").await, Some(4)); + assert_eq!(tracker.get_user_limit("u420").await, Some(4)); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_same_user_unique_ip_pressure_stays_bounded() { + let tracker = Arc::new(UserIpTracker::new()); + tracker.set_user_limit("hot", 32).await; + tracker + .set_limit_policy(UserMaxUniqueIpsMode::ActiveWindow, 30) + .await; + + let mut handles = Vec::new(); + for worker in 0..16u32 { + let tracker_cloned = tracker.clone(); + handles.push(tokio::spawn(async move { + let base = worker * 200; + for step in 0..200u32 { + let _ = tracker_cloned + .check_and_add("hot", ip_from_idx(base + step)) + .await; + } + })); + } + + for handle in handles { + handle.await.unwrap(); + } + + assert!(tracker.get_active_ip_count("hot").await <= 32); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_many_users_isolate_limits() { + let tracker = Arc::new(UserIpTracker::new()); + tracker.load_limits(4, &HashMap::new()).await; + + let mut handles = Vec::new(); + for user_idx in 0..120u32 { + let tracker_cloned = tracker.clone(); + handles.push(tokio::spawn(async move { + let user = format!("u{}", user_idx); + for ip_idx in 0..10u32 { + let _ = tracker_cloned + .check_and_add(&user, ip_from_idx(user_idx * 1_000 + ip_idx)) + .await; + } + })); + } + + for handle in handles { + handle.await.unwrap(); + } + + let stats = tracker.get_stats().await; + assert_eq!(stats.len(), 120); + assert!(stats.iter().all(|(_, active, limit)| *active <= 4 && *limit == 4)); +} + +#[tokio::test] +async fn same_ip_reconnect_high_frequency_keeps_single_unique() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("same", 2).await; + let ip = ip_from_idx(9); + + for _ in 0..2_000 { + tracker.check_and_add("same", ip).await.unwrap(); + } + + assert_eq!(tracker.get_active_ip_count("same").await, 1); + assert!(tracker.is_ip_active("same", ip).await); +} + +#[tokio::test] +async fn format_stats_contains_expected_limited_and_unlimited_markers() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("limited", 2).await; + tracker.check_and_add("limited", ip_from_idx(1)).await.unwrap(); + tracker.check_and_add("open", ip_from_idx(2)).await.unwrap(); + + let text = tracker.format_stats().await; + + assert!(text.contains("limited")); + assert!(text.contains("open")); + assert!(text.contains("unlimited")); +} + +#[tokio::test] +async fn stats_report_global_default_for_users_without_override() { + let tracker = UserIpTracker::new(); + tracker.load_limits(5, &HashMap::new()).await; + + tracker.check_and_add("a", ip_from_idx(1)).await.unwrap(); + tracker.check_and_add("b", ip_from_idx(2)).await.unwrap(); + + let stats = tracker.get_stats().await; + assert!(stats.iter().any(|(user, _, limit)| user == "a" && *limit == 5)); + assert!(stats.iter().any(|(user, _, limit)| user == "b" && *limit == 5)); +} + +#[tokio::test] +async fn stress_cycle_add_remove_clear_preserves_empty_end_state() { + let tracker = UserIpTracker::new(); + + for cycle in 0..50u32 { + let user = format!("cycle{}", cycle); + tracker.set_user_limit(&user, 128).await; + + for ip_idx in 0..128u32 { + tracker + .check_and_add(&user, ip_from_idx(cycle * 10_000 + ip_idx)) + .await + .unwrap(); + } + + for ip_idx in 0..128u32 { + tracker + .remove_ip(&user, ip_from_idx(cycle * 10_000 + ip_idx)) + .await; + } + + tracker.clear_user_ips(&user).await; + } + + assert!(tracker.get_stats().await.is_empty()); +} + +#[tokio::test] +async fn remove_unknown_user_or_ip_does_not_corrupt_state() { + let tracker = UserIpTracker::new(); + + tracker.remove_ip("no_user", ip_from_idx(1)).await; + tracker.check_and_add("x", ip_from_idx(2)).await.unwrap(); + tracker.remove_ip("x", ip_from_idx(3)).await; + + assert_eq!(tracker.get_active_ip_count("x").await, 1); + assert!(tracker.is_ip_active("x", ip_from_idx(2)).await); +} + +#[tokio::test] +async fn active_and_recent_views_match_after_mixed_workload() { + let tracker = UserIpTracker::new(); + tracker.set_user_limit("mix", 16).await; + + for ip_idx in 0..12u32 { + tracker.check_and_add("mix", ip_from_idx(ip_idx)).await.unwrap(); + } + for ip_idx in 0..6u32 { + tracker.remove_ip("mix", ip_from_idx(ip_idx)).await; + } + + let active = tracker + .get_active_ips_for_users(&["mix".to_string()]) + .await + .get("mix") + .cloned() + .unwrap_or_default(); + let recent_count = tracker + .get_recent_counts_for_users(&["mix".to_string()]) + .await + .get("mix") + .copied() + .unwrap_or(0); + + assert_eq!(active.len(), 6); + assert!(recent_count >= active.len()); + assert!(recent_count <= 12); +} + +#[tokio::test] +async fn global_limit_switch_updates_enforcement_immediately() { + let tracker = UserIpTracker::new(); + tracker.load_limits(2, &HashMap::new()).await; + + assert!(tracker.check_and_add("u", ip_from_idx(1)).await.is_ok()); + assert!(tracker.check_and_add("u", ip_from_idx(2)).await.is_ok()); + assert!(tracker.check_and_add("u", ip_from_idx(3)).await.is_err()); + + tracker.clear_user_ips("u").await; + tracker.load_limits(4, &HashMap::new()).await; + + assert!(tracker.check_and_add("u", ip_from_idx(1)).await.is_ok()); + assert!(tracker.check_and_add("u", ip_from_idx(2)).await.is_ok()); + assert!(tracker.check_and_add("u", ip_from_idx(3)).await.is_ok()); + assert!(tracker.check_and_add("u", ip_from_idx(4)).await.is_ok()); + assert!(tracker.check_and_add("u", ip_from_idx(5)).await.is_err()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_reconnect_and_disconnect_preserves_non_negative_counts() { + let tracker = Arc::new(UserIpTracker::new()); + tracker.set_user_limit("cc", 8).await; + + let mut handles = Vec::new(); + for worker in 0..8u32 { + let tracker_cloned = tracker.clone(); + handles.push(tokio::spawn(async move { + let ip = ip_from_idx(50 + worker); + for _ in 0..500u32 { + let _ = tracker_cloned.check_and_add("cc", ip).await; + tracker_cloned.remove_ip("cc", ip).await; + } + })); + } + + for handle in handles { + handle.await.unwrap(); + } + + assert!(tracker.get_active_ip_count("cc").await <= 8); +} diff --git a/src/main.rs b/src/main.rs index 73ada8c..2cfbe28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,8 @@ mod config; mod crypto; mod error; mod ip_tracker; +#[cfg(test)] +mod ip_tracker_regression_tests; mod maestro; mod metrics; mod network; diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index e5f4260..8ac6839 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -25,6 +25,9 @@ const HEALTH_RECONNECT_BUDGET_PER_CORE: usize = 2; const HEALTH_RECONNECT_BUDGET_PER_DC: usize = 1; const HEALTH_RECONNECT_BUDGET_MIN: usize = 4; const HEALTH_RECONNECT_BUDGET_MAX: usize = 128; +const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16; +const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; +const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -111,7 +114,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c } } -async fn reap_draining_writers( +pub(super) async fn reap_draining_writers( pool: &Arc, warn_next_allowed: &mut HashMap, ) { @@ -122,14 +125,22 @@ async fn reap_draining_writers( .me_pool_drain_threshold .load(std::sync::atomic::Ordering::Relaxed); let writers = pool.writers.read().await.clone(); + let activity = pool.registry.writer_activity_snapshot().await; let mut draining_writers = Vec::new(); + let mut empty_writer_ids = Vec::::new(); + let mut force_close_writer_ids = Vec::::new(); for writer in writers { if !writer.draining.load(std::sync::atomic::Ordering::Relaxed) { continue; } - let is_empty = pool.registry.is_writer_empty(writer.id).await; - if is_empty { - pool.remove_writer_and_close_clients(writer.id).await; + if activity + .bound_clients_by_writer + .get(&writer.id) + .copied() + .unwrap_or(0) + == 0 + { + empty_writer_ids.push(writer.id); continue; } draining_writers.push(writer); @@ -156,12 +167,13 @@ async fn reap_draining_writers( "ME draining writer threshold exceeded, force-closing oldest draining writers" ); for writer in draining_writers.drain(..overflow) { - pool.stats.increment_pool_force_close_total(); - pool.remove_writer_and_close_clients(writer.id).await; + force_close_writer_ids.push(writer.id); } } + let mut active_draining_writer_ids = HashSet::with_capacity(draining_writers.len()); for writer in draining_writers { + active_draining_writer_ids.insert(writer.id); let drain_started_at_epoch_secs = writer .draining_started_at_epoch_secs .load(std::sync::atomic::Ordering::Relaxed); @@ -191,10 +203,59 @@ async fn reap_draining_writers( .load(std::sync::atomic::Ordering::Relaxed); if deadline_epoch_secs != 0 && now_epoch_secs >= deadline_epoch_secs { warn!(writer_id = writer.id, "Drain timeout, force-closing"); - pool.stats.increment_pool_force_close_total(); - pool.remove_writer_and_close_clients(writer.id).await; + force_close_writer_ids.push(writer.id); + active_draining_writer_ids.remove(&writer.id); } } + + warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); + + let close_budget = health_drain_close_budget(); + let requested_force_close = force_close_writer_ids.len(); + let requested_empty_close = empty_writer_ids.len(); + let requested_close_total = requested_force_close.saturating_add(requested_empty_close); + let mut closed_writer_ids = HashSet::::new(); + let mut closed_total = 0usize; + for writer_id in force_close_writer_ids { + if closed_total >= close_budget { + break; + } + if !closed_writer_ids.insert(writer_id) { + continue; + } + pool.stats.increment_pool_force_close_total(); + pool.remove_writer_and_close_clients(writer_id).await; + closed_total = closed_total.saturating_add(1); + } + for writer_id in empty_writer_ids { + if closed_total >= close_budget { + break; + } + if !closed_writer_ids.insert(writer_id) { + continue; + } + pool.remove_writer_and_close_clients(writer_id).await; + closed_total = closed_total.saturating_add(1); + } + + let pending_close_total = requested_close_total.saturating_sub(closed_total); + if pending_close_total > 0 { + warn!( + close_budget, + closed_total, + pending_close_total, + "ME draining close backlog deferred to next health cycle" + ); + } +} + +pub(super) fn health_drain_close_budget() -> usize { + let cpu_cores = std::thread::available_parallelism() + .map(std::num::NonZeroUsize::get) + .unwrap_or(1); + cpu_cores + .saturating_mul(HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE) + .clamp(HEALTH_DRAIN_CLOSE_BUDGET_MIN, HEALTH_DRAIN_CLOSE_BUDGET_MAX) } fn should_emit_writer_warn( diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs new file mode 100644 index 0000000..675005a --- /dev/null +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -0,0 +1,437 @@ +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use super::codec::WriterCommand; +use super::health::{health_drain_close_budget, reap_draining_writers}; +use super::pool::{MePool, MeWriter, WriterContour}; +use super::registry::ConnMeta; +use super::me_health_monitor; +use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; +use crate::crypto::SecureRandom; +use crate::network::probe::NetworkDecision; +use crate::stats::Stats; + +async fn make_pool( + me_pool_drain_threshold: u64, + me_health_interval_ms_unhealthy: u64, + me_health_interval_ms_healthy: u64, +) -> (Arc, Arc) { + let general = GeneralConfig { + me_pool_drain_threshold, + me_health_interval_ms_unhealthy, + me_health_interval_ms_healthy, + ..GeneralConfig::default() + }; + + let rng = Arc::new(SecureRandom::new()); + let pool = MePool::new( + None, + vec![1u8; 32], + None, + false, + None, + Vec::new(), + 1, + None, + 12, + 1200, + HashMap::new(), + HashMap::new(), + None, + NetworkDecision::default(), + None, + rng.clone(), + Arc::new(Stats::default()), + general.me_keepalive_enabled, + general.me_keepalive_interval_secs, + general.me_keepalive_jitter_secs, + general.me_keepalive_payload_random, + general.rpc_proxy_req_every, + general.me_warmup_stagger_enabled, + general.me_warmup_step_delay_ms, + general.me_warmup_step_jitter_ms, + general.me_reconnect_max_concurrent_per_dc, + general.me_reconnect_backoff_base_ms, + general.me_reconnect_backoff_cap_ms, + general.me_reconnect_fast_retry_count, + general.me_single_endpoint_shadow_writers, + general.me_single_endpoint_outage_mode_enabled, + general.me_single_endpoint_outage_disable_quarantine, + general.me_single_endpoint_outage_backoff_min_ms, + general.me_single_endpoint_outage_backoff_max_ms, + general.me_single_endpoint_shadow_rotate_every_secs, + general.me_floor_mode, + general.me_adaptive_floor_idle_secs, + general.me_adaptive_floor_min_writers_single_endpoint, + general.me_adaptive_floor_min_writers_multi_endpoint, + general.me_adaptive_floor_recover_grace_secs, + general.me_adaptive_floor_writers_per_core_total, + general.me_adaptive_floor_cpu_cores_override, + general.me_adaptive_floor_max_extra_writers_single_per_core, + general.me_adaptive_floor_max_extra_writers_multi_per_core, + general.me_adaptive_floor_max_active_writers_per_core, + general.me_adaptive_floor_max_warm_writers_per_core, + general.me_adaptive_floor_max_active_writers_global, + general.me_adaptive_floor_max_warm_writers_global, + general.hardswap, + general.me_pool_drain_ttl_secs, + general.me_pool_drain_threshold, + general.effective_me_pool_force_close_secs(), + general.me_pool_min_fresh_ratio, + general.me_hardswap_warmup_delay_min_ms, + general.me_hardswap_warmup_delay_max_ms, + general.me_hardswap_warmup_extra_passes, + general.me_hardswap_warmup_pass_backoff_base_ms, + general.me_bind_stale_mode, + general.me_bind_stale_ttl_secs, + general.me_secret_atomic_snapshot, + general.me_deterministic_writer_sort, + MeWriterPickMode::default(), + general.me_writer_pick_sample_size, + MeSocksKdfPolicy::default(), + general.me_writer_cmd_channel_capacity, + general.me_route_channel_capacity, + general.me_route_backpressure_base_timeout_ms, + general.me_route_backpressure_high_timeout_ms, + general.me_route_backpressure_high_watermark_pct, + general.me_reader_route_data_wait_ms, + general.me_health_interval_ms_unhealthy, + general.me_health_interval_ms_healthy, + general.me_warn_rate_limit_ms, + MeRouteNoWriterMode::default(), + general.me_route_no_writer_wait_ms, + general.me_route_inline_recovery_attempts, + general.me_route_inline_recovery_wait_ms, + ); + + (pool, rng) +} + +async fn insert_draining_writer( + pool: &Arc, + writer_id: u64, + drain_started_at_epoch_secs: u64, + bound_clients: usize, + drain_deadline_epoch_secs: u64, +) { + let (tx, _writer_rx) = mpsc::channel::(8); + let writer = MeWriter { + id: writer_id, + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6000 + writer_id as u16), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(writer_id), + tx: tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new(drain_started_at_epoch_secs)), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(drain_deadline_epoch_secs)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + + pool.writers.write().await.push(writer); + pool.registry.register_writer(writer_id, tx).await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + + for idx in 0..bound_clients { + let (conn_id, _rx) = pool.registry.register().await; + assert!( + pool.registry + .bind_writer( + conn_id, + writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 8000 + idx as u16, + ), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + } +} + +async fn writer_count(pool: &Arc) -> usize { + pool.writers.read().await.len() +} + +async fn sorted_writer_ids(pool: &Arc) -> Vec { + let mut ids = pool + .writers + .read() + .await + .iter() + .map(|writer| writer.id) + .collect::>(); + ids.sort_unstable(); + ids +} + +#[tokio::test] +async fn reap_draining_writers_clears_warn_state_when_pool_empty() { + let (pool, _rng) = make_pool(128, 1, 1).await; + let mut warn_next_allowed = HashMap::new(); + warn_next_allowed.insert(11, Instant::now() + Duration::from_secs(5)); + warn_next_allowed.insert(22, Instant::now() + Duration::from_secs(5)); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert!(warn_next_allowed.is_empty()); +} + +#[tokio::test] +async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycles() { + let threshold = 3u64; + let (pool, _rng) = make_pool(threshold, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + for writer_id in 1..=60u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(600).saturating_add(writer_id), + 1, + 0, + ) + .await; + } + + let mut warn_next_allowed = HashMap::new(); + for _ in 0..64 { + reap_draining_writers(&pool, &mut warn_next_allowed).await; + if writer_count(&pool).await <= threshold as usize { + break; + } + } + + assert_eq!(writer_count(&pool).await, threshold as usize); + assert_eq!(sorted_writer_ids(&pool).await, vec![58, 59, 60]); +} + +#[tokio::test] +async fn reap_draining_writers_handles_large_empty_writer_population() { + let (pool, _rng) = make_pool(128, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let total = health_drain_close_budget().saturating_mul(3).saturating_add(27); + + for writer_id in 1..=total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(120), + 0, + 0, + ) + .await; + } + + let mut warn_next_allowed = HashMap::new(); + for _ in 0..24 { + if writer_count(&pool).await == 0 { + break; + } + reap_draining_writers(&pool, &mut warn_next_allowed).await; + } + + assert_eq!(writer_count(&pool).await, 0); +} + +#[tokio::test] +async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_growth() { + let (pool, _rng) = make_pool(128, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let total = health_drain_close_budget().saturating_mul(4).saturating_add(31); + + for writer_id in 1..=total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(180), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + } + + let mut warn_next_allowed = HashMap::new(); + for _ in 0..40 { + if writer_count(&pool).await == 0 { + break; + } + reap_draining_writers(&pool, &mut warn_next_allowed).await; + } + + assert_eq!(writer_count(&pool).await, 0); +} + +#[tokio::test] +async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_churn() { + let (pool, _rng) = make_pool(128, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let mut warn_next_allowed = HashMap::new(); + + for wave in 0..40u64 { + for offset in 0..8u64 { + insert_draining_writer( + &pool, + wave * 100 + offset, + now_epoch_secs.saturating_sub(400 + offset), + 1, + 0, + ) + .await; + } + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + assert!(warn_next_allowed.len() <= writer_count(&pool).await); + + let ids = sorted_writer_ids(&pool).await; + for writer_id in ids.into_iter().take(3) { + let _ = pool.remove_writer_and_close_clients(writer_id).await; + } + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + assert!(warn_next_allowed.len() <= writer_count(&pool).await); + } +} + +#[tokio::test] +async fn reap_draining_writers_budgeted_cleanup_never_increases_pool_size() { + let (pool, _rng) = make_pool(5, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + for writer_id in 1..=200u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(240).saturating_add(writer_id), + 1, + 0, + ) + .await; + } + + let mut warn_next_allowed = HashMap::new(); + let mut previous = writer_count(&pool).await; + for _ in 0..32 { + reap_draining_writers(&pool, &mut warn_next_allowed).await; + let current = writer_count(&pool).await; + assert!(current <= previous); + previous = current; + } +} + +#[tokio::test] +async fn me_health_monitor_converges_to_threshold_under_live_injection_churn() { + let threshold = 7u64; + let (pool, rng) = make_pool(threshold, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + for writer_id in 1..=40u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(300).saturating_add(writer_id), + 1, + 0, + ) + .await; + } + + let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); + + for wave in 0..8u64 { + for offset in 0..10u64 { + insert_draining_writer( + &pool, + 1000 + wave * 100 + offset, + now_epoch_secs.saturating_sub(120).saturating_add(offset), + 1, + 0, + ) + .await; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + + tokio::time::sleep(Duration::from_millis(120)).await; + monitor.abort(); + let _ = monitor.await; + + assert!(writer_count(&pool).await <= threshold as usize); +} + +#[tokio::test] +async fn me_health_monitor_drains_deadline_storm_with_budgeted_progress() { + let (pool, rng) = make_pool(128, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + for writer_id in 1..=220u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(120), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + } + + let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); + tokio::time::sleep(Duration::from_millis(120)).await; + monitor.abort(); + let _ = monitor.await; + + assert_eq!(writer_count(&pool).await, 0); +} + +#[tokio::test] +async fn me_health_monitor_eliminates_mixed_empty_and_deadline_backlog() { + let threshold = 12u64; + let (pool, rng) = make_pool(threshold, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + + for writer_id in 1..=180u64 { + let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 }; + let deadline = if writer_id % 2 == 0 { + now_epoch_secs.saturating_sub(1) + } else { + 0 + }; + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(250).saturating_add(writer_id), + bound_clients, + deadline, + ) + .await; + } + + let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); + tokio::time::sleep(Duration::from_millis(140)).await; + monitor.abort(); + let _ = monitor.await; + + assert!(writer_count(&pool).await <= threshold as usize); +} + +#[test] +fn health_drain_close_budget_is_within_expected_bounds() { + let budget = health_drain_close_budget(); + assert!((16..=256).contains(&budget)); +} diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs new file mode 100644 index 0000000..70b6411 --- /dev/null +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -0,0 +1,227 @@ +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use super::codec::WriterCommand; +use super::health::health_drain_close_budget; +use super::pool::{MePool, MeWriter, WriterContour}; +use super::registry::ConnMeta; +use super::me_health_monitor; +use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; +use crate::crypto::SecureRandom; +use crate::network::probe::NetworkDecision; +use crate::stats::Stats; + +async fn make_pool( + me_pool_drain_threshold: u64, + me_health_interval_ms_unhealthy: u64, + me_health_interval_ms_healthy: u64, +) -> (Arc, Arc) { + let general = GeneralConfig { + me_pool_drain_threshold, + me_health_interval_ms_unhealthy, + me_health_interval_ms_healthy, + ..GeneralConfig::default() + }; + let rng = Arc::new(SecureRandom::new()); + let pool = MePool::new( + None, + vec![1u8; 32], + None, + false, + None, + Vec::new(), + 1, + None, + 12, + 1200, + HashMap::new(), + HashMap::new(), + None, + NetworkDecision::default(), + None, + rng.clone(), + Arc::new(Stats::default()), + general.me_keepalive_enabled, + general.me_keepalive_interval_secs, + general.me_keepalive_jitter_secs, + general.me_keepalive_payload_random, + general.rpc_proxy_req_every, + general.me_warmup_stagger_enabled, + general.me_warmup_step_delay_ms, + general.me_warmup_step_jitter_ms, + general.me_reconnect_max_concurrent_per_dc, + general.me_reconnect_backoff_base_ms, + general.me_reconnect_backoff_cap_ms, + general.me_reconnect_fast_retry_count, + general.me_single_endpoint_shadow_writers, + general.me_single_endpoint_outage_mode_enabled, + general.me_single_endpoint_outage_disable_quarantine, + general.me_single_endpoint_outage_backoff_min_ms, + general.me_single_endpoint_outage_backoff_max_ms, + general.me_single_endpoint_shadow_rotate_every_secs, + general.me_floor_mode, + general.me_adaptive_floor_idle_secs, + general.me_adaptive_floor_min_writers_single_endpoint, + general.me_adaptive_floor_min_writers_multi_endpoint, + general.me_adaptive_floor_recover_grace_secs, + general.me_adaptive_floor_writers_per_core_total, + general.me_adaptive_floor_cpu_cores_override, + general.me_adaptive_floor_max_extra_writers_single_per_core, + general.me_adaptive_floor_max_extra_writers_multi_per_core, + general.me_adaptive_floor_max_active_writers_per_core, + general.me_adaptive_floor_max_warm_writers_per_core, + general.me_adaptive_floor_max_active_writers_global, + general.me_adaptive_floor_max_warm_writers_global, + general.hardswap, + general.me_pool_drain_ttl_secs, + general.me_pool_drain_threshold, + general.effective_me_pool_force_close_secs(), + general.me_pool_min_fresh_ratio, + general.me_hardswap_warmup_delay_min_ms, + general.me_hardswap_warmup_delay_max_ms, + general.me_hardswap_warmup_extra_passes, + general.me_hardswap_warmup_pass_backoff_base_ms, + general.me_bind_stale_mode, + general.me_bind_stale_ttl_secs, + general.me_secret_atomic_snapshot, + general.me_deterministic_writer_sort, + MeWriterPickMode::default(), + general.me_writer_pick_sample_size, + MeSocksKdfPolicy::default(), + general.me_writer_cmd_channel_capacity, + general.me_route_channel_capacity, + general.me_route_backpressure_base_timeout_ms, + general.me_route_backpressure_high_timeout_ms, + general.me_route_backpressure_high_watermark_pct, + general.me_reader_route_data_wait_ms, + general.me_health_interval_ms_unhealthy, + general.me_health_interval_ms_healthy, + general.me_warn_rate_limit_ms, + MeRouteNoWriterMode::default(), + general.me_route_no_writer_wait_ms, + general.me_route_inline_recovery_attempts, + general.me_route_inline_recovery_wait_ms, + ); + (pool, rng) +} + +async fn insert_draining_writer( + pool: &Arc, + writer_id: u64, + drain_started_at_epoch_secs: u64, + bound_clients: usize, + drain_deadline_epoch_secs: u64, +) { + let (tx, _writer_rx) = mpsc::channel::(8); + let writer = MeWriter { + id: writer_id, + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5500 + writer_id as u16), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(writer_id), + tx: tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new(drain_started_at_epoch_secs)), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(drain_deadline_epoch_secs)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(writer); + pool.registry.register_writer(writer_id, tx).await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + for idx in 0..bound_clients { + let (conn_id, _rx) = pool.registry.register().await; + assert!( + pool.registry + .bind_writer( + conn_id, + writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 7200 + idx as u16, + ), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + } +} + +#[tokio::test] +async fn me_health_monitor_drains_expired_backlog_over_multiple_cycles() { + let (pool, rng) = make_pool(128, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let writer_total = health_drain_close_budget().saturating_mul(2).saturating_add(9); + for writer_id in 1..=writer_total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(120), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + } + + let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); + tokio::time::sleep(Duration::from_millis(60)).await; + monitor.abort(); + let _ = monitor.await; + + assert!(pool.writers.read().await.is_empty()); +} + +#[tokio::test] +async fn me_health_monitor_cleans_empty_draining_writers_without_force_close() { + let (pool, rng) = make_pool(128, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + for writer_id in 1..=24u64 { + insert_draining_writer(&pool, writer_id, now_epoch_secs.saturating_sub(60), 0, 0).await; + } + + let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); + tokio::time::sleep(Duration::from_millis(30)).await; + monitor.abort(); + let _ = monitor.await; + + assert!(pool.writers.read().await.is_empty()); +} + +#[tokio::test] +async fn me_health_monitor_converges_retry_like_threshold_backlog_to_empty() { + let threshold = 4u64; + let (pool, rng) = make_pool(threshold, 1, 1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let writer_total = threshold as usize + health_drain_close_budget().saturating_add(11); + for writer_id in 1..=writer_total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(300).saturating_add(writer_id), + 1, + 0, + ) + .await; + } + + let monitor = tokio::spawn(me_health_monitor(pool.clone(), rng, 0)); + tokio::time::sleep(Duration::from_millis(60)).await; + monitor.abort(); + let _ = monitor.await; + + assert!(pool.writers.read().await.is_empty()); +} diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs new file mode 100644 index 0000000..05a8e6a --- /dev/null +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -0,0 +1,462 @@ +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use super::codec::WriterCommand; +use super::health::{health_drain_close_budget, reap_draining_writers}; +use super::pool::{MePool, MeWriter, WriterContour}; +use super::registry::ConnMeta; +use crate::config::{GeneralConfig, MeRouteNoWriterMode, MeSocksKdfPolicy, MeWriterPickMode}; +use crate::crypto::SecureRandom; +use crate::network::probe::NetworkDecision; +use crate::stats::Stats; + +async fn make_pool(me_pool_drain_threshold: u64) -> Arc { + let general = GeneralConfig { + me_pool_drain_threshold, + ..GeneralConfig::default() + }; + + MePool::new( + None, + vec![1u8; 32], + None, + false, + None, + Vec::new(), + 1, + None, + 12, + 1200, + HashMap::new(), + HashMap::new(), + None, + NetworkDecision::default(), + None, + Arc::new(SecureRandom::new()), + Arc::new(Stats::default()), + general.me_keepalive_enabled, + general.me_keepalive_interval_secs, + general.me_keepalive_jitter_secs, + general.me_keepalive_payload_random, + general.rpc_proxy_req_every, + general.me_warmup_stagger_enabled, + general.me_warmup_step_delay_ms, + general.me_warmup_step_jitter_ms, + general.me_reconnect_max_concurrent_per_dc, + general.me_reconnect_backoff_base_ms, + general.me_reconnect_backoff_cap_ms, + general.me_reconnect_fast_retry_count, + general.me_single_endpoint_shadow_writers, + general.me_single_endpoint_outage_mode_enabled, + general.me_single_endpoint_outage_disable_quarantine, + general.me_single_endpoint_outage_backoff_min_ms, + general.me_single_endpoint_outage_backoff_max_ms, + general.me_single_endpoint_shadow_rotate_every_secs, + general.me_floor_mode, + general.me_adaptive_floor_idle_secs, + general.me_adaptive_floor_min_writers_single_endpoint, + general.me_adaptive_floor_min_writers_multi_endpoint, + general.me_adaptive_floor_recover_grace_secs, + general.me_adaptive_floor_writers_per_core_total, + general.me_adaptive_floor_cpu_cores_override, + general.me_adaptive_floor_max_extra_writers_single_per_core, + general.me_adaptive_floor_max_extra_writers_multi_per_core, + general.me_adaptive_floor_max_active_writers_per_core, + general.me_adaptive_floor_max_warm_writers_per_core, + general.me_adaptive_floor_max_active_writers_global, + general.me_adaptive_floor_max_warm_writers_global, + general.hardswap, + general.me_pool_drain_ttl_secs, + general.me_pool_drain_threshold, + general.effective_me_pool_force_close_secs(), + general.me_pool_min_fresh_ratio, + general.me_hardswap_warmup_delay_min_ms, + general.me_hardswap_warmup_delay_max_ms, + general.me_hardswap_warmup_extra_passes, + general.me_hardswap_warmup_pass_backoff_base_ms, + general.me_bind_stale_mode, + general.me_bind_stale_ttl_secs, + general.me_secret_atomic_snapshot, + general.me_deterministic_writer_sort, + MeWriterPickMode::default(), + general.me_writer_pick_sample_size, + MeSocksKdfPolicy::default(), + general.me_writer_cmd_channel_capacity, + general.me_route_channel_capacity, + general.me_route_backpressure_base_timeout_ms, + general.me_route_backpressure_high_timeout_ms, + general.me_route_backpressure_high_watermark_pct, + general.me_reader_route_data_wait_ms, + general.me_health_interval_ms_unhealthy, + general.me_health_interval_ms_healthy, + general.me_warn_rate_limit_ms, + MeRouteNoWriterMode::default(), + general.me_route_no_writer_wait_ms, + general.me_route_inline_recovery_attempts, + general.me_route_inline_recovery_wait_ms, + ) +} + +async fn insert_draining_writer( + pool: &Arc, + writer_id: u64, + drain_started_at_epoch_secs: u64, + bound_clients: usize, + drain_deadline_epoch_secs: u64, +) -> Vec { + let mut conn_ids = Vec::with_capacity(bound_clients); + let (tx, _writer_rx) = mpsc::channel::(8); + let writer = MeWriter { + id: writer_id, + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4500 + writer_id as u16), + source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), + writer_dc: 2, + generation: 1, + contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), + created_at: Instant::now() - Duration::from_secs(writer_id), + tx: tx.clone(), + cancel: CancellationToken::new(), + degraded: Arc::new(AtomicBool::new(false)), + rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), + draining: Arc::new(AtomicBool::new(true)), + draining_started_at_epoch_secs: Arc::new(AtomicU64::new(drain_started_at_epoch_secs)), + drain_deadline_epoch_secs: Arc::new(AtomicU64::new(drain_deadline_epoch_secs)), + allow_drain_fallback: Arc::new(AtomicBool::new(false)), + }; + pool.writers.write().await.push(writer); + pool.registry.register_writer(writer_id, tx).await; + pool.conn_count.fetch_add(1, Ordering::Relaxed); + for idx in 0..bound_clients { + let (conn_id, _rx) = pool.registry.register().await; + assert!( + pool.registry + .bind_writer( + conn_id, + writer_id, + ConnMeta { + target_dc: 2, + client_addr: SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + 6200 + idx as u16, + ), + our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), + proto_flags: 0, + }, + ) + .await + ); + conn_ids.push(conn_id); + } + conn_ids +} + +async fn current_writer_ids(pool: &Arc) -> Vec { + let mut writer_ids = pool + .writers + .read() + .await + .iter() + .map(|writer| writer.id) + .collect::>(); + writer_ids.sort_unstable(); + writer_ids +} + +#[tokio::test] +async fn reap_draining_writers_drops_warn_state_for_removed_writer() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let conn_ids = + insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + assert!(warn_next_allowed.contains_key(&7)); + + let _ = pool.remove_writer_and_close_clients(7).await; + assert!(pool.registry.get_writer(conn_ids[0]).await.is_none()); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + assert!(!warn_next_allowed.contains_key(&7)); +} + +#[tokio::test] +async fn reap_draining_writers_removes_empty_draining_writers() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(40), 0, 0).await; + insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(30), 0, 0).await; + insert_draining_writer(&pool, 3, now_epoch_secs.saturating_sub(20), 1, 0).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert_eq!(current_writer_ids(&pool).await, vec![3]); +} + +#[tokio::test] +async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { + let pool = make_pool(2).await; + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 11, now_epoch_secs.saturating_sub(40), 1, 0).await; + insert_draining_writer(&pool, 22, now_epoch_secs.saturating_sub(30), 1, 0).await; + insert_draining_writer(&pool, 33, now_epoch_secs.saturating_sub(20), 1, 0).await; + insert_draining_writer(&pool, 44, now_epoch_secs.saturating_sub(10), 1, 0).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert_eq!(current_writer_ids(&pool).await, vec![33, 44]); +} + +#[tokio::test] +async fn reap_draining_writers_deadline_force_close_applies_under_threshold() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer( + &pool, + 50, + now_epoch_secs.saturating_sub(15), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert!(current_writer_ids(&pool).await.is_empty()); +} + +#[tokio::test] +async fn reap_draining_writers_limits_closes_per_health_tick() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let close_budget = health_drain_close_budget(); + let writer_total = close_budget.saturating_add(19); + for writer_id in 1..=writer_total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(20), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + } + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert_eq!(pool.writers.read().await.len(), writer_total - close_budget); +} + +#[tokio::test] +async fn reap_draining_writers_backlog_drains_across_ticks() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let close_budget = health_drain_close_budget(); + let writer_total = close_budget.saturating_mul(2).saturating_add(7); + for writer_id in 1..=writer_total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(20), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + } + let mut warn_next_allowed = HashMap::new(); + + for _ in 0..8 { + if pool.writers.read().await.is_empty() { + break; + } + reap_draining_writers(&pool, &mut warn_next_allowed).await; + } + + assert!(pool.writers.read().await.is_empty()); +} + +#[tokio::test] +async fn reap_draining_writers_threshold_backlog_converges_to_threshold() { + let threshold = 5u64; + let pool = make_pool(threshold).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let close_budget = health_drain_close_budget(); + let writer_total = threshold as usize + close_budget.saturating_add(12); + for writer_id in 1..=writer_total as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(200).saturating_add(writer_id), + 1, + 0, + ) + .await; + } + let mut warn_next_allowed = HashMap::new(); + + for _ in 0..16 { + reap_draining_writers(&pool, &mut warn_next_allowed).await; + if pool.writers.read().await.len() <= threshold as usize { + break; + } + } + + assert_eq!(pool.writers.read().await.len(), threshold as usize); +} + +#[tokio::test] +async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_writers() { + let pool = make_pool(0).await; + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 10, now_epoch_secs.saturating_sub(40), 1, 0).await; + insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(30), 1, 0).await; + insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(20), 1, 0).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert_eq!(current_writer_ids(&pool).await, vec![10, 20, 30]); +} + +#[tokio::test] +async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let close_budget = health_drain_close_budget(); + for writer_id in 1..=close_budget as u64 { + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(20), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + } + let empty_writer_id = close_budget as u64 + 1; + insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); +} + +#[tokio::test] +async fn reap_draining_writers_empty_cleanup_does_not_increment_force_close_metric() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(60), 0, 0).await; + insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(50), 0, 0).await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert!(current_writer_ids(&pool).await.is_empty()); + assert_eq!(pool.stats.get_pool_force_close_total(), 0); +} + +#[tokio::test] +async fn reap_draining_writers_handles_duplicate_force_close_requests_for_same_writer() { + let pool = make_pool(1).await; + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer( + &pool, + 10, + now_epoch_secs.saturating_sub(30), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + insert_draining_writer( + &pool, + 20, + now_epoch_secs.saturating_sub(20), + 1, + now_epoch_secs.saturating_sub(1), + ) + .await; + let mut warn_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed).await; + + assert!(current_writer_ids(&pool).await.is_empty()); +} + +#[tokio::test] +async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population_under_churn() { + let pool = make_pool(128).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let mut warn_next_allowed = HashMap::new(); + + for wave in 0..12u64 { + for offset in 0..9u64 { + insert_draining_writer( + &pool, + wave * 100 + offset, + now_epoch_secs.saturating_sub(120 + offset), + 1, + 0, + ) + .await; + } + reap_draining_writers(&pool, &mut warn_next_allowed).await; + assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); + + let existing_writer_ids = current_writer_ids(&pool).await; + for writer_id in existing_writer_ids.into_iter().take(4) { + let _ = pool.remove_writer_and_close_clients(writer_id).await; + } + reap_draining_writers(&pool, &mut warn_next_allowed).await; + assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); + } +} + +#[tokio::test] +async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_state() { + let pool = make_pool(6).await; + let now_epoch_secs = MePool::now_epoch_secs(); + let mut warn_next_allowed = HashMap::new(); + + for writer_id in 1..=18u64 { + let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 }; + let deadline = if writer_id % 2 == 0 { + now_epoch_secs.saturating_sub(1) + } else { + 0 + }; + insert_draining_writer( + &pool, + writer_id, + now_epoch_secs.saturating_sub(300).saturating_add(writer_id), + bound_clients, + deadline, + ) + .await; + } + + for _ in 0..16 { + reap_draining_writers(&pool, &mut warn_next_allowed).await; + if pool.writers.read().await.len() <= 6 { + break; + } + } + + assert!(pool.writers.read().await.len() <= 6); + assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); +} + +#[test] +fn general_config_default_drain_threshold_remains_enabled() { + assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); +} diff --git a/src/transport/middle_proxy/mod.rs b/src/transport/middle_proxy/mod.rs index 92e222d..590c996 100644 --- a/src/transport/middle_proxy/mod.rs +++ b/src/transport/middle_proxy/mod.rs @@ -21,6 +21,12 @@ mod secret; mod selftest; mod wire; mod pool_status; +#[cfg(test)] +mod health_regression_tests; +#[cfg(test)] +mod health_integration_tests; +#[cfg(test)] +mod health_adversarial_tests; use bytes::Bytes;