diff --git a/Cargo.toml b/Cargo.toml index 3311a82..a95eef7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.9" +version = "3.3.10" edition = "2024" [dependencies] diff --git a/src/config/defaults.rs b/src/config/defaults.rs index ce55394..798d881 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -21,6 +21,13 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256; const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256; +const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 512; +const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512; +const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 128; +const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; +const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; +const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; +const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; @@ -296,6 +303,34 @@ pub(crate) fn default_me_adaptive_floor_max_warm_writers_global() -> u32 { DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL } +pub(crate) fn default_me_writer_cmd_channel_capacity() -> usize { + DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY +} + +pub(crate) fn default_me_route_channel_capacity() -> usize { + DEFAULT_ME_ROUTE_CHANNEL_CAPACITY +} + +pub(crate) fn default_me_c2me_channel_capacity() -> usize { + DEFAULT_ME_C2ME_CHANNEL_CAPACITY +} + +pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 { + DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY +} + +pub(crate) fn default_me_health_interval_ms_healthy() -> u64 { + DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY +} + +pub(crate) fn default_me_admission_poll_ms() -> u64 { + DEFAULT_ME_ADMISSION_POLL_MS +} + +pub(crate) fn default_me_warn_rate_limit_ms() -> u64 { + DEFAULT_ME_WARN_RATE_LIMIT_MS +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index a7ae60c..e7029a3 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -91,6 +91,10 @@ pub struct HotFields { pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_high_timeout_ms: u64, pub me_route_backpressure_high_watermark_pct: u8, + pub me_health_interval_ms_unhealthy: u64, + pub me_health_interval_ms_healthy: u64, + pub me_admission_poll_ms: u64, + pub me_warn_rate_limit_ms: u64, pub users: std::collections::HashMap, pub user_ad_tags: std::collections::HashMap, pub user_max_tcp_conns: std::collections::HashMap, @@ -192,6 +196,10 @@ impl HotFields { me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, + me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy, + me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy, + me_admission_poll_ms: cfg.general.me_admission_poll_ms, + me_warn_rate_limit_ms: cfg.general.me_warn_rate_limit_ms, users: cfg.access.users.clone(), user_ad_tags: cfg.access.user_ad_tags.clone(), user_max_tcp_conns: cfg.access.user_max_tcp_conns.clone(), @@ -335,6 +343,10 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { new.general.me_route_backpressure_high_timeout_ms; cfg.general.me_route_backpressure_high_watermark_pct = new.general.me_route_backpressure_high_watermark_pct; + cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy; + cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy; + cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms; + cfg.general.me_warn_rate_limit_ms = new.general.me_warn_rate_limit_ms; cfg.access.users = new.access.users.clone(); cfg.access.user_ad_tags = new.access.user_ad_tags.clone(); @@ -796,12 +808,21 @@ fn log_changes( != new_hot.me_route_backpressure_high_timeout_ms || old_hot.me_route_backpressure_high_watermark_pct != new_hot.me_route_backpressure_high_watermark_pct + || old_hot.me_health_interval_ms_unhealthy + != new_hot.me_health_interval_ms_unhealthy + || old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy + || old_hot.me_admission_poll_ms != new_hot.me_admission_poll_ms + || old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms { info!( - "config reload: me_route_backpressure: base={}ms high={}ms watermark={}%", + "config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms", new_hot.me_route_backpressure_base_timeout_ms, new_hot.me_route_backpressure_high_timeout_ms, new_hot.me_route_backpressure_high_watermark_pct, + new_hot.me_health_interval_ms_unhealthy, + new_hot.me_health_interval_ms_healthy, + new_hot.me_admission_poll_ms, + new_hot.me_warn_rate_limit_ms, ); } diff --git a/src/config/load.rs b/src/config/load.rs index 25a9994..c013b1a 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -285,6 +285,48 @@ impl ProxyConfig { )); } + if config.general.me_writer_cmd_channel_capacity == 0 { + return Err(ProxyError::Config( + "general.me_writer_cmd_channel_capacity must be > 0".to_string(), + )); + } + + if config.general.me_route_channel_capacity == 0 { + return Err(ProxyError::Config( + "general.me_route_channel_capacity must be > 0".to_string(), + )); + } + + if config.general.me_c2me_channel_capacity == 0 { + return Err(ProxyError::Config( + "general.me_c2me_channel_capacity must be > 0".to_string(), + )); + } + + if config.general.me_health_interval_ms_unhealthy == 0 { + return Err(ProxyError::Config( + "general.me_health_interval_ms_unhealthy must be > 0".to_string(), + )); + } + + if config.general.me_health_interval_ms_healthy == 0 { + return Err(ProxyError::Config( + "general.me_health_interval_ms_healthy must be > 0".to_string(), + )); + } + + if config.general.me_admission_poll_ms == 0 { + return Err(ProxyError::Config( + "general.me_admission_poll_ms must be > 0".to_string(), + )); + } + + if config.general.me_warn_rate_limit_ms == 0 { + return Err(ProxyError::Config( + "general.me_warn_rate_limit_ms must be > 0".to_string(), + )); + } + if config.access.user_max_unique_ips_window_secs == 0 { return Err(ProxyError::Config( "access.user_max_unique_ips_window_secs must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index 0c89df8..b2be9cf 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -420,6 +420,18 @@ pub struct GeneralConfig { #[serde(default = "default_rpc_proxy_req_every")] pub rpc_proxy_req_every: u64, + /// Capacity of per-ME writer command channel. + #[serde(default = "default_me_writer_cmd_channel_capacity")] + pub me_writer_cmd_channel_capacity: usize, + + /// Capacity of per-connection ME response route channel. + #[serde(default = "default_me_route_channel_capacity")] + pub me_route_channel_capacity: usize, + + /// Capacity of per-client command queue from client reader to ME sender task. + #[serde(default = "default_me_c2me_channel_capacity")] + pub me_c2me_channel_capacity: usize, + /// Max pending ciphertext buffer per client writer (bytes). /// Controls FakeTLS backpressure vs throughput. #[serde(default = "default_crypto_pending_buffer")] @@ -620,6 +632,22 @@ pub struct GeneralConfig { #[serde(default = "default_me_route_backpressure_high_watermark_pct")] pub me_route_backpressure_high_watermark_pct: u8, + /// Health monitor interval in milliseconds while writer coverage is degraded. + #[serde(default = "default_me_health_interval_ms_unhealthy")] + pub me_health_interval_ms_unhealthy: u64, + + /// Health monitor interval in milliseconds while writer coverage is stable. + #[serde(default = "default_me_health_interval_ms_healthy")] + pub me_health_interval_ms_healthy: u64, + + /// Poll interval in milliseconds for conditional-admission state checks. + #[serde(default = "default_me_admission_poll_ms")] + pub me_admission_poll_ms: u64, + + /// Cooldown for repetitive ME warning logs in milliseconds. + #[serde(default = "default_me_warn_rate_limit_ms")] + pub me_warn_rate_limit_ms: u64, + /// ME route behavior when no writer is immediately available. #[serde(default)] pub me_route_no_writer_mode: MeRouteNoWriterMode, @@ -796,6 +824,9 @@ impl Default for GeneralConfig { me_keepalive_jitter_secs: default_keepalive_jitter(), me_keepalive_payload_random: default_true(), rpc_proxy_req_every: default_rpc_proxy_req_every(), + me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(), + me_route_channel_capacity: default_me_route_channel_capacity(), + me_c2me_channel_capacity: default_me_c2me_channel_capacity(), me_warmup_stagger_enabled: default_true(), me_warmup_step_delay_ms: default_warmup_step_delay_ms(), me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(), @@ -837,6 +868,10 @@ impl Default for GeneralConfig { me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(), me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(), me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(), + me_health_interval_ms_unhealthy: default_me_health_interval_ms_unhealthy(), + me_health_interval_ms_healthy: default_me_health_interval_ms_healthy(), + me_admission_poll_ms: default_me_admission_poll_ms(), + me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(), me_route_no_writer_mode: MeRouteNoWriterMode::default(), me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(), me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(), diff --git a/src/main.rs b/src/main.rs index 58c70cd..c1059c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1048,9 +1048,14 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_secret_atomic_snapshot, config.general.me_deterministic_writer_sort, config.general.me_socks_kdf_policy, + config.general.me_writer_cmd_channel_capacity, + config.general.me_route_channel_capacity, config.general.me_route_backpressure_base_timeout_ms, config.general.me_route_backpressure_high_timeout_ms, config.general.me_route_backpressure_high_watermark_pct, + config.general.me_health_interval_ms_unhealthy, + config.general.me_health_interval_ms_healthy, + config.general.me_warn_rate_limit_ms, config.general.me_route_no_writer_mode, config.general.me_route_no_writer_wait_ms, config.general.me_route_inline_recovery_attempts, @@ -1784,11 +1789,24 @@ async fn main() -> std::result::Result<(), Box> { let pool_for_gate = pool.clone(); let admission_tx_gate = admission_tx.clone(); + let mut config_rx_gate = config_rx.clone(); + let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1); tokio::spawn(async move { let mut gate_open = initial_open; let mut open_streak = if initial_open { 1u32 } else { 0u32 }; let mut close_streak = if initial_open { 0u32 } else { 1u32 }; loop { + tokio::select! { + changed = config_rx_gate.changed() => { + if changed.is_err() { + break; + } + let cfg = config_rx_gate.borrow_and_update().clone(); + admission_poll_ms = cfg.general.me_admission_poll_ms.max(1); + continue; + } + _ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {} + } let ready = pool_for_gate.admission_ready_conditional_cast().await; if ready { open_streak = open_streak.saturating_add(1); @@ -1813,7 +1831,6 @@ async fn main() -> std::result::Result<(), Box> { ); } } - tokio::time::sleep(Duration::from_millis(250)).await; } }); } else { diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index 707c8af..cae8273 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -27,7 +27,7 @@ enum C2MeCommand { const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60); const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync"; -const C2ME_CHANNEL_CAPACITY: usize = 1024; +const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128; const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64; const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32; static DESYNC_DEDUP: OnceLock>> = OnceLock::new(); @@ -271,7 +271,11 @@ where let frame_limit = config.general.max_client_frame; - let (c2me_tx, mut c2me_rx) = mpsc::channel::(C2ME_CHANNEL_CAPACITY); + let c2me_channel_capacity = config + .general + .me_c2me_channel_capacity + .max(C2ME_CHANNEL_CAPACITY_FALLBACK); + let (c2me_tx, mut c2me_rx) = mpsc::channel::(c2me_channel_capacity); let me_pool_c2me = me_pool.clone(); let effective_tag = effective_tag; let c2me_sender = tokio::spawn(async move { diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index cccf381..1bcda14 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -325,6 +325,9 @@ async fn run_update_cycle( cfg.general.me_adaptive_floor_max_warm_writers_per_core, cfg.general.me_adaptive_floor_max_active_writers_global, cfg.general.me_adaptive_floor_max_warm_writers_global, + cfg.general.me_health_interval_ms_unhealthy, + cfg.general.me_health_interval_ms_healthy, + cfg.general.me_warn_rate_limit_ms, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -546,6 +549,9 @@ pub async fn me_config_updater( cfg.general.me_adaptive_floor_max_warm_writers_per_core, cfg.general.me_adaptive_floor_max_active_writers_global, cfg.general.me_adaptive_floor_max_warm_writers_global, + cfg.general.me_health_interval_ms_unhealthy, + cfg.general.me_health_interval_ms_healthy, + cfg.general.me_warn_rate_limit_ms, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index aefeead..b422dc6 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -13,7 +13,6 @@ use crate::network::IpFamily; use super::MePool; -const HEALTH_INTERVAL_SECS: u64 = 1; const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff #[allow(dead_code)] const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1; @@ -62,11 +61,18 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); + let mut degraded_interval = true; loop { - tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await; + let interval = if degraded_interval { + pool.health_interval_unhealthy() + } else { + pool.health_interval_healthy() + }; + tokio::time::sleep(interval).await; pool.prune_closed_writers().await; reap_draining_writers(&pool).await; - check_family( + let v4_degraded = check_family( IpFamily::V4, &pool, &rng, @@ -80,9 +86,10 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut idle_refresh_next_attempt, &mut adaptive_idle_since, &mut adaptive_recover_until, + &mut floor_warn_next_allowed, ) .await; - check_family( + let v6_degraded = check_family( IpFamily::V6, &pool, &rng, @@ -96,8 +103,10 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c &mut idle_refresh_next_attempt, &mut adaptive_idle_since, &mut adaptive_recover_until, + &mut floor_warn_next_allowed, ) .await; + degraded_interval = v4_degraded || v6_degraded; } } @@ -137,15 +146,18 @@ async fn check_family( idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>, adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, -) { + floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>, +) -> bool { let enabled = match family { IpFamily::V4 => pool.decision.ipv4_me, IpFamily::V6 => pool.decision.ipv6_me, }; if !enabled { - return; + return false; } + let mut family_degraded = false; + let mut dc_endpoints = HashMap::>::new(); let map_guard = match family { IpFamily::V4 => pool.proxy_map_v4.read().await, @@ -234,6 +246,7 @@ async fn check_family( .sum::(); if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 { + family_degraded = true; if single_endpoint_outage.insert(key) { pool.stats.increment_me_single_endpoint_outage_enter_total(); warn!( @@ -310,6 +323,7 @@ async fn check_family( continue; } let missing = required - alive; + family_degraded = true; let now = Instant::now(); if reconnect_budget == 0 { @@ -438,15 +452,23 @@ async fn check_family( + Duration::from_millis(rand::rng().random_range(0..=jitter.max(1))); next_attempt.insert(key, now + wait); if pool.is_runtime_ready() { - warn!( - dc = %dc, - ?family, - alive = now_alive, - required, - endpoint_count = endpoints.len(), - backoff_ms = next_ms, - "DC writer floor is below required level, scheduled reconnect" - ); + let warn_cooldown = pool.warn_rate_limit_duration(); + if should_emit_rate_limited_warn( + floor_warn_next_allowed, + key, + now, + warn_cooldown, + ) { + warn!( + dc = %dc, + ?family, + alive = now_alive, + required, + endpoint_count = endpoints.len(), + backoff_ms = next_ms, + "DC writer floor is below required level, scheduled reconnect" + ); + } } else { info!( dc = %dc, @@ -463,6 +485,8 @@ async fn check_family( *v = v.saturating_sub(1); } } + + family_degraded } fn health_reconnect_budget(pool: &Arc, dc_groups: usize) -> usize { @@ -474,6 +498,23 @@ fn health_reconnect_budget(pool: &Arc, dc_groups: usize) -> usize { .clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX) } +fn should_emit_rate_limited_warn( + next_allowed: &mut HashMap<(i32, IpFamily), Instant>, + key: (i32, IpFamily), + now: Instant, + cooldown: Duration, +) -> bool { + let Some(ready_at) = next_allowed.get(&key).copied() else { + next_allowed.insert(key, now + cooldown); + return true; + }; + if now >= ready_at { + next_allowed.insert(key, now + cooldown); + return true; + } + false +} + fn adaptive_floor_class_min( pool: &Arc, endpoint_count: usize, diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 5ec512a..0c9c30c 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -103,6 +103,7 @@ pub struct MePool { pub(super) me_keepalive_jitter: Duration, pub(super) me_keepalive_payload_random: bool, pub(super) rpc_proxy_req_every_secs: AtomicU64, + pub(super) writer_cmd_channel_capacity: usize, pub(super) me_warmup_stagger_enabled: bool, pub(super) me_warmup_step_delay: Duration, pub(super) me_warmup_step_jitter: Duration, @@ -181,8 +182,12 @@ pub struct MePool { pub(super) me_route_no_writer_wait: Duration, pub(super) me_route_inline_recovery_attempts: u32, pub(super) me_route_inline_recovery_wait: Duration, + pub(super) me_health_interval_ms_unhealthy: AtomicU64, + pub(super) me_health_interval_ms_healthy: AtomicU64, + pub(super) me_warn_rate_limit_ms: AtomicU64, pub(super) runtime_ready: AtomicBool, pool_size: usize, + pub(super) preferred_endpoints_by_dc: Arc>>>, } #[derive(Debug, Default)] @@ -270,16 +275,25 @@ impl MePool { me_secret_atomic_snapshot: bool, me_deterministic_writer_sort: bool, me_socks_kdf_policy: MeSocksKdfPolicy, + me_writer_cmd_channel_capacity: usize, + me_route_channel_capacity: usize, me_route_backpressure_base_timeout_ms: u64, me_route_backpressure_high_timeout_ms: u64, me_route_backpressure_high_watermark_pct: u8, + me_health_interval_ms_unhealthy: u64, + me_health_interval_ms_healthy: u64, + me_warn_rate_limit_ms: u64, me_route_no_writer_mode: MeRouteNoWriterMode, me_route_no_writer_wait_ms: u64, me_route_inline_recovery_attempts: u32, me_route_inline_recovery_wait_ms: u64, ) -> Arc { let endpoint_dc_map = Self::build_endpoint_dc_map_from_maps(&proxy_map_v4, &proxy_map_v6); - let registry = Arc::new(ConnRegistry::new()); + let preferred_endpoints_by_dc = + Self::build_preferred_endpoints_by_dc(&decision, &proxy_map_v4, &proxy_map_v6); + let registry = Arc::new(ConnRegistry::with_route_channel_capacity( + me_route_channel_capacity, + )); registry.update_route_backpressure_policy( me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms, @@ -326,6 +340,7 @@ impl MePool { me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs), me_keepalive_payload_random, rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs), + writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1), me_warmup_stagger_enabled, me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms), me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms), @@ -440,7 +455,11 @@ impl MePool { me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms), me_route_inline_recovery_attempts, me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms), + me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)), + me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)), + me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)), runtime_ready: AtomicBool::new(false), + preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)), }) } @@ -489,6 +508,9 @@ impl MePool { adaptive_floor_max_warm_writers_per_core: u16, adaptive_floor_max_active_writers_global: u32, adaptive_floor_max_warm_writers_global: u32, + me_health_interval_ms_unhealthy: u64, + me_health_interval_ms_healthy: u64, + me_warn_rate_limit_ms: u64, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs @@ -564,6 +586,12 @@ impl MePool { .store(adaptive_floor_max_active_writers_global, Ordering::Relaxed); self.me_adaptive_floor_max_warm_writers_global .store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed); + self.me_health_interval_ms_unhealthy + .store(me_health_interval_ms_unhealthy.max(1), Ordering::Relaxed); + self.me_health_interval_ms_healthy + .store(me_health_interval_ms_healthy.max(1), Ordering::Relaxed); + self.me_warn_rate_limit_ms + .store(me_warn_rate_limit_ms.max(1), Ordering::Relaxed); if previous_floor_mode != floor_mode { self.stats.increment_me_floor_mode_switch_total(); match (previous_floor_mode, floor_mode) { @@ -1042,6 +1070,62 @@ impl MePool { } } + fn build_preferred_endpoints_by_dc( + decision: &NetworkDecision, + map_v4: &HashMap>, + map_v6: &HashMap>, + ) -> HashMap> { + let mut out = HashMap::>::new(); + let mut dcs = HashSet::::new(); + dcs.extend(map_v4.keys().copied()); + dcs.extend(map_v6.keys().copied()); + + for dc in dcs { + let v4 = map_v4 + .get(&dc) + .map(|items| { + items + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect::>() + }) + .unwrap_or_default(); + let v6 = map_v6 + .get(&dc) + .map(|items| { + items + .iter() + .map(|(ip, port)| SocketAddr::new(*ip, *port)) + .collect::>() + }) + .unwrap_or_default(); + + let mut selected = if decision.effective_multipath { + let mut both = Vec::::with_capacity(v4.len().saturating_add(v6.len())); + if decision.prefer_ipv6() { + both.extend(v6.iter().copied()); + both.extend(v4.iter().copied()); + } else { + both.extend(v4.iter().copied()); + both.extend(v6.iter().copied()); + } + both + } else if decision.prefer_ipv6() { + if !v6.is_empty() { v6 } else { v4 } + } else if !v4.is_empty() { + v4 + } else { + v6 + }; + + selected.sort_unstable(); + selected.dedup(); + out.insert(dc, selected); + } + + out + } + fn build_endpoint_dc_map_from_maps( map_v4: &HashMap>, map_v6: &HashMap>, @@ -1064,6 +1148,25 @@ impl MePool { let map_v4 = self.proxy_map_v4.read().await.clone(); let map_v6 = self.proxy_map_v6.read().await.clone(); let rebuilt = Self::build_endpoint_dc_map_from_maps(&map_v4, &map_v6); + let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6); *self.endpoint_dc_map.write().await = rebuilt; + *self.preferred_endpoints_by_dc.write().await = preferred; + } + + pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec { + let guard = self.preferred_endpoints_by_dc.read().await; + guard.get(&dc).cloned().unwrap_or_default() + } + + pub(super) fn health_interval_unhealthy(&self) -> Duration { + Duration::from_millis(self.me_health_interval_ms_unhealthy.load(Ordering::Relaxed).max(1)) + } + + pub(super) fn health_interval_healthy(&self) -> Duration { + Duration::from_millis(self.me_health_interval_ms_healthy.load(Ordering::Relaxed).max(1)) + } + + pub(super) fn warn_rate_limit_duration(&self) -> Duration { + Duration::from_millis(self.me_warn_rate_limit_ms.load(Ordering::Relaxed).max(1)) } } diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index 90f8d0a..036572a 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -132,7 +132,7 @@ impl MePool { let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0)); let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0)); let allow_drain_fallback = Arc::new(AtomicBool::new(false)); - let (tx, mut rx) = mpsc::channel::(4096); + let (tx, mut rx) = mpsc::channel::(self.writer_cmd_channel_capacity); let mut rpc_writer = RpcWriter { writer: hs.wr, key: hs.write_key, diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index 0ee81e0..f2682d5 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -9,7 +9,6 @@ use tokio::sync::mpsc::error::TrySendError; use super::codec::WriterCommand; use super::MeResponse; -const ROUTE_CHANNEL_CAPACITY: usize = 4096; const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25; const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120; const ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT: u8 = 80; @@ -78,6 +77,7 @@ impl RegistryInner { pub struct ConnRegistry { inner: RwLock, next_id: AtomicU64, + route_channel_capacity: usize, route_backpressure_base_timeout_ms: AtomicU64, route_backpressure_high_timeout_ms: AtomicU64, route_backpressure_high_watermark_pct: AtomicU8, @@ -91,11 +91,12 @@ impl ConnRegistry { .as_secs() } - pub fn new() -> Self { + pub fn with_route_channel_capacity(route_channel_capacity: usize) -> Self { let start = rand::random::() | 1; Self { inner: RwLock::new(RegistryInner::new()), next_id: AtomicU64::new(start), + route_channel_capacity: route_channel_capacity.max(1), route_backpressure_base_timeout_ms: AtomicU64::new( ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS, ), @@ -108,6 +109,11 @@ impl ConnRegistry { } } + #[cfg(test)] + pub fn new() -> Self { + Self::with_route_channel_capacity(4096) + } + pub fn update_route_backpressure_policy( &self, base_timeout_ms: u64, @@ -127,7 +133,7 @@ impl ConnRegistry { pub async fn register(&self) -> (u64, mpsc::Receiver) { let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY); + let (tx, rx) = mpsc::channel(self.route_channel_capacity); self.inner.write().await.map.insert(id, tx); (id, rx) } @@ -179,11 +185,11 @@ impl ConnRegistry { .route_backpressure_high_watermark_pct .load(Ordering::Relaxed) .clamp(1, 100); - let used = ROUTE_CHANNEL_CAPACITY.saturating_sub(tx.capacity()); - let used_pct = if ROUTE_CHANNEL_CAPACITY == 0 { + let used = self.route_channel_capacity.saturating_sub(tx.capacity()); + let used_pct = if self.route_channel_capacity == 0 { 100 } else { - (used.saturating_mul(100) / ROUTE_CHANNEL_CAPACITY) as u8 + (used.saturating_mul(100) / self.route_channel_capacity) as u8 }; let high_profile = used_pct >= high_watermark_pct; let timeout_ms = if high_profile { diff --git a/src/transport/middle_proxy/send.rs b/src/transport/middle_proxy/send.rs index 07d39f6..ec199fd 100644 --- a/src/transport/middle_proxy/send.rs +++ b/src/transport/middle_proxy/send.rs @@ -480,31 +480,7 @@ impl MePool { } async fn endpoint_candidates_for_target_dc(&self, routed_dc: i32) -> Vec { - let mut preferred = Vec::::new(); - let mut seen = HashSet::::new(); - - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - let mut family_selected = Vec::::new(); - if let Some(addrs) = map_guard.get(&routed_dc) { - for (ip, port) in addrs { - family_selected.push(SocketAddr::new(*ip, *port)); - } - } - for addr in family_selected { - if seen.insert(addr) { - preferred.push(addr); - } - } - if !preferred.is_empty() && !self.decision.effective_multipath { - break; - } - } - - preferred + self.preferred_endpoints_for_dc(routed_dc).await } async fn maybe_trigger_hybrid_recovery( @@ -591,28 +567,7 @@ impl MePool { routed_dc: i32, include_warm: bool, ) -> Vec { - let mut preferred = HashSet::::new(); - - for family in self.family_order() { - let map_guard = match family { - IpFamily::V4 => self.proxy_map_v4.read().await, - IpFamily::V6 => self.proxy_map_v6.read().await, - }; - let mut family_selected = Vec::::new(); - if let Some(v) = map_guard.get(&routed_dc) { - family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port))); - } - for endpoint in family_selected { - preferred.insert(endpoint); - } - - drop(map_guard); - - if !preferred.is_empty() && !self.decision.effective_multipath { - break; - } - } - + let preferred = self.preferred_endpoints_for_dc(routed_dc).await; if preferred.is_empty() { return Vec::new(); } @@ -622,7 +577,7 @@ impl MePool { if !self.writer_eligible_for_selection(w, include_warm) { continue; } - if w.writer_dc == routed_dc && preferred.contains(&w.addr) { + if w.writer_dc == routed_dc && preferred.iter().any(|endpoint| *endpoint == w.addr) { out.push(idx); } }