From 3739f38440b63d81005a621f0884eef535046f69 Mon Sep 17 00:00:00 2001 From: Alexey <247128645+axkurcom@users.noreply.github.com> Date: Wed, 18 Mar 2026 10:49:02 +0300 Subject: [PATCH] Adaptive Buffers + Session Eviction Method --- src/api/model.rs | 7 + src/api/runtime_stats.rs | 7 + src/config/defaults.rs | 33 +- src/config/hot_reload.rs | 42 ++ src/config/load.rs | 29 ++ src/config/types.rs | 27 ++ src/maestro/me_startup.rs | 5 + src/maestro/mod.rs | 2 +- src/metrics.rs | 151 +++++++ src/proxy/adaptive_buffers.rs | 383 ++++++++++++++++++ src/proxy/client.rs | 15 + src/proxy/direct_relay.rs | 17 +- src/proxy/middle_relay.rs | 38 +- src/proxy/mod.rs | 2 + src/proxy/relay.rs | 148 ++++++- src/proxy/session_eviction.rs | 46 +++ src/stats/mod.rs | 74 ++++ src/stream/buffer_pool.rs | 3 +- src/transport/middle_proxy/config_updater.rs | 10 + src/transport/middle_proxy/health.rs | 117 +++++- .../middle_proxy/health_adversarial_tests.rs | 27 +- .../middle_proxy/health_integration_tests.rs | 5 + .../middle_proxy/health_regression_tests.rs | 103 ++++- src/transport/middle_proxy/pool.rs | 68 ++++ src/transport/middle_proxy/pool_status.rs | 21 + src/transport/middle_proxy/registry.rs | 133 ++++++ src/transport/socket.rs | 16 +- 27 files changed, 1479 insertions(+), 50 deletions(-) create mode 100644 src/proxy/adaptive_buffers.rs create mode 100644 src/proxy/session_eviction.rs diff --git a/src/api/model.rs b/src/api/model.rs index 31233d7..6b6fd72 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -195,6 +195,8 @@ pub(super) struct ZeroPoolData { pub(super) pool_swap_total: u64, pub(super) pool_drain_active: u64, pub(super) pool_force_close_total: u64, + pub(super) pool_drain_soft_evict_total: u64, + pub(super) pool_drain_soft_evict_writer_total: u64, pub(super) pool_stale_pick_total: u64, pub(super) writer_removed_total: u64, pub(super) writer_removed_unexpected_total: u64, @@ -360,6 +362,11 @@ pub(super) struct MinimalMeRuntimeData { pub(super) me_reconnect_backoff_cap_ms: u64, pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_pool_drain_ttl_secs: u64, + pub(super) me_pool_drain_soft_evict_enabled: bool, + pub(super) me_pool_drain_soft_evict_grace_secs: u64, + pub(super) me_pool_drain_soft_evict_per_writer: u8, + pub(super) me_pool_drain_soft_evict_budget_per_core: u16, + pub(super) me_pool_drain_soft_evict_cooldown_ms: u64, pub(super) me_pool_force_close_secs: u64, pub(super) me_pool_min_fresh_ratio: f32, pub(super) me_bind_stale_mode: &'static str, diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index 9260c40..61c8a5a 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -96,6 +96,8 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer pool_swap_total: stats.get_pool_swap_total(), pool_drain_active: stats.get_pool_drain_active(), pool_force_close_total: stats.get_pool_force_close_total(), + pool_drain_soft_evict_total: stats.get_pool_drain_soft_evict_total(), + pool_drain_soft_evict_writer_total: stats.get_pool_drain_soft_evict_writer_total(), pool_stale_pick_total: stats.get_pool_stale_pick_total(), writer_removed_total: stats.get_me_writer_removed_total(), writer_removed_unexpected_total: stats.get_me_writer_removed_unexpected_total(), @@ -427,6 +429,11 @@ async fn get_minimal_payload_cached( me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms, me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs, + me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled, + me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs, + me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer, + me_pool_drain_soft_evict_budget_per_core: runtime.me_pool_drain_soft_evict_budget_per_core, + me_pool_drain_soft_evict_cooldown_ms: runtime.me_pool_drain_soft_evict_cooldown_ms, me_pool_force_close_secs: runtime.me_pool_force_close_secs, me_pool_min_fresh_ratio: runtime.me_pool_min_fresh_ratio, me_bind_stale_mode: runtime.me_bind_stale_mode, diff --git a/src/config/defaults.rs b/src/config/defaults.rs index ea9250d..7b5b4a8 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -27,8 +27,8 @@ const DEFAULT_ME_C2ME_CHANNEL_CAPACITY: usize = 1024; const DEFAULT_ME_READER_ROUTE_DATA_WAIT_MS: u64 = 2; const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_FRAMES: usize = 32; const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_BYTES: usize = 128 * 1024; -const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 1500; -const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = false; +const DEFAULT_ME_D2C_FLUSH_BATCH_MAX_DELAY_US: u64 = 500; +const DEFAULT_ME_D2C_ACK_FLUSH_IMMEDIATE: bool = true; const DEFAULT_DIRECT_RELAY_COPY_BUF_C2S_BYTES: usize = 64 * 1024; const DEFAULT_DIRECT_RELAY_COPY_BUF_S2C_BYTES: usize = 256 * 1024; const DEFAULT_ME_WRITER_PICK_SAMPLE_SIZE: u8 = 3; @@ -36,6 +36,11 @@ const DEFAULT_ME_HEALTH_INTERVAL_MS_UNHEALTHY: u64 = 1000; const DEFAULT_ME_HEALTH_INTERVAL_MS_HEALTHY: u64 = 3000; const DEFAULT_ME_ADMISSION_POLL_MS: u64 = 1000; const DEFAULT_ME_WARN_RATE_LIMIT_MS: u64 = 5000; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED: bool = true; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS: u64 = 30; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER: u8 = 1; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE: u16 = 8; +const DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS: u64 = 5000; const DEFAULT_USER_MAX_UNIQUE_IPS_WINDOW_SECS: u64 = 30; const DEFAULT_UPSTREAM_CONNECT_RETRY_ATTEMPTS: u32 = 2; const DEFAULT_UPSTREAM_UNHEALTHY_FAIL_THRESHOLD: u32 = 5; @@ -85,11 +90,11 @@ pub(crate) fn default_connect_timeout() -> u64 { } pub(crate) fn default_keepalive() -> u64 { - 60 + 15 } pub(crate) fn default_ack_timeout() -> u64 { - 300 + 90 } pub(crate) fn default_me_one_retry() -> u8 { 12 @@ -592,6 +597,26 @@ pub(crate) fn default_me_pool_drain_threshold() -> u64 { 128 } +pub(crate) fn default_me_pool_drain_soft_evict_enabled() -> bool { + DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_ENABLED +} + +pub(crate) fn default_me_pool_drain_soft_evict_grace_secs() -> u64 { + DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_GRACE_SECS +} + +pub(crate) fn default_me_pool_drain_soft_evict_per_writer() -> u8 { + DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_PER_WRITER +} + +pub(crate) fn default_me_pool_drain_soft_evict_budget_per_core() -> u16 { + DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_BUDGET_PER_CORE +} + +pub(crate) fn default_me_pool_drain_soft_evict_cooldown_ms() -> u64 { + DEFAULT_ME_POOL_DRAIN_SOFT_EVICT_COOLDOWN_MS +} + pub(crate) fn default_me_bind_stale_ttl_secs() -> u64 { default_me_pool_drain_ttl_secs() } diff --git a/src/config/hot_reload.rs b/src/config/hot_reload.rs index 6f07a4b..c0ca98d 100644 --- a/src/config/hot_reload.rs +++ b/src/config/hot_reload.rs @@ -56,6 +56,11 @@ pub struct HotFields { pub hardswap: bool, pub me_pool_drain_ttl_secs: u64, pub me_pool_drain_threshold: u64, + pub me_pool_drain_soft_evict_enabled: bool, + pub me_pool_drain_soft_evict_grace_secs: u64, + pub me_pool_drain_soft_evict_per_writer: u8, + pub me_pool_drain_soft_evict_budget_per_core: u16, + pub me_pool_drain_soft_evict_cooldown_ms: u64, pub me_pool_min_fresh_ratio: f32, pub me_reinit_drain_timeout_secs: u64, pub me_hardswap_warmup_delay_min_ms: u64, @@ -138,6 +143,15 @@ impl HotFields { hardswap: cfg.general.hardswap, me_pool_drain_ttl_secs: cfg.general.me_pool_drain_ttl_secs, me_pool_drain_threshold: cfg.general.me_pool_drain_threshold, + me_pool_drain_soft_evict_enabled: cfg.general.me_pool_drain_soft_evict_enabled, + me_pool_drain_soft_evict_grace_secs: cfg.general.me_pool_drain_soft_evict_grace_secs, + me_pool_drain_soft_evict_per_writer: cfg.general.me_pool_drain_soft_evict_per_writer, + me_pool_drain_soft_evict_budget_per_core: cfg + .general + .me_pool_drain_soft_evict_budget_per_core, + me_pool_drain_soft_evict_cooldown_ms: cfg + .general + .me_pool_drain_soft_evict_cooldown_ms, me_pool_min_fresh_ratio: cfg.general.me_pool_min_fresh_ratio, me_reinit_drain_timeout_secs: cfg.general.me_reinit_drain_timeout_secs, me_hardswap_warmup_delay_min_ms: cfg.general.me_hardswap_warmup_delay_min_ms, @@ -455,6 +469,15 @@ fn overlay_hot_fields(old: &ProxyConfig, new: &ProxyConfig) -> ProxyConfig { cfg.general.hardswap = new.general.hardswap; cfg.general.me_pool_drain_ttl_secs = new.general.me_pool_drain_ttl_secs; cfg.general.me_pool_drain_threshold = new.general.me_pool_drain_threshold; + cfg.general.me_pool_drain_soft_evict_enabled = new.general.me_pool_drain_soft_evict_enabled; + cfg.general.me_pool_drain_soft_evict_grace_secs = + new.general.me_pool_drain_soft_evict_grace_secs; + cfg.general.me_pool_drain_soft_evict_per_writer = + new.general.me_pool_drain_soft_evict_per_writer; + cfg.general.me_pool_drain_soft_evict_budget_per_core = + new.general.me_pool_drain_soft_evict_budget_per_core; + cfg.general.me_pool_drain_soft_evict_cooldown_ms = + new.general.me_pool_drain_soft_evict_cooldown_ms; cfg.general.me_pool_min_fresh_ratio = new.general.me_pool_min_fresh_ratio; cfg.general.me_reinit_drain_timeout_secs = new.general.me_reinit_drain_timeout_secs; cfg.general.me_hardswap_warmup_delay_min_ms = new.general.me_hardswap_warmup_delay_min_ms; @@ -835,6 +858,25 @@ fn log_changes( old_hot.me_pool_drain_threshold, new_hot.me_pool_drain_threshold, ); } + if old_hot.me_pool_drain_soft_evict_enabled != new_hot.me_pool_drain_soft_evict_enabled + || old_hot.me_pool_drain_soft_evict_grace_secs + != new_hot.me_pool_drain_soft_evict_grace_secs + || old_hot.me_pool_drain_soft_evict_per_writer + != new_hot.me_pool_drain_soft_evict_per_writer + || old_hot.me_pool_drain_soft_evict_budget_per_core + != new_hot.me_pool_drain_soft_evict_budget_per_core + || old_hot.me_pool_drain_soft_evict_cooldown_ms + != new_hot.me_pool_drain_soft_evict_cooldown_ms + { + info!( + "config reload: me_pool_drain_soft_evict: enabled={} grace={}s per_writer={} budget_per_core={} cooldown={}ms", + new_hot.me_pool_drain_soft_evict_enabled, + new_hot.me_pool_drain_soft_evict_grace_secs, + new_hot.me_pool_drain_soft_evict_per_writer, + new_hot.me_pool_drain_soft_evict_budget_per_core, + new_hot.me_pool_drain_soft_evict_cooldown_ms + ); + } if (old_hot.me_pool_min_fresh_ratio - new_hot.me_pool_min_fresh_ratio).abs() > f32::EPSILON { info!( diff --git a/src/config/load.rs b/src/config/load.rs index ed3e303..6fcbea3 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -406,6 +406,35 @@ impl ProxyConfig { )); } + if config.general.me_pool_drain_soft_evict_grace_secs > 3600 { + return Err(ProxyError::Config( + "general.me_pool_drain_soft_evict_grace_secs must be within [0, 3600]".to_string(), + )); + } + + if config.general.me_pool_drain_soft_evict_per_writer == 0 + || config.general.me_pool_drain_soft_evict_per_writer > 16 + { + return Err(ProxyError::Config( + "general.me_pool_drain_soft_evict_per_writer must be within [1, 16]".to_string(), + )); + } + + if config.general.me_pool_drain_soft_evict_budget_per_core == 0 + || config.general.me_pool_drain_soft_evict_budget_per_core > 64 + { + return Err(ProxyError::Config( + "general.me_pool_drain_soft_evict_budget_per_core must be within [1, 64]" + .to_string(), + )); + } + + if config.general.me_pool_drain_soft_evict_cooldown_ms == 0 { + return Err(ProxyError::Config( + "general.me_pool_drain_soft_evict_cooldown_ms must be > 0".to_string(), + )); + } + if config.access.user_max_unique_ips_window_secs == 0 { return Err(ProxyError::Config( "access.user_max_unique_ips_window_secs must be > 0".to_string(), diff --git a/src/config/types.rs b/src/config/types.rs index 7ea1fe7..e507044 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -803,6 +803,26 @@ pub struct GeneralConfig { #[serde(default = "default_me_pool_drain_threshold")] pub me_pool_drain_threshold: u64, + /// Enable staged client eviction for draining ME writers that remain non-empty past TTL. + #[serde(default = "default_me_pool_drain_soft_evict_enabled")] + pub me_pool_drain_soft_evict_enabled: bool, + + /// Extra grace in seconds after drain TTL before soft-eviction stage starts. + #[serde(default = "default_me_pool_drain_soft_evict_grace_secs")] + pub me_pool_drain_soft_evict_grace_secs: u64, + + /// Maximum number of client sessions to evict from one draining writer per health tick. + #[serde(default = "default_me_pool_drain_soft_evict_per_writer")] + pub me_pool_drain_soft_evict_per_writer: u8, + + /// Soft-eviction budget per CPU core for one health tick. + #[serde(default = "default_me_pool_drain_soft_evict_budget_per_core")] + pub me_pool_drain_soft_evict_budget_per_core: u16, + + /// Cooldown for repetitive soft-eviction on the same writer in milliseconds. + #[serde(default = "default_me_pool_drain_soft_evict_cooldown_ms")] + pub me_pool_drain_soft_evict_cooldown_ms: u64, + /// Policy for new binds on stale draining writers. #[serde(default)] pub me_bind_stale_mode: MeBindStaleMode, @@ -984,6 +1004,13 @@ impl Default for GeneralConfig { proxy_secret_len_max: default_proxy_secret_len_max(), me_pool_drain_ttl_secs: default_me_pool_drain_ttl_secs(), me_pool_drain_threshold: default_me_pool_drain_threshold(), + me_pool_drain_soft_evict_enabled: default_me_pool_drain_soft_evict_enabled(), + me_pool_drain_soft_evict_grace_secs: default_me_pool_drain_soft_evict_grace_secs(), + me_pool_drain_soft_evict_per_writer: default_me_pool_drain_soft_evict_per_writer(), + me_pool_drain_soft_evict_budget_per_core: + default_me_pool_drain_soft_evict_budget_per_core(), + me_pool_drain_soft_evict_cooldown_ms: + default_me_pool_drain_soft_evict_cooldown_ms(), me_bind_stale_mode: MeBindStaleMode::default(), me_bind_stale_ttl_secs: default_me_bind_stale_ttl_secs(), me_pool_min_fresh_ratio: default_me_pool_min_fresh_ratio(), diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 245c7a9..94ae884 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -238,6 +238,11 @@ pub(crate) async fn initialize_me_pool( config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_threshold, + config.general.me_pool_drain_soft_evict_enabled, + config.general.me_pool_drain_soft_evict_grace_secs, + config.general.me_pool_drain_soft_evict_per_writer, + config.general.me_pool_drain_soft_evict_budget_per_core, + config.general.me_pool_drain_soft_evict_cooldown_ms, config.general.effective_me_pool_force_close_secs(), config.general.me_pool_min_fresh_ratio, config.general.me_hardswap_warmup_delay_min_ms, diff --git a/src/maestro/mod.rs b/src/maestro/mod.rs index da00b40..dce421c 100644 --- a/src/maestro/mod.rs +++ b/src/maestro/mod.rs @@ -476,7 +476,7 @@ pub async fn run() -> std::result::Result<(), Box> { Duration::from_secs(config.access.replay_window_secs), )); - let buffer_pool = Arc::new(BufferPool::with_config(16 * 1024, 4096)); + let buffer_pool = Arc::new(BufferPool::with_config(64 * 1024, 4096)); connectivity::run_startup_connectivity( &config, diff --git a/src/metrics.rs b/src/metrics.rs index f4f8a2e..3de9896 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -292,6 +292,109 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp "telemt_connections_bad_total {}", if core_enabled { stats.get_connects_bad() } else { 0 } ); + let _ = writeln!(out, "# HELP telemt_connections_current Current active connections"); + let _ = writeln!(out, "# TYPE telemt_connections_current gauge"); + let _ = writeln!( + out, + "telemt_connections_current {}", + if core_enabled { + stats.get_current_connections_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_connections_direct_current Current active direct connections"); + let _ = writeln!(out, "# TYPE telemt_connections_direct_current gauge"); + let _ = writeln!( + out, + "telemt_connections_direct_current {}", + if core_enabled { + stats.get_current_connections_direct() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_connections_me_current Current active middle-end connections"); + let _ = writeln!(out, "# TYPE telemt_connections_me_current gauge"); + let _ = writeln!( + out, + "telemt_connections_me_current {}", + if core_enabled { + stats.get_current_connections_me() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_relay_adaptive_promotions_total Adaptive relay tier promotions" + ); + let _ = writeln!(out, "# TYPE telemt_relay_adaptive_promotions_total counter"); + let _ = writeln!( + out, + "telemt_relay_adaptive_promotions_total {}", + if core_enabled { + stats.get_relay_adaptive_promotions_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_relay_adaptive_demotions_total Adaptive relay tier demotions" + ); + let _ = writeln!(out, "# TYPE telemt_relay_adaptive_demotions_total counter"); + let _ = writeln!( + out, + "telemt_relay_adaptive_demotions_total {}", + if core_enabled { + stats.get_relay_adaptive_demotions_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_relay_adaptive_hard_promotions_total Adaptive relay hard promotions triggered by write pressure" + ); + let _ = writeln!( + out, + "# TYPE telemt_relay_adaptive_hard_promotions_total counter" + ); + let _ = writeln!( + out, + "telemt_relay_adaptive_hard_promotions_total {}", + if core_enabled { + stats.get_relay_adaptive_hard_promotions_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_reconnect_evict_total Reconnect-driven session evictions"); + let _ = writeln!(out, "# TYPE telemt_reconnect_evict_total counter"); + let _ = writeln!( + out, + "telemt_reconnect_evict_total {}", + if core_enabled { + stats.get_reconnect_evict_total() + } else { + 0 + } + ); + let _ = writeln!( + out, + "# HELP telemt_reconnect_stale_close_total Sessions closed because they became stale after reconnect" + ); + let _ = writeln!(out, "# TYPE telemt_reconnect_stale_close_total counter"); + let _ = writeln!( + out, + "telemt_reconnect_stale_close_total {}", + if core_enabled { + stats.get_reconnect_stale_close_total() + } else { + 0 + } + ); let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts"); let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); @@ -1547,6 +1650,36 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); + let _ = writeln!( + out, + "# HELP telemt_pool_drain_soft_evict_total Soft-evicted client sessions on stuck draining writers" + ); + let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_total counter"); + let _ = writeln!( + out, + "telemt_pool_drain_soft_evict_total {}", + if me_allows_normal { + stats.get_pool_drain_soft_evict_total() + } else { + 0 + } + ); + + let _ = writeln!( + out, + "# HELP telemt_pool_drain_soft_evict_writer_total Draining writers with at least one soft eviction" + ); + let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_writer_total counter"); + let _ = writeln!( + out, + "telemt_pool_drain_soft_evict_writer_total {}", + if me_allows_normal { + stats.get_pool_drain_soft_evict_writer_total() + } else { + 0 + } + ); + let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds"); let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter"); let _ = writeln!( @@ -1864,6 +1997,8 @@ mod tests { stats.increment_connects_all(); stats.increment_connects_all(); stats.increment_connects_bad(); + stats.increment_current_connections_direct(); + stats.increment_current_connections_me(); stats.increment_handshake_timeouts(); stats.increment_upstream_connect_attempt_total(); stats.increment_upstream_connect_attempt_total(); @@ -1895,6 +2030,9 @@ mod tests { assert!(output.contains("telemt_connections_total 2")); assert!(output.contains("telemt_connections_bad_total 1")); + assert!(output.contains("telemt_connections_current 2")); + assert!(output.contains("telemt_connections_direct_current 1")); + assert!(output.contains("telemt_connections_me_current 1")); assert!(output.contains("telemt_handshake_timeouts_total 1")); assert!(output.contains("telemt_upstream_connect_attempt_total 2")); assert!(output.contains("telemt_upstream_connect_success_total 1")); @@ -1937,6 +2075,9 @@ mod tests { let output = render_metrics(&stats, &config, &tracker).await; assert!(output.contains("telemt_connections_total 0")); assert!(output.contains("telemt_connections_bad_total 0")); + assert!(output.contains("telemt_connections_current 0")); + assert!(output.contains("telemt_connections_direct_current 0")); + assert!(output.contains("telemt_connections_me_current 0")); assert!(output.contains("telemt_handshake_timeouts_total 0")); assert!(output.contains("telemt_user_unique_ips_current{user=")); assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); @@ -1970,11 +2111,21 @@ mod tests { assert!(output.contains("# TYPE telemt_uptime_seconds gauge")); assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter")); + assert!(output.contains("# TYPE telemt_connections_current gauge")); + assert!(output.contains("# TYPE telemt_connections_direct_current gauge")); + assert!(output.contains("# TYPE telemt_connections_me_current gauge")); + assert!(output.contains("# TYPE telemt_relay_adaptive_promotions_total counter")); + assert!(output.contains("# TYPE telemt_relay_adaptive_demotions_total counter")); + assert!(output.contains("# TYPE telemt_relay_adaptive_hard_promotions_total counter")); + assert!(output.contains("# TYPE telemt_reconnect_evict_total counter")); + assert!(output.contains("# TYPE telemt_reconnect_stale_close_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); + assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); + assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains( "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" )); diff --git a/src/proxy/adaptive_buffers.rs b/src/proxy/adaptive_buffers.rs new file mode 100644 index 0000000..3b1bce9 --- /dev/null +++ b/src/proxy/adaptive_buffers.rs @@ -0,0 +1,383 @@ +use dashmap::DashMap; +use std::cmp::max; +use std::sync::OnceLock; +use std::time::{Duration, Instant}; + +const EMA_ALPHA: f64 = 0.2; +const PROFILE_TTL: Duration = Duration::from_secs(300); +const THROUGHPUT_UP_BPS: f64 = 8_000_000.0; +const THROUGHPUT_DOWN_BPS: f64 = 2_000_000.0; +const RATIO_CONFIRM_THRESHOLD: f64 = 1.12; +const TIER1_HOLD_TICKS: u32 = 8; +const TIER2_HOLD_TICKS: u32 = 4; +const QUIET_DEMOTE_TICKS: u32 = 480; +const HARD_COOLDOWN_TICKS: u32 = 20; +const HARD_PENDING_THRESHOLD: u32 = 3; +const HARD_PARTIAL_RATIO_THRESHOLD: f64 = 0.25; +const DIRECT_C2S_CAP_BYTES: usize = 128 * 1024; +const DIRECT_S2C_CAP_BYTES: usize = 512 * 1024; +const ME_FRAMES_CAP: usize = 96; +const ME_BYTES_CAP: usize = 384 * 1024; +const ME_DELAY_MIN_US: u64 = 150; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum AdaptiveTier { + Base = 0, + Tier1 = 1, + Tier2 = 2, + Tier3 = 3, +} + +impl AdaptiveTier { + pub fn promote(self) -> Self { + match self { + Self::Base => Self::Tier1, + Self::Tier1 => Self::Tier2, + Self::Tier2 => Self::Tier3, + Self::Tier3 => Self::Tier3, + } + } + + pub fn demote(self) -> Self { + match self { + Self::Base => Self::Base, + Self::Tier1 => Self::Base, + Self::Tier2 => Self::Tier1, + Self::Tier3 => Self::Tier2, + } + } + + fn ratio(self) -> (usize, usize) { + match self { + Self::Base => (1, 1), + Self::Tier1 => (5, 4), + Self::Tier2 => (3, 2), + Self::Tier3 => (2, 1), + } + } + + pub fn as_u8(self) -> u8 { + self as u8 + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TierTransitionReason { + SoftConfirmed, + HardPressure, + QuietDemotion, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TierTransition { + pub from: AdaptiveTier, + pub to: AdaptiveTier, + pub reason: TierTransitionReason, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct RelaySignalSample { + pub c2s_bytes: u64, + pub s2c_requested_bytes: u64, + pub s2c_written_bytes: u64, + pub s2c_write_ops: u64, + pub s2c_partial_writes: u64, + pub s2c_consecutive_pending_writes: u32, +} + +#[derive(Debug, Clone, Copy)] +pub struct SessionAdaptiveController { + tier: AdaptiveTier, + max_tier_seen: AdaptiveTier, + throughput_ema_bps: f64, + incoming_ema_bps: f64, + outgoing_ema_bps: f64, + tier1_hold_ticks: u32, + tier2_hold_ticks: u32, + quiet_ticks: u32, + hard_cooldown_ticks: u32, +} + +impl SessionAdaptiveController { + pub fn new(initial_tier: AdaptiveTier) -> Self { + Self { + tier: initial_tier, + max_tier_seen: initial_tier, + throughput_ema_bps: 0.0, + incoming_ema_bps: 0.0, + outgoing_ema_bps: 0.0, + tier1_hold_ticks: 0, + tier2_hold_ticks: 0, + quiet_ticks: 0, + hard_cooldown_ticks: 0, + } + } + + pub fn max_tier_seen(&self) -> AdaptiveTier { + self.max_tier_seen + } + + pub fn observe(&mut self, sample: RelaySignalSample, tick_secs: f64) -> Option { + if tick_secs <= f64::EPSILON { + return None; + } + + if self.hard_cooldown_ticks > 0 { + self.hard_cooldown_ticks -= 1; + } + + let c2s_bps = (sample.c2s_bytes as f64 * 8.0) / tick_secs; + let incoming_bps = (sample.s2c_requested_bytes as f64 * 8.0) / tick_secs; + let outgoing_bps = (sample.s2c_written_bytes as f64 * 8.0) / tick_secs; + let throughput = c2s_bps.max(outgoing_bps); + + self.throughput_ema_bps = ema(self.throughput_ema_bps, throughput); + self.incoming_ema_bps = ema(self.incoming_ema_bps, incoming_bps); + self.outgoing_ema_bps = ema(self.outgoing_ema_bps, outgoing_bps); + + let tier1_now = self.throughput_ema_bps >= THROUGHPUT_UP_BPS; + if tier1_now { + self.tier1_hold_ticks = self.tier1_hold_ticks.saturating_add(1); + } else { + self.tier1_hold_ticks = 0; + } + + let ratio = if self.outgoing_ema_bps <= f64::EPSILON { + 0.0 + } else { + self.incoming_ema_bps / self.outgoing_ema_bps + }; + let tier2_now = ratio >= RATIO_CONFIRM_THRESHOLD; + if tier2_now { + self.tier2_hold_ticks = self.tier2_hold_ticks.saturating_add(1); + } else { + self.tier2_hold_ticks = 0; + } + + let partial_ratio = if sample.s2c_write_ops == 0 { + 0.0 + } else { + sample.s2c_partial_writes as f64 / sample.s2c_write_ops as f64 + }; + let hard_now = sample.s2c_consecutive_pending_writes >= HARD_PENDING_THRESHOLD + || partial_ratio >= HARD_PARTIAL_RATIO_THRESHOLD; + + if hard_now && self.hard_cooldown_ticks == 0 { + return self.promote(TierTransitionReason::HardPressure, HARD_COOLDOWN_TICKS); + } + + if self.tier1_hold_ticks >= TIER1_HOLD_TICKS && self.tier2_hold_ticks >= TIER2_HOLD_TICKS { + return self.promote(TierTransitionReason::SoftConfirmed, 0); + } + + let demote_candidate = self.throughput_ema_bps < THROUGHPUT_DOWN_BPS && !tier2_now && !hard_now; + if demote_candidate { + self.quiet_ticks = self.quiet_ticks.saturating_add(1); + if self.quiet_ticks >= QUIET_DEMOTE_TICKS { + self.quiet_ticks = 0; + return self.demote(TierTransitionReason::QuietDemotion); + } + } else { + self.quiet_ticks = 0; + } + + None + } + + fn promote( + &mut self, + reason: TierTransitionReason, + hard_cooldown_ticks: u32, + ) -> Option { + let from = self.tier; + let to = from.promote(); + if from == to { + return None; + } + self.tier = to; + self.max_tier_seen = max(self.max_tier_seen, to); + self.hard_cooldown_ticks = hard_cooldown_ticks; + self.tier1_hold_ticks = 0; + self.tier2_hold_ticks = 0; + self.quiet_ticks = 0; + Some(TierTransition { from, to, reason }) + } + + fn demote(&mut self, reason: TierTransitionReason) -> Option { + let from = self.tier; + let to = from.demote(); + if from == to { + return None; + } + self.tier = to; + self.tier1_hold_ticks = 0; + self.tier2_hold_ticks = 0; + Some(TierTransition { from, to, reason }) + } +} + +#[derive(Debug, Clone, Copy)] +struct UserAdaptiveProfile { + tier: AdaptiveTier, + seen_at: Instant, +} + +fn profiles() -> &'static DashMap { + static USER_PROFILES: OnceLock> = OnceLock::new(); + USER_PROFILES.get_or_init(DashMap::new) +} + +pub fn seed_tier_for_user(user: &str) -> AdaptiveTier { + let now = Instant::now(); + if let Some(entry) = profiles().get(user) { + let value = entry.value(); + if now.duration_since(value.seen_at) <= PROFILE_TTL { + return value.tier; + } + } + AdaptiveTier::Base +} + +pub fn record_user_tier(user: &str, tier: AdaptiveTier) { + let now = Instant::now(); + if let Some(mut entry) = profiles().get_mut(user) { + let existing = *entry; + let effective = if now.duration_since(existing.seen_at) > PROFILE_TTL { + tier + } else { + max(existing.tier, tier) + }; + *entry = UserAdaptiveProfile { + tier: effective, + seen_at: now, + }; + return; + } + profiles().insert( + user.to_string(), + UserAdaptiveProfile { tier, seen_at: now }, + ); +} + +pub fn direct_copy_buffers_for_tier( + tier: AdaptiveTier, + base_c2s: usize, + base_s2c: usize, +) -> (usize, usize) { + let (num, den) = tier.ratio(); + ( + scale(base_c2s, num, den, DIRECT_C2S_CAP_BYTES), + scale(base_s2c, num, den, DIRECT_S2C_CAP_BYTES), + ) +} + +pub fn me_flush_policy_for_tier( + tier: AdaptiveTier, + base_frames: usize, + base_bytes: usize, + base_delay: Duration, +) -> (usize, usize, Duration) { + let (num, den) = tier.ratio(); + let frames = scale(base_frames, num, den, ME_FRAMES_CAP).max(1); + let bytes = scale(base_bytes, num, den, ME_BYTES_CAP).max(4096); + let delay_us = base_delay.as_micros() as u64; + let adjusted_delay_us = match tier { + AdaptiveTier::Base => delay_us, + AdaptiveTier::Tier1 => (delay_us.saturating_mul(7)).saturating_div(10), + AdaptiveTier::Tier2 => delay_us.saturating_div(2), + AdaptiveTier::Tier3 => (delay_us.saturating_mul(3)).saturating_div(10), + } + .max(ME_DELAY_MIN_US) + .min(delay_us.max(ME_DELAY_MIN_US)); + (frames, bytes, Duration::from_micros(adjusted_delay_us)) +} + +fn ema(prev: f64, value: f64) -> f64 { + if prev <= f64::EPSILON { + value + } else { + (prev * (1.0 - EMA_ALPHA)) + (value * EMA_ALPHA) + } +} + +fn scale(base: usize, numerator: usize, denominator: usize, cap: usize) -> usize { + let scaled = base + .saturating_mul(numerator) + .saturating_div(denominator.max(1)); + scaled.min(cap).max(1) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample( + c2s_bytes: u64, + s2c_requested_bytes: u64, + s2c_written_bytes: u64, + s2c_write_ops: u64, + s2c_partial_writes: u64, + s2c_consecutive_pending_writes: u32, + ) -> RelaySignalSample { + RelaySignalSample { + c2s_bytes, + s2c_requested_bytes, + s2c_written_bytes, + s2c_write_ops, + s2c_partial_writes, + s2c_consecutive_pending_writes, + } + } + + #[test] + fn test_soft_promotion_requires_tier1_and_tier2() { + let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Base); + let tick_secs = 0.25; + let mut promoted = None; + for _ in 0..8 { + promoted = ctrl.observe( + sample( + 300_000, // ~9.6 Mbps + 320_000, // incoming > outgoing to confirm tier2 + 250_000, + 10, + 0, + 0, + ), + tick_secs, + ); + } + + let transition = promoted.expect("expected soft promotion"); + assert_eq!(transition.from, AdaptiveTier::Base); + assert_eq!(transition.to, AdaptiveTier::Tier1); + assert_eq!(transition.reason, TierTransitionReason::SoftConfirmed); + } + + #[test] + fn test_hard_promotion_on_pending_pressure() { + let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Base); + let transition = ctrl + .observe( + sample(10_000, 20_000, 10_000, 4, 1, 3), + 0.25, + ) + .expect("expected hard promotion"); + assert_eq!(transition.reason, TierTransitionReason::HardPressure); + assert_eq!(transition.to, AdaptiveTier::Tier1); + } + + #[test] + fn test_quiet_demotion_is_slow_and_stepwise() { + let mut ctrl = SessionAdaptiveController::new(AdaptiveTier::Tier2); + let mut demotion = None; + for _ in 0..QUIET_DEMOTE_TICKS { + demotion = ctrl.observe(sample(1, 1, 1, 1, 0, 0), 0.25); + } + + let transition = demotion.expect("expected quiet demotion"); + assert_eq!(transition.from, AdaptiveTier::Tier2); + assert_eq!(transition.to, AdaptiveTier::Tier1); + assert_eq!(transition.reason, TierTransitionReason::QuietDemotion); + } +} diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 99e6837..25e6cf9 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -40,6 +40,7 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle use crate::proxy::masking::handle_bad_client; use crate::proxy::middle_relay::handle_via_middle_proxy; use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; +use crate::proxy::session_eviction::register_session; fn beobachten_ttl(config: &ProxyConfig) -> Duration { Duration::from_secs(config.general.beobachten_minutes.saturating_mul(60)) @@ -731,6 +732,17 @@ impl RunningClientHandler { return Err(e); } + let registration = register_session(&user, success.dc_idx); + if registration.replaced_existing { + stats.increment_reconnect_evict_total(); + warn!( + user = %user, + dc = success.dc_idx, + "Reconnect detected: replacing active session for user+dc" + ); + } + let session_lease = registration.lease; + let route_snapshot = route_runtime.snapshot(); let session_id = rng.u64(); let relay_result = if config.general.use_middle_proxy @@ -750,6 +762,7 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, + session_lease.clone(), ) .await } else { @@ -766,6 +779,7 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, + session_lease.clone(), ) .await } @@ -783,6 +797,7 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, + session_lease.clone(), ) .await }; diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 7a7810a..b7a1fbf 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -18,6 +18,8 @@ use crate::proxy::route_mode::{ RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state, cutover_stagger_delay, }; +use crate::proxy::adaptive_buffers; +use crate::proxy::session_eviction::SessionLease; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::UpstreamManager; @@ -34,6 +36,7 @@ pub(crate) async fn handle_via_direct( mut route_rx: watch::Receiver, route_snapshot: RouteCutoverState, session_id: u64, + session_lease: SessionLease, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -67,16 +70,26 @@ where stats.increment_user_curr_connects(user); stats.increment_current_connections_direct(); + let seed_tier = adaptive_buffers::seed_tier_for_user(user); + let (c2s_copy_buf, s2c_copy_buf) = adaptive_buffers::direct_copy_buffers_for_tier( + seed_tier, + config.general.direct_relay_copy_buf_c2s_bytes, + config.general.direct_relay_copy_buf_s2c_bytes, + ); + let relay_result = relay_bidirectional( client_reader, client_writer, tg_reader, tg_writer, - config.general.direct_relay_copy_buf_c2s_bytes, - config.general.direct_relay_copy_buf_s2c_bytes, + c2s_copy_buf, + s2c_copy_buf, user, + success.dc_idx, Arc::clone(&stats), buffer_pool, + session_lease, + seed_tier, ); tokio::pin!(relay_result); let relay_result = loop { diff --git a/src/proxy/middle_relay.rs b/src/proxy/middle_relay.rs index aaae1b3..4f70a17 100644 --- a/src/proxy/middle_relay.rs +++ b/src/proxy/middle_relay.rs @@ -20,6 +20,8 @@ use crate::proxy::route_mode::{ RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state, cutover_stagger_delay, }; +use crate::proxy::adaptive_buffers::{self, AdaptiveTier}; +use crate::proxy::session_eviction::SessionLease; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::middle_proxy::{MePool, MeResponse, proto_flags_for_tag}; @@ -59,8 +61,8 @@ struct MeD2cFlushPolicy { } impl MeD2cFlushPolicy { - fn from_config(config: &ProxyConfig) -> Self { - Self { + fn from_config(config: &ProxyConfig, tier: AdaptiveTier) -> Self { + let base = Self { max_frames: config .general .me_d2c_flush_batch_max_frames @@ -71,6 +73,18 @@ impl MeD2cFlushPolicy { .max(ME_D2C_FLUSH_BATCH_MAX_BYTES_MIN), max_delay: Duration::from_micros(config.general.me_d2c_flush_batch_max_delay_us), ack_flush_immediate: config.general.me_d2c_ack_flush_immediate, + }; + let (max_frames, max_bytes, max_delay) = adaptive_buffers::me_flush_policy_for_tier( + tier, + base.max_frames, + base.max_bytes, + base.max_delay, + ); + Self { + max_frames, + max_bytes, + max_delay, + ack_flush_immediate: base.ack_flush_immediate, } } } @@ -235,6 +249,7 @@ pub(crate) async fn handle_via_middle_proxy( mut route_rx: watch::Receiver, route_snapshot: RouteCutoverState, session_id: u64, + session_lease: SessionLease, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -244,6 +259,7 @@ where let peer = success.peer; let proto_tag = success.proto_tag; let pool_generation = me_pool.current_generation(); + let seed_tier = adaptive_buffers::seed_tier_for_user(&user); debug!( user = %user, @@ -295,6 +311,15 @@ where return Err(ProxyError::Proxy(ROUTE_SWITCH_ERROR_MSG.to_string())); } + if session_lease.is_stale() { + stats.increment_reconnect_stale_close_total(); + let _ = me_pool.send_close(conn_id).await; + me_pool.registry().unregister(conn_id).await; + stats.decrement_current_connections_me(); + stats.decrement_user_curr_connects(&user); + return Err(ProxyError::Proxy("Session evicted by reconnect".to_string())); + } + // Per-user ad_tag from access.user_ad_tags; fallback to general.ad_tag (hot-reloadable) let user_tag: Option> = config .access @@ -368,7 +393,7 @@ where let rng_clone = rng.clone(); let user_clone = user.clone(); let bytes_me2c_clone = bytes_me2c.clone(); - let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config); + let d2c_flush_policy = MeD2cFlushPolicy::from_config(&config, seed_tier); let me_writer = tokio::spawn(async move { let mut writer = crypto_writer; let mut frame_buf = Vec::with_capacity(16 * 1024); @@ -528,6 +553,12 @@ where let mut frame_counter: u64 = 0; let mut route_watch_open = true; loop { + if session_lease.is_stale() { + stats.increment_reconnect_stale_close_total(); + let _ = enqueue_c2me_command(&c2me_tx, C2MeCommand::Close).await; + main_result = Err(ProxyError::Proxy("Session evicted by reconnect".to_string())); + break; + } if let Some(cutover) = affected_cutover_state( &route_rx, RelayRouteMode::Middle, @@ -636,6 +667,7 @@ where frames_ok = frame_counter, "ME relay cleanup" ); + adaptive_buffers::record_user_tier(&user, seed_tier); me_pool.registry().unregister(conn_id).await; stats.decrement_current_connections_me(); stats.decrement_user_curr_connects(&user); diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 1eed469..ab840f6 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -1,5 +1,6 @@ //! Proxy Defs +pub mod adaptive_buffers; pub mod client; pub mod direct_relay; pub mod handshake; @@ -7,6 +8,7 @@ pub mod masking; pub mod middle_relay; pub mod route_mode; pub mod relay; +pub mod session_eviction; pub use client::ClientHandler; #[allow(unused_imports)] diff --git a/src/proxy/relay.rs b/src/proxy/relay.rs index 06ce0d8..2b12d5a 100644 --- a/src/proxy/relay.rs +++ b/src/proxy/relay.rs @@ -63,6 +63,10 @@ use tokio::io::{ use tokio::time::Instant; use tracing::{debug, trace, warn}; use crate::error::Result; +use crate::proxy::adaptive_buffers::{ + self, AdaptiveTier, RelaySignalSample, SessionAdaptiveController, TierTransitionReason, +}; +use crate::proxy::session_eviction::SessionLease; use crate::stats::Stats; use crate::stream::BufferPool; @@ -79,6 +83,7 @@ const ACTIVITY_TIMEOUT: Duration = Duration::from_secs(1800); /// 10 seconds gives responsive timeout detection (±10s accuracy) /// without measurable overhead from atomic reads. const WATCHDOG_INTERVAL: Duration = Duration::from_secs(10); +const ADAPTIVE_TICK: Duration = Duration::from_millis(250); // ============= CombinedStream ============= @@ -155,6 +160,16 @@ struct SharedCounters { s2c_ops: AtomicU64, /// Milliseconds since relay epoch of last I/O activity last_activity_ms: AtomicU64, + /// Bytes requested to write to client (S→C direction). + s2c_requested_bytes: AtomicU64, + /// Total write operations for S→C direction. + s2c_write_ops: AtomicU64, + /// Number of partial writes to client. + s2c_partial_writes: AtomicU64, + /// Number of times S→C poll_write returned Pending. + s2c_pending_writes: AtomicU64, + /// Consecutive pending writes in S→C direction. + s2c_consecutive_pending_writes: AtomicU64, } impl SharedCounters { @@ -165,6 +180,11 @@ impl SharedCounters { c2s_ops: AtomicU64::new(0), s2c_ops: AtomicU64::new(0), last_activity_ms: AtomicU64::new(0), + s2c_requested_bytes: AtomicU64::new(0), + s2c_write_ops: AtomicU64::new(0), + s2c_partial_writes: AtomicU64::new(0), + s2c_pending_writes: AtomicU64::new(0), + s2c_consecutive_pending_writes: AtomicU64::new(0), } } @@ -259,9 +279,21 @@ impl AsyncWrite for StatsIo { buf: &[u8], ) -> Poll> { let this = self.get_mut(); + this.counters + .s2c_requested_bytes + .fetch_add(buf.len() as u64, Ordering::Relaxed); match Pin::new(&mut this.inner).poll_write(cx, buf) { Poll::Ready(Ok(n)) => { + this.counters.s2c_write_ops.fetch_add(1, Ordering::Relaxed); + this.counters + .s2c_consecutive_pending_writes + .store(0, Ordering::Relaxed); + if n < buf.len() { + this.counters + .s2c_partial_writes + .fetch_add(1, Ordering::Relaxed); + } if n > 0 { // S→C: data written to client this.counters.s2c_bytes.fetch_add(n as u64, Ordering::Relaxed); @@ -275,6 +307,15 @@ impl AsyncWrite for StatsIo { } Poll::Ready(Ok(n)) } + Poll::Pending => { + this.counters + .s2c_pending_writes + .fetch_add(1, Ordering::Relaxed); + this.counters + .s2c_consecutive_pending_writes + .fetch_add(1, Ordering::Relaxed); + Poll::Pending + } other => other, } } @@ -316,8 +357,11 @@ pub async fn relay_bidirectional( c2s_buf_size: usize, s2c_buf_size: usize, user: &str, + dc_idx: i16, stats: Arc, _buffer_pool: Arc, + session_lease: SessionLease, + seed_tier: AdaptiveTier, ) -> Result<()> where CR: AsyncRead + Unpin + Send + 'static, @@ -345,13 +389,33 @@ where // ── Watchdog: activity timeout + periodic rate logging ────────── let wd_counters = Arc::clone(&counters); let wd_user = user_owned.clone(); + let wd_dc = dc_idx; + let wd_stats = Arc::clone(&stats); + let wd_session = session_lease.clone(); let watchdog = async { - let mut prev_c2s: u64 = 0; - let mut prev_s2c: u64 = 0; + let mut prev_c2s_log: u64 = 0; + let mut prev_s2c_log: u64 = 0; + let mut prev_c2s_sample: u64 = 0; + let mut prev_s2c_requested_sample: u64 = 0; + let mut prev_s2c_written_sample: u64 = 0; + let mut prev_s2c_write_ops_sample: u64 = 0; + let mut prev_s2c_partial_sample: u64 = 0; + let mut accumulated_log = Duration::ZERO; + let mut adaptive = SessionAdaptiveController::new(seed_tier); loop { - tokio::time::sleep(WATCHDOG_INTERVAL).await; + tokio::time::sleep(ADAPTIVE_TICK).await; + + if wd_session.is_stale() { + wd_stats.increment_reconnect_stale_close_total(); + warn!( + user = %wd_user, + dc = wd_dc, + "Session evicted by reconnect" + ); + return; + } let now = Instant::now(); let idle = wd_counters.idle_duration(now, epoch); @@ -370,11 +434,80 @@ where return; // Causes select! to cancel copy_bidirectional } + let c2s_total = wd_counters.c2s_bytes.load(Ordering::Relaxed); + let s2c_requested_total = wd_counters + .s2c_requested_bytes + .load(Ordering::Relaxed); + let s2c_written_total = wd_counters.s2c_bytes.load(Ordering::Relaxed); + let s2c_write_ops_total = wd_counters + .s2c_write_ops + .load(Ordering::Relaxed); + let s2c_partial_total = wd_counters + .s2c_partial_writes + .load(Ordering::Relaxed); + let consecutive_pending = wd_counters + .s2c_consecutive_pending_writes + .load(Ordering::Relaxed) as u32; + + let sample = RelaySignalSample { + c2s_bytes: c2s_total.saturating_sub(prev_c2s_sample), + s2c_requested_bytes: s2c_requested_total + .saturating_sub(prev_s2c_requested_sample), + s2c_written_bytes: s2c_written_total + .saturating_sub(prev_s2c_written_sample), + s2c_write_ops: s2c_write_ops_total + .saturating_sub(prev_s2c_write_ops_sample), + s2c_partial_writes: s2c_partial_total + .saturating_sub(prev_s2c_partial_sample), + s2c_consecutive_pending_writes: consecutive_pending, + }; + + if let Some(transition) = adaptive.observe(sample, ADAPTIVE_TICK.as_secs_f64()) { + match transition.reason { + TierTransitionReason::SoftConfirmed => { + wd_stats.increment_relay_adaptive_promotions_total(); + } + TierTransitionReason::HardPressure => { + wd_stats.increment_relay_adaptive_promotions_total(); + wd_stats.increment_relay_adaptive_hard_promotions_total(); + } + TierTransitionReason::QuietDemotion => { + wd_stats.increment_relay_adaptive_demotions_total(); + } + } + adaptive_buffers::record_user_tier(&wd_user, adaptive.max_tier_seen()); + debug!( + user = %wd_user, + dc = wd_dc, + from_tier = transition.from.as_u8(), + to_tier = transition.to.as_u8(), + reason = ?transition.reason, + throughput_ema_bps = sample + .c2s_bytes + .max(sample.s2c_written_bytes) + .saturating_mul(8) + .saturating_mul(4), + "Adaptive relay tier transition" + ); + } + + prev_c2s_sample = c2s_total; + prev_s2c_requested_sample = s2c_requested_total; + prev_s2c_written_sample = s2c_written_total; + prev_s2c_write_ops_sample = s2c_write_ops_total; + prev_s2c_partial_sample = s2c_partial_total; + + accumulated_log = accumulated_log.saturating_add(ADAPTIVE_TICK); + if accumulated_log < WATCHDOG_INTERVAL { + continue; + } + accumulated_log = Duration::ZERO; + // ── Periodic rate logging ─────────────────────────────── let c2s = wd_counters.c2s_bytes.load(Ordering::Relaxed); let s2c = wd_counters.s2c_bytes.load(Ordering::Relaxed); - let c2s_delta = c2s - prev_c2s; - let s2c_delta = s2c - prev_s2c; + let c2s_delta = c2s.saturating_sub(prev_c2s_log); + let s2c_delta = s2c.saturating_sub(prev_s2c_log); if c2s_delta > 0 || s2c_delta > 0 { let secs = WATCHDOG_INTERVAL.as_secs_f64(); @@ -388,8 +521,8 @@ where ); } - prev_c2s = c2s; - prev_s2c = s2c; + prev_c2s_log = c2s; + prev_s2c_log = s2c; } }; @@ -424,6 +557,7 @@ where let c2s_ops = counters.c2s_ops.load(Ordering::Relaxed); let s2c_ops = counters.s2c_ops.load(Ordering::Relaxed); let duration = epoch.elapsed(); + adaptive_buffers::record_user_tier(&user_owned, seed_tier); match copy_result { Some(Ok((c2s, s2c))) => { diff --git a/src/proxy/session_eviction.rs b/src/proxy/session_eviction.rs new file mode 100644 index 0000000..c735cae --- /dev/null +++ b/src/proxy/session_eviction.rs @@ -0,0 +1,46 @@ +/// Session eviction is intentionally disabled in runtime. +/// +/// The initial `user+dc` single-lease model caused valid parallel client +/// connections to evict each other. Keep the API shape for compatibility, +/// but make it a no-op until a safer policy is introduced. + +#[derive(Debug, Clone, Default)] +pub struct SessionLease; + +impl SessionLease { + pub fn is_stale(&self) -> bool { + false + } + + #[allow(dead_code)] + pub fn release(&self) {} +} + +pub struct RegistrationResult { + pub lease: SessionLease, + pub replaced_existing: bool, +} + +pub fn register_session(_user: &str, _dc_idx: i16) -> RegistrationResult { + RegistrationResult { + lease: SessionLease, + replaced_existing: false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_session_eviction_disabled_behavior() { + let first = register_session("alice", 2); + let second = register_session("alice", 2); + assert!(!first.replaced_existing); + assert!(!second.replaced_existing); + assert!(!first.lease.is_stale()); + assert!(!second.lease.is_stale()); + first.lease.release(); + second.lease.release(); + } +} diff --git a/src/stats/mod.rs b/src/stats/mod.rs index 25905b2..83cd03d 100644 --- a/src/stats/mod.rs +++ b/src/stats/mod.rs @@ -120,6 +120,8 @@ pub struct Stats { pool_swap_total: AtomicU64, pool_drain_active: AtomicU64, pool_force_close_total: AtomicU64, + pool_drain_soft_evict_total: AtomicU64, + pool_drain_soft_evict_writer_total: AtomicU64, pool_stale_pick_total: AtomicU64, me_writer_removed_total: AtomicU64, me_writer_removed_unexpected_total: AtomicU64, @@ -133,6 +135,11 @@ pub struct Stats { me_inline_recovery_total: AtomicU64, ip_reservation_rollback_tcp_limit_total: AtomicU64, ip_reservation_rollback_quota_limit_total: AtomicU64, + relay_adaptive_promotions_total: AtomicU64, + relay_adaptive_demotions_total: AtomicU64, + relay_adaptive_hard_promotions_total: AtomicU64, + reconnect_evict_total: AtomicU64, + reconnect_stale_close_total: AtomicU64, telemetry_core_enabled: AtomicBool, telemetry_user_enabled: AtomicBool, telemetry_me_level: AtomicU8, @@ -285,6 +292,36 @@ impl Stats { pub fn decrement_current_connections_me(&self) { Self::decrement_atomic_saturating(&self.current_connections_me); } + pub fn increment_relay_adaptive_promotions_total(&self) { + if self.telemetry_core_enabled() { + self.relay_adaptive_promotions_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_relay_adaptive_demotions_total(&self) { + if self.telemetry_core_enabled() { + self.relay_adaptive_demotions_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_relay_adaptive_hard_promotions_total(&self) { + if self.telemetry_core_enabled() { + self.relay_adaptive_hard_promotions_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_reconnect_evict_total(&self) { + if self.telemetry_core_enabled() { + self.reconnect_evict_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_reconnect_stale_close_total(&self) { + if self.telemetry_core_enabled() { + self.reconnect_stale_close_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_handshake_timeouts(&self) { if self.telemetry_core_enabled() { self.handshake_timeouts.fetch_add(1, Ordering::Relaxed); @@ -680,6 +717,18 @@ impl Stats { self.pool_force_close_total.fetch_add(1, Ordering::Relaxed); } } + pub fn increment_pool_drain_soft_evict_total(&self) { + if self.telemetry_me_allows_normal() { + self.pool_drain_soft_evict_total + .fetch_add(1, Ordering::Relaxed); + } + } + pub fn increment_pool_drain_soft_evict_writer_total(&self) { + if self.telemetry_me_allows_normal() { + self.pool_drain_soft_evict_writer_total + .fetch_add(1, Ordering::Relaxed); + } + } pub fn increment_pool_stale_pick_total(&self) { if self.telemetry_me_allows_normal() { self.pool_stale_pick_total.fetch_add(1, Ordering::Relaxed); @@ -933,6 +982,22 @@ impl Stats { self.get_current_connections_direct() .saturating_add(self.get_current_connections_me()) } + pub fn get_relay_adaptive_promotions_total(&self) -> u64 { + self.relay_adaptive_promotions_total.load(Ordering::Relaxed) + } + pub fn get_relay_adaptive_demotions_total(&self) -> u64 { + self.relay_adaptive_demotions_total.load(Ordering::Relaxed) + } + pub fn get_relay_adaptive_hard_promotions_total(&self) -> u64 { + self.relay_adaptive_hard_promotions_total + .load(Ordering::Relaxed) + } + pub fn get_reconnect_evict_total(&self) -> u64 { + self.reconnect_evict_total.load(Ordering::Relaxed) + } + pub fn get_reconnect_stale_close_total(&self) -> u64 { + self.reconnect_stale_close_total.load(Ordering::Relaxed) + } pub fn get_me_keepalive_sent(&self) -> u64 { self.me_keepalive_sent.load(Ordering::Relaxed) } pub fn get_me_keepalive_failed(&self) -> u64 { self.me_keepalive_failed.load(Ordering::Relaxed) } pub fn get_me_keepalive_pong(&self) -> u64 { self.me_keepalive_pong.load(Ordering::Relaxed) } @@ -1185,6 +1250,12 @@ impl Stats { pub fn get_pool_force_close_total(&self) -> u64 { self.pool_force_close_total.load(Ordering::Relaxed) } + pub fn get_pool_drain_soft_evict_total(&self) -> u64 { + self.pool_drain_soft_evict_total.load(Ordering::Relaxed) + } + pub fn get_pool_drain_soft_evict_writer_total(&self) -> u64 { + self.pool_drain_soft_evict_writer_total.load(Ordering::Relaxed) + } pub fn get_pool_stale_pick_total(&self) -> u64 { self.pool_stale_pick_total.load(Ordering::Relaxed) } @@ -1258,6 +1329,9 @@ impl Stats { } pub fn decrement_user_curr_connects(&self, user: &str) { + if !self.telemetry_user_enabled() { + return; + } self.maybe_cleanup_user_stats(); if let Some(stats) = self.user_stats.get(user) { Self::touch_user_stats(stats.value()); diff --git a/src/stream/buffer_pool.rs b/src/stream/buffer_pool.rs index 9c46922..dac0fb5 100644 --- a/src/stream/buffer_pool.rs +++ b/src/stream/buffer_pool.rs @@ -14,8 +14,7 @@ use std::sync::Arc; // ============= Configuration ============= /// Default buffer size -/// CHANGED: Reduced from 64KB to 16KB to match TLS record size and prevent bufferbloat. -pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024; +pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024; /// Default maximum number of pooled buffers pub const DEFAULT_MAX_BUFFERS: usize = 1024; diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index b6a0160..43a3569 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -299,6 +299,11 @@ async fn run_update_cycle( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_threshold, + cfg.general.me_pool_drain_soft_evict_enabled, + cfg.general.me_pool_drain_soft_evict_grace_secs, + cfg.general.me_pool_drain_soft_evict_per_writer, + cfg.general.me_pool_drain_soft_evict_budget_per_core, + cfg.general.me_pool_drain_soft_evict_cooldown_ms, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, cfg.general.me_hardswap_warmup_delay_min_ms, @@ -526,6 +531,11 @@ pub async fn me_config_updater( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_threshold, + cfg.general.me_pool_drain_soft_evict_enabled, + cfg.general.me_pool_drain_soft_evict_grace_secs, + cfg.general.me_pool_drain_soft_evict_per_writer, + cfg.general.me_pool_drain_soft_evict_budget_per_core, + cfg.general.me_pool_drain_soft_evict_cooldown_ms, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, cfg.general.me_hardswap_warmup_delay_min_ms, diff --git a/src/transport/middle_proxy/health.rs b/src/transport/middle_proxy/health.rs index 8ac6839..862e58a 100644 --- a/src/transport/middle_proxy/health.rs +++ b/src/transport/middle_proxy/health.rs @@ -28,6 +28,8 @@ const HEALTH_RECONNECT_BUDGET_MAX: usize = 128; const HEALTH_DRAIN_CLOSE_BUDGET_PER_CORE: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MIN: usize = 16; const HEALTH_DRAIN_CLOSE_BUDGET_MAX: usize = 256; +const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN: usize = 8; +const HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX: usize = 256; #[derive(Debug, Clone)] struct DcFloorPlanEntry { @@ -66,6 +68,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c let mut adaptive_recover_until: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut floor_warn_next_allowed: HashMap<(i32, IpFamily), Instant> = HashMap::new(); let mut drain_warn_next_allowed: HashMap = HashMap::new(); + let mut drain_soft_evict_next_allowed: HashMap = HashMap::new(); let mut degraded_interval = true; loop { let interval = if degraded_interval { @@ -75,7 +78,12 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c }; tokio::time::sleep(interval).await; pool.prune_closed_writers().await; - reap_draining_writers(&pool, &mut drain_warn_next_allowed).await; + reap_draining_writers( + &pool, + &mut drain_warn_next_allowed, + &mut drain_soft_evict_next_allowed, + ) + .await; let v4_degraded = check_family( IpFamily::V4, &pool, @@ -117,6 +125,7 @@ pub async fn me_health_monitor(pool: Arc, rng: Arc, _min_c pub(super) async fn reap_draining_writers( pool: &Arc, warn_next_allowed: &mut HashMap, + soft_evict_next_allowed: &mut HashMap, ) { let now_epoch_secs = MePool::now_epoch_secs(); let now = Instant::now(); @@ -172,7 +181,7 @@ pub(super) async fn reap_draining_writers( } let mut active_draining_writer_ids = HashSet::with_capacity(draining_writers.len()); - for writer in draining_writers { + for writer in &draining_writers { active_draining_writer_ids.insert(writer.id); let drain_started_at_epoch_secs = writer .draining_started_at_epoch_secs @@ -209,6 +218,86 @@ pub(super) async fn reap_draining_writers( } warn_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); + soft_evict_next_allowed.retain(|writer_id, _| active_draining_writer_ids.contains(writer_id)); + + if pool.drain_soft_evict_enabled() && drain_ttl_secs > 0 && !draining_writers.is_empty() { + let mut force_close_ids = HashSet::::with_capacity(force_close_writer_ids.len()); + for writer_id in &force_close_writer_ids { + force_close_ids.insert(*writer_id); + } + let soft_grace_secs = pool.drain_soft_evict_grace_secs(); + let soft_trigger_age_secs = drain_ttl_secs.saturating_add(soft_grace_secs); + let per_writer_limit = pool.drain_soft_evict_per_writer(); + let soft_budget = health_drain_soft_evict_budget(pool); + let soft_cooldown = pool.drain_soft_evict_cooldown(); + let mut soft_evicted_total = 0usize; + + for writer in &draining_writers { + if soft_evicted_total >= soft_budget { + break; + } + if force_close_ids.contains(&writer.id) { + continue; + } + if pool.writer_accepts_new_binding(writer) { + continue; + } + let started_epoch_secs = writer + .draining_started_at_epoch_secs + .load(std::sync::atomic::Ordering::Relaxed); + if started_epoch_secs == 0 + || now_epoch_secs.saturating_sub(started_epoch_secs) < soft_trigger_age_secs + { + continue; + } + if !should_emit_writer_warn( + soft_evict_next_allowed, + writer.id, + now, + soft_cooldown, + ) { + continue; + } + + let remaining_budget = soft_budget.saturating_sub(soft_evicted_total); + let limit = per_writer_limit.min(remaining_budget); + if limit == 0 { + break; + } + let conn_ids = pool + .registry + .bound_conn_ids_for_writer_limited(writer.id, limit) + .await; + if conn_ids.is_empty() { + continue; + } + + let mut evicted_for_writer = 0usize; + for conn_id in conn_ids { + if pool.registry.evict_bound_conn_if_writer(conn_id, writer.id).await { + evicted_for_writer = evicted_for_writer.saturating_add(1); + soft_evicted_total = soft_evicted_total.saturating_add(1); + pool.stats.increment_pool_drain_soft_evict_total(); + if soft_evicted_total >= soft_budget { + break; + } + } + } + + if evicted_for_writer > 0 { + pool.stats.increment_pool_drain_soft_evict_writer_total(); + info!( + writer_id = writer.id, + writer_dc = writer.writer_dc, + endpoint = %writer.addr, + drained_connections = evicted_for_writer, + soft_budget, + soft_trigger_age_secs, + "ME draining writer soft-evicted bound clients" + ); + } + } + } let close_budget = health_drain_close_budget(); let requested_force_close = force_close_writer_ids.len(); @@ -258,6 +347,19 @@ pub(super) fn health_drain_close_budget() -> usize { .clamp(HEALTH_DRAIN_CLOSE_BUDGET_MIN, HEALTH_DRAIN_CLOSE_BUDGET_MAX) } +pub(super) fn health_drain_soft_evict_budget(pool: &MePool) -> usize { + let cpu_cores = std::thread::available_parallelism() + .map(std::num::NonZeroUsize::get) + .unwrap_or(1); + let per_core = pool.drain_soft_evict_budget_per_core(); + cpu_cores + .saturating_mul(per_core) + .clamp( + HEALTH_DRAIN_SOFT_EVICT_BUDGET_MIN, + HEALTH_DRAIN_SOFT_EVICT_BUDGET_MAX, + ) +} + fn should_emit_writer_warn( next_allowed: &mut HashMap, writer_id: u64, @@ -1443,6 +1545,11 @@ mod tests { general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, + general.me_pool_drain_soft_evict_enabled, + general.me_pool_drain_soft_evict_grace_secs, + general.me_pool_drain_soft_evict_per_writer, + general.me_pool_drain_soft_evict_budget_per_core, + general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, @@ -1524,8 +1631,9 @@ mod tests { let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await; let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; let writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); assert_eq!(writer_ids, vec![20, 30]); @@ -1542,8 +1650,9 @@ mod tests { let conn_b = insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(20)).await; let conn_c = insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(10)).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; let writer_ids: Vec = pool.writers.read().await.iter().map(|writer| writer.id).collect(); assert_eq!(writer_ids, vec![10, 20, 30]); diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index 675005a..dc1a0eb 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -82,6 +82,11 @@ async fn make_pool( general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, + general.me_pool_drain_soft_evict_enabled, + general.me_pool_drain_soft_evict_grace_secs, + general.me_pool_drain_soft_evict_per_writer, + general.me_pool_drain_soft_evict_budget_per_core, + general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, @@ -185,10 +190,11 @@ async fn sorted_writer_ids(pool: &Arc) -> Vec { async fn reap_draining_writers_clears_warn_state_when_pool_empty() { let (pool, _rng) = make_pool(128, 1, 1).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); warn_next_allowed.insert(11, Instant::now() + Duration::from_secs(5)); warn_next_allowed.insert(22, Instant::now() + Duration::from_secs(5)); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.is_empty()); } @@ -197,6 +203,8 @@ async fn reap_draining_writers_clears_warn_state_when_pool_empty() { async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycles() { let threshold = 3u64; let (pool, _rng) = make_pool(threshold, 1, 1).await; + pool.me_pool_drain_soft_evict_enabled + .store(false, Ordering::Relaxed); let now_epoch_secs = MePool::now_epoch_secs(); for writer_id in 1..=60u64 { @@ -211,8 +219,9 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..64 { - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; if writer_count(&pool).await <= threshold as usize { break; } @@ -240,11 +249,12 @@ async fn reap_draining_writers_handles_large_empty_writer_population() { } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..24 { if writer_count(&pool).await == 0 { break; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; } assert_eq!(writer_count(&pool).await, 0); @@ -268,11 +278,12 @@ async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_ } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..40 { if writer_count(&pool).await == 0 { break; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; } assert_eq!(writer_count(&pool).await, 0); @@ -283,6 +294,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c let (pool, _rng) = make_pool(128, 1, 1).await; let now_epoch_secs = MePool::now_epoch_secs(); let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for wave in 0..40u64 { for offset in 0..8u64 { @@ -296,7 +308,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c .await; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.len() <= writer_count(&pool).await); let ids = sorted_writer_ids(&pool).await; @@ -304,7 +316,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c let _ = pool.remove_writer_and_close_clients(writer_id).await; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.len() <= writer_count(&pool).await); } } @@ -326,9 +338,10 @@ async fn reap_draining_writers_budgeted_cleanup_never_increases_pool_size() { } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); let mut previous = writer_count(&pool).await; for _ in 0..32 { - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; let current = writer_count(&pool).await; assert!(current <= previous); previous = current; diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 70b6411..4724851 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -81,6 +81,11 @@ async fn make_pool( general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, + general.me_pool_drain_soft_evict_enabled, + general.me_pool_drain_soft_evict_grace_secs, + general.me_pool_drain_soft_evict_per_writer, + general.me_pool_drain_soft_evict_budget_per_core, + general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index 05a8e6a..45a1eee 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -39,7 +39,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { NetworkDecision::default(), None, Arc::new(SecureRandom::new()), - Arc::new(Stats::default()), + Arc::new(Stats::new()), general.me_keepalive_enabled, general.me_keepalive_interval_secs, general.me_keepalive_jitter_secs, @@ -74,6 +74,11 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, + general.me_pool_drain_soft_evict_enabled, + general.me_pool_drain_soft_evict_grace_secs, + general.me_pool_drain_soft_evict_per_writer, + general.me_pool_drain_soft_evict_budget_per_core, + general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, @@ -175,14 +180,15 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() { let conn_ids = insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.contains_key(&7)); let _ = pool.remove_writer_and_close_clients(7).await; assert!(pool.registry.get_writer(conn_ids[0]).await.is_none()); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(!warn_next_allowed.contains_key(&7)); } @@ -194,8 +200,9 @@ async fn reap_draining_writers_removes_empty_draining_writers() { insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(30), 0, 0).await; insert_draining_writer(&pool, 3, now_epoch_secs.saturating_sub(20), 1, 0).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![3]); } @@ -209,8 +216,9 @@ async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { insert_draining_writer(&pool, 33, now_epoch_secs.saturating_sub(20), 1, 0).await; insert_draining_writer(&pool, 44, now_epoch_secs.saturating_sub(10), 1, 0).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![33, 44]); } @@ -228,8 +236,9 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() { ) .await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(current_writer_ids(&pool).await.is_empty()); } @@ -251,8 +260,9 @@ async fn reap_draining_writers_limits_closes_per_health_tick() { .await; } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert_eq!(pool.writers.read().await.len(), writer_total - close_budget); } @@ -274,12 +284,13 @@ async fn reap_draining_writers_backlog_drains_across_ticks() { .await; } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..8 { if pool.writers.read().await.is_empty() { break; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; } assert!(pool.writers.read().await.is_empty()); @@ -303,9 +314,10 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() { .await; } let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..16 { - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; if pool.writers.read().await.len() <= threshold as usize { break; } @@ -322,8 +334,9 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(30), 1, 0).await; insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(20), 1, 0).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![10, 20, 30]); } @@ -346,8 +359,9 @@ async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { let empty_writer_id = close_budget as u64 + 1; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); } @@ -359,8 +373,9 @@ async fn reap_draining_writers_empty_cleanup_does_not_increment_force_close_metr insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(60), 0, 0).await; insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(50), 0, 0).await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(current_writer_ids(&pool).await.is_empty()); assert_eq!(pool.stats.get_pool_force_close_total(), 0); @@ -387,8 +402,9 @@ async fn reap_draining_writers_handles_duplicate_force_close_requests_for_same_w ) .await; let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(current_writer_ids(&pool).await.is_empty()); } @@ -398,6 +414,7 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population let pool = make_pool(128).await; let now_epoch_secs = MePool::now_epoch_secs(); let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for wave in 0..12u64 { for offset in 0..9u64 { @@ -410,14 +427,14 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population ) .await; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); let existing_writer_ids = current_writer_ids(&pool).await; for writer_id in existing_writer_ids.into_iter().take(4) { let _ = pool.remove_writer_and_close_clients(writer_id).await; } - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); } } @@ -427,6 +444,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat let pool = make_pool(6).await; let now_epoch_secs = MePool::now_epoch_secs(); let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); for writer_id in 1..=18u64 { let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 }; @@ -446,7 +464,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat } for _ in 0..16 { - reap_draining_writers(&pool, &mut warn_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; if pool.writers.read().await.len() <= 6 { break; } @@ -456,7 +474,60 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); } +#[tokio::test] +async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() { + let pool = make_pool(128).await; + pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_cooldown_ms + .store(1, Ordering::Relaxed); + + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await; + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + + let activity = pool.registry.writer_activity_snapshot().await; + assert_eq!(activity.bound_clients_by_writer.get(&77), Some(&2)); + assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1); + assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); + assert_eq!(current_writer_ids(&pool).await, vec![77]); +} + +#[tokio::test] +async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() { + let pool = make_pool(128).await; + pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed); + pool.me_pool_drain_soft_evict_cooldown_ms + .store(60_000, Ordering::Relaxed); + + let now_epoch_secs = MePool::now_epoch_secs(); + insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await; + let mut warn_next_allowed = HashMap::new(); + let mut soft_evict_next_allowed = HashMap::new(); + + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + + let activity = pool.registry.writer_activity_snapshot().await; + assert_eq!(activity.bound_clients_by_writer.get(&88), Some(&2)); + assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1); + assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); +} + #[test] fn general_config_default_drain_threshold_remains_enabled() { assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); + assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled); + assert_eq!( + GeneralConfig::default().me_pool_drain_soft_evict_per_writer, + 1 + ); } diff --git a/src/transport/middle_proxy/pool.rs b/src/transport/middle_proxy/pool.rs index 2a65160..f3cc817 100644 --- a/src/transport/middle_proxy/pool.rs +++ b/src/transport/middle_proxy/pool.rs @@ -172,6 +172,11 @@ pub struct MePool { pub(super) kdf_material_fingerprint: Arc>>, pub(super) me_pool_drain_ttl_secs: AtomicU64, pub(super) me_pool_drain_threshold: AtomicU64, + pub(super) me_pool_drain_soft_evict_enabled: AtomicBool, + pub(super) me_pool_drain_soft_evict_grace_secs: AtomicU64, + pub(super) me_pool_drain_soft_evict_per_writer: AtomicU8, + pub(super) me_pool_drain_soft_evict_budget_per_core: AtomicU32, + pub(super) me_pool_drain_soft_evict_cooldown_ms: AtomicU64, pub(super) me_pool_force_close_secs: AtomicU64, pub(super) me_pool_min_fresh_ratio_permille: AtomicU32, pub(super) me_hardswap_warmup_delay_min_ms: AtomicU64, @@ -273,6 +278,11 @@ impl MePool { hardswap: bool, me_pool_drain_ttl_secs: u64, me_pool_drain_threshold: u64, + me_pool_drain_soft_evict_enabled: bool, + me_pool_drain_soft_evict_grace_secs: u64, + me_pool_drain_soft_evict_per_writer: u8, + me_pool_drain_soft_evict_budget_per_core: u16, + me_pool_drain_soft_evict_cooldown_ms: u64, me_pool_force_close_secs: u64, me_pool_min_fresh_ratio: f32, me_hardswap_warmup_delay_min_ms: u64, @@ -449,6 +459,17 @@ impl MePool { kdf_material_fingerprint: Arc::new(RwLock::new(HashMap::new())), me_pool_drain_ttl_secs: AtomicU64::new(me_pool_drain_ttl_secs), me_pool_drain_threshold: AtomicU64::new(me_pool_drain_threshold), + me_pool_drain_soft_evict_enabled: AtomicBool::new(me_pool_drain_soft_evict_enabled), + me_pool_drain_soft_evict_grace_secs: AtomicU64::new(me_pool_drain_soft_evict_grace_secs), + me_pool_drain_soft_evict_per_writer: AtomicU8::new( + me_pool_drain_soft_evict_per_writer.max(1), + ), + me_pool_drain_soft_evict_budget_per_core: AtomicU32::new( + me_pool_drain_soft_evict_budget_per_core.max(1) as u32, + ), + me_pool_drain_soft_evict_cooldown_ms: AtomicU64::new( + me_pool_drain_soft_evict_cooldown_ms.max(1), + ), me_pool_force_close_secs: AtomicU64::new(me_pool_force_close_secs), me_pool_min_fresh_ratio_permille: AtomicU32::new(Self::ratio_to_permille( me_pool_min_fresh_ratio, @@ -496,6 +517,11 @@ impl MePool { hardswap: bool, drain_ttl_secs: u64, pool_drain_threshold: u64, + pool_drain_soft_evict_enabled: bool, + pool_drain_soft_evict_grace_secs: u64, + pool_drain_soft_evict_per_writer: u8, + pool_drain_soft_evict_budget_per_core: u16, + pool_drain_soft_evict_cooldown_ms: u64, force_close_secs: u64, min_fresh_ratio: f32, hardswap_warmup_delay_min_ms: u64, @@ -536,6 +562,18 @@ impl MePool { .store(drain_ttl_secs, Ordering::Relaxed); self.me_pool_drain_threshold .store(pool_drain_threshold, Ordering::Relaxed); + self.me_pool_drain_soft_evict_enabled + .store(pool_drain_soft_evict_enabled, Ordering::Relaxed); + self.me_pool_drain_soft_evict_grace_secs + .store(pool_drain_soft_evict_grace_secs, Ordering::Relaxed); + self.me_pool_drain_soft_evict_per_writer + .store(pool_drain_soft_evict_per_writer.max(1), Ordering::Relaxed); + self.me_pool_drain_soft_evict_budget_per_core.store( + pool_drain_soft_evict_budget_per_core.max(1) as u32, + Ordering::Relaxed, + ); + self.me_pool_drain_soft_evict_cooldown_ms + .store(pool_drain_soft_evict_cooldown_ms.max(1), Ordering::Relaxed); self.me_pool_force_close_secs .store(force_close_secs, Ordering::Relaxed); self.me_pool_min_fresh_ratio_permille @@ -690,6 +728,36 @@ impl MePool { } } + pub(super) fn drain_soft_evict_enabled(&self) -> bool { + self.me_pool_drain_soft_evict_enabled + .load(Ordering::Relaxed) + } + + pub(super) fn drain_soft_evict_grace_secs(&self) -> u64 { + self.me_pool_drain_soft_evict_grace_secs + .load(Ordering::Relaxed) + } + + pub(super) fn drain_soft_evict_per_writer(&self) -> usize { + self.me_pool_drain_soft_evict_per_writer + .load(Ordering::Relaxed) + .max(1) as usize + } + + pub(super) fn drain_soft_evict_budget_per_core(&self) -> usize { + self.me_pool_drain_soft_evict_budget_per_core + .load(Ordering::Relaxed) + .max(1) as usize + } + + pub(super) fn drain_soft_evict_cooldown(&self) -> Duration { + Duration::from_millis( + self.me_pool_drain_soft_evict_cooldown_ms + .load(Ordering::Relaxed) + .max(1), + ) + } + pub(super) async fn key_selector(&self) -> u32 { self.proxy_secret.read().await.key_selector } diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 99070a8..d32835c 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -124,6 +124,11 @@ pub(crate) struct MeApiRuntimeSnapshot { pub me_reconnect_backoff_cap_ms: u64, pub me_reconnect_fast_retry_count: u32, pub me_pool_drain_ttl_secs: u64, + pub me_pool_drain_soft_evict_enabled: bool, + pub me_pool_drain_soft_evict_grace_secs: u64, + pub me_pool_drain_soft_evict_per_writer: u8, + pub me_pool_drain_soft_evict_budget_per_core: u16, + pub me_pool_drain_soft_evict_cooldown_ms: u64, pub me_pool_force_close_secs: u64, pub me_pool_min_fresh_ratio: f32, pub me_bind_stale_mode: &'static str, @@ -562,6 +567,22 @@ impl MePool { me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), + me_pool_drain_soft_evict_enabled: self + .me_pool_drain_soft_evict_enabled + .load(Ordering::Relaxed), + me_pool_drain_soft_evict_grace_secs: self + .me_pool_drain_soft_evict_grace_secs + .load(Ordering::Relaxed), + me_pool_drain_soft_evict_per_writer: self + .me_pool_drain_soft_evict_per_writer + .load(Ordering::Relaxed), + me_pool_drain_soft_evict_budget_per_core: self + .me_pool_drain_soft_evict_budget_per_core + .load(Ordering::Relaxed) + .min(u16::MAX as u32) as u16, + me_pool_drain_soft_evict_cooldown_ms: self + .me_pool_drain_soft_evict_cooldown_ms + .load(Ordering::Relaxed), me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed), me_pool_min_fresh_ratio: Self::permille_to_ratio( self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed), diff --git a/src/transport/middle_proxy/registry.rs b/src/transport/middle_proxy/registry.rs index cc3028b..b8a926e 100644 --- a/src/transport/middle_proxy/registry.rs +++ b/src/transport/middle_proxy/registry.rs @@ -394,6 +394,56 @@ impl ConnRegistry { inner.writer_for_conn.keys().copied().collect() } + pub(super) async fn bound_conn_ids_for_writer_limited( + &self, + writer_id: u64, + limit: usize, + ) -> Vec { + if limit == 0 { + return Vec::new(); + } + let inner = self.inner.read().await; + let Some(conn_ids) = inner.conns_for_writer.get(&writer_id) else { + return Vec::new(); + }; + let mut out = conn_ids.iter().copied().collect::>(); + out.sort_unstable(); + out.truncate(limit); + out + } + + pub(super) async fn evict_bound_conn_if_writer(&self, conn_id: u64, writer_id: u64) -> bool { + let maybe_client_tx = { + let mut inner = self.inner.write().await; + if inner.writer_for_conn.get(&conn_id).copied() != Some(writer_id) { + return false; + } + + let client_tx = inner.map.get(&conn_id).cloned(); + inner.map.remove(&conn_id); + inner.meta.remove(&conn_id); + inner.writer_for_conn.remove(&conn_id); + + let became_empty = if let Some(set) = inner.conns_for_writer.get_mut(&writer_id) { + set.remove(&conn_id); + set.is_empty() + } else { + false + }; + if became_empty { + inner + .writer_idle_since_epoch_secs + .insert(writer_id, Self::now_epoch_secs()); + } + client_tx + }; + + if let Some(client_tx) = maybe_client_tx { + let _ = client_tx.try_send(MeResponse::Close); + } + true + } + pub async fn writer_lost(&self, writer_id: u64) -> Vec { let mut inner = self.inner.write().await; inner.writers.remove(&writer_id); @@ -444,6 +494,7 @@ mod tests { use super::ConnMeta; use super::ConnRegistry; + use super::MeResponse; #[tokio::test] async fn writer_activity_snapshot_tracks_writer_and_dc_load() { @@ -634,4 +685,86 @@ mod tests { ); assert!(registry.get_writer(conn_id).await.is_none()); } + + #[tokio::test] + async fn bound_conn_ids_for_writer_limited_is_sorted_and_bounded() { + let registry = ConnRegistry::new(); + let (writer_tx, _writer_rx) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx).await; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + let mut conn_ids = Vec::new(); + for _ in 0..5 { + let (conn_id, _rx) = registry.register().await; + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + conn_ids.push(conn_id); + } + conn_ids.sort_unstable(); + + let limited = registry.bound_conn_ids_for_writer_limited(10, 3).await; + assert_eq!(limited.len(), 3); + assert_eq!(limited, conn_ids.into_iter().take(3).collect::>()); + } + + #[tokio::test] + async fn evict_bound_conn_if_writer_does_not_touch_rebound_conn() { + let registry = ConnRegistry::new(); + let (conn_id, mut rx) = registry.register().await; + let (writer_tx_a, _writer_rx_a) = tokio::sync::mpsc::channel(8); + let (writer_tx_b, _writer_rx_b) = tokio::sync::mpsc::channel(8); + registry.register_writer(10, writer_tx_a).await; + registry.register_writer(20, writer_tx_b).await; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443); + + assert!( + registry + .bind_writer( + conn_id, + 10, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 0, + }, + ) + .await + ); + assert!( + registry + .bind_writer( + conn_id, + 20, + ConnMeta { + target_dc: 2, + client_addr: addr, + our_addr: addr, + proto_flags: 1, + }, + ) + .await + ); + + let evicted = registry.evict_bound_conn_if_writer(conn_id, 10).await; + assert!(!evicted); + assert_eq!(registry.get_writer(conn_id).await.expect("writer").writer_id, 20); + assert!(rx.try_recv().is_err()); + + let evicted = registry.evict_bound_conn_if_writer(conn_id, 20).await; + assert!(evicted); + assert!(registry.get_writer(conn_id).await.is_none()); + assert!(matches!(rx.try_recv(), Ok(MeResponse::Close))); + } } diff --git a/src/transport/socket.rs b/src/transport/socket.rs index aa4dc01..3ff96a2 100644 --- a/src/transport/socket.rs +++ b/src/transport/socket.rs @@ -11,6 +11,8 @@ use tokio::net::TcpStream; use socket2::{Socket, TcpKeepalive, Domain, Type, Protocol}; use tracing::debug; +const DEFAULT_SOCKET_BUFFER_BYTES: usize = 256 * 1024; + /// Configure TCP socket with recommended settings for proxy use #[allow(dead_code)] pub fn configure_tcp_socket( @@ -34,10 +36,10 @@ pub fn configure_tcp_socket( socket.set_tcp_keepalive(&keepalive)?; } - - // CHANGED: Removed manual buffer size setting (was 256KB). - // Allowing the OS kernel to handle TCP window scaling (Autotuning) is critical - // for mobile clients to avoid bufferbloat and stalled connections during uploads. + + // Use explicit baseline buffers to reduce slow-start stalls on high RTT links. + socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?; + socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?; Ok(()) } @@ -62,6 +64,10 @@ pub fn configure_client_socket( let keepalive = keepalive.with_interval(Duration::from_secs(keepalive_secs)); socket.set_tcp_keepalive(&keepalive)?; + + // Keep explicit baseline buffers for predictable throughput across busy hosts. + socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?; + socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?; // Set TCP user timeout (Linux only) // NOTE: iOS does not support TCP_USER_TIMEOUT - application-level timeout @@ -124,6 +130,8 @@ pub fn create_outgoing_socket_bound(addr: SocketAddr, bind_addr: Option) // Disable Nagle socket.set_nodelay(true)?; + socket.set_recv_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?; + socket.set_send_buffer_size(DEFAULT_SOCKET_BUFFER_BYTES)?; if let Some(bind_ip) = bind_addr { let bind_sock_addr = SocketAddr::new(bind_ip, 0);