CPU/RAM improvements + removing hot-path obstacles

This commit is contained in:
Alexey
2026-03-07 19:33:48 +03:00
parent 1bd249b0a9
commit d2baa8e721
12 changed files with 340 additions and 75 deletions

View File

@@ -325,6 +325,9 @@ async fn run_update_cycle(
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
cfg.general.me_adaptive_floor_max_active_writers_global,
cfg.general.me_adaptive_floor_max_warm_writers_global,
cfg.general.me_health_interval_ms_unhealthy,
cfg.general.me_health_interval_ms_healthy,
cfg.general.me_warn_rate_limit_ms,
);
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
@@ -546,6 +549,9 @@ pub async fn me_config_updater(
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
cfg.general.me_adaptive_floor_max_active_writers_global,
cfg.general.me_adaptive_floor_max_warm_writers_global,
cfg.general.me_health_interval_ms_unhealthy,
cfg.general.me_health_interval_ms_healthy,
cfg.general.me_warn_rate_limit_ms,
);
let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs {

View File

@@ -13,7 +13,6 @@ use crate::network::IpFamily;
use super::MePool;
const HEALTH_INTERVAL_SECS: u64 = 1;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)]
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
@@ -62,11 +61,18 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut degraded_interval = true;
loop {
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
let interval = if degraded_interval {
pool.health_interval_unhealthy()
} else {
pool.health_interval_healthy()
};
tokio::time::sleep(interval).await;
pool.prune_closed_writers().await;
reap_draining_writers(&pool).await;
check_family(
let v4_degraded = check_family(
IpFamily::V4,
&pool,
&rng,
@@ -80,9 +86,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
check_family(
let v6_degraded = check_family(
IpFamily::V6,
&pool,
&rng,
@@ -96,8 +103,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
degraded_interval = v4_degraded || v6_degraded;
}
}
@@ -137,15 +146,18 @@ async fn check_family(
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) {
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me,
IpFamily::V6 => pool.decision.ipv6_me,
};
if !enabled {
return;
return false;
}
let mut family_degraded = false;
let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new();
let map_guard = match family {
IpFamily::V4 => pool.proxy_map_v4.read().await,
@@ -234,6 +246,7 @@ async fn check_family(
.sum::<usize>();
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
family_degraded = true;
if single_endpoint_outage.insert(key) {
pool.stats.increment_me_single_endpoint_outage_enter_total();
warn!(
@@ -310,6 +323,7 @@ async fn check_family(
continue;
}
let missing = required - alive;
family_degraded = true;
let now = Instant::now();
if reconnect_budget == 0 {
@@ -438,15 +452,23 @@ async fn check_family(
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
if pool.is_runtime_ready() {
warn!(
dc = %dc,
?family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
backoff_ms = next_ms,
"DC writer floor is below required level, scheduled reconnect"
);
let warn_cooldown = pool.warn_rate_limit_duration();
if should_emit_rate_limited_warn(
floor_warn_next_allowed,
key,
now,
warn_cooldown,
) {
warn!(
dc = %dc,
?family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
backoff_ms = next_ms,
"DC writer floor is below required level, scheduled reconnect"
);
}
} else {
info!(
dc = %dc,
@@ -463,6 +485,8 @@ async fn check_family(
*v = v.saturating_sub(1);
}
}
family_degraded
}
fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
@@ -474,6 +498,23 @@ fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
.clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX)
}
fn should_emit_rate_limited_warn(
next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
key: (i32, IpFamily),
now: Instant,
cooldown: Duration,
) -> bool {
let Some(ready_at) = next_allowed.get(&key).copied() else {
next_allowed.insert(key, now + cooldown);
return true;
};
if now >= ready_at {
next_allowed.insert(key, now + cooldown);
return true;
}
false
}
fn adaptive_floor_class_min(
pool: &Arc<MePool>,
endpoint_count: usize,

View File

@@ -103,6 +103,7 @@ pub struct MePool {
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) rpc_proxy_req_every_secs: AtomicU64,
pub(super) writer_cmd_channel_capacity: usize,
pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration,
@@ -181,8 +182,12 @@ pub struct MePool {
pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_inline_recovery_attempts: u32,
pub(super) me_route_inline_recovery_wait: Duration,
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) runtime_ready: AtomicBool,
pool_size: usize,
pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>,
}
#[derive(Debug, Default)]
@@ -270,16 +275,25 @@ impl MePool {
me_secret_atomic_snapshot: bool,
me_deterministic_writer_sort: bool,
me_socks_kdf_policy: MeSocksKdfPolicy,
me_writer_cmd_channel_capacity: usize,
me_route_channel_capacity: usize,
me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64,
me_route_backpressure_high_watermark_pct: u8,
me_health_interval_ms_unhealthy: u64,
me_health_interval_ms_healthy: u64,
me_warn_rate_limit_ms: u64,
me_route_no_writer_mode: MeRouteNoWriterMode,
me_route_no_writer_wait_ms: u64,
me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> {
let endpoint_dc_map = Self::build_endpoint_dc_map_from_maps(&proxy_map_v4, &proxy_map_v6);
let registry = Arc::new(ConnRegistry::new());
let preferred_endpoints_by_dc =
Self::build_preferred_endpoints_by_dc(&decision, &proxy_map_v4, &proxy_map_v6);
let registry = Arc::new(ConnRegistry::with_route_channel_capacity(
me_route_channel_capacity,
));
registry.update_route_backpressure_policy(
me_route_backpressure_base_timeout_ms,
me_route_backpressure_high_timeout_ms,
@@ -326,6 +340,7 @@ impl MePool {
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs),
writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1),
me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
@@ -440,7 +455,11 @@ impl MePool {
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
})
}
@@ -489,6 +508,9 @@ impl MePool {
adaptive_floor_max_warm_writers_per_core: u16,
adaptive_floor_max_active_writers_global: u32,
adaptive_floor_max_warm_writers_global: u32,
me_health_interval_ms_unhealthy: u64,
me_health_interval_ms_healthy: u64,
me_warn_rate_limit_ms: u64,
) {
self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs
@@ -564,6 +586,12 @@ impl MePool {
.store(adaptive_floor_max_active_writers_global, Ordering::Relaxed);
self.me_adaptive_floor_max_warm_writers_global
.store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed);
self.me_health_interval_ms_unhealthy
.store(me_health_interval_ms_unhealthy.max(1), Ordering::Relaxed);
self.me_health_interval_ms_healthy
.store(me_health_interval_ms_healthy.max(1), Ordering::Relaxed);
self.me_warn_rate_limit_ms
.store(me_warn_rate_limit_ms.max(1), Ordering::Relaxed);
if previous_floor_mode != floor_mode {
self.stats.increment_me_floor_mode_switch_total();
match (previous_floor_mode, floor_mode) {
@@ -1042,6 +1070,62 @@ impl MePool {
}
}
fn build_preferred_endpoints_by_dc(
decision: &NetworkDecision,
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
) -> HashMap<i32, Vec<SocketAddr>> {
let mut out = HashMap::<i32, Vec<SocketAddr>>::new();
let mut dcs = HashSet::<i32>::new();
dcs.extend(map_v4.keys().copied());
dcs.extend(map_v6.keys().copied());
for dc in dcs {
let v4 = map_v4
.get(&dc)
.map(|items| {
items
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let v6 = map_v6
.get(&dc)
.map(|items| {
items
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut selected = if decision.effective_multipath {
let mut both = Vec::<SocketAddr>::with_capacity(v4.len().saturating_add(v6.len()));
if decision.prefer_ipv6() {
both.extend(v6.iter().copied());
both.extend(v4.iter().copied());
} else {
both.extend(v4.iter().copied());
both.extend(v6.iter().copied());
}
both
} else if decision.prefer_ipv6() {
if !v6.is_empty() { v6 } else { v4 }
} else if !v4.is_empty() {
v4
} else {
v6
};
selected.sort_unstable();
selected.dedup();
out.insert(dc, selected);
}
out
}
fn build_endpoint_dc_map_from_maps(
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
@@ -1064,6 +1148,25 @@ impl MePool {
let map_v4 = self.proxy_map_v4.read().await.clone();
let map_v6 = self.proxy_map_v6.read().await.clone();
let rebuilt = Self::build_endpoint_dc_map_from_maps(&map_v4, &map_v6);
let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6);
*self.endpoint_dc_map.write().await = rebuilt;
*self.preferred_endpoints_by_dc.write().await = preferred;
}
pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec<SocketAddr> {
let guard = self.preferred_endpoints_by_dc.read().await;
guard.get(&dc).cloned().unwrap_or_default()
}
pub(super) fn health_interval_unhealthy(&self) -> Duration {
Duration::from_millis(self.me_health_interval_ms_unhealthy.load(Ordering::Relaxed).max(1))
}
pub(super) fn health_interval_healthy(&self) -> Duration {
Duration::from_millis(self.me_health_interval_ms_healthy.load(Ordering::Relaxed).max(1))
}
pub(super) fn warn_rate_limit_duration(&self) -> Duration {
Duration::from_millis(self.me_warn_rate_limit_ms.load(Ordering::Relaxed).max(1))
}
}

View File

@@ -132,7 +132,7 @@ impl MePool {
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0));
let allow_drain_fallback = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let (tx, mut rx) = mpsc::channel::<WriterCommand>(self.writer_cmd_channel_capacity);
let mut rpc_writer = RpcWriter {
writer: hs.wr,
key: hs.write_key,

View File

@@ -9,7 +9,6 @@ use tokio::sync::mpsc::error::TrySendError;
use super::codec::WriterCommand;
use super::MeResponse;
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
const ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT: u8 = 80;
@@ -78,6 +77,7 @@ impl RegistryInner {
pub struct ConnRegistry {
inner: RwLock<RegistryInner>,
next_id: AtomicU64,
route_channel_capacity: usize,
route_backpressure_base_timeout_ms: AtomicU64,
route_backpressure_high_timeout_ms: AtomicU64,
route_backpressure_high_watermark_pct: AtomicU8,
@@ -91,11 +91,12 @@ impl ConnRegistry {
.as_secs()
}
pub fn new() -> Self {
pub fn with_route_channel_capacity(route_channel_capacity: usize) -> Self {
let start = rand::random::<u64>() | 1;
Self {
inner: RwLock::new(RegistryInner::new()),
next_id: AtomicU64::new(start),
route_channel_capacity: route_channel_capacity.max(1),
route_backpressure_base_timeout_ms: AtomicU64::new(
ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS,
),
@@ -108,6 +109,11 @@ impl ConnRegistry {
}
}
#[cfg(test)]
pub fn new() -> Self {
Self::with_route_channel_capacity(4096)
}
pub fn update_route_backpressure_policy(
&self,
base_timeout_ms: u64,
@@ -127,7 +133,7 @@ impl ConnRegistry {
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY);
let (tx, rx) = mpsc::channel(self.route_channel_capacity);
self.inner.write().await.map.insert(id, tx);
(id, rx)
}
@@ -179,11 +185,11 @@ impl ConnRegistry {
.route_backpressure_high_watermark_pct
.load(Ordering::Relaxed)
.clamp(1, 100);
let used = ROUTE_CHANNEL_CAPACITY.saturating_sub(tx.capacity());
let used_pct = if ROUTE_CHANNEL_CAPACITY == 0 {
let used = self.route_channel_capacity.saturating_sub(tx.capacity());
let used_pct = if self.route_channel_capacity == 0 {
100
} else {
(used.saturating_mul(100) / ROUTE_CHANNEL_CAPACITY) as u8
(used.saturating_mul(100) / self.route_channel_capacity) as u8
};
let high_profile = used_pct >= high_watermark_pct;
let timeout_ms = if high_profile {

View File

@@ -480,31 +480,7 @@ impl MePool {
}
async fn endpoint_candidates_for_target_dc(&self, routed_dc: i32) -> Vec<SocketAddr> {
let mut preferred = Vec::<SocketAddr>::new();
let mut seen = HashSet::<SocketAddr>::new();
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
let mut family_selected = Vec::<SocketAddr>::new();
if let Some(addrs) = map_guard.get(&routed_dc) {
for (ip, port) in addrs {
family_selected.push(SocketAddr::new(*ip, *port));
}
}
for addr in family_selected {
if seen.insert(addr) {
preferred.push(addr);
}
}
if !preferred.is_empty() && !self.decision.effective_multipath {
break;
}
}
preferred
self.preferred_endpoints_for_dc(routed_dc).await
}
async fn maybe_trigger_hybrid_recovery(
@@ -591,28 +567,7 @@ impl MePool {
routed_dc: i32,
include_warm: bool,
) -> Vec<usize> {
let mut preferred = HashSet::<SocketAddr>::new();
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
let mut family_selected = Vec::<SocketAddr>::new();
if let Some(v) = map_guard.get(&routed_dc) {
family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
for endpoint in family_selected {
preferred.insert(endpoint);
}
drop(map_guard);
if !preferred.is_empty() && !self.decision.effective_multipath {
break;
}
}
let preferred = self.preferred_endpoints_for_dc(routed_dc).await;
if preferred.is_empty() {
return Vec::new();
}
@@ -622,7 +577,7 @@ impl MePool {
if !self.writer_eligible_for_selection(w, include_warm) {
continue;
}
if w.writer_dc == routed_dc && preferred.contains(&w.addr) {
if w.writer_dc == routed_dc && preferred.iter().any(|endpoint| *endpoint == w.addr) {
out.push(idx);
}
}