mirror of https://github.com/telemt/telemt.git
commit
3440aa9fcd
|
|
@ -269,6 +269,10 @@ pub(super) struct DcStatus {
|
||||||
pub(super) available_endpoints: usize,
|
pub(super) available_endpoints: usize,
|
||||||
pub(super) available_pct: f64,
|
pub(super) available_pct: f64,
|
||||||
pub(super) required_writers: usize,
|
pub(super) required_writers: usize,
|
||||||
|
pub(super) floor_min: usize,
|
||||||
|
pub(super) floor_target: usize,
|
||||||
|
pub(super) floor_max: usize,
|
||||||
|
pub(super) floor_capped: bool,
|
||||||
pub(super) alive_writers: usize,
|
pub(super) alive_writers: usize,
|
||||||
pub(super) coverage_pct: f64,
|
pub(super) coverage_pct: f64,
|
||||||
pub(super) rtt_ms: Option<f64>,
|
pub(super) rtt_ms: Option<f64>,
|
||||||
|
|
@ -308,7 +312,17 @@ pub(super) struct MinimalMeRuntimeData {
|
||||||
pub(super) floor_mode: &'static str,
|
pub(super) floor_mode: &'static str,
|
||||||
pub(super) adaptive_floor_idle_secs: u64,
|
pub(super) adaptive_floor_idle_secs: u64,
|
||||||
pub(super) adaptive_floor_min_writers_single_endpoint: u8,
|
pub(super) adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
pub(super) adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
pub(super) adaptive_floor_recover_grace_secs: u64,
|
pub(super) adaptive_floor_recover_grace_secs: u64,
|
||||||
|
pub(super) adaptive_floor_writers_per_core_total: u16,
|
||||||
|
pub(super) adaptive_floor_cpu_cores_override: u16,
|
||||||
|
pub(super) adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_cpu_cores_detected: u32,
|
||||||
|
pub(super) adaptive_floor_cpu_cores_effective: u32,
|
||||||
|
pub(super) adaptive_floor_global_cap_raw: u64,
|
||||||
|
pub(super) adaptive_floor_global_cap_effective: u64,
|
||||||
|
pub(super) adaptive_floor_target_writers_total: u64,
|
||||||
pub(super) me_keepalive_enabled: bool,
|
pub(super) me_keepalive_enabled: bool,
|
||||||
pub(super) me_keepalive_interval_secs: u64,
|
pub(super) me_keepalive_interval_secs: u64,
|
||||||
pub(super) me_keepalive_jitter_secs: u64,
|
pub(super) me_keepalive_jitter_secs: u64,
|
||||||
|
|
|
||||||
|
|
@ -349,6 +349,10 @@ async fn get_minimal_payload_cached(
|
||||||
available_endpoints: entry.available_endpoints,
|
available_endpoints: entry.available_endpoints,
|
||||||
available_pct: entry.available_pct,
|
available_pct: entry.available_pct,
|
||||||
required_writers: entry.required_writers,
|
required_writers: entry.required_writers,
|
||||||
|
floor_min: entry.floor_min,
|
||||||
|
floor_target: entry.floor_target,
|
||||||
|
floor_max: entry.floor_max,
|
||||||
|
floor_capped: entry.floor_capped,
|
||||||
alive_writers: entry.alive_writers,
|
alive_writers: entry.alive_writers,
|
||||||
coverage_pct: entry.coverage_pct,
|
coverage_pct: entry.coverage_pct,
|
||||||
rtt_ms: entry.rtt_ms,
|
rtt_ms: entry.rtt_ms,
|
||||||
|
|
@ -366,7 +370,21 @@ async fn get_minimal_payload_cached(
|
||||||
adaptive_floor_idle_secs: runtime.adaptive_floor_idle_secs,
|
adaptive_floor_idle_secs: runtime.adaptive_floor_idle_secs,
|
||||||
adaptive_floor_min_writers_single_endpoint: runtime
|
adaptive_floor_min_writers_single_endpoint: runtime
|
||||||
.adaptive_floor_min_writers_single_endpoint,
|
.adaptive_floor_min_writers_single_endpoint,
|
||||||
|
adaptive_floor_min_writers_multi_endpoint: runtime
|
||||||
|
.adaptive_floor_min_writers_multi_endpoint,
|
||||||
adaptive_floor_recover_grace_secs: runtime.adaptive_floor_recover_grace_secs,
|
adaptive_floor_recover_grace_secs: runtime.adaptive_floor_recover_grace_secs,
|
||||||
|
adaptive_floor_writers_per_core_total: runtime
|
||||||
|
.adaptive_floor_writers_per_core_total,
|
||||||
|
adaptive_floor_cpu_cores_override: runtime.adaptive_floor_cpu_cores_override,
|
||||||
|
adaptive_floor_max_extra_writers_single_per_core: runtime
|
||||||
|
.adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
adaptive_floor_max_extra_writers_multi_per_core: runtime
|
||||||
|
.adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
|
adaptive_floor_cpu_cores_detected: runtime.adaptive_floor_cpu_cores_detected,
|
||||||
|
adaptive_floor_cpu_cores_effective: runtime.adaptive_floor_cpu_cores_effective,
|
||||||
|
adaptive_floor_global_cap_raw: runtime.adaptive_floor_global_cap_raw,
|
||||||
|
adaptive_floor_global_cap_effective: runtime.adaptive_floor_global_cap_effective,
|
||||||
|
adaptive_floor_target_writers_total: runtime.adaptive_floor_target_writers_total,
|
||||||
me_keepalive_enabled: runtime.me_keepalive_enabled,
|
me_keepalive_enabled: runtime.me_keepalive_enabled,
|
||||||
me_keepalive_interval_secs: runtime.me_keepalive_interval_secs,
|
me_keepalive_interval_secs: runtime.me_keepalive_interval_secs,
|
||||||
me_keepalive_jitter_secs: runtime.me_keepalive_jitter_secs,
|
me_keepalive_jitter_secs: runtime.me_keepalive_jitter_secs,
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,12 @@ pub(super) struct EffectiveMiddleProxyLimits {
|
||||||
pub(super) floor_mode: &'static str,
|
pub(super) floor_mode: &'static str,
|
||||||
pub(super) adaptive_floor_idle_secs: u64,
|
pub(super) adaptive_floor_idle_secs: u64,
|
||||||
pub(super) adaptive_floor_min_writers_single_endpoint: u8,
|
pub(super) adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
pub(super) adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
pub(super) adaptive_floor_recover_grace_secs: u64,
|
pub(super) adaptive_floor_recover_grace_secs: u64,
|
||||||
|
pub(super) adaptive_floor_writers_per_core_total: u16,
|
||||||
|
pub(super) adaptive_floor_cpu_cores_override: u16,
|
||||||
|
pub(super) adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
pub(super) adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
pub(super) reconnect_max_concurrent_per_dc: u32,
|
pub(super) reconnect_max_concurrent_per_dc: u32,
|
||||||
pub(super) reconnect_backoff_base_ms: u64,
|
pub(super) reconnect_backoff_base_ms: u64,
|
||||||
pub(super) reconnect_backoff_cap_ms: u64,
|
pub(super) reconnect_backoff_cap_ms: u64,
|
||||||
|
|
@ -183,7 +188,22 @@ pub(super) fn build_limits_effective_data(cfg: &ProxyConfig) -> EffectiveLimitsD
|
||||||
adaptive_floor_min_writers_single_endpoint: cfg
|
adaptive_floor_min_writers_single_endpoint: cfg
|
||||||
.general
|
.general
|
||||||
.me_adaptive_floor_min_writers_single_endpoint,
|
.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
adaptive_floor_min_writers_multi_endpoint: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
adaptive_floor_recover_grace_secs: cfg.general.me_adaptive_floor_recover_grace_secs,
|
adaptive_floor_recover_grace_secs: cfg.general.me_adaptive_floor_recover_grace_secs,
|
||||||
|
adaptive_floor_writers_per_core_total: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_writers_per_core_total,
|
||||||
|
adaptive_floor_cpu_cores_override: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_cpu_cores_override,
|
||||||
|
adaptive_floor_max_extra_writers_single_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
adaptive_floor_max_extra_writers_multi_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc,
|
reconnect_max_concurrent_per_dc: cfg.general.me_reconnect_max_concurrent_per_dc,
|
||||||
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
|
reconnect_backoff_base_ms: cfg.general.me_reconnect_backoff_base_ms,
|
||||||
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
|
reconnect_backoff_cap_ms: cfg.general.me_reconnect_backoff_cap_ms,
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,12 @@ const DEFAULT_ME_RECONNECT_FAST_RETRY_COUNT: u32 = 16;
|
||||||
const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2;
|
const DEFAULT_ME_SINGLE_ENDPOINT_SHADOW_WRITERS: u8 = 2;
|
||||||
const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90;
|
const DEFAULT_ME_ADAPTIVE_FLOOR_IDLE_SECS: u64 = 90;
|
||||||
const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1;
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT: u8 = 1;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_MULTI_ENDPOINT: u8 = 1;
|
||||||
const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180;
|
const DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS: u64 = 180;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_WRITERS_PER_CORE_TOTAL: u16 = 48;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE: u16 = 0;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE: u16 = 1;
|
||||||
|
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE: u16 = 2;
|
||||||
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
|
||||||
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
|
||||||
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
|
||||||
|
|
@ -247,10 +252,30 @@ pub(crate) fn default_me_adaptive_floor_min_writers_single_endpoint() -> u8 {
|
||||||
DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT
|
DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_SINGLE_ENDPOINT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_min_writers_multi_endpoint() -> u8 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MIN_WRITERS_MULTI_ENDPOINT
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 {
|
pub(crate) fn default_me_adaptive_floor_recover_grace_secs() -> u64 {
|
||||||
DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS
|
DEFAULT_ME_ADAPTIVE_FLOOR_RECOVER_GRACE_SECS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_writers_per_core_total() -> u16 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_WRITERS_PER_CORE_TOTAL
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_cpu_cores_override() -> u16 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_CPU_CORES_OVERRIDE
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_max_extra_writers_single_per_core() -> u16 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_SINGLE_PER_CORE
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_me_adaptive_floor_max_extra_writers_multi_per_core() -> u16 {
|
||||||
|
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_EXTRA_WRITERS_MULTI_PER_CORE
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
|
||||||
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,12 @@ pub struct HotFields {
|
||||||
pub me_floor_mode: MeFloorMode,
|
pub me_floor_mode: MeFloorMode,
|
||||||
pub me_adaptive_floor_idle_secs: u64,
|
pub me_adaptive_floor_idle_secs: u64,
|
||||||
pub me_adaptive_floor_min_writers_single_endpoint: u8,
|
pub me_adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
pub me_adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
pub me_adaptive_floor_recover_grace_secs: u64,
|
pub me_adaptive_floor_recover_grace_secs: u64,
|
||||||
|
pub me_adaptive_floor_writers_per_core_total: u16,
|
||||||
|
pub me_adaptive_floor_cpu_cores_override: u16,
|
||||||
|
pub me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
pub me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
pub me_route_backpressure_base_timeout_ms: u64,
|
pub me_route_backpressure_base_timeout_ms: u64,
|
||||||
pub me_route_backpressure_high_timeout_ms: u64,
|
pub me_route_backpressure_high_timeout_ms: u64,
|
||||||
pub me_route_backpressure_high_watermark_pct: u8,
|
pub me_route_backpressure_high_watermark_pct: u8,
|
||||||
|
|
@ -150,9 +155,24 @@ impl HotFields {
|
||||||
me_adaptive_floor_min_writers_single_endpoint: cfg
|
me_adaptive_floor_min_writers_single_endpoint: cfg
|
||||||
.general
|
.general
|
||||||
.me_adaptive_floor_min_writers_single_endpoint,
|
.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
me_adaptive_floor_min_writers_multi_endpoint: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
me_adaptive_floor_recover_grace_secs: cfg
|
me_adaptive_floor_recover_grace_secs: cfg
|
||||||
.general
|
.general
|
||||||
.me_adaptive_floor_recover_grace_secs,
|
.me_adaptive_floor_recover_grace_secs,
|
||||||
|
me_adaptive_floor_writers_per_core_total: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_writers_per_core_total,
|
||||||
|
me_adaptive_floor_cpu_cores_override: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_cpu_cores_override,
|
||||||
|
me_adaptive_floor_max_extra_writers_single_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
me_adaptive_floor_max_extra_writers_multi_per_core: cfg
|
||||||
|
.general
|
||||||
|
.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
|
||||||
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
|
||||||
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
|
||||||
|
|
@ -273,8 +293,18 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
|
||||||
cfg.general.me_adaptive_floor_idle_secs = new.general.me_adaptive_floor_idle_secs;
|
cfg.general.me_adaptive_floor_idle_secs = new.general.me_adaptive_floor_idle_secs;
|
||||||
cfg.general.me_adaptive_floor_min_writers_single_endpoint =
|
cfg.general.me_adaptive_floor_min_writers_single_endpoint =
|
||||||
new.general.me_adaptive_floor_min_writers_single_endpoint;
|
new.general.me_adaptive_floor_min_writers_single_endpoint;
|
||||||
|
cfg.general.me_adaptive_floor_min_writers_multi_endpoint =
|
||||||
|
new.general.me_adaptive_floor_min_writers_multi_endpoint;
|
||||||
cfg.general.me_adaptive_floor_recover_grace_secs =
|
cfg.general.me_adaptive_floor_recover_grace_secs =
|
||||||
new.general.me_adaptive_floor_recover_grace_secs;
|
new.general.me_adaptive_floor_recover_grace_secs;
|
||||||
|
cfg.general.me_adaptive_floor_writers_per_core_total =
|
||||||
|
new.general.me_adaptive_floor_writers_per_core_total;
|
||||||
|
cfg.general.me_adaptive_floor_cpu_cores_override =
|
||||||
|
new.general.me_adaptive_floor_cpu_cores_override;
|
||||||
|
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core =
|
||||||
|
new.general.me_adaptive_floor_max_extra_writers_single_per_core;
|
||||||
|
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core =
|
||||||
|
new.general.me_adaptive_floor_max_extra_writers_multi_per_core;
|
||||||
cfg.general.me_route_backpressure_base_timeout_ms =
|
cfg.general.me_route_backpressure_base_timeout_ms =
|
||||||
new.general.me_route_backpressure_base_timeout_ms;
|
new.general.me_route_backpressure_base_timeout_ms;
|
||||||
cfg.general.me_route_backpressure_high_timeout_ms =
|
cfg.general.me_route_backpressure_high_timeout_ms =
|
||||||
|
|
@ -697,15 +727,30 @@ fn log_changes(
|
||||||
|| old_hot.me_adaptive_floor_idle_secs != new_hot.me_adaptive_floor_idle_secs
|
|| old_hot.me_adaptive_floor_idle_secs != new_hot.me_adaptive_floor_idle_secs
|
||||||
|| old_hot.me_adaptive_floor_min_writers_single_endpoint
|
|| old_hot.me_adaptive_floor_min_writers_single_endpoint
|
||||||
!= new_hot.me_adaptive_floor_min_writers_single_endpoint
|
!= new_hot.me_adaptive_floor_min_writers_single_endpoint
|
||||||
|
|| old_hot.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|
!= new_hot.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|| old_hot.me_adaptive_floor_recover_grace_secs
|
|| old_hot.me_adaptive_floor_recover_grace_secs
|
||||||
!= new_hot.me_adaptive_floor_recover_grace_secs
|
!= new_hot.me_adaptive_floor_recover_grace_secs
|
||||||
|
|| old_hot.me_adaptive_floor_writers_per_core_total
|
||||||
|
!= new_hot.me_adaptive_floor_writers_per_core_total
|
||||||
|
|| old_hot.me_adaptive_floor_cpu_cores_override
|
||||||
|
!= new_hot.me_adaptive_floor_cpu_cores_override
|
||||||
|
|| old_hot.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|
!= new_hot.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|
|| old_hot.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
|
!= new_hot.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
{
|
{
|
||||||
info!(
|
info!(
|
||||||
"config reload: me_floor: mode={:?} idle={}s min_single={} recover_grace={}s",
|
"config reload: me_floor: mode={:?} idle={}s min_single={} min_multi={} recover_grace={}s per_core_total={} cores_override={} extra_single_per_core={} extra_multi_per_core={}",
|
||||||
new_hot.me_floor_mode,
|
new_hot.me_floor_mode,
|
||||||
new_hot.me_adaptive_floor_idle_secs,
|
new_hot.me_adaptive_floor_idle_secs,
|
||||||
new_hot.me_adaptive_floor_min_writers_single_endpoint,
|
new_hot.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
new_hot.me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
new_hot.me_adaptive_floor_recover_grace_secs,
|
new_hot.me_adaptive_floor_recover_grace_secs,
|
||||||
|
new_hot.me_adaptive_floor_writers_per_core_total,
|
||||||
|
new_hot.me_adaptive_floor_cpu_cores_override,
|
||||||
|
new_hot.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
new_hot.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -312,6 +312,21 @@ impl ProxyConfig {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if config.general.me_adaptive_floor_min_writers_multi_endpoint == 0
|
||||||
|
|| config.general.me_adaptive_floor_min_writers_multi_endpoint > 32
|
||||||
|
{
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_adaptive_floor_min_writers_multi_endpoint must be within [1, 32]"
|
||||||
|
.to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.general.me_adaptive_floor_writers_per_core_total == 0 {
|
||||||
|
return Err(ProxyError::Config(
|
||||||
|
"general.me_adaptive_floor_writers_per_core_total must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
|
if config.general.me_single_endpoint_outage_backoff_min_ms == 0 {
|
||||||
return Err(ProxyError::Config(
|
return Err(ProxyError::Config(
|
||||||
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
|
"general.me_single_endpoint_outage_backoff_min_ms must be > 0".to_string(),
|
||||||
|
|
|
||||||
|
|
@ -520,10 +520,31 @@ pub struct GeneralConfig {
|
||||||
#[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")]
|
#[serde(default = "default_me_adaptive_floor_min_writers_single_endpoint")]
|
||||||
pub me_adaptive_floor_min_writers_single_endpoint: u8,
|
pub me_adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
|
||||||
|
/// Minimum writer target for multi-endpoint DC groups in adaptive floor mode.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_min_writers_multi_endpoint")]
|
||||||
|
pub me_adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
|
|
||||||
/// Grace period in seconds to hold static floor after activity in adaptive mode.
|
/// Grace period in seconds to hold static floor after activity in adaptive mode.
|
||||||
#[serde(default = "default_me_adaptive_floor_recover_grace_secs")]
|
#[serde(default = "default_me_adaptive_floor_recover_grace_secs")]
|
||||||
pub me_adaptive_floor_recover_grace_secs: u64,
|
pub me_adaptive_floor_recover_grace_secs: u64,
|
||||||
|
|
||||||
|
/// Global ME writer budget per logical CPU core in adaptive mode.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_writers_per_core_total")]
|
||||||
|
pub me_adaptive_floor_writers_per_core_total: u16,
|
||||||
|
|
||||||
|
/// Override logical CPU core count for adaptive floor calculations.
|
||||||
|
/// Set to 0 to use runtime auto-detection.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_cpu_cores_override")]
|
||||||
|
pub me_adaptive_floor_cpu_cores_override: u16,
|
||||||
|
|
||||||
|
/// Per-core max extra writers above base required floor for single-endpoint DC groups.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_max_extra_writers_single_per_core")]
|
||||||
|
pub me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
|
||||||
|
/// Per-core max extra writers above base required floor for multi-endpoint DC groups.
|
||||||
|
#[serde(default = "default_me_adaptive_floor_max_extra_writers_multi_per_core")]
|
||||||
|
pub me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
|
||||||
/// Connect attempts for the selected upstream before returning error/fallback.
|
/// Connect attempts for the selected upstream before returning error/fallback.
|
||||||
#[serde(default = "default_upstream_connect_retry_attempts")]
|
#[serde(default = "default_upstream_connect_retry_attempts")]
|
||||||
pub upstream_connect_retry_attempts: u32,
|
pub upstream_connect_retry_attempts: u32,
|
||||||
|
|
@ -775,7 +796,12 @@ impl Default for GeneralConfig {
|
||||||
me_floor_mode: MeFloorMode::default(),
|
me_floor_mode: MeFloorMode::default(),
|
||||||
me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(),
|
me_adaptive_floor_idle_secs: default_me_adaptive_floor_idle_secs(),
|
||||||
me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(),
|
me_adaptive_floor_min_writers_single_endpoint: default_me_adaptive_floor_min_writers_single_endpoint(),
|
||||||
|
me_adaptive_floor_min_writers_multi_endpoint: default_me_adaptive_floor_min_writers_multi_endpoint(),
|
||||||
me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(),
|
me_adaptive_floor_recover_grace_secs: default_me_adaptive_floor_recover_grace_secs(),
|
||||||
|
me_adaptive_floor_writers_per_core_total: default_me_adaptive_floor_writers_per_core_total(),
|
||||||
|
me_adaptive_floor_cpu_cores_override: default_me_adaptive_floor_cpu_cores_override(),
|
||||||
|
me_adaptive_floor_max_extra_writers_single_per_core: default_me_adaptive_floor_max_extra_writers_single_per_core(),
|
||||||
|
me_adaptive_floor_max_extra_writers_multi_per_core: default_me_adaptive_floor_max_extra_writers_multi_per_core(),
|
||||||
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
upstream_connect_retry_attempts: default_upstream_connect_retry_attempts(),
|
||||||
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
upstream_connect_retry_backoff_ms: default_upstream_connect_retry_backoff_ms(),
|
||||||
upstream_connect_budget_ms: default_upstream_connect_budget_ms(),
|
upstream_connect_budget_ms: default_upstream_connect_budget_ms(),
|
||||||
|
|
|
||||||
|
|
@ -786,7 +786,12 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
config.general.me_floor_mode,
|
config.general.me_floor_mode,
|
||||||
config.general.me_adaptive_floor_idle_secs,
|
config.general.me_adaptive_floor_idle_secs,
|
||||||
config.general.me_adaptive_floor_min_writers_single_endpoint,
|
config.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
config.general.me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
config.general.me_adaptive_floor_recover_grace_secs,
|
config.general.me_adaptive_floor_recover_grace_secs,
|
||||||
|
config.general.me_adaptive_floor_writers_per_core_total,
|
||||||
|
config.general.me_adaptive_floor_cpu_cores_override,
|
||||||
|
config.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
config.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
config.general.hardswap,
|
config.general.hardswap,
|
||||||
config.general.me_pool_drain_ttl_secs,
|
config.general.me_pool_drain_ttl_secs,
|
||||||
config.general.effective_me_pool_force_close_secs(),
|
config.general.effective_me_pool_force_close_secs(),
|
||||||
|
|
|
||||||
127
src/metrics.rs
127
src/metrics.rs
|
|
@ -968,6 +968,133 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_cpu_cores_detected Runtime detected logical CPU cores for adaptive floor"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_cpu_cores_detected gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_cpu_cores_detected {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_cpu_cores_detected_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_cpu_cores_effective Runtime effective logical CPU cores for adaptive floor"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_cpu_cores_effective gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_cpu_cores_effective {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_cpu_cores_effective_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_global_cap_raw Runtime raw global adaptive floor cap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_global_cap_raw gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_global_cap_raw {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_global_cap_raw_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_global_cap_effective Runtime effective global adaptive floor cap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_global_cap_effective gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_global_cap_effective {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_global_cap_effective_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_adaptive_floor_target_writers_total Runtime adaptive floor target writers total"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# TYPE telemt_me_adaptive_floor_target_writers_total gauge"
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_adaptive_floor_target_writers_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_target_writers_total_gauge()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_floor_cap_block_total Reconnect attempts blocked by adaptive floor caps"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_floor_cap_block_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_floor_cap_block_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_cap_block_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_floor_swap_idle_total Adaptive floor cap recovery via idle writer swap"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_floor_swap_idle_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_floor_swap_idle_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_swap_idle_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"# HELP telemt_me_floor_swap_idle_failed_total Failed idle swap attempts under adaptive floor caps"
|
||||||
|
);
|
||||||
|
let _ = writeln!(out, "# TYPE telemt_me_floor_swap_idle_failed_total counter");
|
||||||
|
let _ = writeln!(
|
||||||
|
out,
|
||||||
|
"telemt_me_floor_swap_idle_failed_total {}",
|
||||||
|
if me_allows_normal {
|
||||||
|
stats.get_me_floor_swap_idle_failed_total()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
|
let _ = writeln!(out, "# HELP telemt_secure_padding_invalid_total Invalid secure frame lengths");
|
||||||
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
|
let _ = writeln!(out, "# TYPE telemt_secure_padding_invalid_total counter");
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,14 @@ pub struct Stats {
|
||||||
me_floor_mode_switch_total: AtomicU64,
|
me_floor_mode_switch_total: AtomicU64,
|
||||||
me_floor_mode_switch_static_to_adaptive_total: AtomicU64,
|
me_floor_mode_switch_static_to_adaptive_total: AtomicU64,
|
||||||
me_floor_mode_switch_adaptive_to_static_total: AtomicU64,
|
me_floor_mode_switch_adaptive_to_static_total: AtomicU64,
|
||||||
|
me_floor_cpu_cores_detected_gauge: AtomicU64,
|
||||||
|
me_floor_cpu_cores_effective_gauge: AtomicU64,
|
||||||
|
me_floor_global_cap_raw_gauge: AtomicU64,
|
||||||
|
me_floor_global_cap_effective_gauge: AtomicU64,
|
||||||
|
me_floor_target_writers_total_gauge: AtomicU64,
|
||||||
|
me_floor_cap_block_total: AtomicU64,
|
||||||
|
me_floor_swap_idle_total: AtomicU64,
|
||||||
|
me_floor_swap_idle_failed_total: AtomicU64,
|
||||||
me_handshake_error_codes: DashMap<i32, AtomicU64>,
|
me_handshake_error_codes: DashMap<i32, AtomicU64>,
|
||||||
me_route_drop_no_conn: AtomicU64,
|
me_route_drop_no_conn: AtomicU64,
|
||||||
me_route_drop_channel_closed: AtomicU64,
|
me_route_drop_channel_closed: AtomicU64,
|
||||||
|
|
@ -676,6 +684,52 @@ impl Stats {
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub fn set_me_floor_cpu_cores_detected_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_cpu_cores_detected_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_cpu_cores_effective_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_cpu_cores_effective_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_global_cap_raw_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_global_cap_raw_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_global_cap_effective_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_global_cap_effective_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn set_me_floor_target_writers_total_gauge(&self, value: u64) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_target_writers_total_gauge
|
||||||
|
.store(value, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_floor_cap_block_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_cap_block_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_floor_swap_idle_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_swap_idle_total.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn increment_me_floor_swap_idle_failed_total(&self) {
|
||||||
|
if self.telemetry_me_allows_normal() {
|
||||||
|
self.me_floor_swap_idle_failed_total
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
pub fn get_connects_all(&self) -> u64 { self.connects_all.load(Ordering::Relaxed) }
|
||||||
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
pub fn get_connects_bad(&self) -> u64 { self.connects_bad.load(Ordering::Relaxed) }
|
||||||
pub fn get_current_connections_direct(&self) -> u64 {
|
pub fn get_current_connections_direct(&self) -> u64 {
|
||||||
|
|
@ -781,6 +835,34 @@ impl Stats {
|
||||||
self.me_floor_mode_switch_adaptive_to_static_total
|
self.me_floor_mode_switch_adaptive_to_static_total
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
pub fn get_me_floor_cpu_cores_detected_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_cpu_cores_detected_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_cpu_cores_effective_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_cpu_cores_effective_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_global_cap_raw_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_global_cap_raw_gauge.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_global_cap_effective_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_global_cap_effective_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_target_writers_total_gauge(&self) -> u64 {
|
||||||
|
self.me_floor_target_writers_total_gauge
|
||||||
|
.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_cap_block_total(&self) -> u64 {
|
||||||
|
self.me_floor_cap_block_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_swap_idle_total(&self) -> u64 {
|
||||||
|
self.me_floor_swap_idle_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
pub fn get_me_floor_swap_idle_failed_total(&self) -> u64 {
|
||||||
|
self.me_floor_swap_idle_failed_total.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> {
|
pub fn get_me_handshake_error_code_counts(&self) -> Vec<(i32, u64)> {
|
||||||
let mut out: Vec<(i32, u64)> = self
|
let mut out: Vec<(i32, u64)> = self
|
||||||
.me_handshake_error_codes
|
.me_handshake_error_codes
|
||||||
|
|
|
||||||
|
|
@ -315,7 +315,12 @@ async fn run_update_cycle(
|
||||||
cfg.general.me_floor_mode,
|
cfg.general.me_floor_mode,
|
||||||
cfg.general.me_adaptive_floor_idle_secs,
|
cfg.general.me_adaptive_floor_idle_secs,
|
||||||
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
cfg.general.me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
cfg.general.me_adaptive_floor_recover_grace_secs,
|
cfg.general.me_adaptive_floor_recover_grace_secs,
|
||||||
|
cfg.general.me_adaptive_floor_writers_per_core_total,
|
||||||
|
cfg.general.me_adaptive_floor_cpu_cores_override,
|
||||||
|
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
);
|
);
|
||||||
|
|
||||||
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
|
||||||
|
|
@ -527,7 +532,12 @@ pub async fn me_config_updater(
|
||||||
cfg.general.me_floor_mode,
|
cfg.general.me_floor_mode,
|
||||||
cfg.general.me_adaptive_floor_idle_secs,
|
cfg.general.me_adaptive_floor_idle_secs,
|
||||||
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
cfg.general.me_adaptive_floor_min_writers_single_endpoint,
|
||||||
|
cfg.general.me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
cfg.general.me_adaptive_floor_recover_grace_secs,
|
cfg.general.me_adaptive_floor_recover_grace_secs,
|
||||||
|
cfg.general.me_adaptive_floor_writers_per_core_total,
|
||||||
|
cfg.general.me_adaptive_floor_cpu_cores_override,
|
||||||
|
cfg.general.me_adaptive_floor_max_extra_writers_single_per_core,
|
||||||
|
cfg.general.me_adaptive_floor_max_extra_writers_multi_per_core,
|
||||||
);
|
);
|
||||||
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
let new_secs = cfg.general.effective_update_every_secs().max(1);
|
||||||
if new_secs == update_every_secs {
|
if new_secs == update_every_secs {
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,25 @@ const IDLE_REFRESH_TRIGGER_JITTER_SECS: u64 = 5;
|
||||||
const IDLE_REFRESH_RETRY_SECS: u64 = 8;
|
const IDLE_REFRESH_RETRY_SECS: u64 = 8;
|
||||||
const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5;
|
const IDLE_REFRESH_SUCCESS_GUARD_SECS: u64 = 5;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct DcFloorPlanEntry {
|
||||||
|
dc: i32,
|
||||||
|
endpoints: Vec<SocketAddr>,
|
||||||
|
alive: usize,
|
||||||
|
min_required: usize,
|
||||||
|
target_required: usize,
|
||||||
|
max_required: usize,
|
||||||
|
has_bound_clients: bool,
|
||||||
|
floor_capped: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct FamilyFloorPlan {
|
||||||
|
by_dc: HashMap<i32, DcFloorPlanEntry>,
|
||||||
|
global_cap_effective_total: usize,
|
||||||
|
target_writers_total: usize,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_connections: usize) {
|
||||||
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
let mut backoff: HashMap<(i32, IpFamily), u64> = HashMap::new();
|
||||||
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
let mut next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
|
||||||
|
|
@ -129,22 +148,33 @@ async fn check_family(
|
||||||
.push(writer.id);
|
.push(writer.id);
|
||||||
}
|
}
|
||||||
let writer_idle_since = pool.registry.writer_idle_since_snapshot().await;
|
let writer_idle_since = pool.registry.writer_idle_since_snapshot().await;
|
||||||
|
let floor_plan = build_family_floor_plan(
|
||||||
|
pool,
|
||||||
|
family,
|
||||||
|
&dc_endpoints,
|
||||||
|
&live_addr_counts,
|
||||||
|
&live_writer_ids_by_addr,
|
||||||
|
adaptive_idle_since,
|
||||||
|
adaptive_recover_until,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
pool.set_adaptive_floor_runtime_caps(
|
||||||
|
floor_plan.global_cap_effective_total,
|
||||||
|
floor_plan.target_writers_total,
|
||||||
|
);
|
||||||
|
|
||||||
for (dc, endpoints) in dc_endpoints {
|
for (dc, endpoints) in dc_endpoints {
|
||||||
if endpoints.is_empty() {
|
if endpoints.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let key = (dc, family);
|
let key = (dc, family);
|
||||||
let reduce_for_idle = should_reduce_floor_for_idle(
|
let required = floor_plan
|
||||||
pool,
|
.by_dc
|
||||||
key,
|
.get(&dc)
|
||||||
&endpoints,
|
.map(|entry| entry.target_required)
|
||||||
&live_writer_ids_by_addr,
|
.unwrap_or_else(|| {
|
||||||
adaptive_idle_since,
|
pool.required_writers_for_dc_with_floor_mode(endpoints.len(), false)
|
||||||
adaptive_recover_until,
|
});
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let required = pool.required_writers_for_dc_with_floor_mode(endpoints.len(), reduce_for_idle);
|
|
||||||
let alive = endpoints
|
let alive = endpoints
|
||||||
.iter()
|
.iter()
|
||||||
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
.map(|addr| *live_addr_counts.get(addr).unwrap_or(&0))
|
||||||
|
|
@ -251,6 +281,36 @@ async fn check_family(
|
||||||
|
|
||||||
let mut restored = 0usize;
|
let mut restored = 0usize;
|
||||||
for _ in 0..missing {
|
for _ in 0..missing {
|
||||||
|
if pool.floor_mode() == MeFloorMode::Adaptive
|
||||||
|
&& pool.active_writer_count_total().await >= floor_plan.global_cap_effective_total
|
||||||
|
{
|
||||||
|
let swapped = maybe_swap_idle_writer_for_cap(
|
||||||
|
pool,
|
||||||
|
rng,
|
||||||
|
dc,
|
||||||
|
family,
|
||||||
|
&endpoints,
|
||||||
|
&live_writer_ids_by_addr,
|
||||||
|
&writer_idle_since,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
if swapped {
|
||||||
|
pool.stats.increment_me_floor_swap_idle_total();
|
||||||
|
restored += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pool.stats.increment_me_floor_cap_block_total();
|
||||||
|
pool.stats.increment_me_floor_swap_idle_failed_total();
|
||||||
|
debug!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
alive,
|
||||||
|
required,
|
||||||
|
global_cap_effective_total = floor_plan.global_cap_effective_total,
|
||||||
|
"Adaptive floor cap reached, reconnect attempt blocked"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
let res = tokio::time::timeout(
|
let res = tokio::time::timeout(
|
||||||
pool.me_one_timeout,
|
pool.me_one_timeout,
|
||||||
pool.connect_endpoints_round_robin(&endpoints, rng.as_ref()),
|
pool.connect_endpoints_round_robin(&endpoints, rng.as_ref()),
|
||||||
|
|
@ -323,6 +383,280 @@ async fn check_family(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn adaptive_floor_class_min(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
endpoint_count: usize,
|
||||||
|
base_required: usize,
|
||||||
|
) -> usize {
|
||||||
|
if endpoint_count <= 1 {
|
||||||
|
let min_single = (pool
|
||||||
|
.me_adaptive_floor_min_writers_single_endpoint
|
||||||
|
.load(std::sync::atomic::Ordering::Relaxed) as usize)
|
||||||
|
.max(1);
|
||||||
|
min_single.min(base_required.max(1))
|
||||||
|
} else {
|
||||||
|
pool.adaptive_floor_min_writers_multi_endpoint()
|
||||||
|
.min(base_required.max(1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn adaptive_floor_class_max(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
endpoint_count: usize,
|
||||||
|
base_required: usize,
|
||||||
|
cpu_cores: usize,
|
||||||
|
) -> usize {
|
||||||
|
let extra_per_core = if endpoint_count <= 1 {
|
||||||
|
pool.adaptive_floor_max_extra_single_per_core()
|
||||||
|
} else {
|
||||||
|
pool.adaptive_floor_max_extra_multi_per_core()
|
||||||
|
};
|
||||||
|
base_required.saturating_add(cpu_cores.saturating_mul(extra_per_core))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_writer_ids_for_endpoints(
|
||||||
|
endpoints: &[SocketAddr],
|
||||||
|
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
|
||||||
|
) -> Vec<u64> {
|
||||||
|
let mut out = Vec::<u64>::new();
|
||||||
|
for endpoint in endpoints {
|
||||||
|
if let Some(ids) = live_writer_ids_by_addr.get(endpoint) {
|
||||||
|
out.extend(ids.iter().copied());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn build_family_floor_plan(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
family: IpFamily,
|
||||||
|
dc_endpoints: &HashMap<i32, Vec<SocketAddr>>,
|
||||||
|
live_addr_counts: &HashMap<SocketAddr, usize>,
|
||||||
|
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
|
||||||
|
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
|
) -> FamilyFloorPlan {
|
||||||
|
let mut entries = Vec::<DcFloorPlanEntry>::new();
|
||||||
|
let mut by_dc = HashMap::<i32, DcFloorPlanEntry>::new();
|
||||||
|
let mut family_active_total = 0usize;
|
||||||
|
|
||||||
|
let floor_mode = pool.floor_mode();
|
||||||
|
let is_adaptive = floor_mode == MeFloorMode::Adaptive;
|
||||||
|
let cpu_cores = pool.adaptive_floor_effective_cpu_cores().max(1);
|
||||||
|
|
||||||
|
for (dc, endpoints) in dc_endpoints {
|
||||||
|
if endpoints.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let key = (*dc, family);
|
||||||
|
let reduce_for_idle = should_reduce_floor_for_idle(
|
||||||
|
pool,
|
||||||
|
key,
|
||||||
|
endpoints,
|
||||||
|
live_writer_ids_by_addr,
|
||||||
|
adaptive_idle_since,
|
||||||
|
adaptive_recover_until,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let base_required = pool.required_writers_for_dc(endpoints.len()).max(1);
|
||||||
|
let min_required = if is_adaptive {
|
||||||
|
adaptive_floor_class_min(pool, endpoints.len(), base_required)
|
||||||
|
} else {
|
||||||
|
base_required
|
||||||
|
};
|
||||||
|
let mut max_required = if is_adaptive {
|
||||||
|
adaptive_floor_class_max(pool, endpoints.len(), base_required, cpu_cores)
|
||||||
|
} else {
|
||||||
|
base_required
|
||||||
|
};
|
||||||
|
if max_required < min_required {
|
||||||
|
max_required = min_required;
|
||||||
|
}
|
||||||
|
let desired_raw = if is_adaptive && reduce_for_idle {
|
||||||
|
min_required
|
||||||
|
} else {
|
||||||
|
base_required
|
||||||
|
};
|
||||||
|
let target_required = desired_raw.clamp(min_required, max_required);
|
||||||
|
let alive = endpoints
|
||||||
|
.iter()
|
||||||
|
.map(|endpoint| live_addr_counts.get(endpoint).copied().unwrap_or(0))
|
||||||
|
.sum::<usize>();
|
||||||
|
family_active_total = family_active_total.saturating_add(alive);
|
||||||
|
let writer_ids = list_writer_ids_for_endpoints(endpoints, live_writer_ids_by_addr);
|
||||||
|
let has_bound_clients = has_bound_clients_on_endpoint(pool, &writer_ids).await;
|
||||||
|
|
||||||
|
entries.push(DcFloorPlanEntry {
|
||||||
|
dc: *dc,
|
||||||
|
endpoints: endpoints.clone(),
|
||||||
|
alive,
|
||||||
|
min_required,
|
||||||
|
target_required,
|
||||||
|
max_required,
|
||||||
|
has_bound_clients,
|
||||||
|
floor_capped: false,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if entries.is_empty() {
|
||||||
|
return FamilyFloorPlan {
|
||||||
|
by_dc,
|
||||||
|
global_cap_effective_total: 0,
|
||||||
|
target_writers_total: 0,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if !is_adaptive {
|
||||||
|
let target_total = entries
|
||||||
|
.iter()
|
||||||
|
.map(|entry| entry.target_required)
|
||||||
|
.sum::<usize>();
|
||||||
|
let active_total = pool.active_writer_count_total().await;
|
||||||
|
for entry in entries {
|
||||||
|
by_dc.insert(entry.dc, entry);
|
||||||
|
}
|
||||||
|
return FamilyFloorPlan {
|
||||||
|
by_dc,
|
||||||
|
global_cap_effective_total: active_total.max(target_total),
|
||||||
|
target_writers_total: target_total,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let global_cap_raw = pool.adaptive_floor_global_cap_raw();
|
||||||
|
let total_active = pool.active_writer_count_total().await;
|
||||||
|
let other_active = total_active.saturating_sub(family_active_total);
|
||||||
|
let min_sum = entries
|
||||||
|
.iter()
|
||||||
|
.map(|entry| entry.min_required)
|
||||||
|
.sum::<usize>();
|
||||||
|
let mut target_sum = entries
|
||||||
|
.iter()
|
||||||
|
.map(|entry| entry.target_required)
|
||||||
|
.sum::<usize>();
|
||||||
|
let family_cap = global_cap_raw
|
||||||
|
.saturating_sub(other_active)
|
||||||
|
.max(min_sum);
|
||||||
|
if target_sum > family_cap {
|
||||||
|
entries.sort_by_key(|entry| {
|
||||||
|
(
|
||||||
|
entry.has_bound_clients,
|
||||||
|
std::cmp::Reverse(entry.target_required.saturating_sub(entry.min_required)),
|
||||||
|
std::cmp::Reverse(entry.alive),
|
||||||
|
entry.dc.abs(),
|
||||||
|
entry.dc,
|
||||||
|
entry.endpoints.len(),
|
||||||
|
entry.max_required,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
let mut changed = true;
|
||||||
|
while target_sum > family_cap && changed {
|
||||||
|
changed = false;
|
||||||
|
for entry in &mut entries {
|
||||||
|
if target_sum <= family_cap {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if entry.target_required > entry.min_required {
|
||||||
|
entry.target_required -= 1;
|
||||||
|
entry.floor_capped = true;
|
||||||
|
target_sum -= 1;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
by_dc.insert(entry.dc, entry);
|
||||||
|
}
|
||||||
|
let global_cap_effective_total = global_cap_raw.max(other_active.saturating_add(min_sum));
|
||||||
|
let target_writers_total = other_active.saturating_add(target_sum);
|
||||||
|
FamilyFloorPlan {
|
||||||
|
by_dc,
|
||||||
|
global_cap_effective_total,
|
||||||
|
target_writers_total,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn maybe_swap_idle_writer_for_cap(
|
||||||
|
pool: &Arc<MePool>,
|
||||||
|
rng: &Arc<SecureRandom>,
|
||||||
|
dc: i32,
|
||||||
|
family: IpFamily,
|
||||||
|
endpoints: &[SocketAddr],
|
||||||
|
live_writer_ids_by_addr: &HashMap<SocketAddr, Vec<u64>>,
|
||||||
|
writer_idle_since: &HashMap<u64, u64>,
|
||||||
|
) -> bool {
|
||||||
|
let now_epoch_secs = MePool::now_epoch_secs();
|
||||||
|
let mut candidate: Option<(u64, SocketAddr, u64)> = None;
|
||||||
|
for endpoint in endpoints {
|
||||||
|
let Some(writer_ids) = live_writer_ids_by_addr.get(endpoint) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
for writer_id in writer_ids {
|
||||||
|
if !pool.registry.is_writer_empty(*writer_id).await {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Some(idle_since_epoch_secs) = writer_idle_since.get(writer_id).copied() else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let idle_age_secs = now_epoch_secs.saturating_sub(idle_since_epoch_secs);
|
||||||
|
if candidate
|
||||||
|
.as_ref()
|
||||||
|
.map(|(_, _, age)| idle_age_secs > *age)
|
||||||
|
.unwrap_or(true)
|
||||||
|
{
|
||||||
|
candidate = Some((*writer_id, *endpoint, idle_age_secs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some((old_writer_id, endpoint, idle_age_secs)) = candidate else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
let connected = match tokio::time::timeout(pool.me_one_timeout, pool.connect_one(endpoint, rng.as_ref())).await {
|
||||||
|
Ok(Ok(())) => true,
|
||||||
|
Ok(Err(error)) => {
|
||||||
|
debug!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
old_writer_id,
|
||||||
|
idle_age_secs,
|
||||||
|
%error,
|
||||||
|
"Adaptive floor cap swap connect failed"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
debug!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
old_writer_id,
|
||||||
|
idle_age_secs,
|
||||||
|
"Adaptive floor cap swap connect timed out"
|
||||||
|
);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if !connected {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.mark_writer_draining_with_timeout(old_writer_id, pool.force_close_timeout(), false)
|
||||||
|
.await;
|
||||||
|
info!(
|
||||||
|
dc = %dc,
|
||||||
|
?family,
|
||||||
|
%endpoint,
|
||||||
|
old_writer_id,
|
||||||
|
idle_age_secs,
|
||||||
|
"Adaptive floor cap swap: idle writer rotated"
|
||||||
|
);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
async fn maybe_refresh_idle_writer_for_dc(
|
async fn maybe_refresh_idle_writer_for_dc(
|
||||||
pool: &Arc<MePool>,
|
pool: &Arc<MePool>,
|
||||||
rng: &Arc<SecureRandom>,
|
rng: &Arc<SecureRandom>,
|
||||||
|
|
@ -438,19 +772,15 @@ async fn should_reduce_floor_for_idle(
|
||||||
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
if endpoints.len() != 1 || pool.floor_mode() != MeFloorMode::Adaptive {
|
if pool.floor_mode() != MeFloorMode::Adaptive {
|
||||||
adaptive_idle_since.remove(&key);
|
adaptive_idle_since.remove(&key);
|
||||||
adaptive_recover_until.remove(&key);
|
adaptive_recover_until.remove(&key);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let endpoint = endpoints[0];
|
let writer_ids = list_writer_ids_for_endpoints(endpoints, live_writer_ids_by_addr);
|
||||||
let writer_ids = live_writer_ids_by_addr
|
let has_bound_clients = has_bound_clients_on_endpoint(pool, &writer_ids).await;
|
||||||
.get(&endpoint)
|
|
||||||
.map(Vec::as_slice)
|
|
||||||
.unwrap_or(&[]);
|
|
||||||
let has_bound_clients = has_bound_clients_on_endpoint(pool, writer_ids).await;
|
|
||||||
if has_bound_clients {
|
if has_bound_clients {
|
||||||
adaptive_idle_since.remove(&key);
|
adaptive_idle_since.remove(&key);
|
||||||
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
|
adaptive_recover_until.insert(key, now + pool.adaptive_floor_recover_grace_duration());
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,17 @@ pub struct MePool {
|
||||||
pub(super) me_floor_mode: AtomicU8,
|
pub(super) me_floor_mode: AtomicU8,
|
||||||
pub(super) me_adaptive_floor_idle_secs: AtomicU64,
|
pub(super) me_adaptive_floor_idle_secs: AtomicU64,
|
||||||
pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8,
|
pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8,
|
||||||
|
pub(super) me_adaptive_floor_min_writers_multi_endpoint: AtomicU8,
|
||||||
pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64,
|
pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_writers_per_core_total: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_cpu_cores_override: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_cpu_cores_detected: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_cpu_cores_effective: AtomicU32,
|
||||||
|
pub(super) me_adaptive_floor_global_cap_raw: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_global_cap_effective: AtomicU64,
|
||||||
|
pub(super) me_adaptive_floor_target_writers_total: AtomicU64,
|
||||||
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
|
||||||
pub(super) default_dc: AtomicI32,
|
pub(super) default_dc: AtomicI32,
|
||||||
|
|
@ -217,7 +227,12 @@ impl MePool {
|
||||||
me_floor_mode: MeFloorMode,
|
me_floor_mode: MeFloorMode,
|
||||||
me_adaptive_floor_idle_secs: u64,
|
me_adaptive_floor_idle_secs: u64,
|
||||||
me_adaptive_floor_min_writers_single_endpoint: u8,
|
me_adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
me_adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
me_adaptive_floor_recover_grace_secs: u64,
|
me_adaptive_floor_recover_grace_secs: u64,
|
||||||
|
me_adaptive_floor_writers_per_core_total: u16,
|
||||||
|
me_adaptive_floor_cpu_cores_override: u16,
|
||||||
|
me_adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
me_adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
hardswap: bool,
|
hardswap: bool,
|
||||||
me_pool_drain_ttl_secs: u64,
|
me_pool_drain_ttl_secs: u64,
|
||||||
me_pool_force_close_secs: u64,
|
me_pool_force_close_secs: u64,
|
||||||
|
|
@ -314,9 +329,29 @@ impl MePool {
|
||||||
me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new(
|
me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new(
|
||||||
me_adaptive_floor_min_writers_single_endpoint,
|
me_adaptive_floor_min_writers_single_endpoint,
|
||||||
),
|
),
|
||||||
|
me_adaptive_floor_min_writers_multi_endpoint: AtomicU8::new(
|
||||||
|
me_adaptive_floor_min_writers_multi_endpoint,
|
||||||
|
),
|
||||||
me_adaptive_floor_recover_grace_secs: AtomicU64::new(
|
me_adaptive_floor_recover_grace_secs: AtomicU64::new(
|
||||||
me_adaptive_floor_recover_grace_secs,
|
me_adaptive_floor_recover_grace_secs,
|
||||||
),
|
),
|
||||||
|
me_adaptive_floor_writers_per_core_total: AtomicU32::new(
|
||||||
|
me_adaptive_floor_writers_per_core_total as u32,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_cpu_cores_override: AtomicU32::new(
|
||||||
|
me_adaptive_floor_cpu_cores_override as u32,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32::new(
|
||||||
|
me_adaptive_floor_max_extra_writers_single_per_core as u32,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new(
|
||||||
|
me_adaptive_floor_max_extra_writers_multi_per_core as u32,
|
||||||
|
),
|
||||||
|
me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1),
|
||||||
|
me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1),
|
||||||
|
me_adaptive_floor_global_cap_raw: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_global_cap_effective: AtomicU64::new(0),
|
||||||
|
me_adaptive_floor_target_writers_total: AtomicU64::new(0),
|
||||||
pool_size: 2,
|
pool_size: 2,
|
||||||
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
|
||||||
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
|
||||||
|
|
@ -399,7 +434,12 @@ impl MePool {
|
||||||
floor_mode: MeFloorMode,
|
floor_mode: MeFloorMode,
|
||||||
adaptive_floor_idle_secs: u64,
|
adaptive_floor_idle_secs: u64,
|
||||||
adaptive_floor_min_writers_single_endpoint: u8,
|
adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
adaptive_floor_recover_grace_secs: u64,
|
adaptive_floor_recover_grace_secs: u64,
|
||||||
|
adaptive_floor_writers_per_core_total: u16,
|
||||||
|
adaptive_floor_cpu_cores_override: u16,
|
||||||
|
adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
) {
|
) {
|
||||||
self.hardswap.store(hardswap, Ordering::Relaxed);
|
self.hardswap.store(hardswap, Ordering::Relaxed);
|
||||||
self.me_pool_drain_ttl_secs
|
self.me_pool_drain_ttl_secs
|
||||||
|
|
@ -443,8 +483,24 @@ impl MePool {
|
||||||
.store(adaptive_floor_idle_secs, Ordering::Relaxed);
|
.store(adaptive_floor_idle_secs, Ordering::Relaxed);
|
||||||
self.me_adaptive_floor_min_writers_single_endpoint
|
self.me_adaptive_floor_min_writers_single_endpoint
|
||||||
.store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed);
|
.store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|
.store(adaptive_floor_min_writers_multi_endpoint, Ordering::Relaxed);
|
||||||
self.me_adaptive_floor_recover_grace_secs
|
self.me_adaptive_floor_recover_grace_secs
|
||||||
.store(adaptive_floor_recover_grace_secs, Ordering::Relaxed);
|
.store(adaptive_floor_recover_grace_secs, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_writers_per_core_total
|
||||||
|
.store(adaptive_floor_writers_per_core_total as u32, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_cpu_cores_override
|
||||||
|
.store(adaptive_floor_cpu_cores_override as u32, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|
.store(
|
||||||
|
adaptive_floor_max_extra_writers_single_per_core as u32,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
|
self.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
|
.store(
|
||||||
|
adaptive_floor_max_extra_writers_multi_per_core as u32,
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
if previous_floor_mode != floor_mode {
|
if previous_floor_mode != floor_mode {
|
||||||
self.stats.increment_me_floor_mode_switch_total();
|
self.stats.increment_me_floor_mode_switch_total();
|
||||||
match (previous_floor_mode, floor_mode) {
|
match (previous_floor_mode, floor_mode) {
|
||||||
|
|
@ -515,6 +571,13 @@ impl MePool {
|
||||||
self.proxy_secret.read().await.key_selector
|
self.proxy_secret.read().await.key_selector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) async fn active_writer_count_total(&self) -> usize {
|
||||||
|
let ws = self.writers.read().await;
|
||||||
|
ws.iter()
|
||||||
|
.filter(|w| !w.draining.load(Ordering::Relaxed))
|
||||||
|
.count()
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
|
pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
|
||||||
self.proxy_secret.read().await.clone()
|
self.proxy_secret.read().await.clone()
|
||||||
}
|
}
|
||||||
|
|
@ -551,6 +614,82 @@ impl MePool {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_writers_per_core_total(&self) -> usize {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_writers_per_core_total
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize {
|
||||||
|
self.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|
.load(Ordering::Relaxed) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_max_extra_multi_per_core(&self) -> usize {
|
||||||
|
self.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
|
.load(Ordering::Relaxed) as usize
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_detected_cpu_cores(&self) -> usize {
|
||||||
|
std::thread::available_parallelism()
|
||||||
|
.map(|value| value.get())
|
||||||
|
.unwrap_or(1)
|
||||||
|
.max(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_effective_cpu_cores(&self) -> usize {
|
||||||
|
let detected = self.adaptive_floor_detected_cpu_cores();
|
||||||
|
let override_cores = self
|
||||||
|
.me_adaptive_floor_cpu_cores_override
|
||||||
|
.load(Ordering::Relaxed) as usize;
|
||||||
|
let effective = if override_cores == 0 {
|
||||||
|
detected
|
||||||
|
} else {
|
||||||
|
override_cores.max(1)
|
||||||
|
};
|
||||||
|
self.me_adaptive_floor_cpu_cores_detected
|
||||||
|
.store(detected as u32, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_cpu_cores_effective
|
||||||
|
.store(effective as u32, Ordering::Relaxed);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_cpu_cores_detected_gauge(detected as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_cpu_cores_effective_gauge(effective as u64);
|
||||||
|
effective
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn adaptive_floor_global_cap_raw(&self) -> usize {
|
||||||
|
let cores = self.adaptive_floor_effective_cpu_cores();
|
||||||
|
let cap = cores.saturating_mul(self.adaptive_floor_writers_per_core_total());
|
||||||
|
self.me_adaptive_floor_global_cap_raw
|
||||||
|
.store(cap as u64, Ordering::Relaxed);
|
||||||
|
self.stats.set_me_floor_global_cap_raw_gauge(cap as u64);
|
||||||
|
cap
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_adaptive_floor_runtime_caps(
|
||||||
|
&self,
|
||||||
|
global_cap_effective: usize,
|
||||||
|
target_writers_total: usize,
|
||||||
|
) {
|
||||||
|
self.me_adaptive_floor_global_cap_effective
|
||||||
|
.store(global_cap_effective as u64, Ordering::Relaxed);
|
||||||
|
self.me_adaptive_floor_target_writers_total
|
||||||
|
.store(target_writers_total as u64, Ordering::Relaxed);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_global_cap_effective_gauge(global_cap_effective as u64);
|
||||||
|
self.stats
|
||||||
|
.set_me_floor_target_writers_total_gauge(target_writers_total as u64);
|
||||||
|
}
|
||||||
|
|
||||||
pub(super) fn required_writers_for_dc_with_floor_mode(
|
pub(super) fn required_writers_for_dc_with_floor_mode(
|
||||||
&self,
|
&self,
|
||||||
endpoint_count: usize,
|
endpoint_count: usize,
|
||||||
|
|
@ -560,13 +699,20 @@ impl MePool {
|
||||||
if !reduce_for_idle {
|
if !reduce_for_idle {
|
||||||
return base_required;
|
return base_required;
|
||||||
}
|
}
|
||||||
if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive {
|
if self.floor_mode() != MeFloorMode::Adaptive {
|
||||||
return base_required;
|
return base_required;
|
||||||
}
|
}
|
||||||
let min_writers = (self
|
let min_writers = if endpoint_count == 1 {
|
||||||
|
(self
|
||||||
.me_adaptive_floor_min_writers_single_endpoint
|
.me_adaptive_floor_min_writers_single_endpoint
|
||||||
.load(Ordering::Relaxed) as usize)
|
.load(Ordering::Relaxed) as usize)
|
||||||
.max(1);
|
.max(1)
|
||||||
|
} else {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
};
|
||||||
base_required.min(min_writers)
|
base_required.min(min_writers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,10 @@ pub(crate) struct MeApiDcStatusSnapshot {
|
||||||
pub available_endpoints: usize,
|
pub available_endpoints: usize,
|
||||||
pub available_pct: f64,
|
pub available_pct: f64,
|
||||||
pub required_writers: usize,
|
pub required_writers: usize,
|
||||||
|
pub floor_min: usize,
|
||||||
|
pub floor_target: usize,
|
||||||
|
pub floor_max: usize,
|
||||||
|
pub floor_capped: bool,
|
||||||
pub alive_writers: usize,
|
pub alive_writers: usize,
|
||||||
pub coverage_pct: f64,
|
pub coverage_pct: f64,
|
||||||
pub rtt_ms: Option<f64>,
|
pub rtt_ms: Option<f64>,
|
||||||
|
|
@ -72,7 +76,17 @@ pub(crate) struct MeApiRuntimeSnapshot {
|
||||||
pub floor_mode: &'static str,
|
pub floor_mode: &'static str,
|
||||||
pub adaptive_floor_idle_secs: u64,
|
pub adaptive_floor_idle_secs: u64,
|
||||||
pub adaptive_floor_min_writers_single_endpoint: u8,
|
pub adaptive_floor_min_writers_single_endpoint: u8,
|
||||||
|
pub adaptive_floor_min_writers_multi_endpoint: u8,
|
||||||
pub adaptive_floor_recover_grace_secs: u64,
|
pub adaptive_floor_recover_grace_secs: u64,
|
||||||
|
pub adaptive_floor_writers_per_core_total: u16,
|
||||||
|
pub adaptive_floor_cpu_cores_override: u16,
|
||||||
|
pub adaptive_floor_max_extra_writers_single_per_core: u16,
|
||||||
|
pub adaptive_floor_max_extra_writers_multi_per_core: u16,
|
||||||
|
pub adaptive_floor_cpu_cores_detected: u32,
|
||||||
|
pub adaptive_floor_cpu_cores_effective: u32,
|
||||||
|
pub adaptive_floor_global_cap_raw: u64,
|
||||||
|
pub adaptive_floor_global_cap_effective: u64,
|
||||||
|
pub adaptive_floor_target_writers_total: u64,
|
||||||
pub me_keepalive_enabled: bool,
|
pub me_keepalive_enabled: bool,
|
||||||
pub me_keepalive_interval_secs: u64,
|
pub me_keepalive_interval_secs: u64,
|
||||||
pub me_keepalive_jitter_secs: u64,
|
pub me_keepalive_jitter_secs: u64,
|
||||||
|
|
@ -275,14 +289,43 @@ impl MePool {
|
||||||
let mut dcs = Vec::<MeApiDcStatusSnapshot>::with_capacity(endpoints_by_dc.len());
|
let mut dcs = Vec::<MeApiDcStatusSnapshot>::with_capacity(endpoints_by_dc.len());
|
||||||
let mut available_endpoints = 0usize;
|
let mut available_endpoints = 0usize;
|
||||||
let mut alive_writers = 0usize;
|
let mut alive_writers = 0usize;
|
||||||
|
let floor_mode = self.floor_mode();
|
||||||
|
let adaptive_cpu_cores = (self
|
||||||
|
.me_adaptive_floor_cpu_cores_effective
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1);
|
||||||
for (dc, endpoints) in endpoints_by_dc {
|
for (dc, endpoints) in endpoints_by_dc {
|
||||||
let endpoint_count = endpoints.len();
|
let endpoint_count = endpoints.len();
|
||||||
let dc_available_endpoints = endpoints
|
let dc_available_endpoints = endpoints
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|endpoint| live_writers_by_endpoint.contains_key(endpoint))
|
.filter(|endpoint| live_writers_by_endpoint.contains_key(endpoint))
|
||||||
.count();
|
.count();
|
||||||
|
let base_required = self.required_writers_for_dc(endpoint_count);
|
||||||
let dc_required_writers =
|
let dc_required_writers =
|
||||||
self.required_writers_for_dc_with_floor_mode(endpoint_count, false);
|
self.required_writers_for_dc_with_floor_mode(endpoint_count, false);
|
||||||
|
let floor_min = if endpoint_count <= 1 {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_min_writers_single_endpoint
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
.min(base_required.max(1))
|
||||||
|
} else {
|
||||||
|
(self
|
||||||
|
.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|
.load(Ordering::Relaxed) as usize)
|
||||||
|
.max(1)
|
||||||
|
.min(base_required.max(1))
|
||||||
|
};
|
||||||
|
let extra_per_core = if endpoint_count <= 1 {
|
||||||
|
self.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|
.load(Ordering::Relaxed) as usize
|
||||||
|
} else {
|
||||||
|
self.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
|
.load(Ordering::Relaxed) as usize
|
||||||
|
};
|
||||||
|
let floor_max = base_required.saturating_add(adaptive_cpu_cores.saturating_mul(extra_per_core));
|
||||||
|
let floor_capped = matches!(floor_mode, MeFloorMode::Adaptive)
|
||||||
|
&& dc_required_writers < base_required;
|
||||||
let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0);
|
let dc_alive_writers = live_writers_by_dc.get(&dc).copied().unwrap_or(0);
|
||||||
let dc_load = activity
|
let dc_load = activity
|
||||||
.active_sessions_by_target_dc
|
.active_sessions_by_target_dc
|
||||||
|
|
@ -302,6 +345,10 @@ impl MePool {
|
||||||
available_endpoints: dc_available_endpoints,
|
available_endpoints: dc_available_endpoints,
|
||||||
available_pct: ratio_pct(dc_available_endpoints, endpoint_count),
|
available_pct: ratio_pct(dc_available_endpoints, endpoint_count),
|
||||||
required_writers: dc_required_writers,
|
required_writers: dc_required_writers,
|
||||||
|
floor_min,
|
||||||
|
floor_target: dc_required_writers,
|
||||||
|
floor_max,
|
||||||
|
floor_capped,
|
||||||
alive_writers: dc_alive_writers,
|
alive_writers: dc_alive_writers,
|
||||||
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
|
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
|
||||||
rtt_ms: dc_rtt_ms,
|
rtt_ms: dc_rtt_ms,
|
||||||
|
|
@ -378,9 +425,39 @@ impl MePool {
|
||||||
adaptive_floor_min_writers_single_endpoint: self
|
adaptive_floor_min_writers_single_endpoint: self
|
||||||
.me_adaptive_floor_min_writers_single_endpoint
|
.me_adaptive_floor_min_writers_single_endpoint
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_min_writers_multi_endpoint: self
|
||||||
|
.me_adaptive_floor_min_writers_multi_endpoint
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
adaptive_floor_recover_grace_secs: self
|
adaptive_floor_recover_grace_secs: self
|
||||||
.me_adaptive_floor_recover_grace_secs
|
.me_adaptive_floor_recover_grace_secs
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_writers_per_core_total: self
|
||||||
|
.me_adaptive_floor_writers_per_core_total
|
||||||
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_cpu_cores_override: self
|
||||||
|
.me_adaptive_floor_cpu_cores_override
|
||||||
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_max_extra_writers_single_per_core: self
|
||||||
|
.me_adaptive_floor_max_extra_writers_single_per_core
|
||||||
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_max_extra_writers_multi_per_core: self
|
||||||
|
.me_adaptive_floor_max_extra_writers_multi_per_core
|
||||||
|
.load(Ordering::Relaxed) as u16,
|
||||||
|
adaptive_floor_cpu_cores_detected: self
|
||||||
|
.me_adaptive_floor_cpu_cores_detected
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_cpu_cores_effective: self
|
||||||
|
.me_adaptive_floor_cpu_cores_effective
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_global_cap_raw: self
|
||||||
|
.me_adaptive_floor_global_cap_raw
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_global_cap_effective: self
|
||||||
|
.me_adaptive_floor_global_cap_effective
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
|
adaptive_floor_target_writers_total: self
|
||||||
|
.me_adaptive_floor_target_writers_total
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
me_keepalive_enabled: self.me_keepalive_enabled,
|
me_keepalive_enabled: self.me_keepalive_enabled,
|
||||||
me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(),
|
me_keepalive_interval_secs: self.me_keepalive_interval.as_secs(),
|
||||||
me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(),
|
me_keepalive_jitter_secs: self.me_keepalive_jitter.as_secs(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue