diff --git a/src/api/model.rs b/src/api/model.rs index 2f6c58e..88c6ddc 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -269,6 +269,10 @@ pub(super) struct DcStatus { pub(super) available_endpoints: usize, pub(super) available_pct: f64, pub(super) required_writers: usize, + pub(super) floor_min: usize, + pub(super) floor_target: usize, + pub(super) floor_max: usize, + pub(super) floor_capped: bool, pub(super) alive_writers: usize, pub(super) coverage_pct: f64, pub(super) rtt_ms: Option, @@ -308,7 +312,17 @@ pub(super) struct MinimalMeRuntimeData { pub(super) floor_mode: &'static str, pub(super) adaptive_floor_idle_secs: u64, pub(super) adaptive_floor_min_writers_single_endpoint: u8, + pub(super) adaptive_floor_min_writers_multi_endpoint: u8, pub(super) adaptive_floor_recover_grace_secs: u64, + pub(super) adaptive_floor_writers_per_core_total: u16, + pub(super) adaptive_floor_cpu_cores_override: u16, + pub(super) adaptive_floor_max_extra_writers_single_per_core: u16, + pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16, + pub(super) adaptive_floor_cpu_cores_detected: u32, + pub(super) adaptive_floor_cpu_cores_effective: u32, + pub(super) adaptive_floor_global_cap_raw: u64, + pub(super) adaptive_floor_global_cap_effective: u64, + pub(super) adaptive_floor_target_writers_total: u64, pub(super) me_keepalive_enabled: bool, pub(super) me_keepalive_interval_secs: u64, pub(super) me_keepalive_jitter_secs: u64, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 3019636..c69f817 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -349,6 +349,10 @@ async fn get_minimal_payload_cached( available_endpoints: entry.available_endpoints, available_pct: entry.available_pct, required_writers: entry.required_writers, + floor_min: entry.floor_min, + floor_target: entry.floor_target, + floor_max: entry.floor_max, + floor_capped: entry.floor_capped, alive_writers: entry.alive_writers, coverage_pct: entry.coverage_pct, rtt_ms: entry.rtt_ms, @@ -366,7 +370,21 @@ async fn get_minimal_payload_cached( adaptive_floor_idle_secs: runtime.adaptive_floor_idle_secs, adaptive_floor_min_writers_single_endpoint: runtime .adaptive_floor_min_writers_single_endpoint, + adaptive_floor_min_writers_multi_endpoint: runtime + .adaptive_floor_min_writers_multi_endpoint, adaptive_floor_recover_grace_secs: runtime.adaptive_floor_recover_grace_secs, + adaptive_floor_writers_per_core_total: runtime + .adaptive_floor_writers_per_core_total, + adaptive_floor_cpu_cores_override: runtime.adaptive_floor_cpu_cores_override, + adaptive_floor_max_extra_writers_single_per_core: runtime + .adaptive_floor_max_extra_writers_single_per_core, + adaptive_floor_max_extra_writers_multi_per_core: runtime + .adaptive_floor_max_extra_writers_multi_per_core, + adaptive_floor_cpu_cores_detected: runtime.adaptive_floor_cpu_cores_detected, + adaptive_floor_cpu_cores_effective: runtime.adaptive_floor_cpu_cores_effective, + adaptive_floor_global_cap_raw: runtime.adaptive_floor_global_cap_raw, + adaptive_floor_global_cap_effective: runtime.adaptive_floor_global_cap_effective, + adaptive_floor_target_writers_total: runtime.adaptive_floor_target_writers_total, me_keepalive_enabled: runtime.me_keepalive_enabled, me_keepalive_interval_secs: runtime.me_keepalive_interval_secs, me_keepalive_jitter_secs: runtime.me_keepalive_jitter_secs, diff --git a/src/api/runtime_zero.rs b/src/api/runtime_zero.rs index 2c50020..61b6844 100644 --- a/src/api/runtime_zero.rs +++ b/src/api/runtime_zero.rs @@ -60,7 +60,12 @@ pub(super) struct EffectiveMiddleProxyLimits { pub(super) floor_mode: &'static str, pub(super) adaptive_floor_idle_secs: u64, pub(super) adaptive_floor_min_writers_single_endpoint: u8, + pub(super) adaptive_floor_min_writers_multi_endpoint: u8, pub(super) adaptive_floor_recover_grace_secs: u64, + pub(super) adaptive_floor_writers_per_core_total: u16, + pub(super) adaptive_floor_cpu_cores_override: u16, + pub(super) adaptive_floor_max_extra_writers_single_per_core: u16, + pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16, pub(super) reconnect_max_concurrent_per_dc: u32, pub(super) reconnect_backoff_base_ms: u64, pub(super) reconnect_backoff_cap_ms: u64, @@ -183,7 +188,22 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD adaptive_floor_min_writers_single_endpoint: cfg .general .me_adaptive_floor_min_writers_single_endpoint, + adaptive_floor_min_writers_multi_endpoint: cfg + .general + .me_adaptive_floor_min_writers_multi_endpoint, adaptive_floor_recover_grace_secs: cfg.general.me_adaptive_floor_recover_grace_secs, + adaptive_floor_writers_per_core_total: cfg + .general + .me_adaptive_floor_writers_per_core_total, + adaptive_floor_cpu_cores_override: cfg + .general + .me_adaptive_floor_cpu_cores_override, + adaptive_floor_max_extra_writers_single_per_core: cfg + .general + .me_adaptive_floor_max_extra_writers_single_per_core, + adaptive_floor_max_extra_writers_multi_per_core: cfg + .general + .me_adaptive_floor_max_extra_writers_multi_per_core, reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc, reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms, reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 3ba146c..465cef1 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -11,7 +11,12 @@ const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90; const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1; +const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_MULTI_ENDPOINT: u8 = 1; const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180; +const DEFAULT_ME_ADAPTIVE_FLOOR_WRITERS_PER_CORE_TOTAL: u16 = 48; +const DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE: u16 = 0; +const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE: u16 = 1; +const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE: u16 = 2; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; @@ -247,10 +252,30 @@ pub(crate) fn default_me_adaptive_floor_min_writers_single_endpoint() -> u8 { DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT } +pub(crate) fn default_me_adaptive_floor_min_writers_multi_endpoint() -> u8 { + DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_MULTI_ENDPOINT +} + pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 { DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS } +pub(crate) fn default_me_adaptive_floor_writers_per_core_total() -> u16 { + DEFAULT_ME_ADAPTIVE_FLOOR_WRITERS_PER_CORE_TOTAL +} + +pub(crate) fn default_me_adaptive_floor_cpu_cores_override() -> u16 { + DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE +} + +pub(crate) fn default_me_adaptive_floor_max_extra_writers_single_per_core() -> u16 { + DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE +} + +pub(crate) fn default_me_adaptive_floor_max_extra_writers_multi_per_core() -> u16 { + DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index c39cafa..a24f9d5 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -78,7 +78,12 @@ pub struct HotFields { pub me_floor_mode: MeFloorMode, pub me_adaptive_floor_idle_secs: u64, pub me_adaptive_floor_min_writers_single_endpoint: u8, + pub me_adaptive_floor_min_writers_multi_endpoint: u8, pub me_adaptive_floor_recover_grace_secs: u64, + pub me_adaptive_floor_writers_per_core_total: u16, + pub me_adaptive_floor_cpu_cores_override: u16, + pub me_adaptive_floor_max_extra_writers_single_per_core: u16, + pub me_adaptive_floor_max_extra_writers_multi_per_core: u16, pub me_route_backpressure_base_timeout_ms: u64, pub me_route_backpressure_high_timeout_ms: u64, pub me_route_backpressure_high_watermark_pct: u8, @@ -150,9 +155,24 @@ impl HotFields { me_adaptive_floor_min_writers_single_endpoint: cfg .general .me_adaptive_floor_min_writers_single_endpoint, + me_adaptive_floor_min_writers_multi_endpoint: cfg + .general + .me_adaptive_floor_min_writers_multi_endpoint, me_adaptive_floor_recover_grace_secs: cfg .general .me_adaptive_floor_recover_grace_secs, + me_adaptive_floor_writers_per_core_total: cfg + .general + .me_adaptive_floor_writers_per_core_total, + me_adaptive_floor_cpu_cores_override: cfg + .general + .me_adaptive_floor_cpu_cores_override, + me_adaptive_floor_max_extra_writers_single_per_core: cfg + .general + .me_adaptive_floor_max_extra_writers_single_per_core, + me_adaptive_floor_max_extra_writers_multi_per_core: cfg + .general + .me_adaptive_floor_max_extra_writers_multi_per_core, me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms, me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms, me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct, @@ -273,8 +293,18 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.me_adaptive_floor_idle_secs = new.general.me_adaptive_floor_idle_secs; cfg.general.me_adaptive_floor_min_writers_single_endpoint = new.general.me_adaptive_floor_min_writers_single_endpoint; + cfg.general.me_adaptive_floor_min_writers_multi_endpoint = + new.general.me_adaptive_floor_min_writers_multi_endpoint; cfg.general.me_adaptive_floor_recover_grace_secs = new.general.me_adaptive_floor_recover_grace_secs; + cfg.general.me_adaptive_floor_writers_per_core_total = + new.general.me_adaptive_floor_writers_per_core_total; + cfg.general.me_adaptive_floor_cpu_cores_override = + new.general.me_adaptive_floor_cpu_cores_override; + cfg.general.me_adaptive_floor_max_extra_writers_single_per_core = + new.general.me_adaptive_floor_max_extra_writers_single_per_core; + cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core = + new.general.me_adaptive_floor_max_extra_writers_multi_per_core; cfg.general.me_route_backpressure_base_timeout_ms = new.general.me_route_backpressure_base_timeout_ms; cfg.general.me_route_backpressure_high_timeout_ms = @@ -697,15 +727,30 @@ fn log_changes( || old_hot.me_adaptive_floor_idle_secs != new_hot.me_adaptive_floor_idle_secs || old_hot.me_adaptive_floor_min_writers_single_endpoint != new_hot.me_adaptive_floor_min_writers_single_endpoint + || old_hot.me_adaptive_floor_min_writers_multi_endpoint + != new_hot.me_adaptive_floor_min_writers_multi_endpoint || old_hot.me_adaptive_floor_recover_grace_secs != new_hot.me_adaptive_floor_recover_grace_secs + || old_hot.me_adaptive_floor_writers_per_core_total + != new_hot.me_adaptive_floor_writers_per_core_total + || old_hot.me_adaptive_floor_cpu_cores_override + != new_hot.me_adaptive_floor_cpu_cores_override + || old_hot.me_adaptive_floor_max_extra_writers_single_per_core + != new_hot.me_adaptive_floor_max_extra_writers_single_per_core + || old_hot.me_adaptive_floor_max_extra_writers_multi_per_core + != new_hot.me_adaptive_floor_max_extra_writers_multi_per_core { info!( - "config reload: me_floor: mode={:?} idle={}s min_single={} recover_grace={}s", + "config reload: me_floor: mode={:?} idle={}s min_single={} min_multi={} recover_grace={}s per_core_total={} cores_override={} extra_single_per_core={} extra_multi_per_core={}", new_hot.me_floor_mode, new_hot.me_adaptive_floor_idle_secs, new_hot.me_adaptive_floor_min_writers_single_endpoint, + new_hot.me_adaptive_floor_min_writers_multi_endpoint, new_hot.me_adaptive_floor_recover_grace_secs, + new_hot.me_adaptive_floor_writers_per_core_total, + new_hot.me_adaptive_floor_cpu_cores_override, + new_hot.me_adaptive_floor_max_extra_writers_single_per_core, + new_hot.me_adaptive_floor_max_extra_writers_multi_per_core, ); } diff --git a/src/config/load.rs b/src/config/load.rs index 6ce7b65..e6dc728 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -312,6 +312,21 @@ impl ProxyConfig { )); } + if config.general.me_adaptive_floor_min_writers_multi_endpoint == 0 + || config.general.me_adaptive_floor_min_writers_multi_endpoint > 32 + { + return Err(ProxyError::Config( + "general.me_adaptive_floor_min_writers_multi_endpoint must be within [1, 32]" + .to_string(), + )); + } + + if config.general.me_adaptive_floor_writers_per_core_total == 0 { + return Err(ProxyError::Config( + "general.me_adaptive_floor_writers_per_core_total must be > 0".to_string(), + )); + } + if config.general.me_single_endpoint_outage_backoff_min_ms == 0 { return Err(ProxyError::Config( "general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index 4a33b7c..5a0dbb2 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -520,10 +520,31 @@ pub struct GeneralConfig { #[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")] pub me_adaptive_floor_min_writers_single_endpoint: u8, + /// Minimum writer target for multi-endpoint DC groups in adaptive floor mode. + #[serde(default = "default_me_adaptive_floor_min_writers_multi_endpoint")] + pub me_adaptive_floor_min_writers_multi_endpoint: u8, + /// Grace period in seconds to hold static floor after activity in adaptive mode. #[serde(default = "default_me_adaptive_floor_recover_grace_secs")] pub me_adaptive_floor_recover_grace_secs: u64, + /// Global ME writer budget per logical CPU core in adaptive mode. + #[serde(default = "default_me_adaptive_floor_writers_per_core_total")] + pub me_adaptive_floor_writers_per_core_total: u16, + + /// Override logical CPU core count for adaptive floor calculations. + /// Set to 0 to use runtime auto-detection. + #[serde(default = "default_me_adaptive_floor_cpu_cores_override")] + pub me_adaptive_floor_cpu_cores_override: u16, + + /// Per-core max extra writers above base required floor for single-endpoint DC groups. + #[serde(default = "default_me_adaptive_floor_max_extra_writers_single_per_core")] + pub me_adaptive_floor_max_extra_writers_single_per_core: u16, + + /// Per-core max extra writers above base required floor for multi-endpoint DC groups. + #[serde(default = "default_me_adaptive_floor_max_extra_writers_multi_per_core")] + pub me_adaptive_floor_max_extra_writers_multi_per_core: u16, + /// Connect attempts for the selected upstream before returning error/fallback. #[serde(default = "default_upstream_connect_retry_attempts")] pub upstream_connect_retry_attempts: u32, @@ -775,7 +796,12 @@ impl Default for GeneralConfig { me_floor_mode: MeFloorMode::default(), me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(), me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(), + me_adaptive_floor_min_writers_multi_endpoint: default_me_adaptive_floor_min_writers_multi_endpoint(), me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(), + me_adaptive_floor_writers_per_core_total: default_me_adaptive_floor_writers_per_core_total(), + me_adaptive_floor_cpu_cores_override: default_me_adaptive_floor_cpu_cores_override(), + me_adaptive_floor_max_extra_writers_single_per_core: default_me_adaptive_floor_max_extra_writers_single_per_core(), + me_adaptive_floor_max_extra_writers_multi_per_core: default_me_adaptive_floor_max_extra_writers_multi_per_core(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_connect_budget_ms: default_upstream_connect_budget_ms(), diff --git a/src/main.rs b/src/main.rs index ee5aaad..ca24d0e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -786,7 +786,12 @@ async fn main() -> std::result::Result<(), Box> { config.general.me_floor_mode, config.general.me_adaptive_floor_idle_secs, config.general.me_adaptive_floor_min_writers_single_endpoint, + config.general.me_adaptive_floor_min_writers_multi_endpoint, config.general.me_adaptive_floor_recover_grace_secs, + config.general.me_adaptive_floor_writers_per_core_total, + config.general.me_adaptive_floor_cpu_cores_override, + config.general.me_adaptive_floor_max_extra_writers_single_per_core, + config.general.me_adaptive_floor_max_extra_writers_multi_per_core, config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.effective_me_pool_force_close_secs(), diff --git a/src/metrics.rs b/src/metrics.rs index 0ccec94..633a884 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -968,6 +968,133 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp 0 } ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_cpu_cores_detected Runtime detected logical CPU cores for adaptive floor" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_cpu_cores_detected gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_cpu_cores_detected {}", + if me_allows_normal { + stats.get_me_floor_cpu_cores_detected_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_cpu_cores_effective Runtime effective logical CPU cores for adaptive floor" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_cpu_cores_effective gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_cpu_cores_effective {}", + if me_allows_normal { + stats.get_me_floor_cpu_cores_effective_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_global_cap_raw Runtime raw global adaptive floor cap" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_global_cap_raw gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_global_cap_raw {}", + if me_allows_normal { + stats.get_me_floor_global_cap_raw_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_global_cap_effective Runtime effective global adaptive floor cap" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_global_cap_effective gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_global_cap_effective {}", + if me_allows_normal { + stats.get_me_floor_global_cap_effective_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_adaptive_floor_target_writers_total Runtime adaptive floor target writers total" + ); + let _ = writeln!( + out, + "# TYPE telemt_me_adaptive_floor_target_writers_total gauge" + ); + let _ = writeln!( + out, + "telemt_me_adaptive_floor_target_writers_total {}", + if me_allows_normal { + stats.get_me_floor_target_writers_total_gauge() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_floor_cap_block_total Reconnect attempts blocked by adaptive floor caps" + ); + let _ = writeln!(out, "# TYPE telemt_me_floor_cap_block_total counter"); + let _ = writeln!( + out, + "telemt_me_floor_cap_block_total {}", + if me_allows_normal { + stats.get_me_floor_cap_block_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_floor_swap_idle_total Adaptive floor cap recovery via idle writer swap" + ); + let _ = writeln!(out, "# TYPE telemt_me_floor_swap_idle_total counter"); + let _ = writeln!( + out, + "telemt_me_floor_swap_idle_total {}", + if me_allows_normal { + stats.get_me_floor_swap_idle_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_me_floor_swap_idle_failed_total Failed idle swap attempts under adaptive floor caps" + ); + let _ = writeln!(out, "# TYPE telemt_me_floor_swap_idle_failed_total counter"); + let _ = writeln!( + out, + "telemt_me_floor_swap_idle_failed_total {}", + if me_allows_normal { + stats.get_me_floor_swap_idle_failed_total() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths"); let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter"); diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 4b59367..b51c941 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -75,6 +75,14 @@ pub struct Stats { me_floor_mode_switch_total: AtomicU64, me_floor_mode_switch_static_to_adaptive_total: AtomicU64, me_floor_mode_switch_adaptive_to_static_total: AtomicU64, + me_floor_cpu_cores_detected_gauge: AtomicU64, + me_floor_cpu_cores_effective_gauge: AtomicU64, + me_floor_global_cap_raw_gauge: AtomicU64, + me_floor_global_cap_effective_gauge: AtomicU64, + me_floor_target_writers_total_gauge: AtomicU64, + me_floor_cap_block_total: AtomicU64, + me_floor_swap_idle_total: AtomicU64, + me_floor_swap_idle_failed_total: AtomicU64, me_handshake_error_codes: DashMap, me_route_drop_no_conn: AtomicU64, me_route_drop_channel_closed: AtomicU64, @@ -676,6 +684,52 @@ impl Stats { .fetch_add(1, Ordering::Relaxed); } } + pub fn set_me_floor_cpu_cores_detected_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_cpu_cores_detected_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_cpu_cores_effective_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_cpu_cores_effective_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_global_cap_raw_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_global_cap_raw_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_global_cap_effective_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_global_cap_effective_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn set_me_floor_target_writers_total_gauge(&self, value: u64) { + if self.telemetry_me_allows_normal() { + self.me_floor_target_writers_total_gauge + .store(value, Ordering::Relaxed); + } + } + pub fn increment_me_floor_cap_block_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_floor_cap_block_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_floor_swap_idle_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_floor_swap_idle_total.fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_me_floor_swap_idle_failed_total(&self) { + if self.telemetry_me_allows_normal() { + self.me_floor_swap_idle_failed_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) } pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) } pub fn get_current_connections_direct(&self) -> u64 { @@ -781,6 +835,34 @@ impl Stats { self.me_floor_mode_switch_adaptive_to_static_total .load(Ordering::Relaxed) } + pub fn get_me_floor_cpu_cores_detected_gauge(&self) -> u64 { + self.me_floor_cpu_cores_detected_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_cpu_cores_effective_gauge(&self) -> u64 { + self.me_floor_cpu_cores_effective_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_global_cap_raw_gauge(&self) -> u64 { + self.me_floor_global_cap_raw_gauge.load(Ordering::Relaxed) + } + pub fn get_me_floor_global_cap_effective_gauge(&self) -> u64 { + self.me_floor_global_cap_effective_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_target_writers_total_gauge(&self) -> u64 { + self.me_floor_target_writers_total_gauge + .load(Ordering::Relaxed) + } + pub fn get_me_floor_cap_block_total(&self) -> u64 { + self.me_floor_cap_block_total.load(Ordering::Relaxed) + } + pub fn get_me_floor_swap_idle_total(&self) -> u64 { + self.me_floor_swap_idle_total.load(Ordering::Relaxed) + } + pub fn get_me_floor_swap_idle_failed_total(&self) -> u64 { + self.me_floor_swap_idle_failed_total.load(Ordering::Relaxed) + } pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> { let mut out: Vec<(i32, u64)> = self .me_handshake_error_codes diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 072c1f6..4bc3ff7 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -315,7 +315,12 @@ async fn run_update_cycle( cfg.general.me_floor_mode, cfg.general.me_adaptive_floor_idle_secs, cfg.general.me_adaptive_floor_min_writers_single_endpoint, + cfg.general.me_adaptive_floor_min_writers_multi_endpoint, cfg.general.me_adaptive_floor_recover_grace_secs, + cfg.general.me_adaptive_floor_writers_per_core_total, + cfg.general.me_adaptive_floor_cpu_cores_override, + cfg.general.me_adaptive_floor_max_extra_writers_single_per_core, + cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -527,7 +532,12 @@ pub async fn me_config_updater( cfg.general.me_floor_mode, cfg.general.me_adaptive_floor_idle_secs, cfg.general.me_adaptive_floor_min_writers_single_endpoint, + cfg.general.me_adaptive_floor_min_writers_multi_endpoint, cfg.general.me_adaptive_floor_recover_grace_secs, + cfg.general.me_adaptive_floor_writers_per_core_total, + cfg.general.me_adaptive_floor_cpu_cores_override, + cfg.general.me_adaptive_floor_max_extra_writers_single_per_core, + cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs { diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 9a54e32..a594a01 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -23,6 +23,25 @@ const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5; const IDLE_REFRESH_RETRY_SECS: u64 = 8; const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5; +#[derive(Debug, Clone)] +struct DcFloorPlanEntry { + dc: i32, + endpoints: Vec, + alive: usize, + min_required: usize, + target_required: usize, + max_required: usize, + has_bound_clients: bool, + floor_capped: bool, +} + +#[derive(Debug, Clone)] +struct FamilyFloorPlan { + by_dc: HashMap, + global_cap_effective_total: usize, + target_writers_total: usize, +} + pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_connections: usize) { let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new(); let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new(); @@ -129,22 +148,33 @@ async fn check_family( .push(writer.id); } let writer_idle_since = pool.registry.writer_idle_since_snapshot().await; + let floor_plan = build_family_floor_plan( + pool, + family, + &dc_endpoints, + &live_addr_counts, + &live_writer_ids_by_addr, + adaptive_idle_since, + adaptive_recover_until, + ) + .await; + pool.set_adaptive_floor_runtime_caps( + floor_plan.global_cap_effective_total, + floor_plan.target_writers_total, + ); for (dc, endpoints) in dc_endpoints { if endpoints.is_empty() { continue; } let key = (dc, family); - let reduce_for_idle = should_reduce_floor_for_idle( - pool, - key, - &endpoints, - &live_writer_ids_by_addr, - adaptive_idle_since, - adaptive_recover_until, - ) - .await; - let required = pool.required_writers_for_dc_with_floor_mode(endpoints.len(), reduce_for_idle); + let required = floor_plan + .by_dc + .get(&dc) + .map(|entry| entry.target_required) + .unwrap_or_else(|| { + pool.required_writers_for_dc_with_floor_mode(endpoints.len(), false) + }); let alive = endpoints .iter() .map(|addr| *live_addr_counts.get(addr).unwrap_or(&0)) @@ -251,6 +281,36 @@ async fn check_family( let mut restored = 0usize; for _ in 0..missing { + if pool.floor_mode() == MeFloorMode::Adaptive + && pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total + { + let swapped = maybe_swap_idle_writer_for_cap( + pool, + rng, + dc, + family, + &endpoints, + &live_writer_ids_by_addr, + &writer_idle_since, + ) + .await; + if swapped { + pool.stats.increment_me_floor_swap_idle_total(); + restored += 1; + continue; + } + pool.stats.increment_me_floor_cap_block_total(); + pool.stats.increment_me_floor_swap_idle_failed_total(); + debug!( + dc = %dc, + ?family, + alive, + required, + global_cap_effective_total = floor_plan.global_cap_effective_total, + "Adaptive floor cap reached, reconnect attempt blocked" + ); + break; + } let res = tokio::time::timeout( pool.me_one_timeout, pool.connect_endpoints_round_robin(&endpoints, rng.as_ref()), @@ -323,6 +383,280 @@ async fn check_family( } } +fn adaptive_floor_class_min( + pool: &Arc, + endpoint_count: usize, + base_required: usize, +) -> usize { + if endpoint_count <= 1 { + let min_single = (pool + .me_adaptive_floor_min_writers_single_endpoint + .load(std::sync::atomic::Ordering::Relaxed) as usize) + .max(1); + min_single.min(base_required.max(1)) + } else { + pool.adaptive_floor_min_writers_multi_endpoint() + .min(base_required.max(1)) + } +} + +fn adaptive_floor_class_max( + pool: &Arc, + endpoint_count: usize, + base_required: usize, + cpu_cores: usize, +) -> usize { + let extra_per_core = if endpoint_count <= 1 { + pool.adaptive_floor_max_extra_single_per_core() + } else { + pool.adaptive_floor_max_extra_multi_per_core() + }; + base_required.saturating_add(cpu_cores.saturating_mul(extra_per_core)) +} + +fn list_writer_ids_for_endpoints( + endpoints: &[SocketAddr], + live_writer_ids_by_addr: &HashMap>, +) -> Vec { + let mut out = Vec::::new(); + for endpoint in endpoints { + if let Some(ids) = live_writer_ids_by_addr.get(endpoint) { + out.extend(ids.iter().copied()); + } + } + out +} + +async fn build_family_floor_plan( + pool: &Arc, + family: IpFamily, + dc_endpoints: &HashMap>, + live_addr_counts: &HashMap, + live_writer_ids_by_addr: &HashMap>, + adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, + adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, +) -> FamilyFloorPlan { + let mut entries = Vec::::new(); + let mut by_dc = HashMap::::new(); + let mut family_active_total = 0usize; + + let floor_mode = pool.floor_mode(); + let is_adaptive = floor_mode == MeFloorMode::Adaptive; + let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1); + + for (dc, endpoints) in dc_endpoints { + if endpoints.is_empty() { + continue; + } + let key = (*dc, family); + let reduce_for_idle = should_reduce_floor_for_idle( + pool, + key, + endpoints, + live_writer_ids_by_addr, + adaptive_idle_since, + adaptive_recover_until, + ) + .await; + let base_required = pool.required_writers_for_dc(endpoints.len()).max(1); + let min_required = if is_adaptive { + adaptive_floor_class_min(pool, endpoints.len(), base_required) + } else { + base_required + }; + let mut max_required = if is_adaptive { + adaptive_floor_class_max(pool, endpoints.len(), base_required, cpu_cores) + } else { + base_required + }; + if max_required < min_required { + max_required = min_required; + } + let desired_raw = if is_adaptive && reduce_for_idle { + min_required + } else { + base_required + }; + let target_required = desired_raw.clamp(min_required, max_required); + let alive = endpoints + .iter() + .map(|endpoint| live_addr_counts.get(endpoint).copied().unwrap_or(0)) + .sum::(); + family_active_total = family_active_total.saturating_add(alive); + let writer_ids = list_writer_ids_for_endpoints(endpoints, live_writer_ids_by_addr); + let has_bound_clients = has_bound_clients_on_endpoint(pool, &writer_ids).await; + + entries.push(DcFloorPlanEntry { + dc: *dc, + endpoints: endpoints.clone(), + alive, + min_required, + target_required, + max_required, + has_bound_clients, + floor_capped: false, + }); + } + + if entries.is_empty() { + return FamilyFloorPlan { + by_dc, + global_cap_effective_total: 0, + target_writers_total: 0, + }; + } + + if !is_adaptive { + let target_total = entries + .iter() + .map(|entry| entry.target_required) + .sum::(); + let active_total = pool.active_writer_count_total().await; + for entry in entries { + by_dc.insert(entry.dc, entry); + } + return FamilyFloorPlan { + by_dc, + global_cap_effective_total: active_total.max(target_total), + target_writers_total: target_total, + }; + } + + let global_cap_raw = pool.adaptive_floor_global_cap_raw(); + let total_active = pool.active_writer_count_total().await; + let other_active = total_active.saturating_sub(family_active_total); + let min_sum = entries + .iter() + .map(|entry| entry.min_required) + .sum::(); + let mut target_sum = entries + .iter() + .map(|entry| entry.target_required) + .sum::(); + let family_cap = global_cap_raw + .saturating_sub(other_active) + .max(min_sum); + if target_sum > family_cap { + entries.sort_by_key(|entry| { + ( + entry.has_bound_clients, + std::cmp::Reverse(entry.target_required.saturating_sub(entry.min_required)), + std::cmp::Reverse(entry.alive), + entry.dc.abs(), + entry.dc, + entry.endpoints.len(), + entry.max_required, + ) + }); + let mut changed = true; + while target_sum > family_cap && changed { + changed = false; + for entry in &mut entries { + if target_sum <= family_cap { + break; + } + if entry.target_required > entry.min_required { + entry.target_required -= 1; + entry.floor_capped = true; + target_sum -= 1; + changed = true; + } + } + } + } + + for entry in entries { + by_dc.insert(entry.dc, entry); + } + let global_cap_effective_total = global_cap_raw.max(other_active.saturating_add(min_sum)); + let target_writers_total = other_active.saturating_add(target_sum); + FamilyFloorPlan { + by_dc, + global_cap_effective_total, + target_writers_total, + } +} + +async fn maybe_swap_idle_writer_for_cap( + pool: &Arc, + rng: &Arc, + dc: i32, + family: IpFamily, + endpoints: &[SocketAddr], + live_writer_ids_by_addr: &HashMap>, + writer_idle_since: &HashMap, +) -> bool { + let now_epoch_secs = MePool::now_epoch_secs(); + let mut candidate: Option<(u64, SocketAddr, u64)> = None; + for endpoint in endpoints { + let Some(writer_ids) = live_writer_ids_by_addr.get(endpoint) else { + continue; + }; + for writer_id in writer_ids { + if !pool.registry.is_writer_empty(*writer_id).await { + continue; + } + let Some(idle_since_epoch_secs) = writer_idle_since.get(writer_id).copied() else { + continue; + }; + let idle_age_secs = now_epoch_secs.saturating_sub(idle_since_epoch_secs); + if candidate + .as_ref() + .map(|(_, _, age)| idle_age_secs > *age) + .unwrap_or(true) + { + candidate = Some((*writer_id, *endpoint, idle_age_secs)); + } + } + } + + let Some((old_writer_id, endpoint, idle_age_secs)) = candidate else { + return false; + }; + + let connected = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await { + Ok(Ok(())) => true, + Ok(Err(error)) => { + debug!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + %error, + "Adaptive floor cap swap connect failed" + ); + false + } + Err(_) => { + debug!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + "Adaptive floor cap swap connect timed out" + ); + false + } + }; + if !connected { + return false; + } + + pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false) + .await; + info!( + dc = %dc, + ?family, + %endpoint, + old_writer_id, + idle_age_secs, + "Adaptive floor cap swap: idle writer rotated" + ); + true +} + async fn maybe_refresh_idle_writer_for_dc( pool: &Arc, rng: &Arc, @@ -438,19 +772,15 @@ async fn should_reduce_floor_for_idle( adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>, adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>, ) -> bool { - if endpoints.len() != 1 || pool.floor_mode() != MeFloorMode::Adaptive { + if pool.floor_mode() != MeFloorMode::Adaptive { adaptive_idle_since.remove(&key); adaptive_recover_until.remove(&key); return false; } let now = Instant::now(); - let endpoint = endpoints[0]; - let writer_ids = live_writer_ids_by_addr - .get(&endpoint) - .map(Vec::as_slice) - .unwrap_or(&[]); - let has_bound_clients = has_bound_clients_on_endpoint(pool, writer_ids).await; + let writer_ids = list_writer_ids_for_endpoints(endpoints, live_writer_ids_by_addr); + let has_bound_clients = has_bound_clients_on_endpoint(pool, &writer_ids).await; if has_bound_clients { adaptive_idle_since.remove(&key); adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration()); diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index b0ae394..1145823 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -111,7 +111,17 @@ pub struct MePool { pub(super) me_floor_mode: AtomicU8, pub(super) me_adaptive_floor_idle_secs: AtomicU64, pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8, + pub(super) me_adaptive_floor_min_writers_multi_endpoint: AtomicU8, pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64, + pub(super) me_adaptive_floor_writers_per_core_total: AtomicU32, + pub(super) me_adaptive_floor_cpu_cores_override: AtomicU32, + pub(super) me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32, + pub(super) me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32, + pub(super) me_adaptive_floor_cpu_cores_detected: AtomicU32, + pub(super) me_adaptive_floor_cpu_cores_effective: AtomicU32, + pub(super) me_adaptive_floor_global_cap_raw: AtomicU64, + pub(super) me_adaptive_floor_global_cap_effective: AtomicU64, + pub(super) me_adaptive_floor_target_writers_total: AtomicU64, pub(super) proxy_map_v4: Arc>>>, pub(super) proxy_map_v6: Arc>>>, pub(super) default_dc: AtomicI32, @@ -217,7 +227,12 @@ impl MePool { me_floor_mode: MeFloorMode, me_adaptive_floor_idle_secs: u64, me_adaptive_floor_min_writers_single_endpoint: u8, + me_adaptive_floor_min_writers_multi_endpoint: u8, me_adaptive_floor_recover_grace_secs: u64, + me_adaptive_floor_writers_per_core_total: u16, + me_adaptive_floor_cpu_cores_override: u16, + me_adaptive_floor_max_extra_writers_single_per_core: u16, + me_adaptive_floor_max_extra_writers_multi_per_core: u16, hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_force_close_secs: u64, @@ -314,9 +329,29 @@ impl MePool { me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new( me_adaptive_floor_min_writers_single_endpoint, ), + me_adaptive_floor_min_writers_multi_endpoint: AtomicU8::new( + me_adaptive_floor_min_writers_multi_endpoint, + ), me_adaptive_floor_recover_grace_secs: AtomicU64::new( me_adaptive_floor_recover_grace_secs, ), + me_adaptive_floor_writers_per_core_total: AtomicU32::new( + me_adaptive_floor_writers_per_core_total as u32, + ), + me_adaptive_floor_cpu_cores_override: AtomicU32::new( + me_adaptive_floor_cpu_cores_override as u32, + ), + me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32::new( + me_adaptive_floor_max_extra_writers_single_per_core as u32, + ), + me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new( + me_adaptive_floor_max_extra_writers_multi_per_core as u32, + ), + me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1), + me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1), + me_adaptive_floor_global_cap_raw: AtomicU64::new(0), + me_adaptive_floor_global_cap_effective: AtomicU64::new(0), + me_adaptive_floor_target_writers_total: AtomicU64::new(0), pool_size: 2, proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)), proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)), @@ -399,7 +434,12 @@ impl MePool { floor_mode: MeFloorMode, adaptive_floor_idle_secs: u64, adaptive_floor_min_writers_single_endpoint: u8, + adaptive_floor_min_writers_multi_endpoint: u8, adaptive_floor_recover_grace_secs: u64, + adaptive_floor_writers_per_core_total: u16, + adaptive_floor_cpu_cores_override: u16, + adaptive_floor_max_extra_writers_single_per_core: u16, + adaptive_floor_max_extra_writers_multi_per_core: u16, ) { self.hardswap.store(hardswap, Ordering::Relaxed); self.me_pool_drain_ttl_secs @@ -443,8 +483,24 @@ impl MePool { .store(adaptive_floor_idle_secs, Ordering::Relaxed); self.me_adaptive_floor_min_writers_single_endpoint .store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed); + self.me_adaptive_floor_min_writers_multi_endpoint + .store(adaptive_floor_min_writers_multi_endpoint, Ordering::Relaxed); self.me_adaptive_floor_recover_grace_secs .store(adaptive_floor_recover_grace_secs, Ordering::Relaxed); + self.me_adaptive_floor_writers_per_core_total + .store(adaptive_floor_writers_per_core_total as u32, Ordering::Relaxed); + self.me_adaptive_floor_cpu_cores_override + .store(adaptive_floor_cpu_cores_override as u32, Ordering::Relaxed); + self.me_adaptive_floor_max_extra_writers_single_per_core + .store( + adaptive_floor_max_extra_writers_single_per_core as u32, + Ordering::Relaxed, + ); + self.me_adaptive_floor_max_extra_writers_multi_per_core + .store( + adaptive_floor_max_extra_writers_multi_per_core as u32, + Ordering::Relaxed, + ); if previous_floor_mode != floor_mode { self.stats.increment_me_floor_mode_switch_total(); match (previous_floor_mode, floor_mode) { @@ -515,6 +571,13 @@ impl MePool { self.proxy_secret.read().await.key_selector } + pub(super) async fn active_writer_count_total(&self) -> usize { + let ws = self.writers.read().await; + ws.iter() + .filter(|w| !w.draining.load(Ordering::Relaxed)) + .count() + } + pub(super) async fn secret_snapshot(&self) -> SecretSnapshot { self.proxy_secret.read().await.clone() } @@ -551,6 +614,82 @@ impl MePool { ) } + pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize { + (self + .me_adaptive_floor_min_writers_multi_endpoint + .load(Ordering::Relaxed) as usize) + .max(1) + } + + pub(super) fn adaptive_floor_writers_per_core_total(&self) -> usize { + (self + .me_adaptive_floor_writers_per_core_total + .load(Ordering::Relaxed) as usize) + .max(1) + } + + pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize { + self.me_adaptive_floor_max_extra_writers_single_per_core + .load(Ordering::Relaxed) as usize + } + + pub(super) fn adaptive_floor_max_extra_multi_per_core(&self) -> usize { + self.me_adaptive_floor_max_extra_writers_multi_per_core + .load(Ordering::Relaxed) as usize + } + + pub(super) fn adaptive_floor_detected_cpu_cores(&self) -> usize { + std::thread::available_parallelism() + .map(|value| value.get()) + .unwrap_or(1) + .max(1) + } + + pub(super) fn adaptive_floor_effective_cpu_cores(&self) -> usize { + let detected = self.adaptive_floor_detected_cpu_cores(); + let override_cores = self + .me_adaptive_floor_cpu_cores_override + .load(Ordering::Relaxed) as usize; + let effective = if override_cores == 0 { + detected + } else { + override_cores.max(1) + }; + self.me_adaptive_floor_cpu_cores_detected + .store(detected as u32, Ordering::Relaxed); + self.me_adaptive_floor_cpu_cores_effective + .store(effective as u32, Ordering::Relaxed); + self.stats + .set_me_floor_cpu_cores_detected_gauge(detected as u64); + self.stats + .set_me_floor_cpu_cores_effective_gauge(effective as u64); + effective + } + + pub(super) fn adaptive_floor_global_cap_raw(&self) -> usize { + let cores = self.adaptive_floor_effective_cpu_cores(); + let cap = cores.saturating_mul(self.adaptive_floor_writers_per_core_total()); + self.me_adaptive_floor_global_cap_raw + .store(cap as u64, Ordering::Relaxed); + self.stats.set_me_floor_global_cap_raw_gauge(cap as u64); + cap + } + + pub(super) fn set_adaptive_floor_runtime_caps( + &self, + global_cap_effective: usize, + target_writers_total: usize, + ) { + self.me_adaptive_floor_global_cap_effective + .store(global_cap_effective as u64, Ordering::Relaxed); + self.me_adaptive_floor_target_writers_total + .store(target_writers_total as u64, Ordering::Relaxed); + self.stats + .set_me_floor_global_cap_effective_gauge(global_cap_effective as u64); + self.stats + .set_me_floor_target_writers_total_gauge(target_writers_total as u64); + } + pub(super) fn required_writers_for_dc_with_floor_mode( &self, endpoint_count: usize, @@ -560,13 +699,20 @@ impl MePool { if !reduce_for_idle { return base_required; } - if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive { + if self.floor_mode() != MeFloorMode::Adaptive { return base_required; } - let min_writers = (self - .me_adaptive_floor_min_writers_single_endpoint - .load(Ordering::Relaxed) as usize) - .max(1); + let min_writers = if endpoint_count == 1 { + (self + .me_adaptive_floor_min_writers_single_endpoint + .load(Ordering::Relaxed) as usize) + .max(1) + } else { + (self + .me_adaptive_floor_min_writers_multi_endpoint + .load(Ordering::Relaxed) as usize) + .max(1) + }; base_required.min(min_writers) } diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index d9898b1..46346b5 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -28,6 +28,10 @@ pub(crate) struct MeApiDcStatusSnapshot { pub available_endpoints: usize, pub available_pct: f64, pub required_writers: usize, + pub floor_min: usize, + pub floor_target: usize, + pub floor_max: usize, + pub floor_capped: bool, pub alive_writers: usize, pub coverage_pct: f64, pub rtt_ms: Option, @@ -72,7 +76,17 @@ pub(crate) struct MeApiRuntimeSnapshot { pub floor_mode: &'static str, pub adaptive_floor_idle_secs: u64, pub adaptive_floor_min_writers_single_endpoint: u8, + pub adaptive_floor_min_writers_multi_endpoint: u8, pub adaptive_floor_recover_grace_secs: u64, + pub adaptive_floor_writers_per_core_total: u16, + pub adaptive_floor_cpu_cores_override: u16, + pub adaptive_floor_max_extra_writers_single_per_core: u16, + pub adaptive_floor_max_extra_writers_multi_per_core: u16, + pub adaptive_floor_cpu_cores_detected: u32, + pub adaptive_floor_cpu_cores_effective: u32, + pub adaptive_floor_global_cap_raw: u64, + pub adaptive_floor_global_cap_effective: u64, + pub adaptive_floor_target_writers_total: u64, pub me_keepalive_enabled: bool, pub me_keepalive_interval_secs: u64, pub me_keepalive_jitter_secs: u64, @@ -275,14 +289,43 @@ impl MePool { let mut dcs = Vec::::with_capacity(endpoints_by_dc.len()); let mut available_endpoints = 0usize; let mut alive_writers = 0usize; + let floor_mode = self.floor_mode(); + let adaptive_cpu_cores = (self + .me_adaptive_floor_cpu_cores_effective + .load(Ordering::Relaxed) as usize) + .max(1); for (dc, endpoints) in endpoints_by_dc { let endpoint_count = endpoints.len(); let dc_available_endpoints = endpoints .iter() .filter(|endpoint| live_writers_by_endpoint.contains_key(endpoint)) .count(); + let base_required = self.required_writers_for_dc(endpoint_count); let dc_required_writers = self.required_writers_for_dc_with_floor_mode(endpoint_count, false); + let floor_min = if endpoint_count <= 1 { + (self + .me_adaptive_floor_min_writers_single_endpoint + .load(Ordering::Relaxed) as usize) + .max(1) + .min(base_required.max(1)) + } else { + (self + .me_adaptive_floor_min_writers_multi_endpoint + .load(Ordering::Relaxed) as usize) + .max(1) + .min(base_required.max(1)) + }; + let extra_per_core = if endpoint_count <= 1 { + self.me_adaptive_floor_max_extra_writers_single_per_core + .load(Ordering::Relaxed) as usize + } else { + self.me_adaptive_floor_max_extra_writers_multi_per_core + .load(Ordering::Relaxed) as usize + }; + let floor_max = base_required.saturating_add(adaptive_cpu_cores.saturating_mul(extra_per_core)); + let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive) + && dc_required_writers < base_required; let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0); let dc_load = activity .active_sessions_by_target_dc @@ -302,6 +345,10 @@ impl MePool { available_endpoints: dc_available_endpoints, available_pct: ratio_pct(dc_available_endpoints, endpoint_count), required_writers: dc_required_writers, + floor_min, + floor_target: dc_required_writers, + floor_max, + floor_capped, alive_writers: dc_alive_writers, coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers), rtt_ms: dc_rtt_ms, @@ -378,9 +425,39 @@ impl MePool { adaptive_floor_min_writers_single_endpoint: self .me_adaptive_floor_min_writers_single_endpoint .load(Ordering::Relaxed), + adaptive_floor_min_writers_multi_endpoint: self + .me_adaptive_floor_min_writers_multi_endpoint + .load(Ordering::Relaxed), adaptive_floor_recover_grace_secs: self .me_adaptive_floor_recover_grace_secs .load(Ordering::Relaxed), + adaptive_floor_writers_per_core_total: self + .me_adaptive_floor_writers_per_core_total + .load(Ordering::Relaxed) as u16, + adaptive_floor_cpu_cores_override: self + .me_adaptive_floor_cpu_cores_override + .load(Ordering::Relaxed) as u16, + adaptive_floor_max_extra_writers_single_per_core: self + .me_adaptive_floor_max_extra_writers_single_per_core + .load(Ordering::Relaxed) as u16, + adaptive_floor_max_extra_writers_multi_per_core: self + .me_adaptive_floor_max_extra_writers_multi_per_core + .load(Ordering::Relaxed) as u16, + adaptive_floor_cpu_cores_detected: self + .me_adaptive_floor_cpu_cores_detected + .load(Ordering::Relaxed), + adaptive_floor_cpu_cores_effective: self + .me_adaptive_floor_cpu_cores_effective + .load(Ordering::Relaxed), + adaptive_floor_global_cap_raw: self + .me_adaptive_floor_global_cap_raw + .load(Ordering::Relaxed), + adaptive_floor_global_cap_effective: self + .me_adaptive_floor_global_cap_effective + .load(Ordering::Relaxed), + adaptive_floor_target_writers_total: self + .me_adaptive_floor_target_writers_total + .load(Ordering::Relaxed), me_keepalive_enabled: self.me_keepalive_enabled, me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(), me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(),