Adaptive Buffers + Session Eviction Method

This commit is contained in:
Alexey 2026-03-18 10:49:02 +03:00
parent 5dd0c47f14
commit 3739f38440
No known key found for this signature in database
27 changed files with 1479 additions and 50 deletions

View File

@ -195,6 +195,8 @@ pub(super) struct ZeroPoolData {
pub(super) pool_swap_total: u64,
pub(super) pool_drain_active: u64,
pub(super) pool_force_close_total: u64,
pub(super) pool_drain_soft_evict_total: u64,
pub(super) pool_drain_soft_evict_writer_total: u64,
pub(super) pool_stale_pick_total: u64,
pub(super) writer_removed_total: u64,
pub(super) writer_removed_unexpected_total: u64,
@ -360,6 +362,11 @@ pub(super) struct MinimalMeRuntimeData {
pub(super) me_reconnect_backoff_cap_ms: u64,
pub(super) me_reconnect_fast_retry_count: u32,
pub(super) me_pool_drain_ttl_secs: u64,
pub(super) me_pool_drain_soft_evict_enabled: bool,
pub(super) me_pool_drain_soft_evict_grace_secs: u64,
pub(super) me_pool_drain_soft_evict_per_writer: u8,
pub(super) me_pool_drain_soft_evict_budget_per_core: u16,
pub(super) me_pool_drain_soft_evict_cooldown_ms: u64,
pub(super) me_pool_force_close_secs: u64,
pub(super) me_pool_min_fresh_ratio: f32,
pub(super) me_bind_stale_mode: &'static str,

View File

@ -96,6 +96,8 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
pool_swap_total: stats.get_pool_swap_total(),
pool_drain_active: stats.get_pool_drain_active(),
pool_force_close_total: stats.get_pool_force_close_total(),
pool_drain_soft_evict_total: stats.get_pool_drain_soft_evict_total(),
pool_drain_soft_evict_writer_total: stats.get_pool_drain_soft_evict_writer_total(),
pool_stale_pick_total: stats.get_pool_stale_pick_total(),
writer_removed_total: stats.get_me_writer_removed_total(),
writer_removed_unexpected_total: stats.get_me_writer_removed_unexpected_total(),
@ -427,6 +429,11 @@ async fn get_minimal_payload_cached(
me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms,
me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs,
me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs,
me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer,
me_pool_drain_soft_evict_budget_per_core: runtime.me_pool_drain_soft_evict_budget_per_core,
me_pool_drain_soft_evict_cooldown_ms: runtime.me_pool_drain_soft_evict_cooldown_ms,
me_pool_force_close_secs: runtime.me_pool_force_close_secs,
me_pool_min_fresh_ratio: runtime.me_pool_min_fresh_ratio,
me_bind_stale_mode: runtime.me_bind_stale_mode,

View File

@ -27,8 +27,8 @@ const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024;
const DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS: u64 = 2;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 1500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = false;
const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500;
const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true;
const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024;
const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024;
const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3;
@ -36,6 +36,11 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000;
const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000;
const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000;
const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8;
const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000;
const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30;
const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2;
const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5;
@ -85,11 +90,11 @@ pub(crate) fn default_connect_timeout() -> u64 {
}
pub(crate) fn default_keepalive() -> u64 {
60
15
}
pub(crate) fn default_ack_timeout() -> u64 {
300
90
}
pub(crate) fn default_me_one_retry() -> u8 {
12
@ -592,6 +597,26 @@ pub(crate) fn default_me_pool_drain_threshold() -> u64 {
128
}
pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED
}
pub(crate) fn default_me_pool_drain_soft_evict_grace_secs() -> u64 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS
}
pub(crate) fn default_me_pool_drain_soft_evict_per_writer() -> u8 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER
}
pub(crate) fn default_me_pool_drain_soft_evict_budget_per_core() -> u16 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE
}
pub(crate) fn default_me_pool_drain_soft_evict_cooldown_ms() -> u64 {
DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS
}
pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 {
default_me_pool_drain_ttl_secs()
}

View File

@ -56,6 +56,11 @@ pub struct HotFields {
pub hardswap: bool,
pub me_pool_drain_ttl_secs: u64,
pub me_pool_drain_threshold: u64,
pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64,
pub me_pool_drain_soft_evict_per_writer: u8,
pub me_pool_drain_soft_evict_budget_per_core: u16,
pub me_pool_drain_soft_evict_cooldown_ms: u64,
pub me_pool_min_fresh_ratio: f32,
pub me_reinit_drain_timeout_secs: u64,
pub me_hardswap_warmup_delay_min_ms: u64,
@ -138,6 +143,15 @@ impl HotFields {
hardswap: cfg.general.hardswap,
me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs,
me_pool_drain_threshold: cfg.general.me_pool_drain_threshold,
me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled,
me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs,
me_pool_drain_soft_evict_per_writer: cfg.general.me_pool_drain_soft_evict_per_writer,
me_pool_drain_soft_evict_budget_per_core: cfg
.general
.me_pool_drain_soft_evict_budget_per_core,
me_pool_drain_soft_evict_cooldown_ms: cfg
.general
.me_pool_drain_soft_evict_cooldown_ms,
me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio,
me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs,
me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms,
@ -455,6 +469,15 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig {
cfg.general.hardswap = new.general.hardswap;
cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs;
cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold;
cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled;
cfg.general.me_pool_drain_soft_evict_grace_secs =
new.general.me_pool_drain_soft_evict_grace_secs;
cfg.general.me_pool_drain_soft_evict_per_writer =
new.general.me_pool_drain_soft_evict_per_writer;
cfg.general.me_pool_drain_soft_evict_budget_per_core =
new.general.me_pool_drain_soft_evict_budget_per_core;
cfg.general.me_pool_drain_soft_evict_cooldown_ms =
new.general.me_pool_drain_soft_evict_cooldown_ms;
cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio;
cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs;
cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms;
@ -835,6 +858,25 @@ fn log_changes(
old_hot.me_pool_drain_threshold, new_hot.me_pool_drain_threshold,
);
}
if old_hot.me_pool_drain_soft_evict_enabled != new_hot.me_pool_drain_soft_evict_enabled
|| old_hot.me_pool_drain_soft_evict_grace_secs
!= new_hot.me_pool_drain_soft_evict_grace_secs
|| old_hot.me_pool_drain_soft_evict_per_writer
!= new_hot.me_pool_drain_soft_evict_per_writer
|| old_hot.me_pool_drain_soft_evict_budget_per_core
!= new_hot.me_pool_drain_soft_evict_budget_per_core
|| old_hot.me_pool_drain_soft_evict_cooldown_ms
!= new_hot.me_pool_drain_soft_evict_cooldown_ms
{
info!(
"config reload: me_pool_drain_soft_evict: enabled={} grace={}s per_writer={} budget_per_core={} cooldown={}ms",
new_hot.me_pool_drain_soft_evict_enabled,
new_hot.me_pool_drain_soft_evict_grace_secs,
new_hot.me_pool_drain_soft_evict_per_writer,
new_hot.me_pool_drain_soft_evict_budget_per_core,
new_hot.me_pool_drain_soft_evict_cooldown_ms
);
}
if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON {
info!(

View File

@ -406,6 +406,35 @@ impl ProxyConfig {
));
}
if config.general.me_pool_drain_soft_evict_grace_secs > 3600 {
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_grace_secs must be within [0, 3600]".to_string(),
));
}
if config.general.me_pool_drain_soft_evict_per_writer == 0
|| config.general.me_pool_drain_soft_evict_per_writer > 16
{
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_per_writer must be within [1, 16]".to_string(),
));
}
if config.general.me_pool_drain_soft_evict_budget_per_core == 0
|| config.general.me_pool_drain_soft_evict_budget_per_core > 64
{
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_budget_per_core must be within [1, 64]"
.to_string(),
));
}
if config.general.me_pool_drain_soft_evict_cooldown_ms == 0 {
return Err(ProxyError::Config(
"general.me_pool_drain_soft_evict_cooldown_ms must be > 0".to_string(),
));
}
if config.access.user_max_unique_ips_window_secs == 0 {
return Err(ProxyError::Config(
"access.user_max_unique_ips_window_secs must be > 0".to_string(),

View File

@ -803,6 +803,26 @@ pub struct GeneralConfig {
#[serde(default = "default_me_pool_drain_threshold")]
pub me_pool_drain_threshold: u64,
/// Enable staged client eviction for draining ME writers that remain non-empty past TTL.
#[serde(default = "default_me_pool_drain_soft_evict_enabled")]
pub me_pool_drain_soft_evict_enabled: bool,
/// Extra grace in seconds after drain TTL before soft-eviction stage starts.
#[serde(default = "default_me_pool_drain_soft_evict_grace_secs")]
pub me_pool_drain_soft_evict_grace_secs: u64,
/// Maximum number of client sessions to evict from one draining writer per health tick.
#[serde(default = "default_me_pool_drain_soft_evict_per_writer")]
pub me_pool_drain_soft_evict_per_writer: u8,
/// Soft-eviction budget per CPU core for one health tick.
#[serde(default = "default_me_pool_drain_soft_evict_budget_per_core")]
pub me_pool_drain_soft_evict_budget_per_core: u16,
/// Cooldown for repetitive soft-eviction on the same writer in milliseconds.
#[serde(default = "default_me_pool_drain_soft_evict_cooldown_ms")]
pub me_pool_drain_soft_evict_cooldown_ms: u64,
/// Policy for new binds on stale draining writers.
#[serde(default)]
pub me_bind_stale_mode: MeBindStaleMode,
@ -984,6 +1004,13 @@ impl Default for GeneralConfig {
proxy_secret_len_max: default_proxy_secret_len_max(),
me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(),
me_pool_drain_threshold: default_me_pool_drain_threshold(),
me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(),
me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(),
me_pool_drain_soft_evict_per_writer: default_me_pool_drain_soft_evict_per_writer(),
me_pool_drain_soft_evict_budget_per_core:
default_me_pool_drain_soft_evict_budget_per_core(),
me_pool_drain_soft_evict_cooldown_ms:
default_me_pool_drain_soft_evict_cooldown_ms(),
me_bind_stale_mode: MeBindStaleMode::default(),
me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(),
me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(),

View File

@ -238,6 +238,11 @@ pub(crate) async fn initialize_me_pool(
config.general.hardswap,
config.general.me_pool_drain_ttl_secs,
config.general.me_pool_drain_threshold,
config.general.me_pool_drain_soft_evict_enabled,
config.general.me_pool_drain_soft_evict_grace_secs,
config.general.me_pool_drain_soft_evict_per_writer,
config.general.me_pool_drain_soft_evict_budget_per_core,
config.general.me_pool_drain_soft_evict_cooldown_ms,
config.general.effective_me_pool_force_close_secs(),
config.general.me_pool_min_fresh_ratio,
config.general.me_hardswap_warmup_delay_min_ms,

View File

@ -476,7 +476,7 @@ pub async fn run() -> std::result::Result<(), Box<dyn std::error::Error>> {
Duration::from_secs(config.access.replay_window_secs),
));
let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096));
let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096));
connectivity::run_startup_connectivity(
&config,

View File

@ -292,6 +292,109 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
"telemt_connections_bad_total {}",
if core_enabled { stats.get_connects_bad() } else { 0 }
);
let _ = writeln!(out, "# HELP telemt_connections_current Current active connections");
let _ = writeln!(out, "# TYPE telemt_connections_current gauge");
let _ = writeln!(
out,
"telemt_connections_current {}",
if core_enabled {
stats.get_current_connections_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_connections_direct_current Current active direct connections");
let _ = writeln!(out, "# TYPE telemt_connections_direct_current gauge");
let _ = writeln!(
out,
"telemt_connections_direct_current {}",
if core_enabled {
stats.get_current_connections_direct()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_connections_me_current Current active middle-end connections");
let _ = writeln!(out, "# TYPE telemt_connections_me_current gauge");
let _ = writeln!(
out,
"telemt_connections_me_current {}",
if core_enabled {
stats.get_current_connections_me()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_relay_adaptive_promotions_total Adaptive relay tier promotions"
);
let _ = writeln!(out, "# TYPE telemt_relay_adaptive_promotions_total counter");
let _ = writeln!(
out,
"telemt_relay_adaptive_promotions_total {}",
if core_enabled {
stats.get_relay_adaptive_promotions_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_relay_adaptive_demotions_total Adaptive relay tier demotions"
);
let _ = writeln!(out, "# TYPE telemt_relay_adaptive_demotions_total counter");
let _ = writeln!(
out,
"telemt_relay_adaptive_demotions_total {}",
if core_enabled {
stats.get_relay_adaptive_demotions_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_relay_adaptive_hard_promotions_total Adaptive relay hard promotions triggered by write pressure"
);
let _ = writeln!(
out,
"# TYPE telemt_relay_adaptive_hard_promotions_total counter"
);
let _ = writeln!(
out,
"telemt_relay_adaptive_hard_promotions_total {}",
if core_enabled {
stats.get_relay_adaptive_hard_promotions_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_reconnect_evict_total Reconnect-driven session evictions");
let _ = writeln!(out, "# TYPE telemt_reconnect_evict_total counter");
let _ = writeln!(
out,
"telemt_reconnect_evict_total {}",
if core_enabled {
stats.get_reconnect_evict_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_reconnect_stale_close_total Sessions closed because they became stale after reconnect"
);
let _ = writeln!(out, "# TYPE telemt_reconnect_stale_close_total counter");
let _ = writeln!(
out,
"telemt_reconnect_stale_close_total {}",
if core_enabled {
stats.get_reconnect_stale_close_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts");
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
@ -1547,6 +1650,36 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
}
);
let _ = writeln!(
out,
"# HELP telemt_pool_drain_soft_evict_total Soft-evicted client sessions on stuck draining writers"
);
let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_total counter");
let _ = writeln!(
out,
"telemt_pool_drain_soft_evict_total {}",
if me_allows_normal {
stats.get_pool_drain_soft_evict_total()
} else {
0
}
);
let _ = writeln!(
out,
"# HELP telemt_pool_drain_soft_evict_writer_total Draining writers with at least one soft eviction"
);
let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_writer_total counter");
let _ = writeln!(
out,
"telemt_pool_drain_soft_evict_writer_total {}",
if me_allows_normal {
stats.get_pool_drain_soft_evict_writer_total()
} else {
0
}
);
let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds");
let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter");
let _ = writeln!(
@ -1864,6 +1997,8 @@ mod tests {
stats.increment_connects_all();
stats.increment_connects_all();
stats.increment_connects_bad();
stats.increment_current_connections_direct();
stats.increment_current_connections_me();
stats.increment_handshake_timeouts();
stats.increment_upstream_connect_attempt_total();
stats.increment_upstream_connect_attempt_total();
@ -1895,6 +2030,9 @@ mod tests {
assert!(output.contains("telemt_connections_total 2"));
assert!(output.contains("telemt_connections_bad_total 1"));
assert!(output.contains("telemt_connections_current 2"));
assert!(output.contains("telemt_connections_direct_current 1"));
assert!(output.contains("telemt_connections_me_current 1"));
assert!(output.contains("telemt_handshake_timeouts_total 1"));
assert!(output.contains("telemt_upstream_connect_attempt_total 2"));
assert!(output.contains("telemt_upstream_connect_success_total 1"));
@ -1937,6 +2075,9 @@ mod tests {
let output = render_metrics(&stats, &config, &tracker).await;
assert!(output.contains("telemt_connections_total 0"));
assert!(output.contains("telemt_connections_bad_total 0"));
assert!(output.contains("telemt_connections_current 0"));
assert!(output.contains("telemt_connections_direct_current 0"));
assert!(output.contains("telemt_connections_me_current 0"));
assert!(output.contains("telemt_handshake_timeouts_total 0"));
assert!(output.contains("telemt_user_unique_ips_current{user="));
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
@ -1970,11 +2111,21 @@ mod tests {
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
assert!(output.contains("# TYPE telemt_connections_total counter"));
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
assert!(output.contains("# TYPE telemt_connections_current gauge"));
assert!(output.contains("# TYPE telemt_connections_direct_current gauge"));
assert!(output.contains("# TYPE telemt_connections_me_current gauge"));
assert!(output.contains("# TYPE telemt_relay_adaptive_promotions_total counter"));
assert!(output.contains("# TYPE telemt_relay_adaptive_demotions_total counter"));
assert!(output.contains("# TYPE telemt_relay_adaptive_hard_promotions_total counter"));
assert!(output.contains("# TYPE telemt_reconnect_evict_total counter"));
assert!(output.contains("# TYPE telemt_reconnect_stale_close_total counter"));
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter"));
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
assert!(output.contains(
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
));

View File

@ -0,0 +1,383 @@
use dashmap::DashMap;
use std::cmp::max;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
const EMA_ALPHA: f64 = 0.2;
const PROFILE_TTL: Duration = Duration::from_secs(300);
const THROUGHPUT_UP_BPS: f64 = 8_000_000.0;
const THROUGHPUT_DOWN_BPS: f64 = 2_000_000.0;
const RATIO_CONFIRM_THRESHOLD: f64 = 1.12;
const TIER1_HOLD_TICKS: u32 = 8;
const TIER2_HOLD_TICKS: u32 = 4;
const QUIET_DEMOTE_TICKS: u32 = 480;
const HARD_COOLDOWN_TICKS: u32 = 20;
const HARD_PENDING_THRESHOLD: u32 = 3;
const HARD_PARTIAL_RATIO_THRESHOLD: f64 = 0.25;
const DIRECT_C2S_CAP_BYTES: usize = 128 * 1024;
const DIRECT_S2C_CAP_BYTES: usize = 512 * 1024;
const ME_FRAMES_CAP: usize = 96;
const ME_BYTES_CAP: usize = 384 * 1024;
const ME_DELAY_MIN_US: u64 = 150;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AdaptiveTier {
Base = 0,
Tier1 = 1,
Tier2 = 2,
Tier3 = 3,
}
impl AdaptiveTier {
pub fn promote(self) -> Self {
match self {
Self::Base => Self::Tier1,
Self::Tier1 => Self::Tier2,
Self::Tier2 => Self::Tier3,
Self::Tier3 => Self::Tier3,
}
}
pub fn demote(self) -> Self {
match self {
Self::Base => Self::Base,
Self::Tier1 => Self::Base,
Self::Tier2 => Self::Tier1,
Self::Tier3 => Self::Tier2,
}
}
fn ratio(self) -> (usize, usize) {
match self {
Self::Base => (1, 1),
Self::Tier1 => (5, 4),
Self::Tier2 => (3, 2),
Self::Tier3 => (2, 1),
}
}
pub fn as_u8(self) -> u8 {
self as u8
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TierTransitionReason {
SoftConfirmed,
HardPressure,
QuietDemotion,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TierTransition {
pub from: AdaptiveTier,
pub to: AdaptiveTier,
pub reason: TierTransitionReason,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct RelaySignalSample {
pub c2s_bytes: u64,
pub s2c_requested_bytes: u64,
pub s2c_written_bytes: u64,
pub s2c_write_ops: u64,
pub s2c_partial_writes: u64,
pub s2c_consecutive_pending_writes: u32,
}
#[derive(Debug, Clone, Copy)]
pub struct SessionAdaptiveController {
tier: AdaptiveTier,
max_tier_seen: AdaptiveTier,
throughput_ema_bps: f64,
incoming_ema_bps: f64,
outgoing_ema_bps: f64,
tier1_hold_ticks: u32,
tier2_hold_ticks: u32,
quiet_ticks: u32,
hard_cooldown_ticks: u32,
}
impl SessionAdaptiveController {
pub fn new(initial_tier: AdaptiveTier) -> Self {
Self {
tier: initial_tier,
max_tier_seen: initial_tier,
throughput_ema_bps: 0.0,
incoming_ema_bps: 0.0,
outgoing_ema_bps: 0.0,
tier1_hold_ticks: 0,
tier2_hold_ticks: 0,
quiet_ticks: 0,
hard_cooldown_ticks: 0,
}
}
pub fn max_tier_seen(&self) -> AdaptiveTier {
self.max_tier_seen
}
pub fn observe(&mut self, sample: RelaySignalSample, tick_secs: f64) -> Option<TierTransition> {
if tick_secs <= f64::EPSILON {
return None;
}
if self.hard_cooldown_ticks > 0 {
self.hard_cooldown_ticks -= 1;
}
let c2s_bps = (sample.c2s_bytes as f64 * 8.0) / tick_secs;
let incoming_bps = (sample.s2c_requested_bytes as f64 * 8.0) / tick_secs;
let outgoing_bps = (sample.s2c_written_bytes as f64 * 8.0) / tick_secs;
let throughput = c2s_bps.max(outgoing_bps);
self.throughput_ema_bps = ema(self.throughput_ema_bps, throughput);
self.incoming_ema_bps = ema(self.incoming_ema_bps, incoming_bps);
self.outgoing_ema_bps = ema(self.outgoing_ema_bps, outgoing_bps);
let tier1_now = self.throughput_ema_bps >= THROUGHPUT_UP_BPS;
if tier1_now {
self.tier1_hold_ticks = self.tier1_hold_ticks.saturating_add(1);
} else {
self.tier1_hold_ticks = 0;
}
let ratio = if self.outgoing_ema_bps <= f64::EPSILON {
0.0
} else {
self.incoming_ema_bps / self.outgoing_ema_bps
};
let tier2_now = ratio >= RATIO_CONFIRM_THRESHOLD;
if tier2_now {
self.tier2_hold_ticks = self.tier2_hold_ticks.saturating_add(1);
} else {
self.tier2_hold_ticks = 0;
}
let partial_ratio = if sample.s2c_write_ops == 0 {
0.0
} else {
sample.s2c_partial_writes as f64 / sample.s2c_write_ops as f64
};
let hard_now = sample.s2c_consecutive_pending_writes >= HARD_PENDING_THRESHOLD
|| partial_ratio >= HARD_PARTIAL_RATIO_THRESHOLD;
if hard_now && self.hard_cooldown_ticks == 0 {
return self.promote(TierTransitionReason::HardPressure, HARD_COOLDOWN_TICKS);
}
if self.tier1_hold_ticks >= TIER1_HOLD_TICKS && self.tier2_hold_ticks >= TIER2_HOLD_TICKS {
return self.promote(TierTransitionReason::SoftConfirmed, 0);
}
let demote_candidate = self.throughput_ema_bps < THROUGHPUT_DOWN_BPS && !tier2_now && !hard_now;
if demote_candidate {
self.quiet_ticks = self.quiet_ticks.saturating_add(1);
if self.quiet_ticks >= QUIET_DEMOTE_TICKS {
self.quiet_ticks = 0;
return self.demote(TierTransitionReason::QuietDemotion);
}
} else {
self.quiet_ticks = 0;
}
None
}
fn promote(
&mut self,
reason: TierTransitionReason,
hard_cooldown_ticks: u32,
) -> Option<TierTransition> {
let from = self.tier;
let to = from.promote();
if from == to {
return None;
}
self.tier = to;
self.max_tier_seen = max(self.max_tier_seen, to);
self.hard_cooldown_ticks = hard_cooldown_ticks;
self.tier1_hold_ticks = 0;
self.tier2_hold_ticks = 0;
self.quiet_ticks = 0;
Some(TierTransition { from, to, reason })
}
fn demote(&mut self, reason: TierTransitionReason) -> Option<TierTransition> {
let from = self.tier;
let to = from.demote();
if from == to {
return None;
}
self.tier = to;
self.tier1_hold_ticks = 0;
self.tier2_hold_ticks = 0;
Some(TierTransition { from, to, reason })
}
}
#[derive(Debug, Clone, Copy)]
struct UserAdaptiveProfile {
tier: AdaptiveTier,
seen_at: Instant,
}
fn profiles() -> &'static DashMap<String, UserAdaptiveProfile> {
static USER_PROFILES: OnceLock<DashMap<String, UserAdaptiveProfile>> = OnceLock::new();
USER_PROFILES.get_or_init(DashMap::new)
}
pub fn seed_tier_for_user(user: &str) -> AdaptiveTier {
let now = Instant::now();
if let Some(entry) = profiles().get(user) {
let value = entry.value();
if now.duration_since(value.seen_at) <= PROFILE_TTL {
return value.tier;
}
}
AdaptiveTier::Base
}
pub fn record_user_tier(user: &str, tier: AdaptiveTier) {
let now = Instant::now();
if let Some(mut entry) = profiles().get_mut(user) {
let existing = *entry;
let effective = if now.duration_since(existing.seen_at) > PROFILE_TTL {
tier
} else {
max(existing.tier, tier)
};
*entry = UserAdaptiveProfile {
tier: effective,
seen_at: now,
};
return;
}
profiles().insert(
user.to_string(),
UserAdaptiveProfile { tier, seen_at: now },
);
}
pub fn direct_copy_buffers_for_tier(
tier: AdaptiveTier,
base_c2s: usize,
base_s2c: usize,
) -> (usize, usize) {
let (num, den) = tier.ratio();
(
scale(base_c2s, num, den, DIRECT_C2S_CAP_BYTES),
scale(base_s2c, num, den, DIRECT_S2C_CAP_BYTES),
)
}
pub fn me_flush_policy_for_tier(
tier: AdaptiveTier,
base_frames: usize,
base_bytes: usize,
base_delay: Duration,
) -> (usize, usize, Duration) {
let (num, den) = tier.ratio();
let frames = scale(base_frames, num, den, ME_FRAMES_CAP).max(1);
let bytes = scale(base_bytes, num, den, ME_BYTES_CAP).max(4096);
let delay_us = base_delay.as_micros() as u64;
let adjusted_delay_us = match tier {
AdaptiveTier::Base => delay_us,
AdaptiveTier::Tier1 => (delay_us.saturating_mul(7)).saturating_div(10),
AdaptiveTier::Tier2 => delay_us.saturating_div(2),
AdaptiveTier::Tier3 => (delay_us.saturating_mul(3)).saturating_div(10),
}
.max(ME_DELAY_MIN_US)
.min(delay_us.max(ME_DELAY_MIN_US));
(frames, bytes, Duration::from_micros(adjusted_delay_us))
}
fn ema(prev: f64, value: f64) -> f64 {
if prev <= f64::EPSILON {
value
} else {
(prev * (1.0 - EMA_ALPHA)) + (value * EMA_ALPHA)
}
}
fn scale(base: usize, numerator: usize, denominator: usize, cap: usize) -> usize {
let scaled = base
.saturating_mul(numerator)
.saturating_div(denominator.max(1));
scaled.min(cap).max(1)
}
#[cfg(test)]
mod tests {
use super::*;
fn sample(
c2s_bytes: u64,
s2c_requested_bytes: u64,
s2c_written_bytes: u64,
s2c_write_ops: u64,
s2c_partial_writes: u64,
s2c_consecutive_pending_writes: u32,
) -> RelaySignalSample {
RelaySignalSample {
c2s_bytes,
s2c_requested_bytes,
s2c_written_bytes,
s2c_write_ops,
s2c_partial_writes,
s2c_consecutive_pending_writes,
}
}
#[test]
fn test_soft_promotion_requires_tier1_and_tier2() {
let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Base);
let tick_secs = 0.25;
let mut promoted = None;
for _ in 0..8 {
promoted = ctrl.observe(
sample(
300_000, // ~9.6 Mbps
320_000, // incoming > outgoing to confirm tier2
250_000,
10,
0,
0,
),
tick_secs,
);
}
let transition = promoted.expect("expected soft promotion");
assert_eq!(transition.from, AdaptiveTier::Base);
assert_eq!(transition.to, AdaptiveTier::Tier1);
assert_eq!(transition.reason, TierTransitionReason::SoftConfirmed);
}
#[test]
fn test_hard_promotion_on_pending_pressure() {
let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Base);
let transition = ctrl
.observe(
sample(10_000, 20_000, 10_000, 4, 1, 3),
0.25,
)
.expect("expected hard promotion");
assert_eq!(transition.reason, TierTransitionReason::HardPressure);
assert_eq!(transition.to, AdaptiveTier::Tier1);
}
#[test]
fn test_quiet_demotion_is_slow_and_stepwise() {
let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Tier2);
let mut demotion = None;
for _ in 0..QUIET_DEMOTE_TICKS {
demotion = ctrl.observe(sample(1, 1, 1, 1, 0, 0), 0.25);
}
let transition = demotion.expect("expected quiet demotion");
assert_eq!(transition.from, AdaptiveTier::Tier2);
assert_eq!(transition.to, AdaptiveTier::Tier1);
assert_eq!(transition.reason, TierTransitionReason::QuietDemotion);
}
}

View File

@ -40,6 +40,7 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
use crate::proxy::masking::handle_bad_client;
use crate::proxy::middle_relay::handle_via_middle_proxy;
use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController};
use crate::proxy::session_eviction::register_session;
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60))
@ -731,6 +732,17 @@ impl RunningClientHandler {
return Err(e);
}
let registration = register_session(&user, success.dc_idx);
if registration.replaced_existing {
stats.increment_reconnect_evict_total();
warn!(
user = %user,
dc = success.dc_idx,
"Reconnect detected: replacing active session for user+dc"
);
}
let session_lease = registration.lease;
let route_snapshot = route_runtime.snapshot();
let session_id = rng.u64();
let relay_result = if config.general.use_middle_proxy
@ -750,6 +762,7 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
session_lease.clone(),
)
.await
} else {
@ -766,6 +779,7 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
session_lease.clone(),
)
.await
}
@ -783,6 +797,7 @@ impl RunningClientHandler {
route_runtime.subscribe(),
route_snapshot,
session_id,
session_lease.clone(),
)
.await
};

View File

@ -18,6 +18,8 @@ use crate::proxy::route_mode::{
RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state,
cutover_stagger_delay,
};
use crate::proxy::adaptive_buffers;
use crate::proxy::session_eviction::SessionLease;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::UpstreamManager;
@ -34,6 +36,7 @@ pub(crate) async fn handle_via_direct<R, W>(
mut route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
session_lease: SessionLease,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@ -67,16 +70,26 @@ where
stats.increment_user_curr_connects(user);
stats.increment_current_connections_direct();
let seed_tier = adaptive_buffers::seed_tier_for_user(user);
let (c2s_copy_buf, s2c_copy_buf) = adaptive_buffers::direct_copy_buffers_for_tier(
seed_tier,
config.general.direct_relay_copy_buf_c2s_bytes,
config.general.direct_relay_copy_buf_s2c_bytes,
);
let relay_result = relay_bidirectional(
client_reader,
client_writer,
tg_reader,
tg_writer,
config.general.direct_relay_copy_buf_c2s_bytes,
config.general.direct_relay_copy_buf_s2c_bytes,
c2s_copy_buf,
s2c_copy_buf,
user,
success.dc_idx,
Arc::clone(&stats),
buffer_pool,
session_lease,
seed_tier,
);
tokio::pin!(relay_result);
let relay_result = loop {

View File

@ -20,6 +20,8 @@ use crate::proxy::route_mode::{
RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state,
cutover_stagger_delay,
};
use crate::proxy::adaptive_buffers::{self, AdaptiveTier};
use crate::proxy::session_eviction::SessionLease;
use crate::stats::Stats;
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag};
@ -59,8 +61,8 @@ struct MeD2cFlushPolicy {
}
impl MeD2cFlushPolicy {
fn from_config(config: &ProxyConfig) -> Self {
Self {
fn from_config(config: &ProxyConfig, tier: AdaptiveTier) -> Self {
let base = Self {
max_frames: config
.general
.me_d2c_flush_batch_max_frames
@ -71,6 +73,18 @@ impl MeD2cFlushPolicy {
.max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN),
max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us),
ack_flush_immediate: config.general.me_d2c_ack_flush_immediate,
};
let (max_frames, max_bytes, max_delay) = adaptive_buffers::me_flush_policy_for_tier(
tier,
base.max_frames,
base.max_bytes,
base.max_delay,
);
Self {
max_frames,
max_bytes,
max_delay,
ack_flush_immediate: base.ack_flush_immediate,
}
}
}
@ -235,6 +249,7 @@ pub(crate) async fn handle_via_middle_proxy<R, W>(
mut route_rx: watch::Receiver<RouteCutoverState>,
route_snapshot: RouteCutoverState,
session_id: u64,
session_lease: SessionLease,
) -> Result<()>
where
R: AsyncRead + Unpin + Send + 'static,
@ -244,6 +259,7 @@ where
let peer = success.peer;
let proto_tag = success.proto_tag;
let pool_generation = me_pool.current_generation();
let seed_tier = adaptive_buffers::seed_tier_for_user(&user);
debug!(
user = %user,
@ -295,6 +311,15 @@ where
return Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string()));
}
if session_lease.is_stale() {
stats.increment_reconnect_stale_close_total();
let _ = me_pool.send_close(conn_id).await;
me_pool.registry().unregister(conn_id).await;
stats.decrement_current_connections_me();
stats.decrement_user_curr_connects(&user);
return Err(ProxyError::Proxy("Session evicted by reconnect".to_string()));
}
// Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable)
let user_tag: Option<Vec<u8>> = config
.access
@ -368,7 +393,7 @@ where
let rng_clone = rng.clone();
let user_clone = user.clone();
let bytes_me2c_clone = bytes_me2c.clone();
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config);
let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config, seed_tier);
let me_writer = tokio::spawn(async move {
let mut writer = crypto_writer;
let mut frame_buf = Vec::with_capacity(16 * 1024);
@ -528,6 +553,12 @@ where
let mut frame_counter: u64 = 0;
let mut route_watch_open = true;
loop {
if session_lease.is_stale() {
stats.increment_reconnect_stale_close_total();
let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await;
main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string()));
break;
}
if let Some(cutover) = affected_cutover_state(
&route_rx,
RelayRouteMode::Middle,
@ -636,6 +667,7 @@ where
frames_ok = frame_counter,
"ME relay cleanup"
);
adaptive_buffers::record_user_tier(&user, seed_tier);
me_pool.registry().unregister(conn_id).await;
stats.decrement_current_connections_me();
stats.decrement_user_curr_connects(&user);

View File

@ -1,5 +1,6 @@
//! Proxy Defs
pub mod adaptive_buffers;
pub mod client;
pub mod direct_relay;
pub mod handshake;
@ -7,6 +8,7 @@ pub mod masking;
pub mod middle_relay;
pub mod route_mode;
pub mod relay;
pub mod session_eviction;
pub use client::ClientHandler;
#[allow(unused_imports)]

View File

@ -63,6 +63,10 @@ use tokio::io::{
use tokio::time::Instant;
use tracing::{debug, trace, warn};
use crate::error::Result;
use crate::proxy::adaptive_buffers::{
self, AdaptiveTier, RelaySignalSample, SessionAdaptiveController, TierTransitionReason,
};
use crate::proxy::session_eviction::SessionLease;
use crate::stats::Stats;
use crate::stream::BufferPool;
@ -79,6 +83,7 @@ const ACTIVITY_TIMEOUT: Duration = Duration::from_secs(1800);
/// 10 seconds gives responsive timeout detection (±10s accuracy)
/// without measurable overhead from atomic reads.
const WATCHDOG_INTERVAL: Duration = Duration::from_secs(10);
const ADAPTIVE_TICK: Duration = Duration::from_millis(250);
// ============= CombinedStream =============
@ -155,6 +160,16 @@ struct SharedCounters {
s2c_ops: AtomicU64,
/// Milliseconds since relay epoch of last I/O activity
last_activity_ms: AtomicU64,
/// Bytes requested to write to client (S→C direction).
s2c_requested_bytes: AtomicU64,
/// Total write operations for S→C direction.
s2c_write_ops: AtomicU64,
/// Number of partial writes to client.
s2c_partial_writes: AtomicU64,
/// Number of times S→C poll_write returned Pending.
s2c_pending_writes: AtomicU64,
/// Consecutive pending writes in S→C direction.
s2c_consecutive_pending_writes: AtomicU64,
}
impl SharedCounters {
@ -165,6 +180,11 @@ impl SharedCounters {
c2s_ops: AtomicU64::new(0),
s2c_ops: AtomicU64::new(0),
last_activity_ms: AtomicU64::new(0),
s2c_requested_bytes: AtomicU64::new(0),
s2c_write_ops: AtomicU64::new(0),
s2c_partial_writes: AtomicU64::new(0),
s2c_pending_writes: AtomicU64::new(0),
s2c_consecutive_pending_writes: AtomicU64::new(0),
}
}
@ -259,9 +279,21 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
this.counters
.s2c_requested_bytes
.fetch_add(buf.len() as u64, Ordering::Relaxed);
match Pin::new(&mut this.inner).poll_write(cx, buf) {
Poll::Ready(Ok(n)) => {
this.counters.s2c_write_ops.fetch_add(1, Ordering::Relaxed);
this.counters
.s2c_consecutive_pending_writes
.store(0, Ordering::Relaxed);
if n < buf.len() {
this.counters
.s2c_partial_writes
.fetch_add(1, Ordering::Relaxed);
}
if n > 0 {
// S→C: data written to client
this.counters.s2c_bytes.fetch_add(n as u64, Ordering::Relaxed);
@ -275,6 +307,15 @@ impl<S: AsyncWrite + Unpin> AsyncWrite for StatsIo<S> {
}
Poll::Ready(Ok(n))
}
Poll::Pending => {
this.counters
.s2c_pending_writes
.fetch_add(1, Ordering::Relaxed);
this.counters
.s2c_consecutive_pending_writes
.fetch_add(1, Ordering::Relaxed);
Poll::Pending
}
other => other,
}
}
@ -316,8 +357,11 @@ pub async fn relay_bidirectional<CR, CW, SR, SW>(
c2s_buf_size: usize,
s2c_buf_size: usize,
user: &str,
dc_idx: i16,
stats: Arc<Stats>,
_buffer_pool: Arc<BufferPool>,
session_lease: SessionLease,
seed_tier: AdaptiveTier,
) -> Result<()>
where
CR: AsyncRead + Unpin + Send + 'static,
@ -345,13 +389,33 @@ where
// ── Watchdog: activity timeout + periodic rate logging ──────────
let wd_counters = Arc::clone(&counters);
let wd_user = user_owned.clone();
let wd_dc = dc_idx;
let wd_stats = Arc::clone(&stats);
let wd_session = session_lease.clone();
let watchdog = async {
let mut prev_c2s: u64 = 0;
let mut prev_s2c: u64 = 0;
let mut prev_c2s_log: u64 = 0;
let mut prev_s2c_log: u64 = 0;
let mut prev_c2s_sample: u64 = 0;
let mut prev_s2c_requested_sample: u64 = 0;
let mut prev_s2c_written_sample: u64 = 0;
let mut prev_s2c_write_ops_sample: u64 = 0;
let mut prev_s2c_partial_sample: u64 = 0;
let mut accumulated_log = Duration::ZERO;
let mut adaptive = SessionAdaptiveController::new(seed_tier);
loop {
tokio::time::sleep(WATCHDOG_INTERVAL).await;
tokio::time::sleep(ADAPTIVE_TICK).await;
if wd_session.is_stale() {
wd_stats.increment_reconnect_stale_close_total();
warn!(
user = %wd_user,
dc = wd_dc,
"Session evicted by reconnect"
);
return;
}
let now = Instant::now();
let idle = wd_counters.idle_duration(now, epoch);
@ -370,11 +434,80 @@ where
return; // Causes select! to cancel copy_bidirectional
}
let c2s_total = wd_counters.c2s_bytes.load(Ordering::Relaxed);
let s2c_requested_total = wd_counters
.s2c_requested_bytes
.load(Ordering::Relaxed);
let s2c_written_total = wd_counters.s2c_bytes.load(Ordering::Relaxed);
let s2c_write_ops_total = wd_counters
.s2c_write_ops
.load(Ordering::Relaxed);
let s2c_partial_total = wd_counters
.s2c_partial_writes
.load(Ordering::Relaxed);
let consecutive_pending = wd_counters
.s2c_consecutive_pending_writes
.load(Ordering::Relaxed) as u32;
let sample = RelaySignalSample {
c2s_bytes: c2s_total.saturating_sub(prev_c2s_sample),
s2c_requested_bytes: s2c_requested_total
.saturating_sub(prev_s2c_requested_sample),
s2c_written_bytes: s2c_written_total
.saturating_sub(prev_s2c_written_sample),
s2c_write_ops: s2c_write_ops_total
.saturating_sub(prev_s2c_write_ops_sample),
s2c_partial_writes: s2c_partial_total
.saturating_sub(prev_s2c_partial_sample),
s2c_consecutive_pending_writes: consecutive_pending,
};
if let Some(transition) = adaptive.observe(sample, ADAPTIVE_TICK.as_secs_f64()) {
match transition.reason {
TierTransitionReason::SoftConfirmed => {
wd_stats.increment_relay_adaptive_promotions_total();
}
TierTransitionReason::HardPressure => {
wd_stats.increment_relay_adaptive_promotions_total();
wd_stats.increment_relay_adaptive_hard_promotions_total();
}
TierTransitionReason::QuietDemotion => {
wd_stats.increment_relay_adaptive_demotions_total();
}
}
adaptive_buffers::record_user_tier(&wd_user, adaptive.max_tier_seen());
debug!(
user = %wd_user,
dc = wd_dc,
from_tier = transition.from.as_u8(),
to_tier = transition.to.as_u8(),
reason = ?transition.reason,
throughput_ema_bps = sample
.c2s_bytes
.max(sample.s2c_written_bytes)
.saturating_mul(8)
.saturating_mul(4),
"Adaptive relay tier transition"
);
}
prev_c2s_sample = c2s_total;
prev_s2c_requested_sample = s2c_requested_total;
prev_s2c_written_sample = s2c_written_total;
prev_s2c_write_ops_sample = s2c_write_ops_total;
prev_s2c_partial_sample = s2c_partial_total;
accumulated_log = accumulated_log.saturating_add(ADAPTIVE_TICK);
if accumulated_log < WATCHDOG_INTERVAL {
continue;
}
accumulated_log = Duration::ZERO;
// ── Periodic rate logging ───────────────────────────────
let c2s = wd_counters.c2s_bytes.load(Ordering::Relaxed);
let s2c = wd_counters.s2c_bytes.load(Ordering::Relaxed);
let c2s_delta = c2s - prev_c2s;
let s2c_delta = s2c - prev_s2c;
let c2s_delta = c2s.saturating_sub(prev_c2s_log);
let s2c_delta = s2c.saturating_sub(prev_s2c_log);
if c2s_delta > 0 || s2c_delta > 0 {
let secs = WATCHDOG_INTERVAL.as_secs_f64();
@ -388,8 +521,8 @@ where
);
}
prev_c2s = c2s;
prev_s2c = s2c;
prev_c2s_log = c2s;
prev_s2c_log = s2c;
}
};
@ -424,6 +557,7 @@ where
let c2s_ops = counters.c2s_ops.load(Ordering::Relaxed);
let s2c_ops = counters.s2c_ops.load(Ordering::Relaxed);
let duration = epoch.elapsed();
adaptive_buffers::record_user_tier(&user_owned, seed_tier);
match copy_result {
Some(Ok((c2s, s2c))) => {

View File

@ -0,0 +1,46 @@
/// Session eviction is intentionally disabled in runtime.
///
/// The initial `user+dc` single-lease model caused valid parallel client
/// connections to evict each other. Keep the API shape for compatibility,
/// but make it a no-op until a safer policy is introduced.
#[derive(Debug, Clone, Default)]
pub struct SessionLease;
impl SessionLease {
pub fn is_stale(&self) -> bool {
false
}
#[allow(dead_code)]
pub fn release(&self) {}
}
pub struct RegistrationResult {
pub lease: SessionLease,
pub replaced_existing: bool,
}
pub fn register_session(_user: &str, _dc_idx: i16) -> RegistrationResult {
RegistrationResult {
lease: SessionLease,
replaced_existing: false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_eviction_disabled_behavior() {
let first = register_session("alice", 2);
let second = register_session("alice", 2);
assert!(!first.replaced_existing);
assert!(!second.replaced_existing);
assert!(!first.lease.is_stale());
assert!(!second.lease.is_stale());
first.lease.release();
second.lease.release();
}
}

View File

@ -120,6 +120,8 @@ pub struct Stats {
pool_swap_total: AtomicU64,
pool_drain_active: AtomicU64,
pool_force_close_total: AtomicU64,
pool_drain_soft_evict_total: AtomicU64,
pool_drain_soft_evict_writer_total: AtomicU64,
pool_stale_pick_total: AtomicU64,
me_writer_removed_total: AtomicU64,
me_writer_removed_unexpected_total: AtomicU64,
@ -133,6 +135,11 @@ pub struct Stats {
me_inline_recovery_total: AtomicU64,
ip_reservation_rollback_tcp_limit_total: AtomicU64,
ip_reservation_rollback_quota_limit_total: AtomicU64,
relay_adaptive_promotions_total: AtomicU64,
relay_adaptive_demotions_total: AtomicU64,
relay_adaptive_hard_promotions_total: AtomicU64,
reconnect_evict_total: AtomicU64,
reconnect_stale_close_total: AtomicU64,
telemetry_core_enabled: AtomicBool,
telemetry_user_enabled: AtomicBool,
telemetry_me_level: AtomicU8,
@ -285,6 +292,36 @@ impl Stats {
pub fn decrement_current_connections_me(&self) {
Self::decrement_atomic_saturating(&self.current_connections_me);
}
pub fn increment_relay_adaptive_promotions_total(&self) {
if self.telemetry_core_enabled() {
self.relay_adaptive_promotions_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_relay_adaptive_demotions_total(&self) {
if self.telemetry_core_enabled() {
self.relay_adaptive_demotions_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_relay_adaptive_hard_promotions_total(&self) {
if self.telemetry_core_enabled() {
self.relay_adaptive_hard_promotions_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_reconnect_evict_total(&self) {
if self.telemetry_core_enabled() {
self.reconnect_evict_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_reconnect_stale_close_total(&self) {
if self.telemetry_core_enabled() {
self.reconnect_stale_close_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_handshake_timeouts(&self) {
if self.telemetry_core_enabled() {
self.handshake_timeouts.fetch_add(1, Ordering::Relaxed);
@ -680,6 +717,18 @@ impl Stats {
self.pool_force_close_total.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_pool_drain_soft_evict_total(&self) {
if self.telemetry_me_allows_normal() {
self.pool_drain_soft_evict_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_pool_drain_soft_evict_writer_total(&self) {
if self.telemetry_me_allows_normal() {
self.pool_drain_soft_evict_writer_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn increment_pool_stale_pick_total(&self) {
if self.telemetry_me_allows_normal() {
self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed);
@ -933,6 +982,22 @@ impl Stats {
self.get_current_connections_direct()
.saturating_add(self.get_current_connections_me())
}
pub fn get_relay_adaptive_promotions_total(&self) -> u64 {
self.relay_adaptive_promotions_total.load(Ordering::Relaxed)
}
pub fn get_relay_adaptive_demotions_total(&self) -> u64 {
self.relay_adaptive_demotions_total.load(Ordering::Relaxed)
}
pub fn get_relay_adaptive_hard_promotions_total(&self) -> u64 {
self.relay_adaptive_hard_promotions_total
.load(Ordering::Relaxed)
}
pub fn get_reconnect_evict_total(&self) -> u64 {
self.reconnect_evict_total.load(Ordering::Relaxed)
}
pub fn get_reconnect_stale_close_total(&self) -> u64 {
self.reconnect_stale_close_total.load(Ordering::Relaxed)
}
pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) }
pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) }
pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) }
@ -1185,6 +1250,12 @@ impl Stats {
pub fn get_pool_force_close_total(&self) -> u64 {
self.pool_force_close_total.load(Ordering::Relaxed)
}
pub fn get_pool_drain_soft_evict_total(&self) -> u64 {
self.pool_drain_soft_evict_total.load(Ordering::Relaxed)
}
pub fn get_pool_drain_soft_evict_writer_total(&self) -> u64 {
self.pool_drain_soft_evict_writer_total.load(Ordering::Relaxed)
}
pub fn get_pool_stale_pick_total(&self) -> u64 {
self.pool_stale_pick_total.load(Ordering::Relaxed)
}
@ -1258,6 +1329,9 @@ impl Stats {
}
pub fn decrement_user_curr_connects(&self, user: &str) {
if !self.telemetry_user_enabled() {
return;
}
self.maybe_cleanup_user_stats();
if let Some(stats) = self.user_stats.get(user) {
Self::touch_user_stats(stats.value());

View File

@ -14,8 +14,7 @@ use std::sync::Arc;
// ============= Configuration =============
/// Default buffer size
/// CHANGED: Reduced from 64KB to 16KB to match TLS record size and prevent bufferbloat.
pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024;
pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
/// Default maximum number of pooled buffers
pub const DEFAULT_MAX_BUFFERS: usize = 1024;

View File

@ -299,6 +299,11 @@ async fn run_update_cycle(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,
cfg.general.me_pool_drain_soft_evict_per_writer,
cfg.general.me_pool_drain_soft_evict_budget_per_core,
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms,
@ -526,6 +531,11 @@ pub async fn me_config_updater(
cfg.general.hardswap,
cfg.general.me_pool_drain_ttl_secs,
cfg.general.me_pool_drain_threshold,
cfg.general.me_pool_drain_soft_evict_enabled,
cfg.general.me_pool_drain_soft_evict_grace_secs,
cfg.general.me_pool_drain_soft_evict_per_writer,
cfg.general.me_pool_drain_soft_evict_budget_per_core,
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
cfg.general.effective_me_pool_force_close_secs(),
cfg.general.me_pool_min_fresh_ratio,
cfg.general.me_hardswap_warmup_delay_min_ms,

View File

@ -28,6 +28,8 @@ const HEALTH_RECONNECT_BUDGET_MAX: usize = 128;
const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16;
const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8;
const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256;
#[derive(Debug, Clone)]
struct DcFloorPlanEntry {
@ -66,6 +68,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new();
let mut drain_warn_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut drain_soft_evict_next_allowed: HashMap<u64, Instant> = HashMap::new();
let mut degraded_interval = true;
loop {
let interval = if degraded_interval {
@ -75,7 +78,12 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
};
tokio::time::sleep(interval).await;
pool.prune_closed_writers().await;
reap_draining_writers(&pool, &mut drain_warn_next_allowed).await;
reap_draining_writers(
&pool,
&mut drain_warn_next_allowed,
&mut drain_soft_evict_next_allowed,
)
.await;
let v4_degraded = check_family(
IpFamily::V4,
&pool,
@ -117,6 +125,7 @@ pub async fn me_health_monitor(pool: Arc<MePool>, rng: Arc<SecureRandom>, _min_c
pub(super) async fn reap_draining_writers(
pool: &Arc<MePool>,
warn_next_allowed: &mut HashMap<u64, Instant>,
soft_evict_next_allowed: &mut HashMap<u64, Instant>,
) {
let now_epoch_secs = MePool::now_epoch_secs();
let now = Instant::now();
@ -172,7 +181,7 @@ pub(super) async fn reap_draining_writers(
}
let mut active_draining_writer_ids = HashSet::with_capacity(draining_writers.len());
for writer in draining_writers {
for writer in &draining_writers {
active_draining_writer_ids.insert(writer.id);
let drain_started_at_epoch_secs = writer
.draining_started_at_epoch_secs
@ -209,6 +218,86 @@ pub(super) async fn reap_draining_writers(
}
warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id));
soft_evict_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id));
if pool.drain_soft_evict_enabled() && drain_ttl_secs > 0 && !draining_writers.is_empty() {
let mut force_close_ids = HashSet::<u64>::with_capacity(force_close_writer_ids.len());
for writer_id in &force_close_writer_ids {
force_close_ids.insert(*writer_id);
}
let soft_grace_secs = pool.drain_soft_evict_grace_secs();
let soft_trigger_age_secs = drain_ttl_secs.saturating_add(soft_grace_secs);
let per_writer_limit = pool.drain_soft_evict_per_writer();
let soft_budget = health_drain_soft_evict_budget(pool);
let soft_cooldown = pool.drain_soft_evict_cooldown();
let mut soft_evicted_total = 0usize;
for writer in &draining_writers {
if soft_evicted_total >= soft_budget {
break;
}
if force_close_ids.contains(&writer.id) {
continue;
}
if pool.writer_accepts_new_binding(writer) {
continue;
}
let started_epoch_secs = writer
.draining_started_at_epoch_secs
.load(std::sync::atomic::Ordering::Relaxed);
if started_epoch_secs == 0
|| now_epoch_secs.saturating_sub(started_epoch_secs) < soft_trigger_age_secs
{
continue;
}
if !should_emit_writer_warn(
soft_evict_next_allowed,
writer.id,
now,
soft_cooldown,
) {
continue;
}
let remaining_budget = soft_budget.saturating_sub(soft_evicted_total);
let limit = per_writer_limit.min(remaining_budget);
if limit == 0 {
break;
}
let conn_ids = pool
.registry
.bound_conn_ids_for_writer_limited(writer.id, limit)
.await;
if conn_ids.is_empty() {
continue;
}
let mut evicted_for_writer = 0usize;
for conn_id in conn_ids {
if pool.registry.evict_bound_conn_if_writer(conn_id, writer.id).await {
evicted_for_writer = evicted_for_writer.saturating_add(1);
soft_evicted_total = soft_evicted_total.saturating_add(1);
pool.stats.increment_pool_drain_soft_evict_total();
if soft_evicted_total >= soft_budget {
break;
}
}
}
if evicted_for_writer > 0 {
pool.stats.increment_pool_drain_soft_evict_writer_total();
info!(
writer_id = writer.id,
writer_dc = writer.writer_dc,
endpoint = %writer.addr,
drained_connections = evicted_for_writer,
soft_budget,
soft_trigger_age_secs,
"ME draining writer soft-evicted bound clients"
);
}
}
}
let close_budget = health_drain_close_budget();
let requested_force_close = force_close_writer_ids.len();
@ -258,6 +347,19 @@ pub(super) fn health_drain_close_budget() -> usize {
.clamp(HEALTH_DRAIN_CLOSE_BUDGET_MIN, HEALTH_DRAIN_CLOSE_BUDGET_MAX)
}
pub(super) fn health_drain_soft_evict_budget(pool: &MePool) -> usize {
let cpu_cores = std::thread::available_parallelism()
.map(std::num::NonZeroUsize::get)
.unwrap_or(1);
let per_core = pool.drain_soft_evict_budget_per_core();
cpu_cores
.saturating_mul(per_core)
.clamp(
HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN,
HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX,
)
}
fn should_emit_writer_warn(
next_allowed: &mut HashMap<u64, Instant>,
writer_id: u64,
@ -1443,6 +1545,11 @@ mod tests {
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
@ -1524,8 +1631,9 @@ mod tests {
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
assert_eq!(writer_ids, vec![20, 30]);
@ -1542,8 +1650,9 @@ mod tests {
let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await;
let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let writer_ids: Vec<u64> = pool.writers.read().await.iter().map(|writer| writer.id).collect();
assert_eq!(writer_ids, vec![10, 20, 30]);

View File

@ -82,6 +82,11 @@ async fn make_pool(
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
@ -185,10 +190,11 @@ async fn sorted_writer_ids(pool: &Arc<MePool>) -> Vec<u64> {
async fn reap_draining_writers_clears_warn_state_when_pool_empty() {
let (pool, _rng) = make_pool(128, 1, 1).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
warn_next_allowed.insert(11, Instant::now() + Duration::from_secs(5));
warn_next_allowed.insert(22, Instant::now() + Duration::from_secs(5));
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.is_empty());
}
@ -197,6 +203,8 @@ async fn reap_draining_writers_clears_warn_state_when_pool_empty() {
async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycles() {
let threshold = 3u64;
let (pool, _rng) = make_pool(threshold, 1, 1).await;
pool.me_pool_drain_soft_evict_enabled
.store(false, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
for writer_id in 1..=60u64 {
@ -211,8 +219,9 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..64 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
if writer_count(&pool).await <= threshold as usize {
break;
}
@ -240,11 +249,12 @@ async fn reap_draining_writers_handles_large_empty_writer_population() {
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..24 {
if writer_count(&pool).await == 0 {
break;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
}
assert_eq!(writer_count(&pool).await, 0);
@ -268,11 +278,12 @@ async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..40 {
if writer_count(&pool).await == 0 {
break;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
}
assert_eq!(writer_count(&pool).await, 0);
@ -283,6 +294,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let (pool, _rng) = make_pool(128, 1, 1).await;
let now_epoch_secs = MePool::now_epoch_secs();
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for wave in 0..40u64 {
for offset in 0..8u64 {
@ -296,7 +308,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= writer_count(&pool).await);
let ids = sorted_writer_ids(&pool).await;
@ -304,7 +316,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
let _ = pool.remove_writer_and_close_clients(writer_id).await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= writer_count(&pool).await);
}
}
@ -326,9 +338,10 @@ async fn reap_draining_writers_budgeted_cleanup_never_increases_pool_size() {
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
let mut previous = writer_count(&pool).await;
for _ in 0..32 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let current = writer_count(&pool).await;
assert!(current <= previous);
previous = current;

View File

@ -81,6 +81,11 @@ async fn make_pool(
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,

View File

@ -39,7 +39,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
NetworkDecision::default(),
None,
Arc::new(SecureRandom::new()),
Arc::new(Stats::default()),
Arc::new(Stats::new()),
general.me_keepalive_enabled,
general.me_keepalive_interval_secs,
general.me_keepalive_jitter_secs,
@ -74,6 +74,11 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
general.hardswap,
general.me_pool_drain_ttl_secs,
general.me_pool_drain_threshold,
general.me_pool_drain_soft_evict_enabled,
general.me_pool_drain_soft_evict_grace_secs,
general.me_pool_drain_soft_evict_per_writer,
general.me_pool_drain_soft_evict_budget_per_core,
general.me_pool_drain_soft_evict_cooldown_ms,
general.effective_me_pool_force_close_secs(),
general.me_pool_min_fresh_ratio,
general.me_hardswap_warmup_delay_min_ms,
@ -175,14 +180,15 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
let conn_ids =
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.contains_key(&7));
let _ = pool.remove_writer_and_close_clients(7).await;
assert!(pool.registry.get_writer(conn_ids[0]).await.is_none());
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(!warn_next_allowed.contains_key(&7));
}
@ -194,8 +200,9 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(30), 0, 0).await;
insert_draining_writer(&pool, 3, now_epoch_secs.saturating_sub(20), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![3]);
}
@ -209,8 +216,9 @@ async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
insert_draining_writer(&pool, 33, now_epoch_secs.saturating_sub(20), 1, 0).await;
insert_draining_writer(&pool, 44, now_epoch_secs.saturating_sub(10), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![33, 44]);
}
@ -228,8 +236,9 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
@ -251,8 +260,9 @@ async fn reap_draining_writers_limits_closes_per_health_tick() {
.await;
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(pool.writers.read().await.len(), writer_total - close_budget);
}
@ -274,12 +284,13 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
.await;
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..8 {
if pool.writers.read().await.is_empty() {
break;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
}
assert!(pool.writers.read().await.is_empty());
@ -303,9 +314,10 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
.await;
}
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for _ in 0..16 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
if pool.writers.read().await.len() <= threshold as usize {
break;
}
@ -322,8 +334,9 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(30), 1, 0).await;
insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(20), 1, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![10, 20, 30]);
}
@ -346,8 +359,9 @@ async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
let empty_writer_id = close_budget as u64 + 1;
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]);
}
@ -359,8 +373,9 @@ async fn reap_draining_writers_empty_cleanup_does_not_increment_force_close_metr
insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(60), 0, 0).await;
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(50), 0, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
assert_eq!(pool.stats.get_pool_force_close_total(), 0);
@ -387,8 +402,9 @@ async fn reap_draining_writers_handles_duplicate_force_close_requests_for_same_w
)
.await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(current_writer_ids(&pool).await.is_empty());
}
@ -398,6 +414,7 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
let pool = make_pool(128).await;
let now_epoch_secs = MePool::now_epoch_secs();
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for wave in 0..12u64 {
for offset in 0..9u64 {
@ -410,14 +427,14 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
)
.await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
let existing_writer_ids = current_writer_ids(&pool).await;
for writer_id in existing_writer_ids.into_iter().take(4) {
let _ = pool.remove_writer_and_close_clients(writer_id).await;
}
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
}
}
@ -427,6 +444,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
let pool = make_pool(6).await;
let now_epoch_secs = MePool::now_epoch_secs();
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
for writer_id in 1..=18u64 {
let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 };
@ -446,7 +464,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
}
for _ in 0..16 {
reap_draining_writers(&pool, &mut warn_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
if pool.writers.read().await.len() <= 6 {
break;
}
@ -456,7 +474,60 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
}
#[tokio::test]
async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() {
let pool = make_pool(128).await;
pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_cooldown_ms
.store(1, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let activity = pool.registry.writer_activity_snapshot().await;
assert_eq!(activity.bound_clients_by_writer.get(&77), Some(&2));
assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1);
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
assert_eq!(current_writer_ids(&pool).await, vec![77]);
}
#[tokio::test]
async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
let pool = make_pool(128).await;
pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed);
pool.me_pool_drain_soft_evict_cooldown_ms
.store(60_000, Ordering::Relaxed);
let now_epoch_secs = MePool::now_epoch_secs();
insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await;
let mut warn_next_allowed = HashMap::new();
let mut soft_evict_next_allowed = HashMap::new();
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
let activity = pool.registry.writer_activity_snapshot().await;
assert_eq!(activity.bound_clients_by_writer.get(&88), Some(&2));
assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1);
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
}
#[test]
fn general_config_default_drain_threshold_remains_enabled() {
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);
assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled);
assert_eq!(
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
1
);
}

View File

@ -172,6 +172,11 @@ pub struct MePool {
pub(super) kdf_material_fingerprint: Arc<RwLock<HashMap<SocketAddr, (u64, u16)>>>,
pub(super) me_pool_drain_ttl_secs: AtomicU64,
pub(super) me_pool_drain_threshold: AtomicU64,
pub(super) me_pool_drain_soft_evict_enabled: AtomicBool,
pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64,
pub(super) me_pool_drain_soft_evict_per_writer: AtomicU8,
pub(super) me_pool_drain_soft_evict_budget_per_core: AtomicU32,
pub(super) me_pool_drain_soft_evict_cooldown_ms: AtomicU64,
pub(super) me_pool_force_close_secs: AtomicU64,
pub(super) me_pool_min_fresh_ratio_permille: AtomicU32,
pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64,
@ -273,6 +278,11 @@ impl MePool {
hardswap: bool,
me_pool_drain_ttl_secs: u64,
me_pool_drain_threshold: u64,
me_pool_drain_soft_evict_enabled: bool,
me_pool_drain_soft_evict_grace_secs: u64,
me_pool_drain_soft_evict_per_writer: u8,
me_pool_drain_soft_evict_budget_per_core: u16,
me_pool_drain_soft_evict_cooldown_ms: u64,
me_pool_force_close_secs: u64,
me_pool_min_fresh_ratio: f32,
me_hardswap_warmup_delay_min_ms: u64,
@ -449,6 +459,17 @@ impl MePool {
kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())),
me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs),
me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold),
me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled),
me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs),
me_pool_drain_soft_evict_per_writer: AtomicU8::new(
me_pool_drain_soft_evict_per_writer.max(1),
),
me_pool_drain_soft_evict_budget_per_core: AtomicU32::new(
me_pool_drain_soft_evict_budget_per_core.max(1) as u32,
),
me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new(
me_pool_drain_soft_evict_cooldown_ms.max(1),
),
me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs),
me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille(
me_pool_min_fresh_ratio,
@ -496,6 +517,11 @@ impl MePool {
hardswap: bool,
drain_ttl_secs: u64,
pool_drain_threshold: u64,
pool_drain_soft_evict_enabled: bool,
pool_drain_soft_evict_grace_secs: u64,
pool_drain_soft_evict_per_writer: u8,
pool_drain_soft_evict_budget_per_core: u16,
pool_drain_soft_evict_cooldown_ms: u64,
force_close_secs: u64,
min_fresh_ratio: f32,
hardswap_warmup_delay_min_ms: u64,
@ -536,6 +562,18 @@ impl MePool {
.store(drain_ttl_secs, Ordering::Relaxed);
self.me_pool_drain_threshold
.store(pool_drain_threshold, Ordering::Relaxed);
self.me_pool_drain_soft_evict_enabled
.store(pool_drain_soft_evict_enabled, Ordering::Relaxed);
self.me_pool_drain_soft_evict_grace_secs
.store(pool_drain_soft_evict_grace_secs, Ordering::Relaxed);
self.me_pool_drain_soft_evict_per_writer
.store(pool_drain_soft_evict_per_writer.max(1), Ordering::Relaxed);
self.me_pool_drain_soft_evict_budget_per_core.store(
pool_drain_soft_evict_budget_per_core.max(1) as u32,
Ordering::Relaxed,
);
self.me_pool_drain_soft_evict_cooldown_ms
.store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed);
self.me_pool_force_close_secs
.store(force_close_secs, Ordering::Relaxed);
self.me_pool_min_fresh_ratio_permille
@ -690,6 +728,36 @@ impl MePool {
}
}
pub(super) fn drain_soft_evict_enabled(&self) -> bool {
self.me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed)
}
pub(super) fn drain_soft_evict_grace_secs(&self) -> u64 {
self.me_pool_drain_soft_evict_grace_secs
.load(Ordering::Relaxed)
}
pub(super) fn drain_soft_evict_per_writer(&self) -> usize {
self.me_pool_drain_soft_evict_per_writer
.load(Ordering::Relaxed)
.max(1) as usize
}
pub(super) fn drain_soft_evict_budget_per_core(&self) -> usize {
self.me_pool_drain_soft_evict_budget_per_core
.load(Ordering::Relaxed)
.max(1) as usize
}
pub(super) fn drain_soft_evict_cooldown(&self) -> Duration {
Duration::from_millis(
self.me_pool_drain_soft_evict_cooldown_ms
.load(Ordering::Relaxed)
.max(1),
)
}
pub(super) async fn key_selector(&self) -> u32 {
self.proxy_secret.read().await.key_selector
}

View File

@ -124,6 +124,11 @@ pub(crate) struct MeApiRuntimeSnapshot {
pub me_reconnect_backoff_cap_ms: u64,
pub me_reconnect_fast_retry_count: u32,
pub me_pool_drain_ttl_secs: u64,
pub me_pool_drain_soft_evict_enabled: bool,
pub me_pool_drain_soft_evict_grace_secs: u64,
pub me_pool_drain_soft_evict_per_writer: u8,
pub me_pool_drain_soft_evict_budget_per_core: u16,
pub me_pool_drain_soft_evict_cooldown_ms: u64,
pub me_pool_force_close_secs: u64,
pub me_pool_min_fresh_ratio: f32,
pub me_bind_stale_mode: &'static str,
@ -562,6 +567,22 @@ impl MePool {
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
me_pool_drain_soft_evict_enabled: self
.me_pool_drain_soft_evict_enabled
.load(Ordering::Relaxed),
me_pool_drain_soft_evict_grace_secs: self
.me_pool_drain_soft_evict_grace_secs
.load(Ordering::Relaxed),
me_pool_drain_soft_evict_per_writer: self
.me_pool_drain_soft_evict_per_writer
.load(Ordering::Relaxed),
me_pool_drain_soft_evict_budget_per_core: self
.me_pool_drain_soft_evict_budget_per_core
.load(Ordering::Relaxed)
.min(u16::MAX as u32) as u16,
me_pool_drain_soft_evict_cooldown_ms: self
.me_pool_drain_soft_evict_cooldown_ms
.load(Ordering::Relaxed),
me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed),
me_pool_min_fresh_ratio: Self::permille_to_ratio(
self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed),

View File

@ -394,6 +394,56 @@ impl ConnRegistry {
inner.writer_for_conn.keys().copied().collect()
}
pub(super) async fn bound_conn_ids_for_writer_limited(
&self,
writer_id: u64,
limit: usize,
) -> Vec<u64> {
if limit == 0 {
return Vec::new();
}
let inner = self.inner.read().await;
let Some(conn_ids) = inner.conns_for_writer.get(&writer_id) else {
return Vec::new();
};
let mut out = conn_ids.iter().copied().collect::<Vec<_>>();
out.sort_unstable();
out.truncate(limit);
out
}
pub(super) async fn evict_bound_conn_if_writer(&self, conn_id: u64, writer_id: u64) -> bool {
let maybe_client_tx = {
let mut inner = self.inner.write().await;
if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) {
return false;
}
let client_tx = inner.map.get(&conn_id).cloned();
inner.map.remove(&conn_id);
inner.meta.remove(&conn_id);
inner.writer_for_conn.remove(&conn_id);
let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) {
set.remove(&conn_id);
set.is_empty()
} else {
false
};
if became_empty {
inner
.writer_idle_since_epoch_secs
.insert(writer_id, Self::now_epoch_secs());
}
client_tx
};
if let Some(client_tx) = maybe_client_tx {
let _ = client_tx.try_send(MeResponse::Close);
}
true
}
pub async fn writer_lost(&self, writer_id: u64) -> Vec<BoundConn> {
let mut inner = self.inner.write().await;
inner.writers.remove(&writer_id);
@ -444,6 +494,7 @@ mod tests {
use super::ConnMeta;
use super::ConnRegistry;
use super::MeResponse;
#[tokio::test]
async fn writer_activity_snapshot_tracks_writer_and_dc_load() {
@ -634,4 +685,86 @@ mod tests {
);
assert!(registry.get_writer(conn_id).await.is_none());
}
#[tokio::test]
async fn bound_conn_ids_for_writer_limited_is_sorted_and_bounded() {
let registry = ConnRegistry::new();
let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
let mut conn_ids = Vec::new();
for _ in 0..5 {
let (conn_id, _rx) = registry.register().await;
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
conn_ids.push(conn_id);
}
conn_ids.sort_unstable();
let limited = registry.bound_conn_ids_for_writer_limited(10, 3).await;
assert_eq!(limited.len(), 3);
assert_eq!(limited, conn_ids.into_iter().take(3).collect::<Vec<_>>());
}
#[tokio::test]
async fn evict_bound_conn_if_writer_does_not_touch_rebound_conn() {
let registry = ConnRegistry::new();
let (conn_id, mut rx) = registry.register().await;
let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8);
let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8);
registry.register_writer(10, writer_tx_a).await;
registry.register_writer(20, writer_tx_b).await;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443);
assert!(
registry
.bind_writer(
conn_id,
10,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 0,
},
)
.await
);
assert!(
registry
.bind_writer(
conn_id,
20,
ConnMeta {
target_dc: 2,
client_addr: addr,
our_addr: addr,
proto_flags: 1,
},
)
.await
);
let evicted = registry.evict_bound_conn_if_writer(conn_id, 10).await;
assert!(!evicted);
assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20);
assert!(rx.try_recv().is_err());
let evicted = registry.evict_bound_conn_if_writer(conn_id, 20).await;
assert!(evicted);
assert!(registry.get_writer(conn_id).await.is_none());
assert!(matches!(rx.try_recv(), Ok(MeResponse::Close)));
}
}

View File

@ -11,6 +11,8 @@ use tokio::net::TcpStream;
use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol};
use tracing::debug;
const DEFAULT_SOCKET_BUFFER_BYTES: usize = 256 * 1024;
/// Configure TCP socket with recommended settings for proxy use
#[allow(dead_code)]
pub fn configure_tcp_socket(
@ -34,10 +36,10 @@ pub fn configure_tcp_socket(
socket.set_tcp_keepalive(&keepalive)?;
}
// CHANGED: Removed manual buffer size setting (was 256KB).
// Allowing the OS kernel to handle TCP window scaling (Autotuning) is critical
// for mobile clients to avoid bufferbloat and stalled connections during uploads.
// Use explicit baseline buffers to reduce slow-start stalls on high RTT links.
socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
Ok(())
}
@ -62,6 +64,10 @@ pub fn configure_client_socket(
let keepalive = keepalive.with_interval(Duration::from_secs(keepalive_secs));
socket.set_tcp_keepalive(&keepalive)?;
// Keep explicit baseline buffers for predictable throughput across busy hosts.
socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
// Set TCP user timeout (Linux only)
// NOTE: iOS does not support TCP_USER_TIMEOUT - application-level timeout
@ -124,6 +130,8 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option<IpAddr>)
// Disable Nagle
socket.set_nodelay(true)?;
socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?;
if let Some(bind_ip) = bind_addr {
let bind_sock_addr = SocketAddr::new(bind_ip, 0);