Merge branch 'main' into improve-install-script

This commit is contained in:
Mirotin Artem 2026-03-07 19:40:09 +03:00 committed by GitHub
commit 178630e3bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 341 additions and 76 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "telemt"
version = "3.3.9"
version = "3.3.10"
edition = "2024"
[dependencies]

View File

@ -21,6 +21,13 @@ const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_PER_CORE: u16 = 64;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_PER_CORE: u16 = 64;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_ACTIVE_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL: u32 = 256;
const DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY: usize = 512;
const DEFAULT_ME_ROUTE_CHANNEL_CAPACITY: usize = 512;
const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 128;
const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
@ -296,6 +303,34 @@ pub(crate) fn default_me_adaptive_floor_max_warm_writers_global() -> u32 {
DEFAULT_ME_ADAPTIVE_FLOOR_MAX_WARM_WRITERS_GLOBAL
}
pub(crate) fn default_me_writer_cmd_channel_capacity() -> usize {
DEFAULT_ME_WRITER_CMD_CHANNEL_CAPACITY
}
pub(crate) fn default_me_route_channel_capacity() -> usize {
DEFAULT_ME_ROUTE_CHANNEL_CAPACITY
}
pub(crate) fn default_me_c2me_channel_capacity() -> usize {
DEFAULT_ME_C2ME_CHANNEL_CAPACITY
}
pub(crate) fn default_me_health_interval_ms_unhealthy() -> u64 {
DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY
}
pub(crate) fn default_me_health_interval_ms_healthy() -> u64 {
DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY
}
pub(crate) fn default_me_admission_poll_ms() -> u64 {
DEFAULT_ME_ADMISSION_POLL_MS
}
pub(crate) fn default_me_warn_rate_limit_ms() -> u64 {
DEFAULT_ME_WARN_RATE_LIMIT_MS
}
pub(crate) fn default_upstream_connect_retry_attempts() -> u32 {
DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS
}

View File

@ -91,6 +91,10 @@ pub struct HotFields {
pub me_route_backpressure_base_timeout_ms: u64,
pub me_route_backpressure_high_timeout_ms: u64,
pub me_route_backpressure_high_watermark_pct: u8,
pub me_health_interval_ms_unhealthy: u64,
pub me_health_interval_ms_healthy: u64,
pub me_admission_poll_ms: u64,
pub me_warn_rate_limit_ms: u64,
pub users: std::collections::HashMap<String, String>,
pub user_ad_tags: std::collections::HashMap<String, String>,
pub user_max_tcp_conns: std::collections::HashMap<String, usize>,
@ -192,6 +196,10 @@ impl HotFields {
me_route_backpressure_base_timeout_ms: cfg.general.me_route_backpressure_base_timeout_ms,
me_route_backpressure_high_timeout_ms: cfg.general.me_route_backpressure_high_timeout_ms,
me_route_backpressure_high_watermark_pct: cfg.general.me_route_backpressure_high_watermark_pct,
me_health_interval_ms_unhealthy: cfg.general.me_health_interval_ms_unhealthy,
me_health_interval_ms_healthy: cfg.general.me_health_interval_ms_healthy,
me_admission_poll_ms: cfg.general.me_admission_poll_ms,
me_warn_rate_limit_ms: cfg.general.me_warn_rate_limit_ms,
users: cfg.access.users.clone(),
user_ad_tags: cfg.access.user_ad_tags.clone(),
user_max_tcp_conns: cfg.access.user_max_tcp_conns.clone(),
@ -335,6 +343,10 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
new.general.me_route_backpressure_high_timeout_ms;
cfg.general.me_route_backpressure_high_watermark_pct =
new.general.me_route_backpressure_high_watermark_pct;
cfg.general.me_health_interval_ms_unhealthy = new.general.me_health_interval_ms_unhealthy;
cfg.general.me_health_interval_ms_healthy = new.general.me_health_interval_ms_healthy;
cfg.general.me_admission_poll_ms = new.general.me_admission_poll_ms;
cfg.general.me_warn_rate_limit_ms = new.general.me_warn_rate_limit_ms;
cfg.access.users = new.access.users.clone();
cfg.access.user_ad_tags = new.access.user_ad_tags.clone();
@ -796,12 +808,21 @@ fn log_changes(
!= new_hot.me_route_backpressure_high_timeout_ms
|| old_hot.me_route_backpressure_high_watermark_pct
!= new_hot.me_route_backpressure_high_watermark_pct
|| old_hot.me_health_interval_ms_unhealthy
!= new_hot.me_health_interval_ms_unhealthy
|| old_hot.me_health_interval_ms_healthy != new_hot.me_health_interval_ms_healthy
|| old_hot.me_admission_poll_ms != new_hot.me_admission_poll_ms
|| old_hot.me_warn_rate_limit_ms != new_hot.me_warn_rate_limit_ms
{
info!(
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%",
"config reload: me_route_backpressure: base={}ms high={}ms watermark={}%; me_health_interval: unhealthy={}ms healthy={}ms; me_admission_poll={}ms; me_warn_rate_limit={}ms",
new_hot.me_route_backpressure_base_timeout_ms,
new_hot.me_route_backpressure_high_timeout_ms,
new_hot.me_route_backpressure_high_watermark_pct,
new_hot.me_health_interval_ms_unhealthy,
new_hot.me_health_interval_ms_healthy,
new_hot.me_admission_poll_ms,
new_hot.me_warn_rate_limit_ms,
);
}

View File

@ -285,6 +285,48 @@ impl ProxyConfig {
));
}
if config.general.me_writer_cmd_channel_capacity == 0 {
return Err(ProxyError::Config(
"general.me_writer_cmd_channel_capacity must be > 0".to_string(),
));
}
if config.general.me_route_channel_capacity == 0 {
return Err(ProxyError::Config(
"general.me_route_channel_capacity must be > 0".to_string(),
));
}
if config.general.me_c2me_channel_capacity == 0 {
return Err(ProxyError::Config(
"general.me_c2me_channel_capacity must be > 0".to_string(),
));
}
if config.general.me_health_interval_ms_unhealthy == 0 {
return Err(ProxyError::Config(
"general.me_health_interval_ms_unhealthy must be > 0".to_string(),
));
}
if config.general.me_health_interval_ms_healthy == 0 {
return Err(ProxyError::Config(
"general.me_health_interval_ms_healthy must be > 0".to_string(),
));
}
if config.general.me_admission_poll_ms == 0 {
return Err(ProxyError::Config(
"general.me_admission_poll_ms must be > 0".to_string(),
));
}
if config.general.me_warn_rate_limit_ms == 0 {
return Err(ProxyError::Config(
"general.me_warn_rate_limit_ms must be > 0".to_string(),
));
}
if config.access.user_max_unique_ips_window_secs == 0 {
return Err(ProxyError::Config(
"access.user_max_unique_ips_window_secs must be > 0".to_string(),

View File

@ -420,6 +420,18 @@ pub struct GeneralConfig {
#[serde(default = "default_rpc_proxy_req_every")]
pub rpc_proxy_req_every: u64,
/// Capacity of per-ME writer command channel.
#[serde(default = "default_me_writer_cmd_channel_capacity")]
pub me_writer_cmd_channel_capacity: usize,
/// Capacity of per-connection ME response route channel.
#[serde(default = "default_me_route_channel_capacity")]
pub me_route_channel_capacity: usize,
/// Capacity of per-client command queue from client reader to ME sender task.
#[serde(default = "default_me_c2me_channel_capacity")]
pub me_c2me_channel_capacity: usize,
/// Max pending ciphertext buffer per client writer (bytes).
/// Controls FakeTLS backpressure vs throughput.
#[serde(default = "default_crypto_pending_buffer")]
@ -620,6 +632,22 @@ pub struct GeneralConfig {
#[serde(default = "default_me_route_backpressure_high_watermark_pct")]
pub me_route_backpressure_high_watermark_pct: u8,
/// Health monitor interval in milliseconds while writer coverage is degraded.
#[serde(default = "default_me_health_interval_ms_unhealthy")]
pub me_health_interval_ms_unhealthy: u64,
/// Health monitor interval in milliseconds while writer coverage is stable.
#[serde(default = "default_me_health_interval_ms_healthy")]
pub me_health_interval_ms_healthy: u64,
/// Poll interval in milliseconds for conditional-admission state checks.
#[serde(default = "default_me_admission_poll_ms")]
pub me_admission_poll_ms: u64,
/// Cooldown for repetitive ME warning logs in milliseconds.
#[serde(default = "default_me_warn_rate_limit_ms")]
pub me_warn_rate_limit_ms: u64,
/// ME route behavior when no writer is immediately available.
#[serde(default)]
pub me_route_no_writer_mode: MeRouteNoWriterMode,
@ -796,6 +824,9 @@ impl Default for GeneralConfig {
me_keepalive_jitter_secs: default_keepalive_jitter(),
me_keepalive_payload_random: default_true(),
rpc_proxy_req_every: default_rpc_proxy_req_every(),
me_writer_cmd_channel_capacity: default_me_writer_cmd_channel_capacity(),
me_route_channel_capacity: default_me_route_channel_capacity(),
me_c2me_channel_capacity: default_me_c2me_channel_capacity(),
me_warmup_stagger_enabled: default_true(),
me_warmup_step_delay_ms: default_warmup_step_delay_ms(),
me_warmup_step_jitter_ms: default_warmup_step_jitter_ms(),
@ -837,6 +868,10 @@ impl Default for GeneralConfig {
me_route_backpressure_base_timeout_ms: default_me_route_backpressure_base_timeout_ms(),
me_route_backpressure_high_timeout_ms: default_me_route_backpressure_high_timeout_ms(),
me_route_backpressure_high_watermark_pct: default_me_route_backpressure_high_watermark_pct(),
me_health_interval_ms_unhealthy: default_me_health_interval_ms_unhealthy(),
me_health_interval_ms_healthy: default_me_health_interval_ms_healthy(),
me_admission_poll_ms: default_me_admission_poll_ms(),
me_warn_rate_limit_ms: default_me_warn_rate_limit_ms(),
me_route_no_writer_mode: MeRouteNoWriterMode::default(),
me_route_no_writer_wait_ms: default_me_route_no_writer_wait_ms(),
me_route_inline_recovery_attempts: default_me_route_inline_recovery_attempts(),

View File

@ -1048,9 +1048,14 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
config.general.me_secret_atomic_snapshot,
config.general.me_deterministic_writer_sort,
config.general.me_socks_kdf_policy,
config.general.me_writer_cmd_channel_capacity,
config.general.me_route_channel_capacity,
config.general.me_route_backpressure_base_timeout_ms,
config.general.me_route_backpressure_high_timeout_ms,
config.general.me_route_backpressure_high_watermark_pct,
config.general.me_health_interval_ms_unhealthy,
config.general.me_health_interval_ms_healthy,
config.general.me_warn_rate_limit_ms,
config.general.me_route_no_writer_mode,
config.general.me_route_no_writer_wait_ms,
config.general.me_route_inline_recovery_attempts,
@ -1784,11 +1789,24 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let pool_for_gate = pool.clone();
let admission_tx_gate = admission_tx.clone();
let mut config_rx_gate = config_rx.clone();
let mut admission_poll_ms = config.general.me_admission_poll_ms.max(1);
tokio::spawn(async move {
let mut gate_open = initial_open;
let mut open_streak = if initial_open { 1u32 } else { 0u32 };
let mut close_streak = if initial_open { 0u32 } else { 1u32 };
loop {
tokio::select! {
changed = config_rx_gate.changed() => {
if changed.is_err() {
break;
}
let cfg = config_rx_gate.borrow_and_update().clone();
admission_poll_ms = cfg.general.me_admission_poll_ms.max(1);
continue;
}
_ = tokio::time::sleep(Duration::from_millis(admission_poll_ms)) => {}
}
let ready = pool_for_gate.admission_ready_conditional_cast().await;
if ready {
open_streak = open_streak.saturating_add(1);
@ -1813,7 +1831,6 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
);
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
});
} else {

View File

@ -27,7 +27,7 @@ enum C2MeCommand {
const DESYNC_DEDUP_WINDOW: Duration = Duration::from_secs(60);
const DESYNC_ERROR_CLASS: &str = "frame_too_large_crypto_desync";
const C2ME_CHANNEL_CAPACITY: usize = 1024;
const C2ME_CHANNEL_CAPACITY_FALLBACK: usize = 128;
const C2ME_SOFT_PRESSURE_MIN_FREE_SLOTS: usize = 64;
const C2ME_SENDER_FAIRNESS_BUDGET: usize = 32;
static DESYNC_DEDUP: OnceLock<Mutex<HashMap<u64, Instant>>> = OnceLock::new();
@ -271,7 +271,11 @@ where
let frame_limit = config.general.max_client_frame;
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(C2ME_CHANNEL_CAPACITY);
let c2me_channel_capacity = config
.general
.me_c2me_channel_capacity
.max(C2ME_CHANNEL_CAPACITY_FALLBACK);
let (c2me_tx, mut c2me_rx) = mpsc::channel::<C2MeCommand>(c2me_channel_capacity);
let me_pool_c2me = me_pool.clone();
let effective_tag = effective_tag;
let c2me_sender = tokio::spawn(async move {

View File

@ -325,6 +325,9 @@ async fn run_update_cycle(
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
cfg.general.me_adaptive_floor_max_active_writers_global,
cfg.general.me_adaptive_floor_max_warm_writers_global,
cfg.general.me_health_interval_ms_unhealthy,
cfg.general.me_health_interval_ms_healthy,
cfg.general.me_warn_rate_limit_ms,
);
let required_cfg_snapshots = cfg.general.me_config_stable_snapshots.max(1);
@ -546,6 +549,9 @@ pub async fn me_config_updater(
cfg.general.me_adaptive_floor_max_warm_writers_per_core,
cfg.general.me_adaptive_floor_max_active_writers_global,
cfg.general.me_adaptive_floor_max_warm_writers_global,
cfg.general.me_health_interval_ms_unhealthy,
cfg.general.me_health_interval_ms_healthy,
cfg.general.me_warn_rate_limit_ms,
);
let new_secs = cfg.general.effective_update_every_secs().max(1);
if new_secs == update_every_secs {

View File

@ -13,7 +13,6 @@ use crate::network::IpFamily;
use super::MePool;
const HEALTH_INTERVAL_SECS: u64 = 1;
const JITTER_FRAC_NUM: u64 = 2; // jitter up to 50% of backoff
#[allow(dead_code)]
const MAX_CONCURRENT_PER_DC_DEFAULT: usize = 1;
@ -62,11 +61,18 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut idle_refresh_next_attempt: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_idle_since: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut degraded_interval = true;
loop {
tokio::time::sleep(Duration::from_secs(HEALTH_INTERVAL_SECS)).await;
let interval = if degraded_interval {
pool.health_interval_unhealthy()
} else {
pool.health_interval_healthy()
};
tokio::time::sleep(interval).await;
pool.prune_closed_writers().await;
reap_draining_writers(&pool).await;
check_family(
let v4_degraded = check_family(
IpFamily::V4,
&pool,
&rng,
@ -80,9 +86,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
check_family(
let v6_degraded = check_family(
IpFamily::V6,
&pool,
&rng,
@ -96,8 +103,10 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
&mut idle_refresh_next_attempt,
&mut adaptive_idle_since,
&mut adaptive_recover_until,
&mut floor_warn_next_allowed,
)
.await;
degraded_interval = v4_degraded || v6_degraded;
}
}
@ -137,15 +146,18 @@ async fn check_family(
idle_refresh_next_attempt: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_idle_since: &mut HashMap<(i32, IpFamily), Instant>,
adaptive_recover_until: &mut HashMap<(i32, IpFamily), Instant>,
) {
floor_warn_next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
) -> bool {
let enabled = match family {
IpFamily::V4 => pool.decision.ipv4_me,
IpFamily::V6 => pool.decision.ipv6_me,
};
if !enabled {
return;
return false;
}
let mut family_degraded = false;
let mut dc_endpoints = HashMap::<i32, Vec<SocketAddr>>::new();
let map_guard = match family {
IpFamily::V4 => pool.proxy_map_v4.read().await,
@ -234,6 +246,7 @@ async fn check_family(
.sum::<usize>();
if endpoints.len() == 1 && pool.single_endpoint_outage_mode_enabled() && alive == 0 {
family_degraded = true;
if single_endpoint_outage.insert(key) {
pool.stats.increment_me_single_endpoint_outage_enter_total();
warn!(
@ -310,6 +323,7 @@ async fn check_family(
continue;
}
let missing = required - alive;
family_degraded = true;
let now = Instant::now();
if reconnect_budget == 0 {
@ -438,15 +452,23 @@ async fn check_family(
+ Duration::from_millis(rand::rng().random_range(0..=jitter.max(1)));
next_attempt.insert(key, now + wait);
if pool.is_runtime_ready() {
warn!(
dc = %dc,
?family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
backoff_ms = next_ms,
"DC writer floor is below required level, scheduled reconnect"
);
let warn_cooldown = pool.warn_rate_limit_duration();
if should_emit_rate_limited_warn(
floor_warn_next_allowed,
key,
now,
warn_cooldown,
) {
warn!(
dc = %dc,
?family,
alive = now_alive,
required,
endpoint_count = endpoints.len(),
backoff_ms = next_ms,
"DC writer floor is below required level, scheduled reconnect"
);
}
} else {
info!(
dc = %dc,
@ -463,6 +485,8 @@ async fn check_family(
*v = v.saturating_sub(1);
}
}
family_degraded
}
fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
@ -474,6 +498,23 @@ fn health_reconnect_budget(pool: &Arc<MePool>, dc_groups: usize) -> usize {
.clamp(HEALTH_RECONNECT_BUDGET_MIN, HEALTH_RECONNECT_BUDGET_MAX)
}
fn should_emit_rate_limited_warn(
next_allowed: &mut HashMap<(i32, IpFamily), Instant>,
key: (i32, IpFamily),
now: Instant,
cooldown: Duration,
) -> bool {
let Some(ready_at) = next_allowed.get(&key).copied() else {
next_allowed.insert(key, now + cooldown);
return true;
};
if now >= ready_at {
next_allowed.insert(key, now + cooldown);
return true;
}
false
}
fn adaptive_floor_class_min(
pool: &Arc<MePool>,
endpoint_count: usize,

View File

@ -103,6 +103,7 @@ pub struct MePool {
pub(super) me_keepalive_jitter: Duration,
pub(super) me_keepalive_payload_random: bool,
pub(super) rpc_proxy_req_every_secs: AtomicU64,
pub(super) writer_cmd_channel_capacity: usize,
pub(super) me_warmup_stagger_enabled: bool,
pub(super) me_warmup_step_delay: Duration,
pub(super) me_warmup_step_jitter: Duration,
@ -181,8 +182,12 @@ pub struct MePool {
pub(super) me_route_no_writer_wait: Duration,
pub(super) me_route_inline_recovery_attempts: u32,
pub(super) me_route_inline_recovery_wait: Duration,
pub(super) me_health_interval_ms_unhealthy: AtomicU64,
pub(super) me_health_interval_ms_healthy: AtomicU64,
pub(super) me_warn_rate_limit_ms: AtomicU64,
pub(super) runtime_ready: AtomicBool,
pool_size: usize,
pub(super) preferred_endpoints_by_dc: Arc<RwLock<HashMap<i32, Vec<SocketAddr>>>>,
}
#[derive(Debug, Default)]
@ -270,16 +275,25 @@ impl MePool {
me_secret_atomic_snapshot: bool,
me_deterministic_writer_sort: bool,
me_socks_kdf_policy: MeSocksKdfPolicy,
me_writer_cmd_channel_capacity: usize,
me_route_channel_capacity: usize,
me_route_backpressure_base_timeout_ms: u64,
me_route_backpressure_high_timeout_ms: u64,
me_route_backpressure_high_watermark_pct: u8,
me_health_interval_ms_unhealthy: u64,
me_health_interval_ms_healthy: u64,
me_warn_rate_limit_ms: u64,
me_route_no_writer_mode: MeRouteNoWriterMode,
me_route_no_writer_wait_ms: u64,
me_route_inline_recovery_attempts: u32,
me_route_inline_recovery_wait_ms: u64,
) -> Arc<Self> {
let endpoint_dc_map = Self::build_endpoint_dc_map_from_maps(&proxy_map_v4, &proxy_map_v6);
let registry = Arc::new(ConnRegistry::new());
let preferred_endpoints_by_dc =
Self::build_preferred_endpoints_by_dc(&decision, &proxy_map_v4, &proxy_map_v6);
let registry = Arc::new(ConnRegistry::with_route_channel_capacity(
me_route_channel_capacity,
));
registry.update_route_backpressure_policy(
me_route_backpressure_base_timeout_ms,
me_route_backpressure_high_timeout_ms,
@ -326,6 +340,7 @@ impl MePool {
me_keepalive_jitter: Duration::from_secs(me_keepalive_jitter_secs),
me_keepalive_payload_random,
rpc_proxy_req_every_secs: AtomicU64::new(rpc_proxy_req_every_secs),
writer_cmd_channel_capacity: me_writer_cmd_channel_capacity.max(1),
me_warmup_stagger_enabled,
me_warmup_step_delay: Duration::from_millis(me_warmup_step_delay_ms),
me_warmup_step_jitter: Duration::from_millis(me_warmup_step_jitter_ms),
@ -440,7 +455,11 @@ impl MePool {
me_route_no_writer_wait: Duration::from_millis(me_route_no_writer_wait_ms),
me_route_inline_recovery_attempts,
me_route_inline_recovery_wait: Duration::from_millis(me_route_inline_recovery_wait_ms),
me_health_interval_ms_unhealthy: AtomicU64::new(me_health_interval_ms_unhealthy.max(1)),
me_health_interval_ms_healthy: AtomicU64::new(me_health_interval_ms_healthy.max(1)),
me_warn_rate_limit_ms: AtomicU64::new(me_warn_rate_limit_ms.max(1)),
runtime_ready: AtomicBool::new(false),
preferred_endpoints_by_dc: Arc::new(RwLock::new(preferred_endpoints_by_dc)),
})
}
@ -489,6 +508,9 @@ impl MePool {
adaptive_floor_max_warm_writers_per_core: u16,
adaptive_floor_max_active_writers_global: u32,
adaptive_floor_max_warm_writers_global: u32,
me_health_interval_ms_unhealthy: u64,
me_health_interval_ms_healthy: u64,
me_warn_rate_limit_ms: u64,
) {
self.hardswap.store(hardswap, Ordering::Relaxed);
self.me_pool_drain_ttl_secs
@ -564,6 +586,12 @@ impl MePool {
.store(adaptive_floor_max_active_writers_global, Ordering::Relaxed);
self.me_adaptive_floor_max_warm_writers_global
.store(adaptive_floor_max_warm_writers_global, Ordering::Relaxed);
self.me_health_interval_ms_unhealthy
.store(me_health_interval_ms_unhealthy.max(1), Ordering::Relaxed);
self.me_health_interval_ms_healthy
.store(me_health_interval_ms_healthy.max(1), Ordering::Relaxed);
self.me_warn_rate_limit_ms
.store(me_warn_rate_limit_ms.max(1), Ordering::Relaxed);
if previous_floor_mode != floor_mode {
self.stats.increment_me_floor_mode_switch_total();
match (previous_floor_mode, floor_mode) {
@ -1042,6 +1070,62 @@ impl MePool {
}
}
fn build_preferred_endpoints_by_dc(
decision: &NetworkDecision,
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
) -> HashMap<i32, Vec<SocketAddr>> {
let mut out = HashMap::<i32, Vec<SocketAddr>>::new();
let mut dcs = HashSet::<i32>::new();
dcs.extend(map_v4.keys().copied());
dcs.extend(map_v6.keys().copied());
for dc in dcs {
let v4 = map_v4
.get(&dc)
.map(|items| {
items
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let v6 = map_v6
.get(&dc)
.map(|items| {
items
.iter()
.map(|(ip, port)| SocketAddr::new(*ip, *port))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut selected = if decision.effective_multipath {
let mut both = Vec::<SocketAddr>::with_capacity(v4.len().saturating_add(v6.len()));
if decision.prefer_ipv6() {
both.extend(v6.iter().copied());
both.extend(v4.iter().copied());
} else {
both.extend(v4.iter().copied());
both.extend(v6.iter().copied());
}
both
} else if decision.prefer_ipv6() {
if !v6.is_empty() { v6 } else { v4 }
} else if !v4.is_empty() {
v4
} else {
v6
};
selected.sort_unstable();
selected.dedup();
out.insert(dc, selected);
}
out
}
fn build_endpoint_dc_map_from_maps(
map_v4: &HashMap<i32, Vec<(IpAddr, u16)>>,
map_v6: &HashMap<i32, Vec<(IpAddr, u16)>>,
@ -1064,6 +1148,25 @@ impl MePool {
let map_v4 = self.proxy_map_v4.read().await.clone();
let map_v6 = self.proxy_map_v6.read().await.clone();
let rebuilt = Self::build_endpoint_dc_map_from_maps(&map_v4, &map_v6);
let preferred = Self::build_preferred_endpoints_by_dc(&self.decision, &map_v4, &map_v6);
*self.endpoint_dc_map.write().await = rebuilt;
*self.preferred_endpoints_by_dc.write().await = preferred;
}
pub(super) async fn preferred_endpoints_for_dc(&self, dc: i32) -> Vec<SocketAddr> {
let guard = self.preferred_endpoints_by_dc.read().await;
guard.get(&dc).cloned().unwrap_or_default()
}
pub(super) fn health_interval_unhealthy(&self) -> Duration {
Duration::from_millis(self.me_health_interval_ms_unhealthy.load(Ordering::Relaxed).max(1))
}
pub(super) fn health_interval_healthy(&self) -> Duration {
Duration::from_millis(self.me_health_interval_ms_healthy.load(Ordering::Relaxed).max(1))
}
pub(super) fn warn_rate_limit_duration(&self) -> Duration {
Duration::from_millis(self.me_warn_rate_limit_ms.load(Ordering::Relaxed).max(1))
}
}

View File

@ -132,7 +132,7 @@ impl MePool {
let draining_started_at_epoch_secs = Arc::new(AtomicU64::new(0));
let drain_deadline_epoch_secs = Arc::new(AtomicU64::new(0));
let allow_drain_fallback = Arc::new(AtomicBool::new(false));
let (tx, mut rx) = mpsc::channel::<WriterCommand>(4096);
let (tx, mut rx) = mpsc::channel::<WriterCommand>(self.writer_cmd_channel_capacity);
let mut rpc_writer = RpcWriter {
writer: hs.wr,
key: hs.write_key,

View File

@ -9,7 +9,6 @@ use tokio::sync::mpsc::error::TrySendError;
use super::codec::WriterCommand;
use super::MeResponse;
const ROUTE_CHANNEL_CAPACITY: usize = 4096;
const ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS: u64 = 25;
const ROUTE_BACKPRESSURE_HIGH_TIMEOUT_MS: u64 = 120;
const ROUTE_BACKPRESSURE_HIGH_WATERMARK_PCT: u8 = 80;
@ -78,6 +77,7 @@ impl RegistryInner {
pub struct ConnRegistry {
inner: RwLock<RegistryInner>,
next_id: AtomicU64,
route_channel_capacity: usize,
route_backpressure_base_timeout_ms: AtomicU64,
route_backpressure_high_timeout_ms: AtomicU64,
route_backpressure_high_watermark_pct: AtomicU8,
@ -91,11 +91,12 @@ impl ConnRegistry {
.as_secs()
}
pub fn new() -> Self {
pub fn with_route_channel_capacity(route_channel_capacity: usize) -> Self {
let start = rand::random::<u64>() | 1;
Self {
inner: RwLock::new(RegistryInner::new()),
next_id: AtomicU64::new(start),
route_channel_capacity: route_channel_capacity.max(1),
route_backpressure_base_timeout_ms: AtomicU64::new(
ROUTE_BACKPRESSURE_BASE_TIMEOUT_MS,
),
@ -108,6 +109,11 @@ impl ConnRegistry {
}
}
#[cfg(test)]
pub fn new() -> Self {
Self::with_route_channel_capacity(4096)
}
pub fn update_route_backpressure_policy(
&self,
base_timeout_ms: u64,
@ -127,7 +133,7 @@ impl ConnRegistry {
pub async fn register(&self) -> (u64, mpsc::Receiver<MeResponse>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(ROUTE_CHANNEL_CAPACITY);
let (tx, rx) = mpsc::channel(self.route_channel_capacity);
self.inner.write().await.map.insert(id, tx);
(id, rx)
}
@ -179,11 +185,11 @@ impl ConnRegistry {
.route_backpressure_high_watermark_pct
.load(Ordering::Relaxed)
.clamp(1, 100);
let used = ROUTE_CHANNEL_CAPACITY.saturating_sub(tx.capacity());
let used_pct = if ROUTE_CHANNEL_CAPACITY == 0 {
let used = self.route_channel_capacity.saturating_sub(tx.capacity());
let used_pct = if self.route_channel_capacity == 0 {
100
} else {
(used.saturating_mul(100) / ROUTE_CHANNEL_CAPACITY) as u8
(used.saturating_mul(100) / self.route_channel_capacity) as u8
};
let high_profile = used_pct >= high_watermark_pct;
let timeout_ms = if high_profile {

View File

@ -480,31 +480,7 @@ impl MePool {
}
async fn endpoint_candidates_for_target_dc(&self, routed_dc: i32) -> Vec<SocketAddr> {
let mut preferred = Vec::<SocketAddr>::new();
let mut seen = HashSet::<SocketAddr>::new();
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
let mut family_selected = Vec::<SocketAddr>::new();
if let Some(addrs) = map_guard.get(&routed_dc) {
for (ip, port) in addrs {
family_selected.push(SocketAddr::new(*ip, *port));
}
}
for addr in family_selected {
if seen.insert(addr) {
preferred.push(addr);
}
}
if !preferred.is_empty() && !self.decision.effective_multipath {
break;
}
}
preferred
self.preferred_endpoints_for_dc(routed_dc).await
}
async fn maybe_trigger_hybrid_recovery(
@ -591,28 +567,7 @@ impl MePool {
routed_dc: i32,
include_warm: bool,
) -> Vec<usize> {
let mut preferred = HashSet::<SocketAddr>::new();
for family in self.family_order() {
let map_guard = match family {
IpFamily::V4 => self.proxy_map_v4.read().await,
IpFamily::V6 => self.proxy_map_v6.read().await,
};
let mut family_selected = Vec::<SocketAddr>::new();
if let Some(v) = map_guard.get(&routed_dc) {
family_selected.extend(v.iter().map(|(ip, port)| SocketAddr::new(*ip, *port)));
}
for endpoint in family_selected {
preferred.insert(endpoint);
}
drop(map_guard);
if !preferred.is_empty() && !self.decision.effective_multipath {
break;
}
}
let preferred = self.preferred_endpoints_for_dc(routed_dc).await;
if preferred.is_empty() {
return Vec::new();
}
@ -622,7 +577,7 @@ impl MePool {
if !self.writer_eligible_for_selection(w, include_warm) {
continue;
}
if w.writer_dc == routed_dc && preferred.contains(&w.addr) {
if w.writer_dc == routed_dc && preferred.iter().any(|endpoint| *endpoint == w.addr) {
out.push(idx);
}
}