mirror of https://github.com/telemt/telemt.git
ME Adaptive Floor Upper-Limit
This commit is contained in:
parent
5ac0ef1ffd
commit
5f77f83b48
|
|
@ -318,11 +318,21 @@ pub(super) struct MinimalMeRuntimeData {
|
||||||
pub(super) adaptive_floor_cpu_cores_override: u16,
|
pub(super) adaptive_floor_cpu_cores_override: u16,
|
||||||
pub(super) adaptive_floor_max_extra_writers_single_per_core: u16,
|
pub(super) adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16,
|
pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_active_writers_global: u32,
|
||||||
|
pub(super) adaptive_floor_max_warm_writers_global: u32,
|
||||||
pub(super) adaptive_floor_cpu_cores_detected: u32,
|
pub(super) adaptive_floor_cpu_cores_detected: u32,
|
||||||
pub(super) adaptive_floor_cpu_cores_effective: u32,
|
pub(super) adaptive_floor_cpu_cores_effective: u32,
|
||||||
pub(super) adaptive_floor_global_cap_raw: u64,
|
pub(super) adaptive_floor_global_cap_raw: u64,
|
||||||
pub(super) adaptive_floor_global_cap_effective: u64,
|
pub(super) adaptive_floor_global_cap_effective: u64,
|
||||||
pub(super) adaptive_floor_target_writers_total: u64,
|
pub(super) adaptive_floor_target_writers_total: u64,
|
||||||
|
pub(super) adaptive_floor_active_cap_configured: u64,
|
||||||
|
pub(super) adaptive_floor_active_cap_effective: u64,
|
||||||
|
pub(super) adaptive_floor_warm_cap_configured: u64,
|
||||||
|
pub(super) adaptive_floor_warm_cap_effective: u64,
|
||||||
|
pub(super) adaptive_floor_active_writers_current: u64,
|
||||||
|
pub(super) adaptive_floor_warm_writers_current: u64,
|
||||||
pub(super) me_keepalive_enabled: bool,
|
pub(super) me_keepalive_enabled: bool,
|
||||||
pub(super) me_keepalive_interval_secs: u64,
|
pub(super) me_keepalive_interval_secs: u64,
|
||||||
pub(super) me_keepalive_jitter_secs: u64,
|
pub(super) me_keepalive_jitter_secs: u64,
|
||||||
|
|
|
||||||
|
|
@ -380,11 +380,25 @@ async fn get_minimal_payload_cached(
|
||||||
.adaptive_floor_max_extra_writers_single_per_core,
|
.adaptive_floor_max_extra_writers_single_per_core,
|
||||||
adaptive_floor_max_extra_writers_multi_per_core: runtime
|
adaptive_floor_max_extra_writers_multi_per_core: runtime
|
||||||
.adaptive_floor_max_extra_writers_multi_per_core,
|
.adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
adaptive_floor_max_active_writers_per_core: runtime
|
||||||
|
.adaptive_floor_max_active_writers_per_core,
|
||||||
|
adaptive_floor_max_warm_writers_per_core: runtime
|
||||||
|
.adaptive_floor_max_warm_writers_per_core,
|
||||||
|
adaptive_floor_max_active_writers_global: runtime
|
||||||
|
.adaptive_floor_max_active_writers_global,
|
||||||
|
adaptive_floor_max_warm_writers_global: runtime
|
||||||
|
.adaptive_floor_max_warm_writers_global,
|
||||||
adaptive_floor_cpu_cores_detected: runtime.adaptive_floor_cpu_cores_detected,
|
adaptive_floor_cpu_cores_detected: runtime.adaptive_floor_cpu_cores_detected,
|
||||||
adaptive_floor_cpu_cores_effective: runtime.adaptive_floor_cpu_cores_effective,
|
adaptive_floor_cpu_cores_effective: runtime.adaptive_floor_cpu_cores_effective,
|
||||||
adaptive_floor_global_cap_raw: runtime.adaptive_floor_global_cap_raw,
|
adaptive_floor_global_cap_raw: runtime.adaptive_floor_global_cap_raw,
|
||||||
adaptive_floor_global_cap_effective: runtime.adaptive_floor_global_cap_effective,
|
adaptive_floor_global_cap_effective: runtime.adaptive_floor_global_cap_effective,
|
||||||
adaptive_floor_target_writers_total: runtime.adaptive_floor_target_writers_total,
|
adaptive_floor_target_writers_total: runtime.adaptive_floor_target_writers_total,
|
||||||
|
adaptive_floor_active_cap_configured: runtime.adaptive_floor_active_cap_configured,
|
||||||
|
adaptive_floor_active_cap_effective: runtime.adaptive_floor_active_cap_effective,
|
||||||
|
adaptive_floor_warm_cap_configured: runtime.adaptive_floor_warm_cap_configured,
|
||||||
|
adaptive_floor_warm_cap_effective: runtime.adaptive_floor_warm_cap_effective,
|
||||||
|
adaptive_floor_active_writers_current: runtime.adaptive_floor_active_writers_current,
|
||||||
|
adaptive_floor_warm_writers_current: runtime.adaptive_floor_warm_writers_current,
|
||||||
me_keepalive_enabled: runtime.me_keepalive_enabled,
|
me_keepalive_enabled: runtime.me_keepalive_enabled,
|
||||||
me_keepalive_interval_secs: runtime.me_keepalive_interval_secs,
|
me_keepalive_interval_secs: runtime.me_keepalive_interval_secs,
|
||||||
me_keepalive_jitter_secs: runtime.me_keepalive_jitter_secs,
|
me_keepalive_jitter_secs: runtime.me_keepalive_jitter_secs,
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,10 @@ pub(super) struct EffectiveMiddleProxyLimits {
|
||||||
pub(super) adaptive_floor_cpu_cores_override: u16,
|
pub(super) adaptive_floor_cpu_cores_override: u16,
|
||||||
pub(super) adaptive_floor_max_extra_writers_single_per_core: u16,
|
pub(super) adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16,
|
pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_active_writers_global: u32,
|
||||||
|
pub(super) adaptive_floor_max_warm_writers_global: u32,
|
||||||
pub(super) reconnect_max_concurrent_per_dc: u32,
|
pub(super) reconnect_max_concurrent_per_dc: u32,
|
||||||
pub(super) reconnect_backoff_base_ms: u64,
|
pub(super) reconnect_backoff_base_ms: u64,
|
||||||
pub(super) reconnect_backoff_cap_ms: u64,
|
pub(super) reconnect_backoff_cap_ms: u64,
|
||||||
|
|
@ -217,6 +221,18 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
|
||||||
adaptive_floor_max_extra_writers_multi_per_core: cfg
|
adaptive_floor_max_extra_writers_multi_per_core: cfg
|
||||||
.general
|
.general
|
||||||
.me_adaptive_floor_max_extra_writers_multi_per_core,
|
.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
adaptive_floor_max_active_writers_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_active_writers_per_core,
|
||||||
|
adaptive_floor_max_warm_writers_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_warm_writers_per_core,
|
||||||
|
adaptive_floor_max_active_writers_global: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_active_writers_global,
|
||||||
|
adaptive_floor_max_warm_writers_global: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_warm_writers_global,
|
||||||
reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc,
|
reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc,
|
||||||
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
|
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
|
||||||
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
|
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,10 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_WRITERS_PER_CORE_TOTAL: u16 = 48;
|
||||||
const DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE: u16 = 0;
|
const DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE: u16 = 0;
|
||||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE: u16 = 1;
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE: u16 = 1;
|
||||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE: u16 = 2;
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE: u16 = 2;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
|
||||||
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
||||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
||||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
||||||
|
|
@ -276,6 +280,22 @@ pub(crate) fn default_me_adaptive_floor_max_extra_writers_multi_per_core() -> u1
|
||||||
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_max_active_writers_per_core() -> u16 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_max_warm_writers_per_core() -> u16 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_max_active_writers_global() -> u32 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_max_warm_writers_global() -> u32 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
||||||
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -84,6 +84,10 @@ pub struct HotFields {
|
||||||
pub me_adaptive_floor_cpu_cores_override: u16,
|
pub me_adaptive_floor_cpu_cores_override: u16,
|
||||||
pub me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
pub me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
pub me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
pub me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
pub me_adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
pub me_adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
pub me_adaptive_floor_max_active_writers_global: u32,
|
||||||
|
pub me_adaptive_floor_max_warm_writers_global: u32,
|
||||||
pub me_route_backpressure_base_timeout_ms: u64,
|
pub me_route_backpressure_base_timeout_ms: u64,
|
||||||
pub me_route_backpressure_high_timeout_ms: u64,
|
pub me_route_backpressure_high_timeout_ms: u64,
|
||||||
pub me_route_backpressure_high_watermark_pct: u8,
|
pub me_route_backpressure_high_watermark_pct: u8,
|
||||||
|
|
@ -173,6 +177,18 @@ impl HotFields {
|
||||||
me_adaptive_floor_max_extra_writers_multi_per_core: cfg
|
me_adaptive_floor_max_extra_writers_multi_per_core: cfg
|
||||||
.general
|
.general
|
||||||
.me_adaptive_floor_max_extra_writers_multi_per_core,
|
.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
me_adaptive_floor_max_active_writers_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_active_writers_per_core,
|
||||||
|
me_adaptive_floor_max_warm_writers_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_warm_writers_per_core,
|
||||||
|
me_adaptive_floor_max_active_writers_global: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_active_writers_global,
|
||||||
|
me_adaptive_floor_max_warm_writers_global: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_warm_writers_global,
|
||||||
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
||||||
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
||||||
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
||||||
|
|
@ -305,6 +321,14 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
||||||
new.general.me_adaptive_floor_max_extra_writers_single_per_core;
|
new.general.me_adaptive_floor_max_extra_writers_single_per_core;
|
||||||
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core =
|
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core =
|
||||||
new.general.me_adaptive_floor_max_extra_writers_multi_per_core;
|
new.general.me_adaptive_floor_max_extra_writers_multi_per_core;
|
||||||
|
cfg.general.me_adaptive_floor_max_active_writers_per_core =
|
||||||
|
new.general.me_adaptive_floor_max_active_writers_per_core;
|
||||||
|
cfg.general.me_adaptive_floor_max_warm_writers_per_core =
|
||||||
|
new.general.me_adaptive_floor_max_warm_writers_per_core;
|
||||||
|
cfg.general.me_adaptive_floor_max_active_writers_global =
|
||||||
|
new.general.me_adaptive_floor_max_active_writers_global;
|
||||||
|
cfg.general.me_adaptive_floor_max_warm_writers_global =
|
||||||
|
new.general.me_adaptive_floor_max_warm_writers_global;
|
||||||
cfg.general.me_route_backpressure_base_timeout_ms =
|
cfg.general.me_route_backpressure_base_timeout_ms =
|
||||||
new.general.me_route_backpressure_base_timeout_ms;
|
new.general.me_route_backpressure_base_timeout_ms;
|
||||||
cfg.general.me_route_backpressure_high_timeout_ms =
|
cfg.general.me_route_backpressure_high_timeout_ms =
|
||||||
|
|
@ -739,9 +763,17 @@ fn log_changes(
|
||||||
!= new_hot.me_adaptive_floor_max_extra_writers_single_per_core
|
!= new_hot.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|| old_hot.me_adaptive_floor_max_extra_writers_multi_per_core
|
|| old_hot.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
!= new_hot.me_adaptive_floor_max_extra_writers_multi_per_core
|
!= new_hot.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
|
|| old_hot.me_adaptive_floor_max_active_writers_per_core
|
||||||
|
!= new_hot.me_adaptive_floor_max_active_writers_per_core
|
||||||
|
|| old_hot.me_adaptive_floor_max_warm_writers_per_core
|
||||||
|
!= new_hot.me_adaptive_floor_max_warm_writers_per_core
|
||||||
|
|| old_hot.me_adaptive_floor_max_active_writers_global
|
||||||
|
!= new_hot.me_adaptive_floor_max_active_writers_global
|
||||||
|
|| old_hot.me_adaptive_floor_max_warm_writers_global
|
||||||
|
!= new_hot.me_adaptive_floor_max_warm_writers_global
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"config reload: me_floor: mode={:?} idle={}s min_single={} min_multi={} recover_grace={}s per_core_total={} cores_override={} extra_single_per_core={} extra_multi_per_core={}",
|
"config reload: me_floor: mode={:?} idle={}s min_single={} min_multi={} recover_grace={}s per_core_total={} cores_override={} extra_single_per_core={} extra_multi_per_core={} max_active_per_core={} max_warm_per_core={} max_active_global={} max_warm_global={}",
|
||||||
new_hot.me_floor_mode,
|
new_hot.me_floor_mode,
|
||||||
new_hot.me_adaptive_floor_idle_secs,
|
new_hot.me_adaptive_floor_idle_secs,
|
||||||
new_hot.me_adaptive_floor_min_writers_single_endpoint,
|
new_hot.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
|
@ -751,6 +783,10 @@ fn log_changes(
|
||||||
new_hot.me_adaptive_floor_cpu_cores_override,
|
new_hot.me_adaptive_floor_cpu_cores_override,
|
||||||
new_hot.me_adaptive_floor_max_extra_writers_single_per_core,
|
new_hot.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
new_hot.me_adaptive_floor_max_extra_writers_multi_per_core,
|
new_hot.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
new_hot.me_adaptive_floor_max_active_writers_per_core,
|
||||||
|
new_hot.me_adaptive_floor_max_warm_writers_per_core,
|
||||||
|
new_hot.me_adaptive_floor_max_active_writers_global,
|
||||||
|
new_hot.me_adaptive_floor_max_warm_writers_global,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -327,6 +327,30 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.me_adaptive_floor_max_active_writers_per_core == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_adaptive_floor_max_active_writers_per_core must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_adaptive_floor_max_warm_writers_per_core == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_adaptive_floor_max_warm_writers_per_core must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_adaptive_floor_max_active_writers_global == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_adaptive_floor_max_active_writers_global must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_adaptive_floor_max_warm_writers_global == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_adaptive_floor_max_warm_writers_global must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
|
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
|
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
|
||||||
|
|
@ -1238,6 +1262,46 @@ mod tests {
|
||||||
let _ = std::fs::remove_file(path);
|
let _ = std::fs::remove_file(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_adaptive_floor_max_active_writers_per_core_zero_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_adaptive_floor_max_active_writers_per_core = 0
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_adaptive_floor_max_active_per_core_zero_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_adaptive_floor_max_active_writers_per_core must be > 0"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn me_adaptive_floor_max_warm_writers_global_zero_is_rejected() {
|
||||||
|
let toml = r#"
|
||||||
|
[general]
|
||||||
|
me_adaptive_floor_max_warm_writers_global = 0
|
||||||
|
|
||||||
|
[censorship]
|
||||||
|
tls_domain = "example.com"
|
||||||
|
|
||||||
|
[access.users]
|
||||||
|
user = "00000000000000000000000000000000"
|
||||||
|
"#;
|
||||||
|
let dir = std::env::temp_dir();
|
||||||
|
let path = dir.join("telemt_me_adaptive_floor_max_warm_global_zero_test.toml");
|
||||||
|
std::fs::write(&path, toml).unwrap();
|
||||||
|
let err = ProxyConfig::load(&path).unwrap_err().to_string();
|
||||||
|
assert!(err.contains("general.me_adaptive_floor_max_warm_writers_global must be > 0"));
|
||||||
|
let _ = std::fs::remove_file(path);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn upstream_connect_retry_attempts_zero_is_rejected() {
|
fn upstream_connect_retry_attempts_zero_is_rejected() {
|
||||||
let toml = r#"
|
let toml = r#"
|
||||||
|
|
|
||||||
|
|
@ -545,6 +545,22 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me_adaptive_floor_max_extra_writers_multi_per_core")]
|
#[serde(default = "default_me_adaptive_floor_max_extra_writers_multi_per_core")]
|
||||||
pub me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
pub me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
|
||||||
|
/// Hard cap for active ME writers per logical CPU core.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_max_active_writers_per_core")]
|
||||||
|
pub me_adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
|
||||||
|
/// Hard cap for warm ME writers per logical CPU core.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_max_warm_writers_per_core")]
|
||||||
|
pub me_adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
|
||||||
|
/// Hard global cap for active ME writers.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_max_active_writers_global")]
|
||||||
|
pub me_adaptive_floor_max_active_writers_global: u32,
|
||||||
|
|
||||||
|
/// Hard global cap for warm ME writers.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_max_warm_writers_global")]
|
||||||
|
pub me_adaptive_floor_max_warm_writers_global: u32,
|
||||||
|
|
||||||
/// Connect attempts for the selected upstream before returning error/fallback.
|
/// Connect attempts for the selected upstream before returning error/fallback.
|
||||||
#[serde(default = "default_upstream_connect_retry_attempts")]
|
#[serde(default = "default_upstream_connect_retry_attempts")]
|
||||||
pub upstream_connect_retry_attempts: u32,
|
pub upstream_connect_retry_attempts: u32,
|
||||||
|
|
@ -802,6 +818,10 @@ impl Default for GeneralConfig {
|
||||||
me_adaptive_floor_cpu_cores_override: default_me_adaptive_floor_cpu_cores_override(),
|
me_adaptive_floor_cpu_cores_override: default_me_adaptive_floor_cpu_cores_override(),
|
||||||
me_adaptive_floor_max_extra_writers_single_per_core: default_me_adaptive_floor_max_extra_writers_single_per_core(),
|
me_adaptive_floor_max_extra_writers_single_per_core: default_me_adaptive_floor_max_extra_writers_single_per_core(),
|
||||||
me_adaptive_floor_max_extra_writers_multi_per_core: default_me_adaptive_floor_max_extra_writers_multi_per_core(),
|
me_adaptive_floor_max_extra_writers_multi_per_core: default_me_adaptive_floor_max_extra_writers_multi_per_core(),
|
||||||
|
me_adaptive_floor_max_active_writers_per_core: default_me_adaptive_floor_max_active_writers_per_core(),
|
||||||
|
me_adaptive_floor_max_warm_writers_per_core: default_me_adaptive_floor_max_warm_writers_per_core(),
|
||||||
|
me_adaptive_floor_max_active_writers_global: default_me_adaptive_floor_max_active_writers_global(),
|
||||||
|
me_adaptive_floor_max_warm_writers_global: default_me_adaptive_floor_max_warm_writers_global(),
|
||||||
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
||||||
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
||||||
upstream_connect_budget_ms: default_upstream_connect_budget_ms(),
|
upstream_connect_budget_ms: default_upstream_connect_budget_ms(),
|
||||||
|
|
|
||||||
|
|
@ -1031,6 +1031,10 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
config.general.me_adaptive_floor_cpu_cores_override,
|
config.general.me_adaptive_floor_cpu_cores_override,
|
||||||
config.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
config.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
config.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
config.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
config.general.me_adaptive_floor_max_active_writers_per_core,
|
||||||
|
config.general.me_adaptive_floor_max_warm_writers_per_core,
|
||||||
|
config.general.me_adaptive_floor_max_active_writers_global,
|
||||||
|
config.general.me_adaptive_floor_max_warm_writers_global,
|
||||||
config.general.hardswap,
|
config.general.hardswap,
|
||||||
config.general.me_pool_drain_ttl_secs,
|
config.general.me_pool_drain_ttl_secs,
|
||||||
config.general.effective_me_pool_force_close_secs(),
|
config.general.effective_me_pool_force_close_secs(),
|
||||||
|
|
|
||||||
|
|
@ -1053,6 +1053,102 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_active_cap_configured Runtime configured active writer cap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_active_cap_configured gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_active_cap_configured {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_active_cap_configured_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_active_cap_effective Runtime effective active writer cap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_active_cap_effective gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_active_cap_effective {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_active_cap_effective_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_warm_cap_configured Runtime configured warm writer cap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_warm_cap_configured gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_warm_cap_configured {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_warm_cap_configured_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_warm_cap_effective Runtime effective warm writer cap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_warm_cap_effective gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_warm_cap_effective {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_warm_cap_effective_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writers_active_current Current non-draining active ME writers"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writers_active_current gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writers_active_current {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_writers_active_current_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_writers_warm_current Current non-draining warm ME writers"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_writers_warm_current gauge");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_writers_warm_current {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_writers_warm_current_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
let _ = writeln!(
|
let _ = writeln!(
|
||||||
out,
|
out,
|
||||||
"# HELP telemt_me_floor_cap_block_total Reconnect attempts blocked by adaptive floor caps"
|
"# HELP telemt_me_floor_cap_block_total Reconnect attempts blocked by adaptive floor caps"
|
||||||
|
|
|
||||||
|
|
@ -80,6 +80,12 @@ pub struct Stats {
|
||||||
me_floor_global_cap_raw_gauge: AtomicU64,
|
me_floor_global_cap_raw_gauge: AtomicU64,
|
||||||
me_floor_global_cap_effective_gauge: AtomicU64,
|
me_floor_global_cap_effective_gauge: AtomicU64,
|
||||||
me_floor_target_writers_total_gauge: AtomicU64,
|
me_floor_target_writers_total_gauge: AtomicU64,
|
||||||
|
me_floor_active_cap_configured_gauge: AtomicU64,
|
||||||
|
me_floor_active_cap_effective_gauge: AtomicU64,
|
||||||
|
me_floor_warm_cap_configured_gauge: AtomicU64,
|
||||||
|
me_floor_warm_cap_effective_gauge: AtomicU64,
|
||||||
|
me_writers_active_current_gauge: AtomicU64,
|
||||||
|
me_writers_warm_current_gauge: AtomicU64,
|
||||||
me_floor_cap_block_total: AtomicU64,
|
me_floor_cap_block_total: AtomicU64,
|
||||||
me_floor_swap_idle_total: AtomicU64,
|
me_floor_swap_idle_total: AtomicU64,
|
||||||
me_floor_swap_idle_failed_total: AtomicU64,
|
me_floor_swap_idle_failed_total: AtomicU64,
|
||||||
|
|
@ -764,6 +770,42 @@ impl Stats {
|
||||||
.store(value, Ordering::Relaxed);
|
.store(value, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn set_me_floor_active_cap_configured_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_active_cap_configured_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_active_cap_effective_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_active_cap_effective_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_warm_cap_configured_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_warm_cap_configured_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_warm_cap_effective_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_warm_cap_effective_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_writers_active_current_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writers_active_current_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_writers_warm_current_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_writers_warm_current_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn increment_me_floor_cap_block_total(&self) {
|
pub fn increment_me_floor_cap_block_total(&self) {
|
||||||
if self.telemetry_me_allows_normal() {
|
if self.telemetry_me_allows_normal() {
|
||||||
self.me_floor_cap_block_total.fetch_add(1, Ordering::Relaxed);
|
self.me_floor_cap_block_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
@ -904,6 +946,30 @@ impl Stats {
|
||||||
self.me_floor_target_writers_total_gauge
|
self.me_floor_target_writers_total_gauge
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_floor_active_cap_configured_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_active_cap_configured_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_active_cap_effective_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_active_cap_effective_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_warm_cap_configured_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_warm_cap_configured_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_warm_cap_effective_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_warm_cap_effective_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writers_active_current_gauge(&self) -> u64 {
|
||||||
|
self.me_writers_active_current_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_writers_warm_current_gauge(&self) -> u64 {
|
||||||
|
self.me_writers_warm_current_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
pub fn get_me_floor_cap_block_total(&self) -> u64 {
|
pub fn get_me_floor_cap_block_total(&self) -> u64 {
|
||||||
self.me_floor_cap_block_total.load(Ordering::Relaxed)
|
self.me_floor_cap_block_total.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -321,6 +321,10 @@ async fn run_update_cycle(
|
||||||
cfg.general.me_adaptive_floor_cpu_cores_override,
|
cfg.general.me_adaptive_floor_cpu_cores_override,
|
||||||
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_active_writers_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_active_writers_global,
|
||||||
|
cfg.general.me_adaptive_floor_max_warm_writers_global,
|
||||||
);
|
);
|
||||||
|
|
||||||
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
||||||
|
|
@ -538,6 +542,10 @@ pub async fn me_config_updater(
|
||||||
cfg.general.me_adaptive_floor_cpu_cores_override,
|
cfg.general.me_adaptive_floor_cpu_cores_override,
|
||||||
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_active_writers_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_active_writers_global,
|
||||||
|
cfg.general.me_adaptive_floor_max_warm_writers_global,
|
||||||
);
|
);
|
||||||
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
||||||
if new_secs == update_every_secs {
|
if new_secs == update_every_secs {
|
||||||
|
|
|
||||||
|
|
@ -42,7 +42,12 @@ struct DcFloorPlanEntry {
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct FamilyFloorPlan {
|
struct FamilyFloorPlan {
|
||||||
by_dc: HashMap<i32, DcFloorPlanEntry>,
|
by_dc: HashMap<i32, DcFloorPlanEntry>,
|
||||||
global_cap_effective_total: usize,
|
active_cap_configured_total: usize,
|
||||||
|
active_cap_effective_total: usize,
|
||||||
|
warm_cap_configured_total: usize,
|
||||||
|
warm_cap_effective_total: usize,
|
||||||
|
active_writers_current: usize,
|
||||||
|
warm_writers_current: usize,
|
||||||
target_writers_total: usize,
|
target_writers_total: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -169,6 +174,14 @@ async fn check_family(
|
||||||
for writer in pool.writers.read().await.iter().filter(|w| {
|
for writer in pool.writers.read().await.iter().filter(|w| {
|
||||||
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
|
!w.draining.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
}) {
|
}) {
|
||||||
|
if !matches!(
|
||||||
|
super::pool::WriterContour::from_u8(
|
||||||
|
writer.contour.load(std::sync::atomic::Ordering::Relaxed),
|
||||||
|
),
|
||||||
|
super::pool::WriterContour::Active
|
||||||
|
) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let key = (writer.writer_dc, writer.addr);
|
let key = (writer.writer_dc, writer.addr);
|
||||||
*live_addr_counts.entry(key).or_insert(0) += 1;
|
*live_addr_counts.entry(key).or_insert(0) += 1;
|
||||||
live_writer_ids_by_addr
|
live_writer_ids_by_addr
|
||||||
|
|
@ -194,8 +207,13 @@ async fn check_family(
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
pool.set_adaptive_floor_runtime_caps(
|
pool.set_adaptive_floor_runtime_caps(
|
||||||
floor_plan.global_cap_effective_total,
|
floor_plan.active_cap_configured_total,
|
||||||
|
floor_plan.active_cap_effective_total,
|
||||||
|
floor_plan.warm_cap_configured_total,
|
||||||
|
floor_plan.warm_cap_effective_total,
|
||||||
floor_plan.target_writers_total,
|
floor_plan.target_writers_total,
|
||||||
|
floor_plan.active_writers_current,
|
||||||
|
floor_plan.warm_writers_current,
|
||||||
);
|
);
|
||||||
|
|
||||||
for (dc, endpoints) in dc_endpoints {
|
for (dc, endpoints) in dc_endpoints {
|
||||||
|
|
@ -344,8 +362,8 @@ async fn check_family(
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
reconnect_budget = reconnect_budget.saturating_sub(1);
|
reconnect_budget = reconnect_budget.saturating_sub(1);
|
||||||
if pool.floor_mode() == MeFloorMode::Adaptive
|
if pool.active_contour_writer_count_total().await
|
||||||
&& pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total
|
>= floor_plan.active_cap_effective_total
|
||||||
{
|
{
|
||||||
let swapped = maybe_swap_idle_writer_for_cap(
|
let swapped = maybe_swap_idle_writer_for_cap(
|
||||||
pool,
|
pool,
|
||||||
|
|
@ -370,7 +388,7 @@ async fn check_family(
|
||||||
?family,
|
?family,
|
||||||
alive,
|
alive,
|
||||||
required,
|
required,
|
||||||
global_cap_effective_total = floor_plan.global_cap_effective_total,
|
active_cap_effective_total = floor_plan.active_cap_effective_total,
|
||||||
"Adaptive floor cap reached, reconnect attempt blocked"
|
"Adaptive floor cap reached, reconnect attempt blocked"
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
|
|
@ -518,6 +536,8 @@ async fn build_family_floor_plan(
|
||||||
let floor_mode = pool.floor_mode();
|
let floor_mode = pool.floor_mode();
|
||||||
let is_adaptive = floor_mode == MeFloorMode::Adaptive;
|
let is_adaptive = floor_mode == MeFloorMode::Adaptive;
|
||||||
let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1);
|
let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1);
|
||||||
|
let (active_writers_current, warm_writers_current, _) =
|
||||||
|
pool.non_draining_writer_counts_by_contour().await;
|
||||||
|
|
||||||
for (dc, endpoints) in dc_endpoints {
|
for (dc, endpoints) in dc_endpoints {
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
|
|
@ -576,9 +596,16 @@ async fn build_family_floor_plan(
|
||||||
}
|
}
|
||||||
|
|
||||||
if entries.is_empty() {
|
if entries.is_empty() {
|
||||||
|
let active_cap_configured_total = pool.adaptive_floor_active_cap_configured_total();
|
||||||
|
let warm_cap_configured_total = pool.adaptive_floor_warm_cap_configured_total();
|
||||||
return FamilyFloorPlan {
|
return FamilyFloorPlan {
|
||||||
by_dc,
|
by_dc,
|
||||||
global_cap_effective_total: 0,
|
active_cap_configured_total,
|
||||||
|
active_cap_effective_total: active_cap_configured_total,
|
||||||
|
warm_cap_configured_total,
|
||||||
|
warm_cap_effective_total: warm_cap_configured_total,
|
||||||
|
active_writers_current,
|
||||||
|
warm_writers_current,
|
||||||
target_writers_total: 0,
|
target_writers_total: 0,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
@ -588,20 +615,26 @@ async fn build_family_floor_plan(
|
||||||
.iter()
|
.iter()
|
||||||
.map(|entry| entry.target_required)
|
.map(|entry| entry.target_required)
|
||||||
.sum::<usize>();
|
.sum::<usize>();
|
||||||
let active_total = pool.active_writer_count_total().await;
|
let active_cap_configured_total = pool.adaptive_floor_active_cap_configured_total();
|
||||||
|
let warm_cap_configured_total = pool.adaptive_floor_warm_cap_configured_total();
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
by_dc.insert(entry.dc, entry);
|
by_dc.insert(entry.dc, entry);
|
||||||
}
|
}
|
||||||
return FamilyFloorPlan {
|
return FamilyFloorPlan {
|
||||||
by_dc,
|
by_dc,
|
||||||
global_cap_effective_total: active_total.max(target_total),
|
active_cap_configured_total,
|
||||||
|
active_cap_effective_total: active_cap_configured_total.max(target_total),
|
||||||
|
warm_cap_configured_total,
|
||||||
|
warm_cap_effective_total: warm_cap_configured_total,
|
||||||
|
active_writers_current,
|
||||||
|
warm_writers_current,
|
||||||
target_writers_total: target_total,
|
target_writers_total: target_total,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let global_cap_raw = pool.adaptive_floor_global_cap_raw();
|
let active_cap_configured_total = pool.adaptive_floor_active_cap_configured_total();
|
||||||
let total_active = pool.active_writer_count_total().await;
|
let warm_cap_configured_total = pool.adaptive_floor_warm_cap_configured_total();
|
||||||
let other_active = total_active.saturating_sub(family_active_total);
|
let other_active = active_writers_current.saturating_sub(family_active_total);
|
||||||
let min_sum = entries
|
let min_sum = entries
|
||||||
.iter()
|
.iter()
|
||||||
.map(|entry| entry.min_required)
|
.map(|entry| entry.min_required)
|
||||||
|
|
@ -610,7 +643,7 @@ async fn build_family_floor_plan(
|
||||||
.iter()
|
.iter()
|
||||||
.map(|entry| entry.target_required)
|
.map(|entry| entry.target_required)
|
||||||
.sum::<usize>();
|
.sum::<usize>();
|
||||||
let family_cap = global_cap_raw
|
let family_cap = active_cap_configured_total
|
||||||
.saturating_sub(other_active)
|
.saturating_sub(other_active)
|
||||||
.max(min_sum);
|
.max(min_sum);
|
||||||
if target_sum > family_cap {
|
if target_sum > family_cap {
|
||||||
|
|
@ -645,11 +678,17 @@ async fn build_family_floor_plan(
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
by_dc.insert(entry.dc, entry);
|
by_dc.insert(entry.dc, entry);
|
||||||
}
|
}
|
||||||
let global_cap_effective_total = global_cap_raw.max(other_active.saturating_add(min_sum));
|
let active_cap_effective_total =
|
||||||
|
active_cap_configured_total.max(other_active.saturating_add(min_sum));
|
||||||
let target_writers_total = other_active.saturating_add(target_sum);
|
let target_writers_total = other_active.saturating_add(target_sum);
|
||||||
FamilyFloorPlan {
|
FamilyFloorPlan {
|
||||||
by_dc,
|
by_dc,
|
||||||
global_cap_effective_total,
|
active_cap_configured_total,
|
||||||
|
active_cap_effective_total,
|
||||||
|
warm_cap_configured_total,
|
||||||
|
warm_cap_effective_total: warm_cap_configured_total,
|
||||||
|
active_writers_current,
|
||||||
|
warm_writers_current,
|
||||||
target_writers_total,
|
target_writers_total,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -125,11 +125,21 @@ pub struct MePool {
|
||||||
pub(super) me_adaptive_floor_cpu_cores_override: AtomicU32,
|
pub(super) me_adaptive_floor_cpu_cores_override: AtomicU32,
|
||||||
pub(super) me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32,
|
pub(super) me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32,
|
||||||
pub(super) me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32,
|
pub(super) me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_max_active_writers_per_core: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_max_warm_writers_per_core: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_max_active_writers_global: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_max_warm_writers_global: AtomicU32,
|
||||||
pub(super) me_adaptive_floor_cpu_cores_detected: AtomicU32,
|
pub(super) me_adaptive_floor_cpu_cores_detected: AtomicU32,
|
||||||
pub(super) me_adaptive_floor_cpu_cores_effective: AtomicU32,
|
pub(super) me_adaptive_floor_cpu_cores_effective: AtomicU32,
|
||||||
pub(super) me_adaptive_floor_global_cap_raw: AtomicU64,
|
pub(super) me_adaptive_floor_global_cap_raw: AtomicU64,
|
||||||
pub(super) me_adaptive_floor_global_cap_effective: AtomicU64,
|
pub(super) me_adaptive_floor_global_cap_effective: AtomicU64,
|
||||||
pub(super) me_adaptive_floor_target_writers_total: AtomicU64,
|
pub(super) me_adaptive_floor_target_writers_total: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_active_cap_configured: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_active_cap_effective: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_warm_cap_configured: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_warm_cap_effective: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_active_writers_current: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_warm_writers_current: AtomicU64,
|
||||||
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) endpoint_dc_map: Arc<RwLock<HashMap<SocketAddr, Option<i32>>>>,
|
pub(super) endpoint_dc_map: Arc<RwLock<HashMap<SocketAddr, Option<i32>>>>,
|
||||||
|
|
@ -243,6 +253,10 @@ impl MePool {
|
||||||
me_adaptive_floor_cpu_cores_override: u16,
|
me_adaptive_floor_cpu_cores_override: u16,
|
||||||
me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
me_adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
me_adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
me_adaptive_floor_max_active_writers_global: u32,
|
||||||
|
me_adaptive_floor_max_warm_writers_global: u32,
|
||||||
hardswap: bool,
|
hardswap: bool,
|
||||||
me_pool_drain_ttl_secs: u64,
|
me_pool_drain_ttl_secs: u64,
|
||||||
me_pool_force_close_secs: u64,
|
me_pool_force_close_secs: u64,
|
||||||
|
|
@ -358,11 +372,29 @@ impl MePool {
|
||||||
me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new(
|
me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new(
|
||||||
me_adaptive_floor_max_extra_writers_multi_per_core as u32,
|
me_adaptive_floor_max_extra_writers_multi_per_core as u32,
|
||||||
),
|
),
|
||||||
|
me_adaptive_floor_max_active_writers_per_core: AtomicU32::new(
|
||||||
|
me_adaptive_floor_max_active_writers_per_core as u32,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_max_warm_writers_per_core: AtomicU32::new(
|
||||||
|
me_adaptive_floor_max_warm_writers_per_core as u32,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_max_active_writers_global: AtomicU32::new(
|
||||||
|
me_adaptive_floor_max_active_writers_global,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_max_warm_writers_global: AtomicU32::new(
|
||||||
|
me_adaptive_floor_max_warm_writers_global,
|
||||||
|
),
|
||||||
me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1),
|
me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1),
|
||||||
me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1),
|
me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1),
|
||||||
me_adaptive_floor_global_cap_raw: AtomicU64::new(0),
|
me_adaptive_floor_global_cap_raw: AtomicU64::new(0),
|
||||||
me_adaptive_floor_global_cap_effective: AtomicU64::new(0),
|
me_adaptive_floor_global_cap_effective: AtomicU64::new(0),
|
||||||
me_adaptive_floor_target_writers_total: AtomicU64::new(0),
|
me_adaptive_floor_target_writers_total: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_active_cap_configured: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_active_cap_effective: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_warm_cap_configured: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_warm_cap_effective: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_active_writers_current: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_warm_writers_current: AtomicU64::new(0),
|
||||||
pool_size: 2,
|
pool_size: 2,
|
||||||
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
||||||
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
||||||
|
|
@ -453,6 +485,10 @@ impl MePool {
|
||||||
adaptive_floor_cpu_cores_override: u16,
|
adaptive_floor_cpu_cores_override: u16,
|
||||||
adaptive_floor_max_extra_writers_single_per_core: u16,
|
adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
adaptive_floor_max_extra_writers_multi_per_core: u16,
|
adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
adaptive_floor_max_active_writers_global: u32,
|
||||||
|
adaptive_floor_max_warm_writers_global: u32,
|
||||||
) {
|
) {
|
||||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||||
self.me_pool_drain_ttl_secs
|
self.me_pool_drain_ttl_secs
|
||||||
|
|
@ -514,6 +550,20 @@ impl MePool {
|
||||||
adaptive_floor_max_extra_writers_multi_per_core as u32,
|
adaptive_floor_max_extra_writers_multi_per_core as u32,
|
||||||
Ordering::Relaxed,
|
Ordering::Relaxed,
|
||||||
);
|
);
|
||||||
|
self.me_adaptive_floor_max_active_writers_per_core
|
||||||
|
.store(
|
||||||
|
adaptive_floor_max_active_writers_per_core as u32,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
self.me_adaptive_floor_max_warm_writers_per_core
|
||||||
|
.store(
|
||||||
|
adaptive_floor_max_warm_writers_per_core as u32,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
self.me_adaptive_floor_max_active_writers_global
|
||||||
|
.store(adaptive_floor_max_active_writers_global, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_max_warm_writers_global
|
||||||
|
.store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed);
|
||||||
if previous_floor_mode != floor_mode {
|
if previous_floor_mode != floor_mode {
|
||||||
self.stats.increment_me_floor_mode_switch_total();
|
self.stats.increment_me_floor_mode_switch_total();
|
||||||
match (previous_floor_mode, floor_mode) {
|
match (previous_floor_mode, floor_mode) {
|
||||||
|
|
@ -584,11 +634,26 @@ impl MePool {
|
||||||
self.proxy_secret.read().await.key_selector
|
self.proxy_secret.read().await.key_selector
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn active_writer_count_total(&self) -> usize {
|
pub(super) async fn non_draining_writer_counts_by_contour(&self) -> (usize, usize, usize) {
|
||||||
let ws = self.writers.read().await;
|
let ws = self.writers.read().await;
|
||||||
ws.iter()
|
let mut active = 0usize;
|
||||||
.filter(|w| !w.draining.load(Ordering::Relaxed))
|
let mut warm = 0usize;
|
||||||
.count()
|
for writer in ws.iter() {
|
||||||
|
if writer.draining.load(Ordering::Relaxed) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match WriterContour::from_u8(writer.contour.load(Ordering::Relaxed)) {
|
||||||
|
WriterContour::Active => active = active.saturating_add(1),
|
||||||
|
WriterContour::Warm => warm = warm.saturating_add(1),
|
||||||
|
WriterContour::Draining => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(active, warm, active.saturating_add(warm))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn active_contour_writer_count_total(&self) -> usize {
|
||||||
|
let (active, _, _) = self.non_draining_writer_counts_by_contour().await;
|
||||||
|
active
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
|
pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
|
||||||
|
|
@ -634,13 +699,6 @@ impl MePool {
|
||||||
.max(1)
|
.max(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn adaptive_floor_writers_per_core_total(&self) -> usize {
|
|
||||||
(self
|
|
||||||
.me_adaptive_floor_writers_per_core_total
|
|
||||||
.load(Ordering::Relaxed) as usize)
|
|
||||||
.max(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize {
|
pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize {
|
||||||
self.me_adaptive_floor_max_extra_writers_single_per_core
|
self.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
.load(Ordering::Relaxed) as usize
|
.load(Ordering::Relaxed) as usize
|
||||||
|
|
@ -651,6 +709,34 @@ impl MePool {
|
||||||
.load(Ordering::Relaxed) as usize
|
.load(Ordering::Relaxed) as usize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_max_active_writers_per_core(&self) -> usize {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_max_active_writers_per_core
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_max_warm_writers_per_core(&self) -> usize {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_max_warm_writers_per_core
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_max_active_writers_global(&self) -> usize {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_max_active_writers_global
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_max_warm_writers_global(&self) -> usize {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_max_warm_writers_global
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn adaptive_floor_detected_cpu_cores(&self) -> usize {
|
pub(super) fn adaptive_floor_detected_cpu_cores(&self) -> usize {
|
||||||
std::thread::available_parallelism()
|
std::thread::available_parallelism()
|
||||||
.map(|value| value.get())
|
.map(|value| value.get())
|
||||||
|
|
@ -679,28 +765,126 @@ impl MePool {
|
||||||
effective
|
effective
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn adaptive_floor_global_cap_raw(&self) -> usize {
|
pub(super) fn adaptive_floor_active_cap_configured_total(&self) -> usize {
|
||||||
let cores = self.adaptive_floor_effective_cpu_cores();
|
let cores = self.adaptive_floor_effective_cpu_cores();
|
||||||
let cap = cores.saturating_mul(self.adaptive_floor_writers_per_core_total());
|
let per_core_cap = cores.saturating_mul(self.adaptive_floor_max_active_writers_per_core());
|
||||||
self.me_adaptive_floor_global_cap_raw
|
let configured = per_core_cap.min(self.adaptive_floor_max_active_writers_global());
|
||||||
.store(cap as u64, Ordering::Relaxed);
|
self.me_adaptive_floor_active_cap_configured
|
||||||
self.stats.set_me_floor_global_cap_raw_gauge(cap as u64);
|
.store(configured as u64, Ordering::Relaxed);
|
||||||
cap
|
self.stats
|
||||||
|
.set_me_floor_active_cap_configured_gauge(configured as u64);
|
||||||
|
configured
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_warm_cap_configured_total(&self) -> usize {
|
||||||
|
let cores = self.adaptive_floor_effective_cpu_cores();
|
||||||
|
let per_core_cap = cores.saturating_mul(self.adaptive_floor_max_warm_writers_per_core());
|
||||||
|
let configured = per_core_cap.min(self.adaptive_floor_max_warm_writers_global());
|
||||||
|
self.me_adaptive_floor_warm_cap_configured
|
||||||
|
.store(configured as u64, Ordering::Relaxed);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_warm_cap_configured_gauge(configured as u64);
|
||||||
|
configured
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn set_adaptive_floor_runtime_caps(
|
pub(super) fn set_adaptive_floor_runtime_caps(
|
||||||
&self,
|
&self,
|
||||||
global_cap_effective: usize,
|
active_cap_configured: usize,
|
||||||
|
active_cap_effective: usize,
|
||||||
|
warm_cap_configured: usize,
|
||||||
|
warm_cap_effective: usize,
|
||||||
target_writers_total: usize,
|
target_writers_total: usize,
|
||||||
|
active_writers_current: usize,
|
||||||
|
warm_writers_current: usize,
|
||||||
) {
|
) {
|
||||||
|
self.me_adaptive_floor_global_cap_raw
|
||||||
|
.store(active_cap_configured as u64, Ordering::Relaxed);
|
||||||
self.me_adaptive_floor_global_cap_effective
|
self.me_adaptive_floor_global_cap_effective
|
||||||
.store(global_cap_effective as u64, Ordering::Relaxed);
|
.store(active_cap_effective as u64, Ordering::Relaxed);
|
||||||
self.me_adaptive_floor_target_writers_total
|
self.me_adaptive_floor_target_writers_total
|
||||||
.store(target_writers_total as u64, Ordering::Relaxed);
|
.store(target_writers_total as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_active_cap_configured
|
||||||
|
.store(active_cap_configured as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_active_cap_effective
|
||||||
|
.store(active_cap_effective as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_warm_cap_configured
|
||||||
|
.store(warm_cap_configured as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_warm_cap_effective
|
||||||
|
.store(warm_cap_effective as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_active_writers_current
|
||||||
|
.store(active_writers_current as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_warm_writers_current
|
||||||
|
.store(warm_writers_current as u64, Ordering::Relaxed);
|
||||||
self.stats
|
self.stats
|
||||||
.set_me_floor_global_cap_effective_gauge(global_cap_effective as u64);
|
.set_me_floor_global_cap_raw_gauge(active_cap_configured as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_global_cap_effective_gauge(active_cap_effective as u64);
|
||||||
self.stats
|
self.stats
|
||||||
.set_me_floor_target_writers_total_gauge(target_writers_total as u64);
|
.set_me_floor_target_writers_total_gauge(target_writers_total as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_active_cap_configured_gauge(active_cap_configured as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_active_cap_effective_gauge(active_cap_effective as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_warm_cap_configured_gauge(warm_cap_configured as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_warm_cap_effective_gauge(warm_cap_effective as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_writers_active_current_gauge(active_writers_current as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_writers_warm_current_gauge(warm_writers_current as u64);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn active_coverage_required_total(&self) -> usize {
|
||||||
|
let mut endpoints_by_dc = HashMap::<i32, HashSet<SocketAddr>>::new();
|
||||||
|
|
||||||
|
if self.decision.ipv4_me {
|
||||||
|
let map = self.proxy_map_v4.read().await;
|
||||||
|
for (dc, addrs) in map.iter() {
|
||||||
|
let entry = endpoints_by_dc.entry(*dc).or_default();
|
||||||
|
for (ip, port) in addrs.iter().copied() {
|
||||||
|
entry.insert(SocketAddr::new(ip, port));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.decision.ipv6_me {
|
||||||
|
let map = self.proxy_map_v6.read().await;
|
||||||
|
for (dc, addrs) in map.iter() {
|
||||||
|
let entry = endpoints_by_dc.entry(*dc).or_default();
|
||||||
|
for (ip, port) in addrs.iter().copied() {
|
||||||
|
entry.insert(SocketAddr::new(ip, port));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoints_by_dc
|
||||||
|
.values()
|
||||||
|
.map(|endpoints| self.required_writers_for_dc_with_floor_mode(endpoints.len(), false))
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn can_open_writer_for_contour(
|
||||||
|
&self,
|
||||||
|
contour: WriterContour,
|
||||||
|
allow_coverage_override: bool,
|
||||||
|
) -> bool {
|
||||||
|
let (active_writers, warm_writers, _) = self.non_draining_writer_counts_by_contour().await;
|
||||||
|
match contour {
|
||||||
|
WriterContour::Active => {
|
||||||
|
let active_cap = self.adaptive_floor_active_cap_configured_total();
|
||||||
|
if active_writers < active_cap {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if !allow_coverage_override {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
let coverage_required = self.active_coverage_required_total().await;
|
||||||
|
active_writers < coverage_required
|
||||||
|
}
|
||||||
|
WriterContour::Warm => warm_writers < self.adaptive_floor_warm_cap_configured_total(),
|
||||||
|
WriterContour::Draining => true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn required_writers_for_dc_with_floor_mode(
|
pub(super) fn required_writers_for_dc_with_floor_mode(
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,7 @@ impl MePool {
|
||||||
target_writers,
|
target_writers,
|
||||||
rng_clone,
|
rng_clone,
|
||||||
connect_concurrency,
|
connect_concurrency,
|
||||||
|
true,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
@ -114,6 +115,7 @@ impl MePool {
|
||||||
target_writers,
|
target_writers,
|
||||||
rng_clone_local,
|
rng_clone_local,
|
||||||
connect_concurrency,
|
connect_concurrency,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
@ -147,6 +149,7 @@ impl MePool {
|
||||||
target_writers: usize,
|
target_writers: usize,
|
||||||
rng: Arc<SecureRandom>,
|
rng: Arc<SecureRandom>,
|
||||||
connect_concurrency: usize,
|
connect_concurrency: usize,
|
||||||
|
allow_coverage_override: bool,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
if addrs.is_empty() {
|
if addrs.is_empty() {
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -180,8 +183,16 @@ impl MePool {
|
||||||
let pool = Arc::clone(&self);
|
let pool = Arc::clone(&self);
|
||||||
let rng_clone = Arc::clone(&rng);
|
let rng_clone = Arc::clone(&rng);
|
||||||
let endpoints_clone = endpoints.clone();
|
let endpoints_clone = endpoints.clone();
|
||||||
|
let generation = self.current_generation();
|
||||||
join.spawn(async move {
|
join.spawn(async move {
|
||||||
pool.connect_endpoints_round_robin(dc, &endpoints_clone, rng_clone.as_ref())
|
pool.connect_endpoints_round_robin_with_generation_contour(
|
||||||
|
dc,
|
||||||
|
&endpoints_clone,
|
||||||
|
rng_clone.as_ref(),
|
||||||
|
generation,
|
||||||
|
super::pool::WriterContour::Active,
|
||||||
|
allow_coverage_override,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -212,12 +223,25 @@ impl MePool {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if !progress {
|
if !progress {
|
||||||
|
let active_writers_current = self.active_contour_writer_count_total().await;
|
||||||
|
let active_cap_configured = self.adaptive_floor_active_cap_configured_total();
|
||||||
|
if !allow_coverage_override && active_writers_current >= active_cap_configured {
|
||||||
|
info!(
|
||||||
|
dc = %dc,
|
||||||
|
alive = alive_after,
|
||||||
|
target_writers,
|
||||||
|
active_writers_current,
|
||||||
|
active_cap_configured,
|
||||||
|
"ME init saturation stopped by active writer cap"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
dc = %dc,
|
dc = %dc,
|
||||||
alive = alive_after,
|
alive = alive_after,
|
||||||
target_writers,
|
target_writers,
|
||||||
"All ME servers for DC failed at init"
|
"All ME servers for DC failed at init"
|
||||||
);
|
);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,7 @@ impl MePool {
|
||||||
rng,
|
rng,
|
||||||
self.current_generation(),
|
self.current_generation(),
|
||||||
WriterContour::Active,
|
WriterContour::Active,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
@ -110,6 +111,7 @@ impl MePool {
|
||||||
rng: &SecureRandom,
|
rng: &SecureRandom,
|
||||||
generation: u64,
|
generation: u64,
|
||||||
contour: WriterContour,
|
contour: WriterContour,
|
||||||
|
allow_coverage_override: bool,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let candidates = self.connectable_endpoints(endpoints).await;
|
let candidates = self.connectable_endpoints(endpoints).await;
|
||||||
if candidates.is_empty() {
|
if candidates.is_empty() {
|
||||||
|
|
@ -120,7 +122,14 @@ impl MePool {
|
||||||
let idx = (start + offset) % candidates.len();
|
let idx = (start + offset) % candidates.len();
|
||||||
let addr = candidates[idx];
|
let addr = candidates[idx];
|
||||||
match self
|
match self
|
||||||
.connect_one_with_generation_contour_for_dc(addr, rng, generation, contour, dc)
|
.connect_one_with_generation_contour_for_dc_with_cap_policy(
|
||||||
|
addr,
|
||||||
|
rng,
|
||||||
|
generation,
|
||||||
|
contour,
|
||||||
|
dc,
|
||||||
|
allow_coverage_override,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => return true,
|
Ok(()) => return true,
|
||||||
|
|
|
||||||
|
|
@ -249,6 +249,7 @@ impl MePool {
|
||||||
rng,
|
rng,
|
||||||
generation,
|
generation,
|
||||||
WriterContour::Warm,
|
WriterContour::Warm,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
debug!(
|
debug!(
|
||||||
|
|
|
||||||
|
|
@ -82,11 +82,21 @@ pub(crate) struct MeApiRuntimeSnapshot {
|
||||||
pub adaptive_floor_cpu_cores_override: u16,
|
pub adaptive_floor_cpu_cores_override: u16,
|
||||||
pub adaptive_floor_max_extra_writers_single_per_core: u16,
|
pub adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
pub adaptive_floor_max_extra_writers_multi_per_core: u16,
|
pub adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
pub adaptive_floor_max_active_writers_per_core: u16,
|
||||||
|
pub adaptive_floor_max_warm_writers_per_core: u16,
|
||||||
|
pub adaptive_floor_max_active_writers_global: u32,
|
||||||
|
pub adaptive_floor_max_warm_writers_global: u32,
|
||||||
pub adaptive_floor_cpu_cores_detected: u32,
|
pub adaptive_floor_cpu_cores_detected: u32,
|
||||||
pub adaptive_floor_cpu_cores_effective: u32,
|
pub adaptive_floor_cpu_cores_effective: u32,
|
||||||
pub adaptive_floor_global_cap_raw: u64,
|
pub adaptive_floor_global_cap_raw: u64,
|
||||||
pub adaptive_floor_global_cap_effective: u64,
|
pub adaptive_floor_global_cap_effective: u64,
|
||||||
pub adaptive_floor_target_writers_total: u64,
|
pub adaptive_floor_target_writers_total: u64,
|
||||||
|
pub adaptive_floor_active_cap_configured: u64,
|
||||||
|
pub adaptive_floor_active_cap_effective: u64,
|
||||||
|
pub adaptive_floor_warm_cap_configured: u64,
|
||||||
|
pub adaptive_floor_warm_cap_effective: u64,
|
||||||
|
pub adaptive_floor_active_writers_current: u64,
|
||||||
|
pub adaptive_floor_warm_writers_current: u64,
|
||||||
pub me_keepalive_enabled: bool,
|
pub me_keepalive_enabled: bool,
|
||||||
pub me_keepalive_interval_secs: u64,
|
pub me_keepalive_interval_secs: u64,
|
||||||
pub me_keepalive_jitter_secs: u64,
|
pub me_keepalive_jitter_secs: u64,
|
||||||
|
|
@ -430,6 +440,18 @@ impl MePool {
|
||||||
adaptive_floor_max_extra_writers_multi_per_core: self
|
adaptive_floor_max_extra_writers_multi_per_core: self
|
||||||
.me_adaptive_floor_max_extra_writers_multi_per_core
|
.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
.load(Ordering::Relaxed) as u16,
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_max_active_writers_per_core: self
|
||||||
|
.me_adaptive_floor_max_active_writers_per_core
|
||||||
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_max_warm_writers_per_core: self
|
||||||
|
.me_adaptive_floor_max_warm_writers_per_core
|
||||||
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_max_active_writers_global: self
|
||||||
|
.me_adaptive_floor_max_active_writers_global
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_max_warm_writers_global: self
|
||||||
|
.me_adaptive_floor_max_warm_writers_global
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
adaptive_floor_cpu_cores_detected: self
|
adaptive_floor_cpu_cores_detected: self
|
||||||
.me_adaptive_floor_cpu_cores_detected
|
.me_adaptive_floor_cpu_cores_detected
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Relaxed),
|
||||||
|
|
@ -445,6 +467,24 @@ impl MePool {
|
||||||
adaptive_floor_target_writers_total: self
|
adaptive_floor_target_writers_total: self
|
||||||
.me_adaptive_floor_target_writers_total
|
.me_adaptive_floor_target_writers_total
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_active_cap_configured: self
|
||||||
|
.me_adaptive_floor_active_cap_configured
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_active_cap_effective: self
|
||||||
|
.me_adaptive_floor_active_cap_effective
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_warm_cap_configured: self
|
||||||
|
.me_adaptive_floor_warm_cap_configured
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_warm_cap_effective: self
|
||||||
|
.me_adaptive_floor_warm_cap_effective
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_active_writers_current: self
|
||||||
|
.me_adaptive_floor_active_writers_current
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_warm_writers_current: self
|
||||||
|
.me_adaptive_floor_warm_writers_current
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
me_keepalive_enabled: self.me_keepalive_enabled,
|
me_keepalive_enabled: self.me_keepalive_enabled,
|
||||||
me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(),
|
me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(),
|
||||||
me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(),
|
me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(),
|
||||||
|
|
|
||||||
|
|
@ -86,6 +86,35 @@ impl MePool {
|
||||||
contour: WriterContour,
|
contour: WriterContour,
|
||||||
writer_dc: i32,
|
writer_dc: i32,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
self.connect_one_with_generation_contour_for_dc_with_cap_policy(
|
||||||
|
addr,
|
||||||
|
rng,
|
||||||
|
generation,
|
||||||
|
contour,
|
||||||
|
writer_dc,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn connect_one_with_generation_contour_for_dc_with_cap_policy(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
addr: SocketAddr,
|
||||||
|
rng: &SecureRandom,
|
||||||
|
generation: u64,
|
||||||
|
contour: WriterContour,
|
||||||
|
writer_dc: i32,
|
||||||
|
allow_coverage_override: bool,
|
||||||
|
) -> Result<()> {
|
||||||
|
if !self
|
||||||
|
.can_open_writer_for_contour(contour, allow_coverage_override)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
return Err(ProxyError::Proxy(format!(
|
||||||
|
"ME {contour:?} writer cap reached"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
let secret_len = self.proxy_secret.read().await.secret.len();
|
let secret_len = self.proxy_secret.read().await.secret.len();
|
||||||
if secret_len < 32 {
|
if secret_len < 32 {
|
||||||
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
|
return Err(ProxyError::Proxy("proxy-secret too short for ME auth".into()));
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue