From c07b600acb6bb59762bd96af6ce5b7fa90ec9de1 Mon Sep 17 00:00:00 2001 From: David Osipov Date: Thu, 19 Mar 2026 20:24:44 +0400 Subject: [PATCH] Integration hardening: reconcile main+flow-sec API drift and restore green suite --- Cargo.toml | 2 +- src/api/model.rs | 9 - src/api/runtime_min.rs | 2 - src/api/runtime_stats.rs | 10 - src/maestro/me_startup.rs | 7 - src/metrics.rs | 209 ------------------ src/proxy/client.rs | 15 -- src/proxy/direct_relay.rs | 17 +- src/transport/middle_proxy/config_updater.rs | 10 - .../middle_proxy/health_adversarial_tests.rs | 29 +-- .../middle_proxy/health_integration_tests.rs | 7 - .../middle_proxy/health_regression_tests.rs | 189 ++-------------- src/transport/middle_proxy/pool_status.rs | 37 ---- src/transport/middle_proxy/pool_writer.rs | 97 ++++---- 14 files changed, 65 insertions(+), 575 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 17145a2..a47a4e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "telemt" -version = "3.3.24" +version = "3.3.20" edition = "2024" [dependencies] diff --git a/src/api/model.rs b/src/api/model.rs index ac4e297..31233d7 100644 --- a/src/api/model.rs +++ b/src/api/model.rs @@ -195,8 +195,6 @@ pub(super) struct ZeroPoolData { pub(super) pool_swap_total: u64, pub(super) pool_drain_active: u64, pub(super) pool_force_close_total: u64, - pub(super) pool_drain_soft_evict_total: u64, - pub(super) pool_drain_soft_evict_writer_total: u64, pub(super) pool_stale_pick_total: u64, pub(super) writer_removed_total: u64, pub(super) writer_removed_unexpected_total: u64, @@ -237,7 +235,6 @@ pub(super) struct MeWritersSummary { pub(super) available_pct: f64, pub(super) required_writers: usize, pub(super) alive_writers: usize, - pub(super) coverage_ratio: f64, pub(super) coverage_pct: f64, pub(super) fresh_alive_writers: usize, pub(super) fresh_coverage_pct: f64, @@ -286,7 +283,6 @@ pub(super) struct DcStatus { pub(super) floor_max: usize, pub(super) floor_capped: bool, pub(super) alive_writers: usize, - pub(super) coverage_ratio: f64, pub(super) coverage_pct: f64, pub(super) fresh_alive_writers: usize, pub(super) fresh_coverage_pct: f64, @@ -364,11 +360,6 @@ pub(super) struct MinimalMeRuntimeData { pub(super) me_reconnect_backoff_cap_ms: u64, pub(super) me_reconnect_fast_retry_count: u32, pub(super) me_pool_drain_ttl_secs: u64, - pub(super) me_pool_drain_soft_evict_enabled: bool, - pub(super) me_pool_drain_soft_evict_grace_secs: u64, - pub(super) me_pool_drain_soft_evict_per_writer: u8, - pub(super) me_pool_drain_soft_evict_budget_per_core: u16, - pub(super) me_pool_drain_soft_evict_cooldown_ms: u64, pub(super) me_pool_force_close_secs: u64, pub(super) me_pool_min_fresh_ratio: f32, pub(super) me_bind_stale_mode: &'static str, diff --git a/src/api/runtime_min.rs b/src/api/runtime_min.rs index f334dd0..d3066a3 100644 --- a/src/api/runtime_min.rs +++ b/src/api/runtime_min.rs @@ -113,7 +113,6 @@ pub(super) struct RuntimeMeQualityDcRttData { pub(super) rtt_ema_ms: Option, pub(super) alive_writers: usize, pub(super) required_writers: usize, - pub(super) coverage_ratio: f64, pub(super) coverage_pct: f64, } @@ -389,7 +388,6 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime rtt_ema_ms: dc.rtt_ms, alive_writers: dc.alive_writers, required_writers: dc.required_writers, - coverage_ratio: dc.coverage_ratio, coverage_pct: dc.coverage_pct, }) .collect(), diff --git a/src/api/runtime_stats.rs b/src/api/runtime_stats.rs index f8948d1..9260c40 100644 --- a/src/api/runtime_stats.rs +++ b/src/api/runtime_stats.rs @@ -96,8 +96,6 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer pool_swap_total: stats.get_pool_swap_total(), pool_drain_active: stats.get_pool_drain_active(), pool_force_close_total: stats.get_pool_force_close_total(), - pool_drain_soft_evict_total: stats.get_pool_drain_soft_evict_total(), - pool_drain_soft_evict_writer_total: stats.get_pool_drain_soft_evict_writer_total(), pool_stale_pick_total: stats.get_pool_stale_pick_total(), writer_removed_total: stats.get_me_writer_removed_total(), writer_removed_unexpected_total: stats.get_me_writer_removed_unexpected_total(), @@ -315,7 +313,6 @@ async fn get_minimal_payload_cached( available_pct: status.available_pct, required_writers: status.required_writers, alive_writers: status.alive_writers, - coverage_ratio: status.coverage_ratio, coverage_pct: status.coverage_pct, fresh_alive_writers: status.fresh_alive_writers, fresh_coverage_pct: status.fresh_coverage_pct, @@ -373,7 +370,6 @@ async fn get_minimal_payload_cached( floor_max: entry.floor_max, floor_capped: entry.floor_capped, alive_writers: entry.alive_writers, - coverage_ratio: entry.coverage_ratio, coverage_pct: entry.coverage_pct, fresh_alive_writers: entry.fresh_alive_writers, fresh_coverage_pct: entry.fresh_coverage_pct, @@ -431,11 +427,6 @@ async fn get_minimal_payload_cached( me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms, me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs, - me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled, - me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs, - me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer, - me_pool_drain_soft_evict_budget_per_core: runtime.me_pool_drain_soft_evict_budget_per_core, - me_pool_drain_soft_evict_cooldown_ms: runtime.me_pool_drain_soft_evict_cooldown_ms, me_pool_force_close_secs: runtime.me_pool_force_close_secs, me_pool_min_fresh_ratio: runtime.me_pool_min_fresh_ratio, me_bind_stale_mode: runtime.me_bind_stale_mode, @@ -504,7 +495,6 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa available_pct: 0.0, required_writers: 0, alive_writers: 0, - coverage_ratio: 0.0, coverage_pct: 0.0, fresh_alive_writers: 0, fresh_coverage_pct: 0.0, diff --git a/src/maestro/me_startup.rs b/src/maestro/me_startup.rs index 827b00c..245c7a9 100644 --- a/src/maestro/me_startup.rs +++ b/src/maestro/me_startup.rs @@ -238,11 +238,6 @@ pub(crate) async fn initialize_me_pool( config.general.hardswap, config.general.me_pool_drain_ttl_secs, config.general.me_pool_drain_threshold, - config.general.me_pool_drain_soft_evict_enabled, - config.general.me_pool_drain_soft_evict_grace_secs, - config.general.me_pool_drain_soft_evict_per_writer, - config.general.me_pool_drain_soft_evict_budget_per_core, - config.general.me_pool_drain_soft_evict_cooldown_ms, config.general.effective_me_pool_force_close_secs(), config.general.me_pool_min_fresh_ratio, config.general.me_hardswap_warmup_delay_min_ms, @@ -267,8 +262,6 @@ pub(crate) async fn initialize_me_pool( config.general.me_warn_rate_limit_ms, config.general.me_route_no_writer_mode, config.general.me_route_no_writer_wait_ms, - config.general.me_route_hybrid_max_wait_ms, - config.general.me_route_blocking_send_timeout_ms, config.general.me_route_inline_recovery_attempts, config.general.me_route_inline_recovery_wait_ms, ); diff --git a/src/metrics.rs b/src/metrics.rs index 4f7f4b6..f4f8a2e 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -292,109 +292,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp "telemt_connections_bad_total {}", if core_enabled { stats.get_connects_bad() } else { 0 } ); - let _ = writeln!(out, "# HELP telemt_connections_current Current active connections"); - let _ = writeln!(out, "# TYPE telemt_connections_current gauge"); - let _ = writeln!( - out, - "telemt_connections_current {}", - if core_enabled { - stats.get_current_connections_total() - } else { - 0 - } - ); - let _ = writeln!(out, "# HELP telemt_connections_direct_current Current active direct connections"); - let _ = writeln!(out, "# TYPE telemt_connections_direct_current gauge"); - let _ = writeln!( - out, - "telemt_connections_direct_current {}", - if core_enabled { - stats.get_current_connections_direct() - } else { - 0 - } - ); - let _ = writeln!(out, "# HELP telemt_connections_me_current Current active middle-end connections"); - let _ = writeln!(out, "# TYPE telemt_connections_me_current gauge"); - let _ = writeln!( - out, - "telemt_connections_me_current {}", - if core_enabled { - stats.get_current_connections_me() - } else { - 0 - } - ); - let _ = writeln!( - out, - "# HELP telemt_relay_adaptive_promotions_total Adaptive relay tier promotions" - ); - let _ = writeln!(out, "# TYPE telemt_relay_adaptive_promotions_total counter"); - let _ = writeln!( - out, - "telemt_relay_adaptive_promotions_total {}", - if core_enabled { - stats.get_relay_adaptive_promotions_total() - } else { - 0 - } - ); - let _ = writeln!( - out, - "# HELP telemt_relay_adaptive_demotions_total Adaptive relay tier demotions" - ); - let _ = writeln!(out, "# TYPE telemt_relay_adaptive_demotions_total counter"); - let _ = writeln!( - out, - "telemt_relay_adaptive_demotions_total {}", - if core_enabled { - stats.get_relay_adaptive_demotions_total() - } else { - 0 - } - ); - let _ = writeln!( - out, - "# HELP telemt_relay_adaptive_hard_promotions_total Adaptive relay hard promotions triggered by write pressure" - ); - let _ = writeln!( - out, - "# TYPE telemt_relay_adaptive_hard_promotions_total counter" - ); - let _ = writeln!( - out, - "telemt_relay_adaptive_hard_promotions_total {}", - if core_enabled { - stats.get_relay_adaptive_hard_promotions_total() - } else { - 0 - } - ); - let _ = writeln!(out, "# HELP telemt_reconnect_evict_total Reconnect-driven session evictions"); - let _ = writeln!(out, "# TYPE telemt_reconnect_evict_total counter"); - let _ = writeln!( - out, - "telemt_reconnect_evict_total {}", - if core_enabled { - stats.get_reconnect_evict_total() - } else { - 0 - } - ); - let _ = writeln!( - out, - "# HELP telemt_reconnect_stale_close_total Sessions closed because they became stale after reconnect" - ); - let _ = writeln!(out, "# TYPE telemt_reconnect_stale_close_total counter"); - let _ = writeln!( - out, - "telemt_reconnect_stale_close_total {}", - if core_enabled { - stats.get_reconnect_stale_close_total() - } else { - 0 - } - ); let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts"); let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter"); @@ -1650,36 +1547,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); - let _ = writeln!( - out, - "# HELP telemt_pool_drain_soft_evict_total Soft-evicted client sessions on stuck draining writers" - ); - let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_total counter"); - let _ = writeln!( - out, - "telemt_pool_drain_soft_evict_total {}", - if me_allows_normal { - stats.get_pool_drain_soft_evict_total() - } else { - 0 - } - ); - - let _ = writeln!( - out, - "# HELP telemt_pool_drain_soft_evict_writer_total Draining writers with at least one soft eviction" - ); - let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_writer_total counter"); - let _ = writeln!( - out, - "telemt_pool_drain_soft_evict_writer_total {}", - if me_allows_normal { - stats.get_pool_drain_soft_evict_writer_total() - } else { - 0 - } - ); - let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds"); let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter"); let _ = writeln!( @@ -1692,57 +1559,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp } ); - let _ = writeln!( - out, - "# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers" - ); - let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter"); - let _ = writeln!( - out, - "telemt_me_writer_close_signal_drop_total {}", - if me_allows_normal { - stats.get_me_writer_close_signal_drop_total() - } else { - 0 - } - ); - - let _ = writeln!( - out, - "# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels" - ); - let _ = writeln!( - out, - "# TYPE telemt_me_writer_close_signal_channel_full_total counter" - ); - let _ = writeln!( - out, - "telemt_me_writer_close_signal_channel_full_total {}", - if me_allows_normal { - stats.get_me_writer_close_signal_channel_full_total() - } else { - 0 - } - ); - - let _ = writeln!( - out, - "# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup" - ); - let _ = writeln!( - out, - "# TYPE telemt_me_draining_writers_reap_progress_total counter" - ); - let _ = writeln!( - out, - "telemt_me_draining_writers_reap_progress_total {}", - if me_allows_normal { - stats.get_me_draining_writers_reap_progress_total() - } else { - 0 - } - ); - let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals"); let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter"); let _ = writeln!( @@ -2048,8 +1864,6 @@ mod tests { stats.increment_connects_all(); stats.increment_connects_all(); stats.increment_connects_bad(); - stats.increment_current_connections_direct(); - stats.increment_current_connections_me(); stats.increment_handshake_timeouts(); stats.increment_upstream_connect_attempt_total(); stats.increment_upstream_connect_attempt_total(); @@ -2081,9 +1895,6 @@ mod tests { assert!(output.contains("telemt_connections_total 2")); assert!(output.contains("telemt_connections_bad_total 1")); - assert!(output.contains("telemt_connections_current 2")); - assert!(output.contains("telemt_connections_direct_current 1")); - assert!(output.contains("telemt_connections_me_current 1")); assert!(output.contains("telemt_handshake_timeouts_total 1")); assert!(output.contains("telemt_upstream_connect_attempt_total 2")); assert!(output.contains("telemt_upstream_connect_success_total 1")); @@ -2126,9 +1937,6 @@ mod tests { let output = render_metrics(&stats, &config, &tracker).await; assert!(output.contains("telemt_connections_total 0")); assert!(output.contains("telemt_connections_bad_total 0")); - assert!(output.contains("telemt_connections_current 0")); - assert!(output.contains("telemt_connections_direct_current 0")); - assert!(output.contains("telemt_connections_me_current 0")); assert!(output.contains("telemt_handshake_timeouts_total 0")); assert!(output.contains("telemt_user_unique_ips_current{user=")); assert!(output.contains("telemt_user_unique_ips_recent_window{user=")); @@ -2162,28 +1970,11 @@ mod tests { assert!(output.contains("# TYPE telemt_uptime_seconds gauge")); assert!(output.contains("# TYPE telemt_connections_total counter")); assert!(output.contains("# TYPE telemt_connections_bad_total counter")); - assert!(output.contains("# TYPE telemt_connections_current gauge")); - assert!(output.contains("# TYPE telemt_connections_direct_current gauge")); - assert!(output.contains("# TYPE telemt_connections_me_current gauge")); - assert!(output.contains("# TYPE telemt_relay_adaptive_promotions_total counter")); - assert!(output.contains("# TYPE telemt_relay_adaptive_demotions_total counter")); - assert!(output.contains("# TYPE telemt_relay_adaptive_hard_promotions_total counter")); - assert!(output.contains("# TYPE telemt_reconnect_evict_total counter")); - assert!(output.contains("# TYPE telemt_reconnect_stale_close_total counter")); assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter")); assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter")); assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter")); assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter")); assert!(output.contains("# TYPE telemt_me_writer_removed_total counter")); - assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter")); - assert!(output.contains( - "# TYPE telemt_me_writer_close_signal_channel_full_total counter" - )); - assert!(output.contains( - "# TYPE telemt_me_draining_writers_reap_progress_total counter" - )); - assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter")); - assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter")); assert!(output.contains( "# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge" )); diff --git a/src/proxy/client.rs b/src/proxy/client.rs index 6a737ac..8dad5da 100644 --- a/src/proxy/client.rs +++ b/src/proxy/client.rs @@ -84,7 +84,6 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle use crate::proxy::masking::handle_bad_client; use crate::proxy::middle_relay::handle_via_middle_proxy; use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController}; -use crate::proxy::session_eviction::register_session; fn beobachten_ttl(config: &ProxyConfig) -> Duration { let minutes = config.general.beobachten_minutes; @@ -867,17 +866,6 @@ impl RunningClientHandler { } }; - let registration = register_session(&user, success.dc_idx); - if registration.replaced_existing { - stats.increment_reconnect_evict_total(); - warn!( - user = %user, - dc = success.dc_idx, - "Reconnect detected: replacing active session for user+dc" - ); - } - let session_lease = registration.lease; - let route_snapshot = route_runtime.snapshot(); let session_id = rng.u64(); let relay_result = if config.general.use_middle_proxy @@ -897,7 +885,6 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, - session_lease.clone(), ) .await } else { @@ -914,7 +901,6 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, - session_lease.clone(), ) .await } @@ -932,7 +918,6 @@ impl RunningClientHandler { route_runtime.subscribe(), route_snapshot, session_id, - session_lease.clone(), ) .await }; diff --git a/src/proxy/direct_relay.rs b/src/proxy/direct_relay.rs index 732898d..ede908e 100644 --- a/src/proxy/direct_relay.rs +++ b/src/proxy/direct_relay.rs @@ -22,8 +22,6 @@ use crate::proxy::route_mode::{ RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state, cutover_stagger_delay, }; -use crate::proxy::adaptive_buffers; -use crate::proxy::session_eviction::SessionLease; use crate::stats::Stats; use crate::stream::{BufferPool, CryptoReader, CryptoWriter}; use crate::transport::UpstreamManager; @@ -185,7 +183,6 @@ pub(crate) async fn handle_via_direct( mut route_rx: watch::Receiver, route_snapshot: RouteCutoverState, session_id: u64, - session_lease: SessionLease, ) -> Result<()> where R: AsyncRead + Unpin + Send + 'static, @@ -225,27 +222,17 @@ where stats.increment_user_connects(user); let _direct_connection_lease = stats.acquire_direct_connection_lease(); - let seed_tier = adaptive_buffers::seed_tier_for_user(user); - let (c2s_copy_buf, s2c_copy_buf) = adaptive_buffers::direct_copy_buffers_for_tier( - seed_tier, - config.general.direct_relay_copy_buf_c2s_bytes, - config.general.direct_relay_copy_buf_s2c_bytes, - ); - let relay_result = relay_bidirectional( client_reader, client_writer, tg_reader, tg_writer, - c2s_copy_buf, - s2c_copy_buf, + config.general.direct_relay_copy_buf_c2s_bytes, + config.general.direct_relay_copy_buf_s2c_bytes, user, - success.dc_idx, Arc::clone(&stats), config.access.user_data_quota.get(user).copied(), buffer_pool, - session_lease, - seed_tier, ); tokio::pin!(relay_result); let relay_result = loop { diff --git a/src/transport/middle_proxy/config_updater.rs b/src/transport/middle_proxy/config_updater.rs index 43a3569..b6a0160 100644 --- a/src/transport/middle_proxy/config_updater.rs +++ b/src/transport/middle_proxy/config_updater.rs @@ -299,11 +299,6 @@ async fn run_update_cycle( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_threshold, - cfg.general.me_pool_drain_soft_evict_enabled, - cfg.general.me_pool_drain_soft_evict_grace_secs, - cfg.general.me_pool_drain_soft_evict_per_writer, - cfg.general.me_pool_drain_soft_evict_budget_per_core, - cfg.general.me_pool_drain_soft_evict_cooldown_ms, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, cfg.general.me_hardswap_warmup_delay_min_ms, @@ -531,11 +526,6 @@ pub async fn me_config_updater( cfg.general.hardswap, cfg.general.me_pool_drain_ttl_secs, cfg.general.me_pool_drain_threshold, - cfg.general.me_pool_drain_soft_evict_enabled, - cfg.general.me_pool_drain_soft_evict_grace_secs, - cfg.general.me_pool_drain_soft_evict_per_writer, - cfg.general.me_pool_drain_soft_evict_budget_per_core, - cfg.general.me_pool_drain_soft_evict_cooldown_ms, cfg.general.effective_me_pool_force_close_secs(), cfg.general.me_pool_min_fresh_ratio, cfg.general.me_hardswap_warmup_delay_min_ms, diff --git a/src/transport/middle_proxy/health_adversarial_tests.rs b/src/transport/middle_proxy/health_adversarial_tests.rs index 503a2fa..cd06fdf 100644 --- a/src/transport/middle_proxy/health_adversarial_tests.rs +++ b/src/transport/middle_proxy/health_adversarial_tests.rs @@ -83,11 +83,6 @@ async fn make_pool( general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, - general.me_pool_drain_soft_evict_enabled, - general.me_pool_drain_soft_evict_grace_secs, - general.me_pool_drain_soft_evict_per_writer, - general.me_pool_drain_soft_evict_budget_per_core, - general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, @@ -112,8 +107,6 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, - general.me_route_hybrid_max_wait_ms, - general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); @@ -227,11 +220,10 @@ async fn set_writer_runtime_state( async fn reap_draining_writers_clears_warn_state_when_pool_empty() { let (pool, _rng) = make_pool(128, 1, 1).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); warn_next_allowed.insert(11, Instant::now() + Duration::from_secs(5)); warn_next_allowed.insert(22, Instant::now() + Duration::from_secs(5)); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(warn_next_allowed.is_empty()); } @@ -240,8 +232,6 @@ async fn reap_draining_writers_clears_warn_state_when_pool_empty() { async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycles() { let threshold = 3u64; let (pool, _rng) = make_pool(threshold, 1, 1).await; - pool.me_pool_drain_soft_evict_enabled - .store(false, Ordering::Relaxed); let now_epoch_secs = MePool::now_epoch_secs(); for writer_id in 1..=60u64 { @@ -256,9 +246,8 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..64 { - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; if writer_count(&pool).await <= threshold as usize { break; } @@ -286,12 +275,11 @@ async fn reap_draining_writers_handles_large_empty_writer_population() { } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..24 { if writer_count(&pool).await == 0 { break; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; } assert_eq!(writer_count(&pool).await, 0); @@ -315,12 +303,11 @@ async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_ } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..40 { if writer_count(&pool).await == 0 { break; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; } assert_eq!(writer_count(&pool).await, 0); @@ -331,7 +318,6 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c let (pool, _rng) = make_pool(128, 1, 1).await; let now_epoch_secs = MePool::now_epoch_secs(); let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for wave in 0..40u64 { for offset in 0..8u64 { @@ -345,7 +331,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c .await; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(warn_next_allowed.len() <= writer_count(&pool).await); let ids = sorted_writer_ids(&pool).await; @@ -353,7 +339,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c let _ = pool.remove_writer_and_close_clients(writer_id).await; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(warn_next_allowed.len() <= writer_count(&pool).await); } } @@ -375,10 +361,9 @@ async fn reap_draining_writers_budgeted_cleanup_never_increases_pool_size() { } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); let mut previous = writer_count(&pool).await; for _ in 0..32 { - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; let current = writer_count(&pool).await; assert!(current <= previous); previous = current; diff --git a/src/transport/middle_proxy/health_integration_tests.rs b/src/transport/middle_proxy/health_integration_tests.rs index 15ad4f2..476b549 100644 --- a/src/transport/middle_proxy/health_integration_tests.rs +++ b/src/transport/middle_proxy/health_integration_tests.rs @@ -81,11 +81,6 @@ async fn make_pool( general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, - general.me_pool_drain_soft_evict_enabled, - general.me_pool_drain_soft_evict_grace_secs, - general.me_pool_drain_soft_evict_per_writer, - general.me_pool_drain_soft_evict_budget_per_core, - general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, @@ -110,8 +105,6 @@ async fn make_pool( general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, - general.me_route_hybrid_max_wait_ms, - general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ); diff --git a/src/transport/middle_proxy/health_regression_tests.rs b/src/transport/middle_proxy/health_regression_tests.rs index b760acc..6b6b12a 100644 --- a/src/transport/middle_proxy/health_regression_tests.rs +++ b/src/transport/middle_proxy/health_regression_tests.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering}; use std::time::{Duration, Instant}; -use bytes::Bytes; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -40,7 +39,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { NetworkDecision::default(), None, Arc::new(SecureRandom::new()), - Arc::new(Stats::new()), + Arc::new(Stats::default()), general.me_keepalive_enabled, general.me_keepalive_interval_secs, general.me_keepalive_jitter_secs, @@ -75,11 +74,6 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.hardswap, general.me_pool_drain_ttl_secs, general.me_pool_drain_threshold, - general.me_pool_drain_soft_evict_enabled, - general.me_pool_drain_soft_evict_grace_secs, - general.me_pool_drain_soft_evict_per_writer, - general.me_pool_drain_soft_evict_budget_per_core, - general.me_pool_drain_soft_evict_cooldown_ms, general.effective_me_pool_force_close_secs(), general.me_pool_min_fresh_ratio, general.me_hardswap_warmup_delay_min_ms, @@ -104,8 +98,6 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc { general.me_warn_rate_limit_ms, MeRouteNoWriterMode::default(), general.me_route_no_writer_wait_ms, - general.me_route_hybrid_max_wait_ms, - general.me_route_blocking_send_timeout_ms, general.me_route_inline_recovery_attempts, general.me_route_inline_recovery_wait_ms, ) @@ -198,15 +190,14 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() { let conn_ids = insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(warn_next_allowed.contains_key(&7)); let _ = pool.remove_writer_and_close_clients(7).await; assert!(pool.registry.get_writer(conn_ids[0]).await.is_none()); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(!warn_next_allowed.contains_key(&7)); } @@ -218,96 +209,12 @@ async fn reap_draining_writers_removes_empty_draining_writers() { insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(30), 0, 0).await; insert_draining_writer(&pool, 3, now_epoch_secs.saturating_sub(20), 1, 0).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![3]); } -#[tokio::test] -async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() { - let pool = make_pool(128).await; - let now_epoch_secs = MePool::now_epoch_secs(); - - let (blocked_tx, blocked_rx) = mpsc::channel::(1); - assert!( - blocked_tx - .try_send(WriterCommand::Data(Bytes::from_static(b"stuck"))) - .is_ok() - ); - let blocked_rx_guard = tokio::spawn(async move { - let _hold_rx = blocked_rx; - tokio::time::sleep(Duration::from_secs(30)).await; - }); - - let blocked_writer_id = 90u64; - let blocked_writer = MeWriter { - id: blocked_writer_id, - addr: SocketAddr::new( - IpAddr::V4(Ipv4Addr::LOCALHOST), - 4500 + blocked_writer_id as u16, - ), - source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), - writer_dc: 2, - generation: 1, - contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())), - created_at: Instant::now() - Duration::from_secs(blocked_writer_id), - tx: blocked_tx.clone(), - cancel: CancellationToken::new(), - degraded: Arc::new(AtomicBool::new(false)), - rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)), - draining: Arc::new(AtomicBool::new(true)), - draining_started_at_epoch_secs: Arc::new(AtomicU64::new( - now_epoch_secs.saturating_sub(120), - )), - drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)), - allow_drain_fallback: Arc::new(AtomicBool::new(false)), - }; - pool.writers.write().await.push(blocked_writer); - pool.registry - .register_writer(blocked_writer_id, blocked_tx) - .await; - pool.conn_count.fetch_add(1, Ordering::Relaxed); - - insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await; - - let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - - let reap_res = tokio::time::timeout( - Duration::from_millis(500), - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed), - ) - .await; - blocked_rx_guard.abort(); - - assert!(reap_res.is_ok(), "reap should not block on close signal"); - assert!(current_writer_ids(&pool).await.is_empty()); - assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2); - assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1); - assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2); - let activity = pool.registry.writer_activity_snapshot().await; - assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id)); - assert!(!activity.bound_clients_by_writer.contains_key(&91)); - let (probe_conn_id, _rx) = pool.registry.register().await; - assert!( - !pool.registry - .bind_writer( - probe_conn_id, - blocked_writer_id, - ConnMeta { - target_dc: 2, - client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400), - our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443), - proto_flags: 0, - }, - ) - .await - ); - let _ = pool.registry.unregister(probe_conn_id).await; -} - #[tokio::test] async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { let pool = make_pool(2).await; @@ -317,9 +224,8 @@ async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() { insert_draining_writer(&pool, 33, now_epoch_secs.saturating_sub(20), 1, 0).await; insert_draining_writer(&pool, 44, now_epoch_secs.saturating_sub(10), 1, 0).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![33, 44]); } @@ -337,9 +243,8 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() { ) .await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(current_writer_ids(&pool).await.is_empty()); } @@ -361,9 +266,8 @@ async fn reap_draining_writers_limits_closes_per_health_tick() { .await; } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert_eq!(pool.writers.read().await.len(), writer_total - close_budget); } @@ -502,13 +406,12 @@ async fn reap_draining_writers_backlog_drains_across_ticks() { .await; } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..8 { if pool.writers.read().await.is_empty() { break; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; } assert!(pool.writers.read().await.is_empty()); @@ -532,10 +435,9 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() { .await; } let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for _ in 0..16 { - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; if pool.writers.read().await.len() <= threshold as usize { break; } @@ -552,9 +454,8 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(30), 1, 0).await; insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(20), 1, 0).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![10, 20, 30]); } @@ -577,9 +478,8 @@ async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() { let empty_writer_id = close_budget as u64 + 1; insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]); } @@ -591,9 +491,8 @@ async fn reap_draining_writers_empty_cleanup_does_not_increment_force_close_metr insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(60), 0, 0).await; insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(50), 0, 0).await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(current_writer_ids(&pool).await.is_empty()); assert_eq!(pool.stats.get_pool_force_close_total(), 0); @@ -620,9 +519,8 @@ async fn reap_draining_writers_handles_duplicate_force_close_requests_for_same_w ) .await; let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(current_writer_ids(&pool).await.is_empty()); } @@ -632,7 +530,6 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population let pool = make_pool(128).await; let now_epoch_secs = MePool::now_epoch_secs(); let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for wave in 0..12u64 { for offset in 0..9u64 { @@ -645,14 +542,14 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population ) .await; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); let existing_writer_ids = current_writer_ids(&pool).await; for writer_id in existing_writer_ids.into_iter().take(4) { let _ = pool.remove_writer_and_close_clients(writer_id).await; } - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); } } @@ -662,7 +559,6 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat let pool = make_pool(6).await; let now_epoch_secs = MePool::now_epoch_secs(); let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); for writer_id in 1..=18u64 { let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 }; @@ -682,7 +578,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat } for _ in 0..16 { - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; + reap_draining_writers(&pool, &mut warn_next_allowed).await; if pool.writers.read().await.len() <= 6 { break; } @@ -692,62 +588,9 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat assert!(warn_next_allowed.len() <= pool.writers.read().await.len()); } -#[tokio::test] -async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() { - let pool = make_pool(128).await; - pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_cooldown_ms - .store(1, Ordering::Relaxed); - - let now_epoch_secs = MePool::now_epoch_secs(); - insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await; - let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; - - let activity = pool.registry.writer_activity_snapshot().await; - assert_eq!(activity.bound_clients_by_writer.get(&77), Some(&2)); - assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1); - assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); - assert_eq!(current_writer_ids(&pool).await, vec![77]); -} - -#[tokio::test] -async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() { - let pool = make_pool(128).await; - pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed); - pool.me_pool_drain_soft_evict_cooldown_ms - .store(60_000, Ordering::Relaxed); - - let now_epoch_secs = MePool::now_epoch_secs(); - insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await; - let mut warn_next_allowed = HashMap::new(); - let mut soft_evict_next_allowed = HashMap::new(); - - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; - reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await; - - let activity = pool.registry.writer_activity_snapshot().await; - assert_eq!(activity.bound_clients_by_writer.get(&88), Some(&2)); - assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1); - assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1); -} - #[test] fn general_config_default_drain_threshold_remains_enabled() { assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128); - assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled); - assert_eq!( - GeneralConfig::default().me_pool_drain_soft_evict_per_writer, - 1 - ); } #[tokio::test] diff --git a/src/transport/middle_proxy/pool_status.rs b/src/transport/middle_proxy/pool_status.rs index 214ee49..99070a8 100644 --- a/src/transport/middle_proxy/pool_status.rs +++ b/src/transport/middle_proxy/pool_status.rs @@ -40,7 +40,6 @@ pub(crate) struct MeApiDcStatusSnapshot { pub floor_max: usize, pub floor_capped: bool, pub alive_writers: usize, - pub coverage_ratio: f64, pub coverage_pct: f64, pub fresh_alive_writers: usize, pub fresh_coverage_pct: f64, @@ -63,7 +62,6 @@ pub(crate) struct MeApiStatusSnapshot { pub available_pct: f64, pub required_writers: usize, pub alive_writers: usize, - pub coverage_ratio: f64, pub coverage_pct: f64, pub fresh_alive_writers: usize, pub fresh_coverage_pct: f64, @@ -126,11 +124,6 @@ pub(crate) struct MeApiRuntimeSnapshot { pub me_reconnect_backoff_cap_ms: u64, pub me_reconnect_fast_retry_count: u32, pub me_pool_drain_ttl_secs: u64, - pub me_pool_drain_soft_evict_enabled: bool, - pub me_pool_drain_soft_evict_grace_secs: u64, - pub me_pool_drain_soft_evict_per_writer: u8, - pub me_pool_drain_soft_evict_budget_per_core: u16, - pub me_pool_drain_soft_evict_cooldown_ms: u64, pub me_pool_force_close_secs: u64, pub me_pool_min_fresh_ratio: f32, pub me_bind_stale_mode: &'static str, @@ -344,8 +337,6 @@ impl MePool { let mut available_endpoints = 0usize; let mut alive_writers = 0usize; let mut fresh_alive_writers = 0usize; - let mut coverage_ratio_dcs_total = 0usize; - let mut coverage_ratio_dcs_covered = 0usize; let floor_mode = self.floor_mode(); let adaptive_cpu_cores = (self .me_adaptive_floor_cpu_cores_effective @@ -397,12 +388,6 @@ impl MePool { available_endpoints += dc_available_endpoints; alive_writers += dc_alive_writers; fresh_alive_writers += dc_fresh_alive_writers; - if endpoint_count > 0 { - coverage_ratio_dcs_total += 1; - if dc_alive_writers > 0 { - coverage_ratio_dcs_covered += 1; - } - } dcs.push(MeApiDcStatusSnapshot { dc, @@ -425,11 +410,6 @@ impl MePool { floor_max, floor_capped, alive_writers: dc_alive_writers, - coverage_ratio: if endpoint_count > 0 && dc_alive_writers > 0 { - 100.0 - } else { - 0.0 - }, coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers), fresh_alive_writers: dc_fresh_alive_writers, fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers), @@ -446,7 +426,6 @@ impl MePool { available_pct: ratio_pct(available_endpoints, configured_endpoints), required_writers, alive_writers, - coverage_ratio: ratio_pct(coverage_ratio_dcs_covered, coverage_ratio_dcs_total), coverage_pct: ratio_pct(alive_writers, required_writers), fresh_alive_writers, fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers), @@ -583,22 +562,6 @@ impl MePool { me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64, me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count, me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed), - me_pool_drain_soft_evict_enabled: self - .me_pool_drain_soft_evict_enabled - .load(Ordering::Relaxed), - me_pool_drain_soft_evict_grace_secs: self - .me_pool_drain_soft_evict_grace_secs - .load(Ordering::Relaxed), - me_pool_drain_soft_evict_per_writer: self - .me_pool_drain_soft_evict_per_writer - .load(Ordering::Relaxed), - me_pool_drain_soft_evict_budget_per_core: self - .me_pool_drain_soft_evict_budget_per_core - .load(Ordering::Relaxed) - .min(u16::MAX as u32) as u16, - me_pool_drain_soft_evict_cooldown_ms: self - .me_pool_drain_soft_evict_cooldown_ms - .load(Ordering::Relaxed), me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed), me_pool_min_fresh_ratio: Self::permille_to_ratio( self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed), diff --git a/src/transport/middle_proxy/pool_writer.rs b/src/transport/middle_proxy/pool_writer.rs index a47dfb9..5b23d7f 100644 --- a/src/transport/middle_proxy/pool_writer.rs +++ b/src/transport/middle_proxy/pool_writer.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use bytes::BytesMut; use rand::Rng; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; @@ -312,28 +311,41 @@ impl MePool { let mut p = Vec::with_capacity(12); p.extend_from_slice(&RPC_PING_U32.to_le_bytes()); p.extend_from_slice(&sent_id.to_le_bytes()); - let now_epoch_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as u64; - let mut run_cleanup = false; - if let Some(pool) = pool_ping.upgrade() { - let last_cleanup_ms = pool - .ping_tracker_last_cleanup_epoch_ms - .load(Ordering::Relaxed); - if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 - && pool + { + let mut tracker = ping_tracker_ping.lock().await; + let now_epoch_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + let mut run_cleanup = false; + if let Some(pool) = pool_ping.upgrade() { + let last_cleanup_ms = pool .ping_tracker_last_cleanup_epoch_ms - .compare_exchange( - last_cleanup_ms, - now_epoch_ms, - Ordering::AcqRel, - Ordering::Relaxed, - ) - .is_ok() - { - run_cleanup = true; + .load(Ordering::Relaxed); + if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000 + && pool + .ping_tracker_last_cleanup_epoch_ms + .compare_exchange( + last_cleanup_ms, + now_epoch_ms, + Ordering::AcqRel, + Ordering::Relaxed, + ) + .is_ok() + { + run_cleanup = true; + } } + + if run_cleanup { + let before = tracker.len(); + tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); + let expired = before.saturating_sub(tracker.len()); + if expired > 0 { + stats_ping.increment_me_keepalive_timeout_by(expired as u64); + } + } + tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } ping_id = ping_id.wrapping_add(1); stats_ping.increment_me_keepalive_sent(); @@ -354,16 +366,6 @@ impl MePool { } break; } - let mut tracker = ping_tracker_ping.lock().await; - if run_cleanup { - let before = tracker.len(); - tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120)); - let expired = before.saturating_sub(tracker.len()); - if expired > 0 { - stats_ping.increment_me_keepalive_timeout_by(expired as u64); - } - } - tracker.insert(sent_id, (std::time::Instant::now(), writer_id)); } }); @@ -491,9 +493,11 @@ impl MePool { } pub(crate) async fn remove_writer_and_close_clients(self: &Arc, writer_id: u64) { - // Full client cleanup now happens inside `registry.writer_lost` to keep - // writer reap/remove paths strictly non-blocking per connection. - let _ = self.remove_writer_only(writer_id).await; + let conns = self.remove_writer_only(writer_id).await; + for bound in conns { + let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await; + let _ = self.registry.unregister(bound.conn_id).await; + } } pub(crate) async fn remove_writer_if_empty(self: &Arc, writer_id: u64) -> bool { @@ -535,11 +539,6 @@ impl MePool { self.conn_count.fetch_sub(1, Ordering::Relaxed); } } - // State invariant: - // - writer is removed from `self.writers` (pool visibility), - // - writer is removed from registry routing/binding maps via `writer_lost`. - // The close command below is only a best-effort accelerator for task shutdown. - // Cleanup progress must never depend on command-channel availability. let conns = self.registry.writer_lost(writer_id).await; { let mut tracker = self.ping_tracker.lock().await; @@ -547,25 +546,7 @@ impl MePool { } self.rtt_stats.lock().await.remove(&writer_id); if let Some(tx) = close_tx { - match tx.try_send(WriterCommand::Close) { - Ok(()) => {} - Err(TrySendError::Full(_)) => { - self.stats.increment_me_writer_close_signal_drop_total(); - self.stats - .increment_me_writer_close_signal_channel_full_total(); - debug!( - writer_id, - "Skipping close signal for removed writer: command channel is full" - ); - } - Err(TrySendError::Closed(_)) => { - self.stats.increment_me_writer_close_signal_drop_total(); - debug!( - writer_id, - "Skipping close signal for removed writer: command channel is closed" - ); - } - } + let _ = tx.send(WriterCommand::Close).await; } if trigger_refill && let Some(addr) = removed_addr