From 225fc3e4ea7e8fa835dad818c392468deb435735 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Tue, 3 Mar 2026 03:37:00 +0300 Subject: [PATCH] ME Adaptive Floor Drafts Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com> --- src/config/defaults.rs | 15 +++++++ src/config/load.rs | 35 +++++++++++++++ src/config/types.rs | 45 ++++++++++++++++++++ src/transport/middle_proxy/config_updater.rs | 8 ++++ 4 files changed, 103 insertions(+) diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 544e328..d92ae78 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -9,6 +9,9 @@ const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16; const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8; const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; +const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90; +const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1; +const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; @@ -185,6 +188,18 @@ pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 { 900 } +pub(crate) fn default_me_adaptive_floor_idle_secs() -> u64 { + DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS +} + +pub(crate) fn default_me_adaptive_floor_min_writers_single_endpoint() -> u8 { + DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT +} + +pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 { + DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS +} + pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS } diff --git a/src/config/load.rs b/src/config/load.rs index a027d58..27249c6 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -261,6 +261,15 @@ impl ProxyConfig { )); } + if config.general.me_adaptive_floor_min_writers_single_endpoint == 0 + || config.general.me_adaptive_floor_min_writers_single_endpoint > 32 + { + return Err(ProxyError::Config( + "general.me_adaptive_floor_min_writers_single_endpoint must be within [1, 32]" + .to_string(), + )); + } + if config.general.me_single_endpoint_outage_backoff_min_ms == 0 { return Err(ProxyError::Config( "general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(), @@ -642,6 +651,19 @@ mod tests { cfg.general.me_single_endpoint_shadow_rotate_every_secs, default_me_single_endpoint_shadow_rotate_every_secs() ); + assert_eq!(cfg.general.me_floor_mode, MeFloorMode::default()); + assert_eq!( + cfg.general.me_adaptive_floor_idle_secs, + default_me_adaptive_floor_idle_secs() + ); + assert_eq!( + cfg.general.me_adaptive_floor_min_writers_single_endpoint, + default_me_adaptive_floor_min_writers_single_endpoint() + ); + assert_eq!( + cfg.general.me_adaptive_floor_recover_grace_secs, + default_me_adaptive_floor_recover_grace_secs() + ); assert_eq!( cfg.general.upstream_connect_retry_attempts, default_upstream_connect_retry_attempts() @@ -704,6 +726,19 @@ mod tests { general.me_single_endpoint_shadow_rotate_every_secs, default_me_single_endpoint_shadow_rotate_every_secs() ); + assert_eq!(general.me_floor_mode, MeFloorMode::default()); + assert_eq!( + general.me_adaptive_floor_idle_secs, + default_me_adaptive_floor_idle_secs() + ); + assert_eq!( + general.me_adaptive_floor_min_writers_single_endpoint, + default_me_adaptive_floor_min_writers_single_endpoint() + ); + assert_eq!( + general.me_adaptive_floor_recover_grace_secs, + default_me_adaptive_floor_recover_grace_secs() + ); assert_eq!( general.upstream_connect_retry_attempts, default_upstream_connect_retry_attempts() diff --git a/src/config/types.rs b/src/config/types.rs index 324abf2..0d255ba 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -158,6 +158,31 @@ impl MeBindStaleMode { } } +/// Middle-End writer floor policy mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum MeFloorMode { + #[default] + Static, + Adaptive, +} + +impl MeFloorMode { + pub fn as_u8(self) -> u8 { + match self { + MeFloorMode::Static => 0, + MeFloorMode::Adaptive => 1, + } + } + + pub fn from_u8(raw: u8) -> Self { + match raw { + 1 => MeFloorMode::Adaptive, + _ => MeFloorMode::Static, + } + } +} + /// Telemetry controls for hot-path counters and ME diagnostics. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TelemetryConfig { @@ -419,6 +444,22 @@ pub struct GeneralConfig { #[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")] pub me_single_endpoint_shadow_rotate_every_secs: u64, + /// Floor policy mode for ME writer targets. + #[serde(default)] + pub me_floor_mode: MeFloorMode, + + /// Idle time in seconds before adaptive floor can reduce single-endpoint writer target. + #[serde(default = "default_me_adaptive_floor_idle_secs")] + pub me_adaptive_floor_idle_secs: u64, + + /// Minimum writer target for single-endpoint DC groups in adaptive floor mode. + #[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")] + pub me_adaptive_floor_min_writers_single_endpoint: u8, + + /// Grace period in seconds to hold static floor after activity in adaptive mode. + #[serde(default = "default_me_adaptive_floor_recover_grace_secs")] + pub me_adaptive_floor_recover_grace_secs: u64, + /// Connect attempts for the selected upstream before returning error/fallback. #[serde(default = "default_upstream_connect_retry_attempts")] pub upstream_connect_retry_attempts: u32, @@ -634,6 +675,10 @@ impl Default for GeneralConfig { me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(), me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(), me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(), + me_floor_mode: MeFloorMode::default(), + me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(), + me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(), + me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 6da14fa..a9c50ab 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -282,6 +282,10 @@ async fn run_update_cycle( cfg.general.me_single_endpoint_outage_backoff_min_ms, cfg.general.me_single_endpoint_outage_backoff_max_ms, cfg.general.me_single_endpoint_shadow_rotate_every_secs, + cfg.general.me_floor_mode, + cfg.general.me_adaptive_floor_idle_secs, + cfg.general.me_adaptive_floor_min_writers_single_endpoint, + cfg.general.me_adaptive_floor_recover_grace_secs, ); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); @@ -490,6 +494,10 @@ pub async fn me_config_updater( cfg.general.me_single_endpoint_outage_backoff_min_ms, cfg.general.me_single_endpoint_outage_backoff_max_ms, cfg.general.me_single_endpoint_shadow_rotate_every_secs, + cfg.general.me_floor_mode, + cfg.general.me_adaptive_floor_idle_secs, + cfg.general.me_adaptive_floor_min_writers_single_endpoint, + cfg.general.me_adaptive_floor_recover_grace_secs, ); let new_secs = cfg.general.effective_update_every_secs().max(1); if new_secs == update_every_secs {