ME Adaptive Floor Planner

Co-Authored-By: brekotis <93345790+brekotis@users.noreply.github.com>
This commit is contained in:
Alexey
2026-03-07 02:50:11 +03:00
parent ddfe7c5cfa
commit ce9698d39b
14 changed files with 963 additions and 23 deletions

View File

@@ -111,7 +111,17 @@ pub struct MePool {
pub(super) me_floor_mode: AtomicU8,
pub(super) me_adaptive_floor_idle_secs: AtomicU64,
pub(super) me_adaptive_floor_min_writers_single_endpoint: AtomicU8,
pub(super) me_adaptive_floor_min_writers_multi_endpoint: AtomicU8,
pub(super) me_adaptive_floor_recover_grace_secs: AtomicU64,
pub(super) me_adaptive_floor_writers_per_core_total: AtomicU32,
pub(super) me_adaptive_floor_cpu_cores_override: AtomicU32,
pub(super) me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32,
pub(super) me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32,
pub(super) me_adaptive_floor_cpu_cores_detected: AtomicU32,
pub(super) me_adaptive_floor_cpu_cores_effective: AtomicU32,
pub(super) me_adaptive_floor_global_cap_raw: AtomicU64,
pub(super) me_adaptive_floor_global_cap_effective: AtomicU64,
pub(super) me_adaptive_floor_target_writers_total: AtomicU64,
pub(super) proxy_map_v4: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) proxy_map_v6: Arc<RwLock<HashMap<i32, Vec<(IpAddr, u16)>>>>,
pub(super) default_dc: AtomicI32,
@@ -217,7 +227,12 @@ impl MePool {
me_floor_mode: MeFloorMode,
me_adaptive_floor_idle_secs: u64,
me_adaptive_floor_min_writers_single_endpoint: u8,
me_adaptive_floor_min_writers_multi_endpoint: u8,
me_adaptive_floor_recover_grace_secs: u64,
me_adaptive_floor_writers_per_core_total: u16,
me_adaptive_floor_cpu_cores_override: u16,
me_adaptive_floor_max_extra_writers_single_per_core: u16,
me_adaptive_floor_max_extra_writers_multi_per_core: u16,
hardswap: bool,
me_pool_drain_ttl_secs: u64,
me_pool_force_close_secs: u64,
@@ -314,9 +329,29 @@ impl MePool {
me_adaptive_floor_min_writers_single_endpoint: AtomicU8::new(
me_adaptive_floor_min_writers_single_endpoint,
),
me_adaptive_floor_min_writers_multi_endpoint: AtomicU8::new(
me_adaptive_floor_min_writers_multi_endpoint,
),
me_adaptive_floor_recover_grace_secs: AtomicU64::new(
me_adaptive_floor_recover_grace_secs,
),
me_adaptive_floor_writers_per_core_total: AtomicU32::new(
me_adaptive_floor_writers_per_core_total as u32,
),
me_adaptive_floor_cpu_cores_override: AtomicU32::new(
me_adaptive_floor_cpu_cores_override as u32,
),
me_adaptive_floor_max_extra_writers_single_per_core: AtomicU32::new(
me_adaptive_floor_max_extra_writers_single_per_core as u32,
),
me_adaptive_floor_max_extra_writers_multi_per_core: AtomicU32::new(
me_adaptive_floor_max_extra_writers_multi_per_core as u32,
),
me_adaptive_floor_cpu_cores_detected: AtomicU32::new(1),
me_adaptive_floor_cpu_cores_effective: AtomicU32::new(1),
me_adaptive_floor_global_cap_raw: AtomicU64::new(0),
me_adaptive_floor_global_cap_effective: AtomicU64::new(0),
me_adaptive_floor_target_writers_total: AtomicU64::new(0),
pool_size: 2,
proxy_map_v4: Arc::new(RwLock::new(proxy_map_v4)),
proxy_map_v6: Arc::new(RwLock::new(proxy_map_v6)),
@@ -399,7 +434,12 @@ impl MePool {
floor_mode: MeFloorMode,
adaptive_floor_idle_secs: u64,
adaptive_floor_min_writers_single_endpoint: u8,
adaptive_floor_min_writers_multi_endpoint: u8,
adaptive_floor_recover_grace_secs: u64,
adaptive_floor_writers_per_core_total: u16,
adaptive_floor_cpu_cores_override: u16,
adaptive_floor_max_extra_writers_single_per_core: u16,
adaptive_floor_max_extra_writers_multi_per_core: u16,
) {
self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs
@@ -443,8 +483,24 @@ impl MePool {
.store(adaptive_floor_idle_secs, Ordering::Relaxed);
self.me_adaptive_floor_min_writers_single_endpoint
.store(adaptive_floor_min_writers_single_endpoint, Ordering::Relaxed);
self.me_adaptive_floor_min_writers_multi_endpoint
.store(adaptive_floor_min_writers_multi_endpoint, Ordering::Relaxed);
self.me_adaptive_floor_recover_grace_secs
.store(adaptive_floor_recover_grace_secs, Ordering::Relaxed);
self.me_adaptive_floor_writers_per_core_total
.store(adaptive_floor_writers_per_core_total as u32, Ordering::Relaxed);
self.me_adaptive_floor_cpu_cores_override
.store(adaptive_floor_cpu_cores_override as u32, Ordering::Relaxed);
self.me_adaptive_floor_max_extra_writers_single_per_core
.store(
adaptive_floor_max_extra_writers_single_per_core as u32,
Ordering::Relaxed,
);
self.me_adaptive_floor_max_extra_writers_multi_per_core
.store(
adaptive_floor_max_extra_writers_multi_per_core as u32,
Ordering::Relaxed,
);
if previous_floor_mode != floor_mode {
self.stats.increment_me_floor_mode_switch_total();
match (previous_floor_mode, floor_mode) {
@@ -515,6 +571,13 @@ impl MePool {
self.proxy_secret.read().await.key_selector
}
pub(super) async fn active_writer_count_total(&self) -> usize {
let ws = self.writers.read().await;
ws.iter()
.filter(|w| !w.draining.load(Ordering::Relaxed))
.count()
}
pub(super) async fn secret_snapshot(&self) -> SecretSnapshot {
self.proxy_secret.read().await.clone()
}
@@ -551,6 +614,82 @@ impl MePool {
)
}
pub(super) fn adaptive_floor_min_writers_multi_endpoint(&self) -> usize {
(self
.me_adaptive_floor_min_writers_multi_endpoint
.load(Ordering::Relaxed) as usize)
.max(1)
}
pub(super) fn adaptive_floor_writers_per_core_total(&self) -> usize {
(self
.me_adaptive_floor_writers_per_core_total
.load(Ordering::Relaxed) as usize)
.max(1)
}
pub(super) fn adaptive_floor_max_extra_single_per_core(&self) -> usize {
self.me_adaptive_floor_max_extra_writers_single_per_core
.load(Ordering::Relaxed) as usize
}
pub(super) fn adaptive_floor_max_extra_multi_per_core(&self) -> usize {
self.me_adaptive_floor_max_extra_writers_multi_per_core
.load(Ordering::Relaxed) as usize
}
pub(super) fn adaptive_floor_detected_cpu_cores(&self) -> usize {
std::thread::available_parallelism()
.map(|value| value.get())
.unwrap_or(1)
.max(1)
}
pub(super) fn adaptive_floor_effective_cpu_cores(&self) -> usize {
let detected = self.adaptive_floor_detected_cpu_cores();
let override_cores = self
.me_adaptive_floor_cpu_cores_override
.load(Ordering::Relaxed) as usize;
let effective = if override_cores == 0 {
detected
} else {
override_cores.max(1)
};
self.me_adaptive_floor_cpu_cores_detected
.store(detected as u32, Ordering::Relaxed);
self.me_adaptive_floor_cpu_cores_effective
.store(effective as u32, Ordering::Relaxed);
self.stats
.set_me_floor_cpu_cores_detected_gauge(detected as u64);
self.stats
.set_me_floor_cpu_cores_effective_gauge(effective as u64);
effective
}
pub(super) fn adaptive_floor_global_cap_raw(&self) -> usize {
let cores = self.adaptive_floor_effective_cpu_cores();
let cap = cores.saturating_mul(self.adaptive_floor_writers_per_core_total());
self.me_adaptive_floor_global_cap_raw
.store(cap as u64, Ordering::Relaxed);
self.stats.set_me_floor_global_cap_raw_gauge(cap as u64);
cap
}
pub(super) fn set_adaptive_floor_runtime_caps(
&self,
global_cap_effective: usize,
target_writers_total: usize,
) {
self.me_adaptive_floor_global_cap_effective
.store(global_cap_effective as u64, Ordering::Relaxed);
self.me_adaptive_floor_target_writers_total
.store(target_writers_total as u64, Ordering::Relaxed);
self.stats
.set_me_floor_global_cap_effective_gauge(global_cap_effective as u64);
self.stats
.set_me_floor_target_writers_total_gauge(target_writers_total as u64);
}
pub(super) fn required_writers_for_dc_with_floor_mode(
&self,
endpoint_count: usize,
@@ -560,13 +699,20 @@ impl MePool {
if !reduce_for_idle {
return base_required;
}
if endpoint_count != 1 || self.floor_mode() != MeFloorMode::Adaptive {
if self.floor_mode() != MeFloorMode::Adaptive {
return base_required;
}
let min_writers = (self
.me_adaptive_floor_min_writers_single_endpoint
.load(Ordering::Relaxed) as usize)
.max(1);
let min_writers = if endpoint_count == 1 {
(self
.me_adaptive_floor_min_writers_single_endpoint
.load(Ordering::Relaxed) as usize)
.max(1)
} else {
(self
.me_adaptive_floor_min_writers_multi_endpoint
.load(Ordering::Relaxed) as usize)
.max(1)
};
base_required.min(min_writers)
}