From 41d786cc11c791a13c897e1b42489804c00b8dff Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 25 Mar 2026 16:29:35 +0300 Subject: [PATCH] Safety Gates Invariants + HybridAsyncPersistent + Watch + Runtime Snapshots + ME Writer Ping Tracker + Parallel Recovery + Backpressure Guardrails Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/hot_reload.rs | 3 + src/maestro/me_startup.rs | 2 + src/metrics.rs | 14 ++ src/stats/mod.rs | 10 + src/transport/middle_proxy/config_updater.rs | 47 ---- src/transport/middle_proxy/health.rs | 33 ++- src/transport/middle_proxy/pool.rs | 201 +++++++++++------- src/transport/middle_proxy/pool_config.rs | 2 +- src/transport/middle_proxy/pool_refill.rs | 23 ++ src/transport/middle_proxy/pool_writer.rs | 45 ++-- src/transport/middle_proxy/reader.rs | 12 +- src/transport/middle_proxy/registry.rs | 42 +++- src/transport/middle_proxy/send.rs | 122 ++++++++++- .../tests/health_adversarial_tests.rs | 2 + .../tests/health_integration_tests.rs | 2 + .../tests/health_regression_tests.rs | 2 + .../tests/pool_refill_security_tests.rs | 2 + .../tests/pool_writer_security_tests.rs | 2 + .../tests/send_adversarial_tests.rs | 2 + 19 files changed, 384 insertions(+), 184 deletions(-) diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 7f7499e..f8064dd 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -651,6 +651,9 @@ fn warn_non_hot_changes(old: &ProxyConfig, new: &ProxyConfig, non_hot_changed: b } if old.general.me_route_no_writer_mode != new.general.me_route_no_writer_mode || old.general.me_route_no_writer_wait_ms != new.general.me_route_no_writer_wait_ms + || old.general.me_route_hybrid_max_wait_ms != new.general.me_route_hybrid_max_wait_ms + || old.general.me_route_blocking_send_timeout_ms + != new.general.me_route_blocking_send_timeout_ms || old.general.me_route_inline_recovery_attempts != new.general.me_route_inline_recovery_attempts || old.general.me_route_inline_recovery_wait_ms diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index b1e605c..4e49e9e 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -277,6 +277,8 @@ pub(crate) async fn initialize_me_pool( config.general.me_warn_rate_limit_ms, config.general.me_route_no_writer_mode, config.general.me_route_no_writer_wait_ms, + config.general.me_route_hybrid_max_wait_ms, + config.general.me_route_blocking_send_timeout_ms, config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); diff --git a/src/metrics.rs b/src/metrics.rs index f9475f6..c125ef5 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -2318,6 +2318,20 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_hybrid_timeout_total ME hybrid route timeouts after bounded retry window" + ); + let _ = writeln!(out, "# TYPE telemt_me_hybrid_timeout_total counter"); + let _ = writeln!( + out, + "telemt_me_hybrid_timeout_total {}", + if me_allows_normal { + stats.get_me_hybrid_timeout_total() + } else { + 0 + } + ); let _ = writeln!( out, "# HELP telemt_me_async_recovery_trigger_total Async ME recovery trigger attempts from route path" diff --git a/src/stats/mod.rs b/src/stats/mod.rs index ff15d4f..9cba3e8 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -234,6 +234,7 @@ pub struct Stats { me_writer_restored_same_endpoint_total: AtomicU64, me_writer_restored_fallback_total: AtomicU64, me_no_writer_failfast_total: AtomicU64, + me_hybrid_timeout_total: AtomicU64, me_async_recovery_trigger_total: AtomicU64, me_inline_recovery_total: AtomicU64, ip_reservation_rollback_tcp_limit_total: AtomicU64, @@ -1203,6 +1204,12 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn increment_me_hybrid_timeout_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_hybrid_timeout_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_me_async_recovery_trigger_total(&self) { if self.telemetry_me_allows_normal() { self.me_async_recovery_trigger_total @@ -1876,6 +1883,9 @@ impl Stats { pub fn get_me_no_writer_failfast_total(&self) -> u64 { self.me_no_writer_failfast_total.load(Ordering::Relaxed) } + pub fn get_me_hybrid_timeout_total(&self) -> u64 { + self.me_hybrid_timeout_total.load(Ordering::Relaxed) + } pub fn get_me_async_recovery_trigger_total(&self) -> u64 { self.me_async_recovery_trigger_total.load(Ordering::Relaxed) } diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index ba90c1a..ebe45fc 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -314,53 +314,6 @@ async fn run_update_cycle( reinit_tx: &mpsc::Sender, ) { let upstream = pool.upstream.clone(); - pool.update_runtime_reinit_policy( - cfg.general.hardswap, - cfg.general.me_pool_drain_ttl_secs, - cfg.general.me_instadrain, - cfg.general.me_pool_drain_threshold, - cfg.general.me_pool_drain_soft_evict_enabled, - cfg.general.me_pool_drain_soft_evict_grace_secs, - cfg.general.me_pool_drain_soft_evict_per_writer, - cfg.general.me_pool_drain_soft_evict_budget_per_core, - cfg.general.me_pool_drain_soft_evict_cooldown_ms, - cfg.general.effective_me_pool_force_close_secs(), - cfg.general.me_pool_min_fresh_ratio, - cfg.general.me_hardswap_warmup_delay_min_ms, - cfg.general.me_hardswap_warmup_delay_max_ms, - cfg.general.me_hardswap_warmup_extra_passes, - cfg.general.me_hardswap_warmup_pass_backoff_base_ms, - cfg.general.me_bind_stale_mode, - cfg.general.me_bind_stale_ttl_secs, - cfg.general.me_secret_atomic_snapshot, - cfg.general.me_deterministic_writer_sort, - cfg.general.me_writer_pick_mode, - cfg.general.me_writer_pick_sample_size, - cfg.general.me_single_endpoint_shadow_writers, - cfg.general.me_single_endpoint_outage_mode_enabled, - cfg.general.me_single_endpoint_outage_disable_quarantine, - cfg.general.me_single_endpoint_outage_backoff_min_ms, - cfg.general.me_single_endpoint_outage_backoff_max_ms, - cfg.general.me_single_endpoint_shadow_rotate_every_secs, - cfg.general.me_floor_mode, - cfg.general.me_adaptive_floor_idle_secs, - cfg.general.me_adaptive_floor_min_writers_single_endpoint, - cfg.general.me_adaptive_floor_min_writers_multi_endpoint, - cfg.general.me_adaptive_floor_recover_grace_secs, - cfg.general.me_adaptive_floor_writers_per_core_total, - cfg.general.me_adaptive_floor_cpu_cores_override, - cfg.general - .me_adaptive_floor_max_extra_writers_single_per_core, - cfg.general - .me_adaptive_floor_max_extra_writers_multi_per_core, - cfg.general.me_adaptive_floor_max_active_writers_per_core, - cfg.general.me_adaptive_floor_max_warm_writers_per_core, - cfg.general.me_adaptive_floor_max_active_writers_global, - cfg.general.me_adaptive_floor_max_warm_writers_global, - cfg.general.me_health_interval_ms_unhealthy, - cfg.general.me_health_interval_ms_healthy, - cfg.general.me_warn_rate_limit_ms, - ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); let required_secret_snapshots = cfg.general.proxy_secret_stable_snapshots.max(1); diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 3e53f38..9e0933f 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use rand::RngExt; +use tokio::sync::Semaphore; use tracing::{debug, info, warn}; use crate::config::MeFloorMode; @@ -78,6 +79,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c }; tokio::time::sleep(interval).await; pool.prune_closed_writers().await; + pool.sweep_endpoint_quarantine().await; reap_draining_writers(&pool, &mut drain_warn_next_allowed).await; let v4_degraded = check_family( IpFamily::V4, @@ -365,7 +367,8 @@ async fn check_family( endpoints.sort_unstable(); endpoints.dedup(); } - let mut reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); + let reconnect_budget = health_reconnect_budget(pool, dc_endpoints.len()); + let reconnect_sem = Arc::new(Semaphore::new(reconnect_budget)); if pool.floor_mode() == MeFloorMode::Static { adaptive_idle_since.clear(); @@ -461,7 +464,7 @@ async fn check_family( required, outage_backoff, outage_next_attempt, - &mut reconnect_budget, + &reconnect_sem, ) .await; continue; @@ -521,7 +524,7 @@ async fn check_family( family_degraded = true; let now = Instant::now(); - if reconnect_budget == 0 { + if reconnect_sem.available_permits() == 0 { let base_ms = pool.me_reconnect_backoff_base.as_millis() as u64; let next_ms = (*backoff.get(&key).unwrap_or(&base_ms)).max(base_ms); let jitter = next_ms / JITTER_FRAC_NUM; @@ -567,10 +570,9 @@ async fn check_family( let mut restored = 0usize; for _ in 0..missing { - if reconnect_budget == 0 { + let Ok(reconnect_permit) = reconnect_sem.clone().try_acquire_owned() else { break; - } - reconnect_budget = reconnect_budget.saturating_sub(1); + }; if pool.active_contour_writer_count_total().await >= floor_plan.active_cap_effective_total { @@ -621,6 +623,7 @@ async fn check_family( debug!(dc = %dc, ?family, "ME reconnect timed out"); } } + drop(reconnect_permit); } let now_alive = alive + restored; @@ -1188,7 +1191,7 @@ async fn recover_single_endpoint_outage( required: usize, outage_backoff: &mut HashMap<(i32, IpFamily), u64>, outage_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, - reconnect_budget: &mut usize, + reconnect_sem: &Arc, ) { let now = Instant::now(); if let Some(ts) = outage_next_attempt.get(&key) @@ -1198,7 +1201,7 @@ async fn recover_single_endpoint_outage( } let (min_backoff_ms, max_backoff_ms) = pool.single_endpoint_outage_backoff_bounds_ms(); - if *reconnect_budget == 0 { + if reconnect_sem.available_permits() == 0 { outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); debug!( dc = %key.0, @@ -1209,7 +1212,17 @@ async fn recover_single_endpoint_outage( ); return; } - *reconnect_budget = (*reconnect_budget).saturating_sub(1); + let Ok(_reconnect_permit) = reconnect_sem.clone().try_acquire_owned() else { + outage_next_attempt.insert(key, now + Duration::from_millis(min_backoff_ms.max(250))); + debug!( + dc = %key.0, + family = ?key.1, + %endpoint, + required, + "Single-endpoint outage reconnect deferred by semaphore saturation" + ); + return; + }; pool.stats .increment_me_single_endpoint_outage_reconnect_attempt_total(); @@ -1687,6 +1700,8 @@ mod tests { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 71ab257..5faa76d 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -8,7 +8,8 @@ use std::sync::atomic::{ }; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use tokio::sync::{Mutex, Notify, RwLock, mpsc}; +use arc_swap::ArcSwap; +use tokio::sync::{Mutex, RwLock, mpsc, watch}; use tokio_util::sync::CancellationToken; use crate::config::{ @@ -69,6 +70,10 @@ impl WriterContour { } pub(super) fn from_u8(value: u8) -> Self { + debug_assert!( + value <= Self::Draining as u8, + "Unexpected WriterContour discriminant: {value}" + ); match value { 0 => Self::Warm, 1 => Self::Active, @@ -87,6 +92,33 @@ pub(crate) enum MeFamilyRuntimeState { Recovering = 3, } +#[derive(Debug, Clone)] +pub(crate) struct FamilyHealthSnapshot { + pub(crate) state: MeFamilyRuntimeState, + pub(crate) state_since_epoch_secs: u64, + pub(crate) suppressed_until_epoch_secs: u64, + pub(crate) fail_streak: u32, + pub(crate) recover_success_streak: u32, +} + +impl FamilyHealthSnapshot { + fn new( + state: MeFamilyRuntimeState, + state_since_epoch_secs: u64, + suppressed_until_epoch_secs: u64, + fail_streak: u32, + recover_success_streak: u32, + ) -> Self { + Self { + state, + state_since_epoch_secs, + suppressed_until_epoch_secs, + fail_streak, + recover_success_streak, + } + } +} + impl MeFamilyRuntimeState { pub(crate) fn from_u8(value: u8) -> Self { match value { @@ -214,13 +246,11 @@ pub struct MePool { pub(super) endpoint_dc_map: Arc>>>, pub(super) default_dc: AtomicI32, pub(super) next_writer_id: AtomicU64, - pub(super) ping_tracker: Arc>>, - pub(super) ping_tracker_last_cleanup_epoch_ms: AtomicU64, pub(super) rtt_stats: Arc>>, pub(super) nat_reflection_cache: Arc>, pub(super) nat_reflection_singleflight_v4: Arc>, pub(super) nat_reflection_singleflight_v6: Arc>, - pub(super) writer_available: Arc, + pub(super) writer_epoch: watch::Sender, pub(super) refill_inflight: Arc>>, pub(super) refill_inflight_dc: Arc>>, pub(super) conn_count: AtomicUsize, @@ -259,21 +289,18 @@ pub struct MePool { pub(super) me_reader_route_data_wait_ms: Arc, pub(super) me_route_no_writer_mode: AtomicU8, pub(super) me_route_no_writer_wait: Duration, + pub(super) me_route_hybrid_max_wait: Duration, + pub(super) me_route_blocking_send_timeout: Option, + pub(super) me_route_last_success_epoch_ms: AtomicU64, + pub(super) me_route_hybrid_timeout_warn_epoch_ms: AtomicU64, + pub(super) me_async_recovery_last_trigger_epoch_ms: AtomicU64, pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_wait: Duration, pub(super) me_health_interval_ms_unhealthy: AtomicU64, pub(super) me_health_interval_ms_healthy: AtomicU64, pub(super) me_warn_rate_limit_ms: AtomicU64, - pub(super) me_family_v4_runtime_state: AtomicU8, - pub(super) me_family_v6_runtime_state: AtomicU8, - pub(super) me_family_v4_state_since_epoch_secs: AtomicU64, - pub(super) me_family_v6_state_since_epoch_secs: AtomicU64, - pub(super) me_family_v4_suppressed_until_epoch_secs: AtomicU64, - pub(super) me_family_v6_suppressed_until_epoch_secs: AtomicU64, - pub(super) me_family_v4_fail_streak: AtomicU32, - pub(super) me_family_v6_fail_streak: AtomicU32, - pub(super) me_family_v4_recover_success_streak: AtomicU32, - pub(super) me_family_v6_recover_success_streak: AtomicU32, + pub(super) family_health_v4: ArcSwap, + pub(super) family_health_v6: ArcSwap, pub(super) me_last_drain_gate_route_quorum_ok: AtomicBool, pub(super) me_last_drain_gate_redundancy_ok: AtomicBool, pub(super) me_last_drain_gate_block_reason: AtomicU8, @@ -396,6 +423,8 @@ impl MePool { me_warn_rate_limit_ms: u64, me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_wait_ms: u64, + me_route_hybrid_max_wait_ms: u64, + me_route_blocking_send_timeout_ms: u64, me_route_inline_recovery_attempts: u32, me_route_inline_recovery_wait_ms: u64, ) -> Arc { @@ -410,6 +439,8 @@ impl MePool { me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct, ); + let (writer_epoch, _) = watch::channel(0u64); + let now_epoch_secs = Self::now_epoch_secs(); Arc::new(Self { registry, writers: Arc::new(RwLock::new(Vec::new())), @@ -527,13 +558,11 @@ impl MePool { endpoint_dc_map: Arc::new(RwLock::new(endpoint_dc_map)), default_dc: AtomicI32::new(default_dc.unwrap_or(2)), next_writer_id: AtomicU64::new(1), - ping_tracker: Arc::new(Mutex::new(HashMap::new())), - ping_tracker_last_cleanup_epoch_ms: AtomicU64::new(0), rtt_stats: Arc::new(Mutex::new(HashMap::new())), nat_reflection_cache: Arc::new(Mutex::new(NatReflectionCache::default())), nat_reflection_singleflight_v4: Arc::new(Mutex::new(())), nat_reflection_singleflight_v6: Arc::new(Mutex::new(())), - writer_available: Arc::new(Notify::new()), + writer_epoch, refill_inflight: Arc::new(Mutex::new(HashSet::new())), refill_inflight_dc: Arc::new(Mutex::new(HashSet::new())), conn_count: AtomicUsize::new(0), @@ -585,25 +614,40 @@ impl MePool { me_reader_route_data_wait_ms: Arc::new(AtomicU64::new(me_reader_route_data_wait_ms)), me_route_no_writer_mode: AtomicU8::new(me_route_no_writer_mode.as_u8()), me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), + me_route_hybrid_max_wait: Duration::from_millis(me_route_hybrid_max_wait_ms.max(50)), + me_route_blocking_send_timeout: if me_route_blocking_send_timeout_ms == 0 { + None + } else { + Some(Duration::from_millis( + me_route_blocking_send_timeout_ms.min(5_000), + )) + }, + me_route_last_success_epoch_ms: AtomicU64::new(0), + me_route_hybrid_timeout_warn_epoch_ms: AtomicU64::new(0), + me_async_recovery_last_trigger_epoch_ms: AtomicU64::new(0), me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), - me_family_v4_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8), - me_family_v6_runtime_state: AtomicU8::new(MeFamilyRuntimeState::Healthy as u8), - me_family_v4_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), - me_family_v6_state_since_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), - me_family_v4_suppressed_until_epoch_secs: AtomicU64::new(0), - me_family_v6_suppressed_until_epoch_secs: AtomicU64::new(0), - me_family_v4_fail_streak: AtomicU32::new(0), - me_family_v6_fail_streak: AtomicU32::new(0), - me_family_v4_recover_success_streak: AtomicU32::new(0), - me_family_v6_recover_success_streak: AtomicU32::new(0), + family_health_v4: ArcSwap::from_pointee(FamilyHealthSnapshot::new( + MeFamilyRuntimeState::Healthy, + now_epoch_secs, + 0, + 0, + 0, + )), + family_health_v6: ArcSwap::from_pointee(FamilyHealthSnapshot::new( + MeFamilyRuntimeState::Healthy, + now_epoch_secs, + 0, + 0, + 0, + )), me_last_drain_gate_route_quorum_ok: AtomicBool::new(false), me_last_drain_gate_redundancy_ok: AtomicBool::new(false), me_last_drain_gate_block_reason: AtomicU8::new(MeDrainGateReason::Open as u8), - me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(Self::now_epoch_secs()), + me_last_drain_gate_updated_at_epoch_secs: AtomicU64::new(now_epoch_secs), runtime_ready: AtomicBool::new(false), preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)), }) @@ -621,6 +665,19 @@ impl MePool { self.runtime_ready.load(Ordering::Relaxed) } + pub(super) fn now_epoch_millis() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 + } + + pub(super) fn notify_writer_epoch(&self) { + let _ = self.writer_epoch.send_modify(|epoch| { + *epoch = epoch.wrapping_add(1); + }); + } + #[allow(dead_code)] pub(super) fn set_family_runtime_state( &self, @@ -631,82 +688,51 @@ impl MePool { fail_streak: u32, recover_success_streak: u32, ) { + let snapshot = Arc::new(FamilyHealthSnapshot::new( + state, + state_since_epoch_secs, + suppressed_until_epoch_secs, + fail_streak, + recover_success_streak, + )); match family { - IpFamily::V4 => { - self.me_family_v4_runtime_state - .store(state as u8, Ordering::Relaxed); - self.me_family_v4_state_since_epoch_secs - .store(state_since_epoch_secs, Ordering::Relaxed); - self.me_family_v4_suppressed_until_epoch_secs - .store(suppressed_until_epoch_secs, Ordering::Relaxed); - self.me_family_v4_fail_streak - .store(fail_streak, Ordering::Relaxed); - self.me_family_v4_recover_success_streak - .store(recover_success_streak, Ordering::Relaxed); - } - IpFamily::V6 => { - self.me_family_v6_runtime_state - .store(state as u8, Ordering::Relaxed); - self.me_family_v6_state_since_epoch_secs - .store(state_since_epoch_secs, Ordering::Relaxed); - self.me_family_v6_suppressed_until_epoch_secs - .store(suppressed_until_epoch_secs, Ordering::Relaxed); - self.me_family_v6_fail_streak - .store(fail_streak, Ordering::Relaxed); - self.me_family_v6_recover_success_streak - .store(recover_success_streak, Ordering::Relaxed); - } + IpFamily::V4 => self.family_health_v4.store(snapshot), + IpFamily::V6 => self.family_health_v6.store(snapshot), } } pub(crate) fn family_runtime_state(&self, family: IpFamily) -> MeFamilyRuntimeState { match family { - IpFamily::V4 => MeFamilyRuntimeState::from_u8( - self.me_family_v4_runtime_state.load(Ordering::Relaxed), - ), - IpFamily::V6 => MeFamilyRuntimeState::from_u8( - self.me_family_v6_runtime_state.load(Ordering::Relaxed), - ), + IpFamily::V4 => self.family_health_v4.load().state, + IpFamily::V6 => self.family_health_v6.load().state, } } pub(crate) fn family_runtime_state_since_epoch_secs(&self, family: IpFamily) -> u64 { match family { - IpFamily::V4 => self - .me_family_v4_state_since_epoch_secs - .load(Ordering::Relaxed), - IpFamily::V6 => self - .me_family_v6_state_since_epoch_secs - .load(Ordering::Relaxed), + IpFamily::V4 => self.family_health_v4.load().state_since_epoch_secs, + IpFamily::V6 => self.family_health_v6.load().state_since_epoch_secs, } } pub(crate) fn family_suppressed_until_epoch_secs(&self, family: IpFamily) -> u64 { match family { - IpFamily::V4 => self - .me_family_v4_suppressed_until_epoch_secs - .load(Ordering::Relaxed), - IpFamily::V6 => self - .me_family_v6_suppressed_until_epoch_secs - .load(Ordering::Relaxed), + IpFamily::V4 => self.family_health_v4.load().suppressed_until_epoch_secs, + IpFamily::V6 => self.family_health_v6.load().suppressed_until_epoch_secs, } } pub(crate) fn family_fail_streak(&self, family: IpFamily) -> u32 { match family { - IpFamily::V4 => self.me_family_v4_fail_streak.load(Ordering::Relaxed), - IpFamily::V6 => self.me_family_v6_fail_streak.load(Ordering::Relaxed), + IpFamily::V4 => self.family_health_v4.load().fail_streak, + IpFamily::V6 => self.family_health_v6.load().fail_streak, } } pub(crate) fn family_recover_success_streak(&self, family: IpFamily) -> u32 { match family { - IpFamily::V4 => self - .me_family_v4_recover_success_streak - .load(Ordering::Relaxed), - IpFamily::V6 => self - .me_family_v6_recover_success_streak - .load(Ordering::Relaxed), + IpFamily::V4 => self.family_health_v4.load().recover_success_streak, + IpFamily::V6 => self.family_health_v6.load().recover_success_streak, } } @@ -818,6 +844,9 @@ impl MePool { self.me_instadrain.store(instadrain, Ordering::Relaxed); self.me_pool_drain_threshold .store(pool_drain_threshold, Ordering::Relaxed); + // Runtime soft-evict knobs are updated lock-free to keep control-plane + // writes non-blocking; readers observe a short eventual-consistency + // window by design. self.me_pool_drain_soft_evict_enabled .store(pool_drain_soft_evict_enabled, Ordering::Relaxed); self.me_pool_drain_soft_evict_grace_secs @@ -1574,6 +1603,22 @@ impl MePool { let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6); *self.endpoint_dc_map.write().await = rebuilt; *self.preferred_endpoints_by_dc.write().await = preferred; + let configured_endpoints = self + .endpoint_dc_map + .read() + .await + .keys() + .copied() + .collect::>(); + { + let mut quarantine = self.endpoint_quarantine.lock().await; + let now = Instant::now(); + quarantine.retain(|addr, expiry| *expiry > now && configured_endpoints.contains(addr)); + } + { + let mut kdf_fp = self.kdf_material_fingerprint.write().await; + kdf_fp.retain(|addr, _| configured_endpoints.contains(addr)); + } } pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec { diff --git a/src/transport/middle_proxy/pool_config.rs b/src/transport/middle_proxy/pool_config.rs index 486fad0..ebbadd2 100644 --- a/src/transport/middle_proxy/pool_config.rs +++ b/src/transport/middle_proxy/pool_config.rs @@ -72,7 +72,7 @@ impl MePool { } if changed { self.rebuild_endpoint_dc_map().await; - self.writer_available.notify_waiters(); + self.notify_writer_epoch(); } if changed { SnapshotApplyOutcome::AppliedChanged diff --git a/src/transport/middle_proxy/pool_refill.rs b/src/transport/middle_proxy/pool_refill.rs index d93bcfe..fc5c996 100644 --- a/src/transport/middle_proxy/pool_refill.rs +++ b/src/transport/middle_proxy/pool_refill.rs @@ -13,8 +13,22 @@ use super::pool::{MePool, RefillDcKey, RefillEndpointKey, WriterContour}; const ME_FLAP_UPTIME_THRESHOLD_SECS: u64 = 20; const ME_FLAP_QUARANTINE_SECS: u64 = 25; +const ME_REFILL_TOTAL_ATTEMPT_CAP: u32 = 20; impl MePool { + pub(super) async fn sweep_endpoint_quarantine(&self) { + let configured = self + .endpoint_dc_map + .read() + .await + .keys() + .copied() + .collect::>(); + let now = Instant::now(); + let mut guard = self.endpoint_quarantine.lock().await; + guard.retain(|addr, expiry| *expiry > now && configured.contains(addr)); + } + pub(super) async fn maybe_quarantine_flapping_endpoint( &self, addr: SocketAddr, @@ -206,10 +220,15 @@ impl MePool { async fn refill_writer_after_loss(self: &Arc, addr: SocketAddr, writer_dc: i32) -> bool { let fast_retries = self.me_reconnect_fast_retry_count.max(1); + let mut total_attempts = 0u32; let same_endpoint_quarantined = self.is_endpoint_quarantined(addr).await; if !same_endpoint_quarantined { for attempt in 0..fast_retries { + if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { + break; + } + total_attempts = total_attempts.saturating_add(1); self.stats.increment_me_reconnect_attempt(); match self .connect_one_for_dc(addr, writer_dc, self.rng.as_ref()) @@ -250,6 +269,10 @@ impl MePool { } for attempt in 0..fast_retries { + if total_attempts >= ME_REFILL_TOTAL_ATTEMPT_CAP { + break; + } + total_attempts = total_attempts.saturating_add(1); self.stats.increment_me_reconnect_attempt(); if self .connect_endpoints_round_robin(writer_dc, &dc_endpoints, self.rng.as_ref()) diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 22fb909..908b113 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -1,5 +1,6 @@ use std::io::ErrorKind; use std::net::SocketAddr; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; @@ -25,6 +26,7 @@ const ME_ACTIVE_PING_SECS: u64 = 25; const ME_ACTIVE_PING_JITTER_SECS: i64 = 5; const ME_IDLE_KEEPALIVE_MAX_SECS: u64 = 5; const ME_RPC_PROXY_REQ_RESPONSE_WAIT_MS: u64 = 700; +const ME_PING_TRACKER_CLEANUP_EVERY: u32 = 32; #[derive(Clone, Copy)] enum WriterTeardownMode { @@ -197,11 +199,11 @@ impl MePool { self.registry.register_writer(writer_id, tx.clone()).await; self.registry.mark_writer_idle(writer_id).await; self.conn_count.fetch_add(1, Ordering::Relaxed); - self.writer_available.notify_one(); + self.notify_writer_epoch(); let reg = self.registry.clone(); let writers_arc = self.writers_arc(); - let ping_tracker = self.ping_tracker.clone(); + let ping_tracker = Arc::new(tokio::sync::Mutex::new(HashMap::::new())); let ping_tracker_reader = ping_tracker.clone(); let rtt_stats = self.rtt_stats.clone(); let stats_reader = self.stats.clone(); @@ -280,6 +282,7 @@ impl MePool { let pool_ping = Arc::downgrade(self); tokio::spawn(async move { let mut ping_id: i64 = rand::random::(); + let mut cleanup_tick: u32 = 0; let idle_interval_cap = Duration::from_secs(ME_IDLE_KEEPALIVE_MAX_SECS); // Per-writer jittered start to avoid phase sync. let startup_jitter = if keepalive_enabled { @@ -339,39 +342,16 @@ impl MePool { p.extend_from_slice(&sent_id.to_le_bytes()); { let mut tracker = ping_tracker_ping.lock().await; - let now_epoch_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - let mut run_cleanup = false; - if let Some(pool) = pool_ping.upgrade() { - let last_cleanup_ms = pool - .ping_tracker_last_cleanup_epoch_ms - .load(Ordering::Relaxed); - if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 - && pool - .ping_tracker_last_cleanup_epoch_ms - .compare_exchange( - last_cleanup_ms, - now_epoch_ms, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - run_cleanup = true; - } - } - - if run_cleanup { + cleanup_tick = cleanup_tick.wrapping_add(1); + if cleanup_tick.is_multiple_of(ME_PING_TRACKER_CLEANUP_EVERY) { let before = tracker.len(); - tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); + tracker.retain(|_, ts| ts.elapsed() < Duration::from_secs(120)); let expired = before.saturating_sub(tracker.len()); if expired > 0 { stats_ping.increment_me_keepalive_timeout_by(expired as u64); } } - tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); + tracker.insert(sent_id, std::time::Instant::now()); } ping_id = ping_id.wrapping_add(1); stats_ping.increment_me_keepalive_sent(); @@ -594,10 +574,6 @@ impl MePool { // The close command below is only a best-effort accelerator for task shutdown. // Cleanup progress must never depend on command-channel availability. let _ = self.registry.writer_lost(writer_id).await; - { - let mut tracker = self.ping_tracker.lock().await; - tracker.retain(|_, (_, wid)| *wid != writer_id); - } self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { let _ = tx.send(WriterCommand::Close).await; @@ -611,6 +587,9 @@ impl MePool { self.trigger_immediate_refill_for_dc(addr, writer_dc); } } + if removed { + self.notify_writer_epoch(); + } removed } diff --git a/src/transport/middle_proxy/reader.rs b/src/transport/middle_proxy/reader.rs index 46acd7e..aec55cd 100644 --- a/src/transport/middle_proxy/reader.rs +++ b/src/transport/middle_proxy/reader.rs @@ -32,10 +32,10 @@ pub(crate) async fn reader_loop( enc_leftover: BytesMut, mut dec: BytesMut, tx: mpsc::Sender, - ping_tracker: Arc>>, + ping_tracker: Arc>>, rtt_stats: Arc>>, stats: Arc, - _writer_id: u64, + writer_id: u64, degraded: Arc, writer_rtt_ema_ms_x10: Arc, reader_route_data_wait_ms: Arc, @@ -45,7 +45,7 @@ pub(crate) async fn reader_loop( let mut expected_seq: i32 = 0; loop { - let mut tmp = [0u8; 16_384]; + let mut tmp = [0u8; 65_536]; let n = tokio::select! { res = rd.read(&mut tmp) => res.map_err(ProxyError::Io)?, _ = cancel.cancelled() => return Ok(()), @@ -203,13 +203,13 @@ pub(crate) async fn reader_loop( } else if pt == RPC_PONG_U32 && body.len() >= 8 { let ping_id = i64::from_le_bytes(body[0..8].try_into().unwrap()); stats.increment_me_keepalive_pong(); - if let Some((sent, wid)) = { + if let Some(sent) = { let mut guard = ping_tracker.lock().await; guard.remove(&ping_id) } { let rtt = sent.elapsed().as_secs_f64() * 1000.0; let mut stats = rtt_stats.lock().await; - let entry = stats.entry(wid).or_insert((rtt, rtt)); + let entry = stats.entry(writer_id).or_insert((rtt, rtt)); entry.1 = entry.1 * 0.8 + rtt * 0.2; if rtt < entry.0 { entry.0 = rtt; @@ -224,7 +224,7 @@ pub(crate) async fn reader_loop( Ordering::Relaxed, ); trace!( - writer_id = wid, + writer_id, rtt_ms = rtt, ema_ms = entry.1, base_ms = entry.0, diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 0a95e18..becd8c4 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -292,6 +292,12 @@ impl ConnRegistry { pub async fn bind_writer(&self, conn_id: u64, writer_id: u64, meta: ConnMeta) -> bool { let mut inner = self.inner.write().await; + // ROUTING IS THE SOURCE OF TRUTH: + // never keep/attach writer binding for a connection that is already + // absent from the routing table. + if !inner.map.contains_key(&conn_id) { + return false; + } if !inner.writers.contains_key(&writer_id) { return false; } @@ -382,9 +388,39 @@ impl ConnRegistry { } pub async fn get_writer(&self, conn_id: u64) -> Option { - let inner = self.inner.read().await; - let writer_id = inner.writer_for_conn.get(&conn_id).cloned()?; - let writer = inner.writers.get(&writer_id).cloned()?; + let mut inner = self.inner.write().await; + // ROUTING IS THE SOURCE OF TRUTH: + // stale bindings are ignored and lazily cleaned when routing no longer + // contains the connection. + if !inner.map.contains_key(&conn_id) { + inner.meta.remove(&conn_id); + if let Some(stale_writer_id) = inner.writer_for_conn.remove(&conn_id) + && let Some(conns) = inner.conns_for_writer.get_mut(&stale_writer_id) + { + conns.remove(&conn_id); + if conns.is_empty() { + inner + .writer_idle_since_epoch_secs + .insert(stale_writer_id, Self::now_epoch_secs()); + } + } + return None; + } + + let writer_id = inner.writer_for_conn.get(&conn_id).copied()?; + let Some(writer) = inner.writers.get(&writer_id).cloned() else { + inner.writer_for_conn.remove(&conn_id); + inner.meta.remove(&conn_id); + if let Some(conns) = inner.conns_for_writer.get_mut(&writer_id) { + conns.remove(&conn_id); + if conns.is_empty() { + inner + .writer_idle_since_epoch_secs + .insert(writer_id, Self::now_epoch_secs()); + } + } + return None; + }; Some(ConnWriter { writer_id, tx: writer, diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index b1cf54e..d38775f 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -26,6 +26,9 @@ use rand::seq::SliceRandom; const IDLE_WRITER_PENALTY_MID_SECS: u64 = 45; const IDLE_WRITER_PENALTY_HIGH_SECS: u64 = 55; const HYBRID_GLOBAL_BURST_PERIOD_ROUNDS: u32 = 4; +const HYBRID_RECENT_SUCCESS_WINDOW_MS: u64 = 120_000; +const HYBRID_TIMEOUT_WARN_RATE_LIMIT_MS: u64 = 5_000; +const HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS: u64 = 5_000; const PICK_PENALTY_WARM: u64 = 200; const PICK_PENALTY_DRAINING: u64 = 600; const PICK_PENALTY_STALE: u64 = 300; @@ -77,6 +80,7 @@ impl MePool { let mut async_recovery_triggered = false; let mut hybrid_recovery_round = 0u32; let mut hybrid_last_recovery_at: Option = None; + let mut hybrid_total_deadline: Option = None; let hybrid_wait_step = self.me_route_no_writer_wait.max(Duration::from_millis(50)); let mut hybrid_wait_current = hybrid_wait_step; @@ -92,9 +96,13 @@ impl MePool { .tx .try_send(WriterCommand::Data(current_payload.clone())) { - Ok(()) => return Ok(()), + Ok(()) => { + self.note_hybrid_route_success(); + return Ok(()); + } Err(TrySendError::Full(cmd)) => { if current.tx.send(cmd).await.is_ok() { + self.note_hybrid_route_success(); return Ok(()); } warn!(writer_id = current.writer_id, "ME writer channel closed"); @@ -182,6 +190,15 @@ impl MePool { continue; } MeRouteNoWriterMode::HybridAsyncPersistent => { + let total_deadline = *hybrid_total_deadline.get_or_insert_with(|| { + Instant::now() + self.hybrid_total_wait_budget() + }); + if Instant::now() >= total_deadline { + self.on_hybrid_timeout(total_deadline, routed_dc); + return Err(ProxyError::Proxy( + "ME writer not available within hybrid timeout".into(), + )); + } if !unknown_target_dc { self.maybe_trigger_hybrid_recovery( routed_dc, @@ -292,6 +309,15 @@ impl MePool { } } MeRouteNoWriterMode::HybridAsyncPersistent => { + let total_deadline = *hybrid_total_deadline + .get_or_insert_with(|| Instant::now() + self.hybrid_total_wait_budget()); + if Instant::now() >= total_deadline { + self.on_hybrid_timeout(total_deadline, routed_dc); + return Err(ProxyError::Proxy( + "No ME writers available for target DC within hybrid timeout" + .into(), + )); + } if !unknown_target_dc { self.maybe_trigger_hybrid_recovery( routed_dc, @@ -423,6 +449,7 @@ impl MePool { "Selected stale ME writer for fallback bind" ); } + self.note_hybrid_route_success(); return Ok(()); } Err(TrySendError::Full(_)) => { @@ -453,7 +480,18 @@ impl MePool { .increment_me_writer_pick_blocking_fallback_total(); let effective_our_addr = SocketAddr::new(w.source_ip, our_addr.port()); let (payload, meta) = build_routed_payload(effective_our_addr); - match w.tx.clone().reserve_owned().await { + let reserve_result = if let Some(timeout) = self.me_route_blocking_send_timeout { + match tokio::time::timeout(timeout, w.tx.clone().reserve_owned()).await { + Ok(result) => result, + Err(_) => { + self.stats.increment_me_writer_pick_full_total(pick_mode); + continue; + } + } + } else { + w.tx.clone().reserve_owned().await + }; + match reserve_result { Ok(permit) => { if !self.registry.bind_writer(conn_id, w.id, meta).await { debug!( @@ -471,6 +509,7 @@ impl MePool { if w.generation < self.current_generation() { self.stats.increment_pool_stale_pick_total(); } + self.note_hybrid_route_success(); return Ok(()); } Err(_) => { @@ -483,7 +522,7 @@ impl MePool { } async fn wait_for_writer_until(&self, deadline: Instant) -> bool { - let waiter = self.writer_available.notified(); + let mut rx = self.writer_epoch.subscribe(); if !self.writers.read().await.is_empty() { return true; } @@ -492,13 +531,14 @@ impl MePool { return !self.writers.read().await.is_empty(); } let timeout = deadline.saturating_duration_since(now); - if tokio::time::timeout(timeout, waiter).await.is_ok() { - return true; + if tokio::time::timeout(timeout, rx.changed()).await.is_ok() { + return !self.writers.read().await.is_empty(); } !self.writers.read().await.is_empty() } async fn wait_for_candidate_until(&self, routed_dc: i32, deadline: Instant) -> bool { + let mut rx = self.writer_epoch.subscribe(); loop { if self.has_candidate_for_target_dc(routed_dc).await { return true; @@ -509,7 +549,6 @@ impl MePool { return self.has_candidate_for_target_dc(routed_dc).await; } - let waiter = self.writer_available.notified(); if self.has_candidate_for_target_dc(routed_dc).await { return true; } @@ -517,7 +556,7 @@ impl MePool { if remaining.is_zero() { return self.has_candidate_for_target_dc(routed_dc).await; } - if tokio::time::timeout(remaining, waiter).await.is_err() { + if tokio::time::timeout(remaining, rx.changed()).await.is_err() { return self.has_candidate_for_target_dc(routed_dc).await; } } @@ -587,6 +626,10 @@ impl MePool { hybrid_last_recovery_at: &mut Option, hybrid_wait_step: Duration, ) { + if !self.try_consume_hybrid_recovery_trigger_slot(HYBRID_RECOVERY_TRIGGER_MIN_INTERVAL_MS) + { + return; + } if let Some(last) = *hybrid_last_recovery_at && last.elapsed() < hybrid_wait_step { @@ -602,6 +645,71 @@ impl MePool { *hybrid_last_recovery_at = Some(Instant::now()); } + fn hybrid_total_wait_budget(&self) -> Duration { + let base = self.me_route_hybrid_max_wait.max(Duration::from_millis(50)); + let now_ms = Self::now_epoch_millis(); + let last_success_ms = self.me_route_last_success_epoch_ms.load(Ordering::Relaxed); + if last_success_ms != 0 + && now_ms.saturating_sub(last_success_ms) <= HYBRID_RECENT_SUCCESS_WINDOW_MS + { + return base.saturating_mul(2); + } + base + } + + fn note_hybrid_route_success(&self) { + self.me_route_last_success_epoch_ms + .store(Self::now_epoch_millis(), Ordering::Relaxed); + } + + fn on_hybrid_timeout(&self, deadline: Instant, routed_dc: i32) { + self.stats.increment_me_hybrid_timeout_total(); + let now_ms = Self::now_epoch_millis(); + let mut last_warn_ms = self + .me_route_hybrid_timeout_warn_epoch_ms + .load(Ordering::Relaxed); + while now_ms.saturating_sub(last_warn_ms) >= HYBRID_TIMEOUT_WARN_RATE_LIMIT_MS { + match self.me_route_hybrid_timeout_warn_epoch_ms.compare_exchange_weak( + last_warn_ms, + now_ms, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) => { + warn!( + routed_dc, + budget_ms = self.hybrid_total_wait_budget().as_millis() as u64, + elapsed_ms = deadline.elapsed().as_millis() as u64, + "ME hybrid route timeout reached" + ); + break; + } + Err(actual) => last_warn_ms = actual, + } + } + } + + fn try_consume_hybrid_recovery_trigger_slot(&self, min_interval_ms: u64) -> bool { + let now_ms = Self::now_epoch_millis(); + let mut last_trigger_ms = self + .me_async_recovery_last_trigger_epoch_ms + .load(Ordering::Relaxed); + loop { + if now_ms.saturating_sub(last_trigger_ms) < min_interval_ms { + return false; + } + match self.me_async_recovery_last_trigger_epoch_ms.compare_exchange_weak( + last_trigger_ms, + now_ms, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) => return true, + Err(actual) => last_trigger_ms = actual, + } + } + } + pub async fn send_close(self: &Arc, conn_id: u64) -> Result<()> { if let Some(w) = self.registry.get_writer(conn_id).await { let mut p = Vec::with_capacity(12); diff --git a/src/transport/middle_proxy/tests/health_adversarial_tests.rs b/src/transport/middle_proxy/tests/health_adversarial_tests.rs index 3444120..4bee91c 100644 --- a/src/transport/middle_proxy/tests/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/tests/health_adversarial_tests.rs @@ -113,6 +113,8 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/tests/health_integration_tests.rs b/src/transport/middle_proxy/tests/health_integration_tests.rs index b0d3a2a..0a6e110 100644 --- a/src/transport/middle_proxy/tests/health_integration_tests.rs +++ b/src/transport/middle_proxy/tests/health_integration_tests.rs @@ -111,6 +111,8 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/tests/health_regression_tests.rs b/src/transport/middle_proxy/tests/health_regression_tests.rs index 55bf8f6..92398b4 100644 --- a/src/transport/middle_proxy/tests/health_regression_tests.rs +++ b/src/transport/middle_proxy/tests/health_regression_tests.rs @@ -106,6 +106,8 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/tests/pool_refill_security_tests.rs b/src/transport/middle_proxy/tests/pool_refill_security_tests.rs index 2d1e23a..90c8382 100644 --- a/src/transport/middle_proxy/tests/pool_refill_security_tests.rs +++ b/src/transport/middle_proxy/tests/pool_refill_security_tests.rs @@ -95,6 +95,8 @@ async fn make_pool() -> Arc { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/tests/pool_writer_security_tests.rs b/src/transport/middle_proxy/tests/pool_writer_security_tests.rs index 7bfc061..fc5135b 100644 --- a/src/transport/middle_proxy/tests/pool_writer_security_tests.rs +++ b/src/transport/middle_proxy/tests/pool_writer_security_tests.rs @@ -100,6 +100,8 @@ async fn make_pool() -> Arc { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) diff --git a/src/transport/middle_proxy/tests/send_adversarial_tests.rs b/src/transport/middle_proxy/tests/send_adversarial_tests.rs index 80379a5..de52d18 100644 --- a/src/transport/middle_proxy/tests/send_adversarial_tests.rs +++ b/src/transport/middle_proxy/tests/send_adversarial_tests.rs @@ -106,6 +106,8 @@ async fn make_pool() -> (Arc, Arc) { general.me_warn_rate_limit_ms, general.me_route_no_writer_mode, general.me_route_no_writer_wait_ms, + general.me_route_hybrid_max_wait_ms, + general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, );