ME Adaptive Floor Drafts

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey 2026-03-03 03:37:00 +03:00
parent 4a0d88ad43
commit 225fc3e4ea
No known key found for this signature in database
4 changed files with 103 additions and 0 deletions

View File

@ -9,6 +9,9 @@ const DEFAULT_MIDDLE_PROXY_WARM_STANDBY: usize = 16;
const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8; const DEFAULT_ME_RECONNECT_MAX_CONCURRENT_PER_DC: u32 = 8;
const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16; const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2; const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2;
const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90;
const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1;
const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 3;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 4;
const DEFAULT_LISTEN_ADDR_IPV6: &str = "::"; const DEFAULT_LISTEN_ADDR_IPV6: &str = "::";
@ -185,6 +188,18 @@ pub(crate) fn default_me_single_endpoint_shadow_rotate_every_secs() -> u64 {
900 900
} }
pub(crate) fn default_me_adaptive_floor_idle_secs() -> u64 {
DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS
}
pub(crate) fn default_me_adaptive_floor_min_writers_single_endpoint() -> u8 {
DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT
}
pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 {
DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS
}
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 { pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
} }

View File

@ -261,6 +261,15 @@ impl ProxyConfig {
)); ));
} }
if config.general.me_adaptive_floor_min_writers_single_endpoint == 0
|| config.general.me_adaptive_floor_min_writers_single_endpoint > 32
{
return Err(ProxyError::Config(
"general.me_adaptive_floor_min_writers_single_endpoint must be within [1, 32]"
.to_string(),
));
}
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 { if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
return Err(ProxyError::Config( return Err(ProxyError::Config(
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(), "general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
@ -642,6 +651,19 @@ mod tests {
cfg.general.me_single_endpoint_shadow_rotate_every_secs, cfg.general.me_single_endpoint_shadow_rotate_every_secs,
default_me_single_endpoint_shadow_rotate_every_secs() default_me_single_endpoint_shadow_rotate_every_secs()
); );
assert_eq!(cfg.general.me_floor_mode, MeFloorMode::default());
assert_eq!(
cfg.general.me_adaptive_floor_idle_secs,
default_me_adaptive_floor_idle_secs()
);
assert_eq!(
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
default_me_adaptive_floor_min_writers_single_endpoint()
);
assert_eq!(
cfg.general.me_adaptive_floor_recover_grace_secs,
default_me_adaptive_floor_recover_grace_secs()
);
assert_eq!( assert_eq!(
cfg.general.upstream_connect_retry_attempts, cfg.general.upstream_connect_retry_attempts,
default_upstream_connect_retry_attempts() default_upstream_connect_retry_attempts()
@ -704,6 +726,19 @@ mod tests {
general.me_single_endpoint_shadow_rotate_every_secs, general.me_single_endpoint_shadow_rotate_every_secs,
default_me_single_endpoint_shadow_rotate_every_secs() default_me_single_endpoint_shadow_rotate_every_secs()
); );
assert_eq!(general.me_floor_mode, MeFloorMode::default());
assert_eq!(
general.me_adaptive_floor_idle_secs,
default_me_adaptive_floor_idle_secs()
);
assert_eq!(
general.me_adaptive_floor_min_writers_single_endpoint,
default_me_adaptive_floor_min_writers_single_endpoint()
);
assert_eq!(
general.me_adaptive_floor_recover_grace_secs,
default_me_adaptive_floor_recover_grace_secs()
);
assert_eq!( assert_eq!(
general.upstream_connect_retry_attempts, general.upstream_connect_retry_attempts,
default_upstream_connect_retry_attempts() default_upstream_connect_retry_attempts()

View File

@ -158,6 +158,31 @@ impl MeBindStaleMode {
} }
} }
/// Middle-End writer floor policy mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum MeFloorMode {
#[default]
Static,
Adaptive,
}
impl MeFloorMode {
pub fn as_u8(self) -> u8 {
match self {
MeFloorMode::Static => 0,
MeFloorMode::Adaptive => 1,
}
}
pub fn from_u8(raw: u8) -> Self {
match raw {
1 => MeFloorMode::Adaptive,
_ => MeFloorMode::Static,
}
}
}
/// Telemetry controls for hot-path counters and ME diagnostics. /// Telemetry controls for hot-path counters and ME diagnostics.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TelemetryConfig { pub struct TelemetryConfig {
@ -419,6 +444,22 @@ pub struct GeneralConfig {
#[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")] #[serde(default = "default_me_single_endpoint_shadow_rotate_every_secs")]
pub me_single_endpoint_shadow_rotate_every_secs: u64, pub me_single_endpoint_shadow_rotate_every_secs: u64,
/// Floor policy mode for ME writer targets.
#[serde(default)]
pub me_floor_mode: MeFloorMode,
/// Idle time in seconds before adaptive floor can reduce single-endpoint writer target.
#[serde(default = "default_me_adaptive_floor_idle_secs")]
pub me_adaptive_floor_idle_secs: u64,
/// Minimum writer target for single-endpoint DC groups in adaptive floor mode.
#[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")]
pub me_adaptive_floor_min_writers_single_endpoint: u8,
/// Grace period in seconds to hold static floor after activity in adaptive mode.
#[serde(default = "default_me_adaptive_floor_recover_grace_secs")]
pub me_adaptive_floor_recover_grace_secs: u64,
/// Connect attempts for the selected upstream before returning error/fallback. /// Connect attempts for the selected upstream before returning error/fallback.
#[serde(default = "default_upstream_connect_retry_attempts")] #[serde(default = "default_upstream_connect_retry_attempts")]
pub upstream_connect_retry_attempts: u32, pub upstream_connect_retry_attempts: u32,
@ -634,6 +675,10 @@ impl Default for GeneralConfig {
me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(), me_single_endpoint_outage_backoff_min_ms: default_me_single_endpoint_outage_backoff_min_ms(),
me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(), me_single_endpoint_outage_backoff_max_ms: default_me_single_endpoint_outage_backoff_max_ms(),
me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(), me_single_endpoint_shadow_rotate_every_secs: default_me_single_endpoint_shadow_rotate_every_secs(),
me_floor_mode: MeFloorMode::default(),
me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(),
me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(),
me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(),
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(), upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(), upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(), upstream_unhealthy_fail_threshold: default_upstream_unhealthy_fail_threshold(),

View File

@ -282,6 +282,10 @@ async fn run_update_cycle(
cfg.general.me_single_endpoint_outage_backoff_min_ms, cfg.general.me_single_endpoint_outage_backoff_min_ms,
cfg.general.me_single_endpoint_outage_backoff_max_ms, cfg.general.me_single_endpoint_outage_backoff_max_ms,
cfg.general.me_single_endpoint_shadow_rotate_every_secs, cfg.general.me_single_endpoint_shadow_rotate_every_secs,
cfg.general.me_floor_mode,
cfg.general.me_adaptive_floor_idle_secs,
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
cfg.general.me_adaptive_floor_recover_grace_secs,
); );
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1); let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
@ -490,6 +494,10 @@ pub async fn me_config_updater(
cfg.general.me_single_endpoint_outage_backoff_min_ms, cfg.general.me_single_endpoint_outage_backoff_min_ms,
cfg.general.me_single_endpoint_outage_backoff_max_ms, cfg.general.me_single_endpoint_outage_backoff_max_ms,
cfg.general.me_single_endpoint_shadow_rotate_every_secs, cfg.general.me_single_endpoint_shadow_rotate_every_secs,
cfg.general.me_floor_mode,
cfg.general.me_adaptive_floor_idle_secs,
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
cfg.general.me_adaptive_floor_recover_grace_secs,
); );
let new_secs = cfg.general.effective_update_every_secs().max(1); let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs { if new_secs == update_every_secs {