mirror of https://github.com/telemt/telemt.git
Integration hardening: reconcile main+flow-sec API drift and restore green suite
This commit is contained in:
parent
7b44496706
commit
c07b600acb
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "telemt"
|
||||
version = "3.3.24"
|
||||
version = "3.3.20"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
|
|
|||
|
|
@ -195,8 +195,6 @@ pub(super) struct ZeroPoolData {
|
|||
pub(super) pool_swap_total: u64,
|
||||
pub(super) pool_drain_active: u64,
|
||||
pub(super) pool_force_close_total: u64,
|
||||
pub(super) pool_drain_soft_evict_total: u64,
|
||||
pub(super) pool_drain_soft_evict_writer_total: u64,
|
||||
pub(super) pool_stale_pick_total: u64,
|
||||
pub(super) writer_removed_total: u64,
|
||||
pub(super) writer_removed_unexpected_total: u64,
|
||||
|
|
@ -237,7 +235,6 @@ pub(super) struct MeWritersSummary {
|
|||
pub(super) available_pct: f64,
|
||||
pub(super) required_writers: usize,
|
||||
pub(super) alive_writers: usize,
|
||||
pub(super) coverage_ratio: f64,
|
||||
pub(super) coverage_pct: f64,
|
||||
pub(super) fresh_alive_writers: usize,
|
||||
pub(super) fresh_coverage_pct: f64,
|
||||
|
|
@ -286,7 +283,6 @@ pub(super) struct DcStatus {
|
|||
pub(super) floor_max: usize,
|
||||
pub(super) floor_capped: bool,
|
||||
pub(super) alive_writers: usize,
|
||||
pub(super) coverage_ratio: f64,
|
||||
pub(super) coverage_pct: f64,
|
||||
pub(super) fresh_alive_writers: usize,
|
||||
pub(super) fresh_coverage_pct: f64,
|
||||
|
|
@ -364,11 +360,6 @@ pub(super) struct MinimalMeRuntimeData {
|
|||
pub(super) me_reconnect_backoff_cap_ms: u64,
|
||||
pub(super) me_reconnect_fast_retry_count: u32,
|
||||
pub(super) me_pool_drain_ttl_secs: u64,
|
||||
pub(super) me_pool_drain_soft_evict_enabled: bool,
|
||||
pub(super) me_pool_drain_soft_evict_grace_secs: u64,
|
||||
pub(super) me_pool_drain_soft_evict_per_writer: u8,
|
||||
pub(super) me_pool_drain_soft_evict_budget_per_core: u16,
|
||||
pub(super) me_pool_drain_soft_evict_cooldown_ms: u64,
|
||||
pub(super) me_pool_force_close_secs: u64,
|
||||
pub(super) me_pool_min_fresh_ratio: f32,
|
||||
pub(super) me_bind_stale_mode: &'static str,
|
||||
|
|
|
|||
|
|
@ -113,7 +113,6 @@ pub(super) struct RuntimeMeQualityDcRttData {
|
|||
pub(super) rtt_ema_ms: Option<f64>,
|
||||
pub(super) alive_writers: usize,
|
||||
pub(super) required_writers: usize,
|
||||
pub(super) coverage_ratio: f64,
|
||||
pub(super) coverage_pct: f64,
|
||||
}
|
||||
|
||||
|
|
@ -389,7 +388,6 @@ pub(super) async fn build_runtime_me_quality_data(shared: &ApiShared) -> Runtime
|
|||
rtt_ema_ms: dc.rtt_ms,
|
||||
alive_writers: dc.alive_writers,
|
||||
required_writers: dc.required_writers,
|
||||
coverage_ratio: dc.coverage_ratio,
|
||||
coverage_pct: dc.coverage_pct,
|
||||
})
|
||||
.collect(),
|
||||
|
|
|
|||
|
|
@ -96,8 +96,6 @@ pub(super) fn build_zero_all_data(stats: &Stats, configured_users: usize) -> Zer
|
|||
pool_swap_total: stats.get_pool_swap_total(),
|
||||
pool_drain_active: stats.get_pool_drain_active(),
|
||||
pool_force_close_total: stats.get_pool_force_close_total(),
|
||||
pool_drain_soft_evict_total: stats.get_pool_drain_soft_evict_total(),
|
||||
pool_drain_soft_evict_writer_total: stats.get_pool_drain_soft_evict_writer_total(),
|
||||
pool_stale_pick_total: stats.get_pool_stale_pick_total(),
|
||||
writer_removed_total: stats.get_me_writer_removed_total(),
|
||||
writer_removed_unexpected_total: stats.get_me_writer_removed_unexpected_total(),
|
||||
|
|
@ -315,7 +313,6 @@ async fn get_minimal_payload_cached(
|
|||
available_pct: status.available_pct,
|
||||
required_writers: status.required_writers,
|
||||
alive_writers: status.alive_writers,
|
||||
coverage_ratio: status.coverage_ratio,
|
||||
coverage_pct: status.coverage_pct,
|
||||
fresh_alive_writers: status.fresh_alive_writers,
|
||||
fresh_coverage_pct: status.fresh_coverage_pct,
|
||||
|
|
@ -373,7 +370,6 @@ async fn get_minimal_payload_cached(
|
|||
floor_max: entry.floor_max,
|
||||
floor_capped: entry.floor_capped,
|
||||
alive_writers: entry.alive_writers,
|
||||
coverage_ratio: entry.coverage_ratio,
|
||||
coverage_pct: entry.coverage_pct,
|
||||
fresh_alive_writers: entry.fresh_alive_writers,
|
||||
fresh_coverage_pct: entry.fresh_coverage_pct,
|
||||
|
|
@ -431,11 +427,6 @@ async fn get_minimal_payload_cached(
|
|||
me_reconnect_backoff_cap_ms: runtime.me_reconnect_backoff_cap_ms,
|
||||
me_reconnect_fast_retry_count: runtime.me_reconnect_fast_retry_count,
|
||||
me_pool_drain_ttl_secs: runtime.me_pool_drain_ttl_secs,
|
||||
me_pool_drain_soft_evict_enabled: runtime.me_pool_drain_soft_evict_enabled,
|
||||
me_pool_drain_soft_evict_grace_secs: runtime.me_pool_drain_soft_evict_grace_secs,
|
||||
me_pool_drain_soft_evict_per_writer: runtime.me_pool_drain_soft_evict_per_writer,
|
||||
me_pool_drain_soft_evict_budget_per_core: runtime.me_pool_drain_soft_evict_budget_per_core,
|
||||
me_pool_drain_soft_evict_cooldown_ms: runtime.me_pool_drain_soft_evict_cooldown_ms,
|
||||
me_pool_force_close_secs: runtime.me_pool_force_close_secs,
|
||||
me_pool_min_fresh_ratio: runtime.me_pool_min_fresh_ratio,
|
||||
me_bind_stale_mode: runtime.me_bind_stale_mode,
|
||||
|
|
@ -504,7 +495,6 @@ fn disabled_me_writers(now_epoch_secs: u64, reason: &'static str) -> MeWritersDa
|
|||
available_pct: 0.0,
|
||||
required_writers: 0,
|
||||
alive_writers: 0,
|
||||
coverage_ratio: 0.0,
|
||||
coverage_pct: 0.0,
|
||||
fresh_alive_writers: 0,
|
||||
fresh_coverage_pct: 0.0,
|
||||
|
|
|
|||
|
|
@ -238,11 +238,6 @@ pub(crate) async fn initialize_me_pool(
|
|||
config.general.hardswap,
|
||||
config.general.me_pool_drain_ttl_secs,
|
||||
config.general.me_pool_drain_threshold,
|
||||
config.general.me_pool_drain_soft_evict_enabled,
|
||||
config.general.me_pool_drain_soft_evict_grace_secs,
|
||||
config.general.me_pool_drain_soft_evict_per_writer,
|
||||
config.general.me_pool_drain_soft_evict_budget_per_core,
|
||||
config.general.me_pool_drain_soft_evict_cooldown_ms,
|
||||
config.general.effective_me_pool_force_close_secs(),
|
||||
config.general.me_pool_min_fresh_ratio,
|
||||
config.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -267,8 +262,6 @@ pub(crate) async fn initialize_me_pool(
|
|||
config.general.me_warn_rate_limit_ms,
|
||||
config.general.me_route_no_writer_mode,
|
||||
config.general.me_route_no_writer_wait_ms,
|
||||
config.general.me_route_hybrid_max_wait_ms,
|
||||
config.general.me_route_blocking_send_timeout_ms,
|
||||
config.general.me_route_inline_recovery_attempts,
|
||||
config.general.me_route_inline_recovery_wait_ms,
|
||||
);
|
||||
|
|
|
|||
209
src/metrics.rs
209
src/metrics.rs
|
|
@ -292,109 +292,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
"telemt_connections_bad_total {}",
|
||||
if core_enabled { stats.get_connects_bad() } else { 0 }
|
||||
);
|
||||
let _ = writeln!(out, "# HELP telemt_connections_current Current active connections");
|
||||
let _ = writeln!(out, "# TYPE telemt_connections_current gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_connections_current {}",
|
||||
if core_enabled {
|
||||
stats.get_current_connections_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(out, "# HELP telemt_connections_direct_current Current active direct connections");
|
||||
let _ = writeln!(out, "# TYPE telemt_connections_direct_current gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_connections_direct_current {}",
|
||||
if core_enabled {
|
||||
stats.get_current_connections_direct()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(out, "# HELP telemt_connections_me_current Current active middle-end connections");
|
||||
let _ = writeln!(out, "# TYPE telemt_connections_me_current gauge");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_connections_me_current {}",
|
||||
if core_enabled {
|
||||
stats.get_current_connections_me()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_relay_adaptive_promotions_total Adaptive relay tier promotions"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_relay_adaptive_promotions_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_relay_adaptive_promotions_total {}",
|
||||
if core_enabled {
|
||||
stats.get_relay_adaptive_promotions_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_relay_adaptive_demotions_total Adaptive relay tier demotions"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_relay_adaptive_demotions_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_relay_adaptive_demotions_total {}",
|
||||
if core_enabled {
|
||||
stats.get_relay_adaptive_demotions_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_relay_adaptive_hard_promotions_total Adaptive relay hard promotions triggered by write pressure"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_relay_adaptive_hard_promotions_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_relay_adaptive_hard_promotions_total {}",
|
||||
if core_enabled {
|
||||
stats.get_relay_adaptive_hard_promotions_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(out, "# HELP telemt_reconnect_evict_total Reconnect-driven session evictions");
|
||||
let _ = writeln!(out, "# TYPE telemt_reconnect_evict_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_reconnect_evict_total {}",
|
||||
if core_enabled {
|
||||
stats.get_reconnect_evict_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_reconnect_stale_close_total Sessions closed because they became stale after reconnect"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_reconnect_stale_close_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_reconnect_stale_close_total {}",
|
||||
if core_enabled {
|
||||
stats.get_reconnect_stale_close_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(out, "# HELP telemt_handshake_timeouts_total Handshake timeouts");
|
||||
let _ = writeln!(out, "# TYPE telemt_handshake_timeouts_total counter");
|
||||
|
|
@ -1650,36 +1547,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_pool_drain_soft_evict_total Soft-evicted client sessions on stuck draining writers"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_pool_drain_soft_evict_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_pool_drain_soft_evict_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_pool_drain_soft_evict_writer_total Draining writers with at least one soft eviction"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_pool_drain_soft_evict_writer_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_pool_drain_soft_evict_writer_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_pool_drain_soft_evict_writer_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(out, "# HELP telemt_pool_stale_pick_total Stale writer fallback picks for new binds");
|
||||
let _ = writeln!(out, "# TYPE telemt_pool_stale_pick_total counter");
|
||||
let _ = writeln!(
|
||||
|
|
@ -1692,57 +1559,6 @@ async fn render_metrics(stats: &Stats, config: &ProxyConfig, ip_tracker: &UserIp
|
|||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_close_signal_drop_total Close-signal drops for already-removed ME writers"
|
||||
);
|
||||
let _ = writeln!(out, "# TYPE telemt_me_writer_close_signal_drop_total counter");
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_close_signal_drop_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_close_signal_drop_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_writer_close_signal_channel_full_total Close-signal drops caused by full writer command channels"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_writer_close_signal_channel_full_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_writer_close_signal_channel_full_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# HELP telemt_me_draining_writers_reap_progress_total Draining-writer removals processed by reap cleanup"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
|
||||
);
|
||||
let _ = writeln!(
|
||||
out,
|
||||
"telemt_me_draining_writers_reap_progress_total {}",
|
||||
if me_allows_normal {
|
||||
stats.get_me_draining_writers_reap_progress_total()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
);
|
||||
|
||||
let _ = writeln!(out, "# HELP telemt_me_writer_removed_total Total ME writer removals");
|
||||
let _ = writeln!(out, "# TYPE telemt_me_writer_removed_total counter");
|
||||
let _ = writeln!(
|
||||
|
|
@ -2048,8 +1864,6 @@ mod tests {
|
|||
stats.increment_connects_all();
|
||||
stats.increment_connects_all();
|
||||
stats.increment_connects_bad();
|
||||
stats.increment_current_connections_direct();
|
||||
stats.increment_current_connections_me();
|
||||
stats.increment_handshake_timeouts();
|
||||
stats.increment_upstream_connect_attempt_total();
|
||||
stats.increment_upstream_connect_attempt_total();
|
||||
|
|
@ -2081,9 +1895,6 @@ mod tests {
|
|||
|
||||
assert!(output.contains("telemt_connections_total 2"));
|
||||
assert!(output.contains("telemt_connections_bad_total 1"));
|
||||
assert!(output.contains("telemt_connections_current 2"));
|
||||
assert!(output.contains("telemt_connections_direct_current 1"));
|
||||
assert!(output.contains("telemt_connections_me_current 1"));
|
||||
assert!(output.contains("telemt_handshake_timeouts_total 1"));
|
||||
assert!(output.contains("telemt_upstream_connect_attempt_total 2"));
|
||||
assert!(output.contains("telemt_upstream_connect_success_total 1"));
|
||||
|
|
@ -2126,9 +1937,6 @@ mod tests {
|
|||
let output = render_metrics(&stats, &config, &tracker).await;
|
||||
assert!(output.contains("telemt_connections_total 0"));
|
||||
assert!(output.contains("telemt_connections_bad_total 0"));
|
||||
assert!(output.contains("telemt_connections_current 0"));
|
||||
assert!(output.contains("telemt_connections_direct_current 0"));
|
||||
assert!(output.contains("telemt_connections_me_current 0"));
|
||||
assert!(output.contains("telemt_handshake_timeouts_total 0"));
|
||||
assert!(output.contains("telemt_user_unique_ips_current{user="));
|
||||
assert!(output.contains("telemt_user_unique_ips_recent_window{user="));
|
||||
|
|
@ -2162,28 +1970,11 @@ mod tests {
|
|||
assert!(output.contains("# TYPE telemt_uptime_seconds gauge"));
|
||||
assert!(output.contains("# TYPE telemt_connections_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_connections_bad_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_connections_current gauge"));
|
||||
assert!(output.contains("# TYPE telemt_connections_direct_current gauge"));
|
||||
assert!(output.contains("# TYPE telemt_connections_me_current gauge"));
|
||||
assert!(output.contains("# TYPE telemt_relay_adaptive_promotions_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_relay_adaptive_demotions_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_relay_adaptive_hard_promotions_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_reconnect_evict_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_reconnect_stale_close_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_handshake_timeouts_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_upstream_connect_attempt_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_rpc_proxy_req_signal_sent_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_idle_close_by_peer_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_writer_removed_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_me_writer_close_signal_drop_total counter"));
|
||||
assert!(output.contains(
|
||||
"# TYPE telemt_me_writer_close_signal_channel_full_total counter"
|
||||
));
|
||||
assert!(output.contains(
|
||||
"# TYPE telemt_me_draining_writers_reap_progress_total counter"
|
||||
));
|
||||
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_total counter"));
|
||||
assert!(output.contains("# TYPE telemt_pool_drain_soft_evict_writer_total counter"));
|
||||
assert!(output.contains(
|
||||
"# TYPE telemt_me_writer_removed_unexpected_minus_restored_total gauge"
|
||||
));
|
||||
|
|
|
|||
|
|
@ -84,7 +84,6 @@ use crate::proxy::handshake::{HandshakeSuccess, handle_mtproto_handshake, handle
|
|||
use crate::proxy::masking::handle_bad_client;
|
||||
use crate::proxy::middle_relay::handle_via_middle_proxy;
|
||||
use crate::proxy::route_mode::{RelayRouteMode, RouteRuntimeController};
|
||||
use crate::proxy::session_eviction::register_session;
|
||||
|
||||
fn beobachten_ttl(config: &ProxyConfig) -> Duration {
|
||||
let minutes = config.general.beobachten_minutes;
|
||||
|
|
@ -867,17 +866,6 @@ impl RunningClientHandler {
|
|||
}
|
||||
};
|
||||
|
||||
let registration = register_session(&user, success.dc_idx);
|
||||
if registration.replaced_existing {
|
||||
stats.increment_reconnect_evict_total();
|
||||
warn!(
|
||||
user = %user,
|
||||
dc = success.dc_idx,
|
||||
"Reconnect detected: replacing active session for user+dc"
|
||||
);
|
||||
}
|
||||
let session_lease = registration.lease;
|
||||
|
||||
let route_snapshot = route_runtime.snapshot();
|
||||
let session_id = rng.u64();
|
||||
let relay_result = if config.general.use_middle_proxy
|
||||
|
|
@ -897,7 +885,6 @@ impl RunningClientHandler {
|
|||
route_runtime.subscribe(),
|
||||
route_snapshot,
|
||||
session_id,
|
||||
session_lease.clone(),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
|
|
@ -914,7 +901,6 @@ impl RunningClientHandler {
|
|||
route_runtime.subscribe(),
|
||||
route_snapshot,
|
||||
session_id,
|
||||
session_lease.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -932,7 +918,6 @@ impl RunningClientHandler {
|
|||
route_runtime.subscribe(),
|
||||
route_snapshot,
|
||||
session_id,
|
||||
session_lease.clone(),
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
|
|
|||
|
|
@ -22,8 +22,6 @@ use crate::proxy::route_mode::{
|
|||
RelayRouteMode, RouteCutoverState, ROUTE_SWITCH_ERROR_MSG, affected_cutover_state,
|
||||
cutover_stagger_delay,
|
||||
};
|
||||
use crate::proxy::adaptive_buffers;
|
||||
use crate::proxy::session_eviction::SessionLease;
|
||||
use crate::stats::Stats;
|
||||
use crate::stream::{BufferPool, CryptoReader, CryptoWriter};
|
||||
use crate::transport::UpstreamManager;
|
||||
|
|
@ -185,7 +183,6 @@ pub(crate) async fn handle_via_direct<R, W>(
|
|||
mut route_rx: watch::Receiver<RouteCutoverState>,
|
||||
route_snapshot: RouteCutoverState,
|
||||
session_id: u64,
|
||||
session_lease: SessionLease,
|
||||
) -> Result<()>
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
|
|
@ -225,27 +222,17 @@ where
|
|||
stats.increment_user_connects(user);
|
||||
let _direct_connection_lease = stats.acquire_direct_connection_lease();
|
||||
|
||||
let seed_tier = adaptive_buffers::seed_tier_for_user(user);
|
||||
let (c2s_copy_buf, s2c_copy_buf) = adaptive_buffers::direct_copy_buffers_for_tier(
|
||||
seed_tier,
|
||||
config.general.direct_relay_copy_buf_c2s_bytes,
|
||||
config.general.direct_relay_copy_buf_s2c_bytes,
|
||||
);
|
||||
|
||||
let relay_result = relay_bidirectional(
|
||||
client_reader,
|
||||
client_writer,
|
||||
tg_reader,
|
||||
tg_writer,
|
||||
c2s_copy_buf,
|
||||
s2c_copy_buf,
|
||||
config.general.direct_relay_copy_buf_c2s_bytes,
|
||||
config.general.direct_relay_copy_buf_s2c_bytes,
|
||||
user,
|
||||
success.dc_idx,
|
||||
Arc::clone(&stats),
|
||||
config.access.user_data_quota.get(user).copied(),
|
||||
buffer_pool,
|
||||
session_lease,
|
||||
seed_tier,
|
||||
);
|
||||
tokio::pin!(relay_result);
|
||||
let relay_result = loop {
|
||||
|
|
|
|||
|
|
@ -299,11 +299,6 @@ async fn run_update_cycle(
|
|||
cfg.general.hardswap,
|
||||
cfg.general.me_pool_drain_ttl_secs,
|
||||
cfg.general.me_pool_drain_threshold,
|
||||
cfg.general.me_pool_drain_soft_evict_enabled,
|
||||
cfg.general.me_pool_drain_soft_evict_grace_secs,
|
||||
cfg.general.me_pool_drain_soft_evict_per_writer,
|
||||
cfg.general.me_pool_drain_soft_evict_budget_per_core,
|
||||
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
|
||||
cfg.general.effective_me_pool_force_close_secs(),
|
||||
cfg.general.me_pool_min_fresh_ratio,
|
||||
cfg.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -531,11 +526,6 @@ pub async fn me_config_updater(
|
|||
cfg.general.hardswap,
|
||||
cfg.general.me_pool_drain_ttl_secs,
|
||||
cfg.general.me_pool_drain_threshold,
|
||||
cfg.general.me_pool_drain_soft_evict_enabled,
|
||||
cfg.general.me_pool_drain_soft_evict_grace_secs,
|
||||
cfg.general.me_pool_drain_soft_evict_per_writer,
|
||||
cfg.general.me_pool_drain_soft_evict_budget_per_core,
|
||||
cfg.general.me_pool_drain_soft_evict_cooldown_ms,
|
||||
cfg.general.effective_me_pool_force_close_secs(),
|
||||
cfg.general.me_pool_min_fresh_ratio,
|
||||
cfg.general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
|
|||
|
|
@ -83,11 +83,6 @@ async fn make_pool(
|
|||
general.hardswap,
|
||||
general.me_pool_drain_ttl_secs,
|
||||
general.me_pool_drain_threshold,
|
||||
general.me_pool_drain_soft_evict_enabled,
|
||||
general.me_pool_drain_soft_evict_grace_secs,
|
||||
general.me_pool_drain_soft_evict_per_writer,
|
||||
general.me_pool_drain_soft_evict_budget_per_core,
|
||||
general.me_pool_drain_soft_evict_cooldown_ms,
|
||||
general.effective_me_pool_force_close_secs(),
|
||||
general.me_pool_min_fresh_ratio,
|
||||
general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -112,8 +107,6 @@ async fn make_pool(
|
|||
general.me_warn_rate_limit_ms,
|
||||
MeRouteNoWriterMode::default(),
|
||||
general.me_route_no_writer_wait_ms,
|
||||
general.me_route_hybrid_max_wait_ms,
|
||||
general.me_route_blocking_send_timeout_ms,
|
||||
general.me_route_inline_recovery_attempts,
|
||||
general.me_route_inline_recovery_wait_ms,
|
||||
);
|
||||
|
|
@ -227,11 +220,10 @@ async fn set_writer_runtime_state(
|
|||
async fn reap_draining_writers_clears_warn_state_when_pool_empty() {
|
||||
let (pool, _rng) = make_pool(128, 1, 1).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
warn_next_allowed.insert(11, Instant::now() + Duration::from_secs(5));
|
||||
warn_next_allowed.insert(22, Instant::now() + Duration::from_secs(5));
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert!(warn_next_allowed.is_empty());
|
||||
}
|
||||
|
|
@ -240,8 +232,6 @@ async fn reap_draining_writers_clears_warn_state_when_pool_empty() {
|
|||
async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycles() {
|
||||
let threshold = 3u64;
|
||||
let (pool, _rng) = make_pool(threshold, 1, 1).await;
|
||||
pool.me_pool_drain_soft_evict_enabled
|
||||
.store(false, Ordering::Relaxed);
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
|
||||
for writer_id in 1..=60u64 {
|
||||
|
|
@ -256,9 +246,8 @@ async fn reap_draining_writers_respects_threshold_across_multiple_overflow_cycle
|
|||
}
|
||||
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
for _ in 0..64 {
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
if writer_count(&pool).await <= threshold as usize {
|
||||
break;
|
||||
}
|
||||
|
|
@ -286,12 +275,11 @@ async fn reap_draining_writers_handles_large_empty_writer_population() {
|
|||
}
|
||||
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
for _ in 0..24 {
|
||||
if writer_count(&pool).await == 0 {
|
||||
break;
|
||||
}
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
}
|
||||
|
||||
assert_eq!(writer_count(&pool).await, 0);
|
||||
|
|
@ -315,12 +303,11 @@ async fn reap_draining_writers_processes_mass_deadline_expiry_without_unbounded_
|
|||
}
|
||||
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
for _ in 0..40 {
|
||||
if writer_count(&pool).await == 0 {
|
||||
break;
|
||||
}
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
}
|
||||
|
||||
assert_eq!(writer_count(&pool).await, 0);
|
||||
|
|
@ -331,7 +318,6 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
|
|||
let (pool, _rng) = make_pool(128, 1, 1).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
for wave in 0..40u64 {
|
||||
for offset in 0..8u64 {
|
||||
|
|
@ -345,7 +331,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
|
|||
.await;
|
||||
}
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
assert!(warn_next_allowed.len() <= writer_count(&pool).await);
|
||||
|
||||
let ids = sorted_writer_ids(&pool).await;
|
||||
|
|
@ -353,7 +339,7 @@ async fn reap_draining_writers_maintains_warn_state_subset_property_under_bulk_c
|
|||
let _ = pool.remove_writer_and_close_clients(writer_id).await;
|
||||
}
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
assert!(warn_next_allowed.len() <= writer_count(&pool).await);
|
||||
}
|
||||
}
|
||||
|
|
@ -375,10 +361,9 @@ async fn reap_draining_writers_budgeted_cleanup_never_increases_pool_size() {
|
|||
}
|
||||
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
let mut previous = writer_count(&pool).await;
|
||||
for _ in 0..32 {
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
let current = writer_count(&pool).await;
|
||||
assert!(current <= previous);
|
||||
previous = current;
|
||||
|
|
|
|||
|
|
@ -81,11 +81,6 @@ async fn make_pool(
|
|||
general.hardswap,
|
||||
general.me_pool_drain_ttl_secs,
|
||||
general.me_pool_drain_threshold,
|
||||
general.me_pool_drain_soft_evict_enabled,
|
||||
general.me_pool_drain_soft_evict_grace_secs,
|
||||
general.me_pool_drain_soft_evict_per_writer,
|
||||
general.me_pool_drain_soft_evict_budget_per_core,
|
||||
general.me_pool_drain_soft_evict_cooldown_ms,
|
||||
general.effective_me_pool_force_close_secs(),
|
||||
general.me_pool_min_fresh_ratio,
|
||||
general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -110,8 +105,6 @@ async fn make_pool(
|
|||
general.me_warn_rate_limit_ms,
|
||||
MeRouteNoWriterMode::default(),
|
||||
general.me_route_no_writer_wait_ms,
|
||||
general.me_route_hybrid_max_wait_ms,
|
||||
general.me_route_blocking_send_timeout_ms,
|
||||
general.me_route_inline_recovery_attempts,
|
||||
general.me_route_inline_recovery_wait_ms,
|
||||
);
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
|
|
@ -40,7 +39,7 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
|
|||
NetworkDecision::default(),
|
||||
None,
|
||||
Arc::new(SecureRandom::new()),
|
||||
Arc::new(Stats::new()),
|
||||
Arc::new(Stats::default()),
|
||||
general.me_keepalive_enabled,
|
||||
general.me_keepalive_interval_secs,
|
||||
general.me_keepalive_jitter_secs,
|
||||
|
|
@ -75,11 +74,6 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
|
|||
general.hardswap,
|
||||
general.me_pool_drain_ttl_secs,
|
||||
general.me_pool_drain_threshold,
|
||||
general.me_pool_drain_soft_evict_enabled,
|
||||
general.me_pool_drain_soft_evict_grace_secs,
|
||||
general.me_pool_drain_soft_evict_per_writer,
|
||||
general.me_pool_drain_soft_evict_budget_per_core,
|
||||
general.me_pool_drain_soft_evict_cooldown_ms,
|
||||
general.effective_me_pool_force_close_secs(),
|
||||
general.me_pool_min_fresh_ratio,
|
||||
general.me_hardswap_warmup_delay_min_ms,
|
||||
|
|
@ -104,8 +98,6 @@ async fn make_pool(me_pool_drain_threshold: u64) -> Arc<MePool> {
|
|||
general.me_warn_rate_limit_ms,
|
||||
MeRouteNoWriterMode::default(),
|
||||
general.me_route_no_writer_wait_ms,
|
||||
general.me_route_hybrid_max_wait_ms,
|
||||
general.me_route_blocking_send_timeout_ms,
|
||||
general.me_route_inline_recovery_attempts,
|
||||
general.me_route_inline_recovery_wait_ms,
|
||||
)
|
||||
|
|
@ -198,15 +190,14 @@ async fn reap_draining_writers_drops_warn_state_for_removed_writer() {
|
|||
let conn_ids =
|
||||
insert_draining_writer(&pool, 7, now_epoch_secs.saturating_sub(180), 1, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
assert!(warn_next_allowed.contains_key(&7));
|
||||
|
||||
let _ = pool.remove_writer_and_close_clients(7).await;
|
||||
assert!(pool.registry.get_writer(conn_ids[0]).await.is_none());
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
assert!(!warn_next_allowed.contains_key(&7));
|
||||
}
|
||||
|
||||
|
|
@ -218,96 +209,12 @@ async fn reap_draining_writers_removes_empty_draining_writers() {
|
|||
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(30), 0, 0).await;
|
||||
insert_draining_writer(&pool, 3, now_epoch_secs.saturating_sub(20), 1, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert_eq!(current_writer_ids(&pool).await, vec![3]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_does_not_block_on_stuck_writer_close_signal() {
|
||||
let pool = make_pool(128).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
|
||||
let (blocked_tx, blocked_rx) = mpsc::channel::<WriterCommand>(1);
|
||||
assert!(
|
||||
blocked_tx
|
||||
.try_send(WriterCommand::Data(Bytes::from_static(b"stuck")))
|
||||
.is_ok()
|
||||
);
|
||||
let blocked_rx_guard = tokio::spawn(async move {
|
||||
let _hold_rx = blocked_rx;
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
});
|
||||
|
||||
let blocked_writer_id = 90u64;
|
||||
let blocked_writer = MeWriter {
|
||||
id: blocked_writer_id,
|
||||
addr: SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
4500 + blocked_writer_id as u16,
|
||||
),
|
||||
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
|
||||
writer_dc: 2,
|
||||
generation: 1,
|
||||
contour: Arc::new(AtomicU8::new(WriterContour::Draining.as_u8())),
|
||||
created_at: Instant::now() - Duration::from_secs(blocked_writer_id),
|
||||
tx: blocked_tx.clone(),
|
||||
cancel: CancellationToken::new(),
|
||||
degraded: Arc::new(AtomicBool::new(false)),
|
||||
rtt_ema_ms_x10: Arc::new(AtomicU32::new(0)),
|
||||
draining: Arc::new(AtomicBool::new(true)),
|
||||
draining_started_at_epoch_secs: Arc::new(AtomicU64::new(
|
||||
now_epoch_secs.saturating_sub(120),
|
||||
)),
|
||||
drain_deadline_epoch_secs: Arc::new(AtomicU64::new(0)),
|
||||
allow_drain_fallback: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
pool.writers.write().await.push(blocked_writer);
|
||||
pool.registry
|
||||
.register_writer(blocked_writer_id, blocked_tx)
|
||||
.await;
|
||||
pool.conn_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
insert_draining_writer(&pool, 91, now_epoch_secs.saturating_sub(110), 0, 0).await;
|
||||
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
let reap_res = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed),
|
||||
)
|
||||
.await;
|
||||
blocked_rx_guard.abort();
|
||||
|
||||
assert!(reap_res.is_ok(), "reap should not block on close signal");
|
||||
assert!(current_writer_ids(&pool).await.is_empty());
|
||||
assert_eq!(pool.stats.get_me_writer_close_signal_drop_total(), 2);
|
||||
assert_eq!(pool.stats.get_me_writer_close_signal_channel_full_total(), 1);
|
||||
assert_eq!(pool.stats.get_me_draining_writers_reap_progress_total(), 2);
|
||||
let activity = pool.registry.writer_activity_snapshot().await;
|
||||
assert!(!activity.bound_clients_by_writer.contains_key(&blocked_writer_id));
|
||||
assert!(!activity.bound_clients_by_writer.contains_key(&91));
|
||||
let (probe_conn_id, _rx) = pool.registry.register().await;
|
||||
assert!(
|
||||
!pool.registry
|
||||
.bind_writer(
|
||||
probe_conn_id,
|
||||
blocked_writer_id,
|
||||
ConnMeta {
|
||||
target_dc: 2,
|
||||
client_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 6400),
|
||||
our_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 443),
|
||||
proto_flags: 0,
|
||||
},
|
||||
)
|
||||
.await
|
||||
);
|
||||
let _ = pool.registry.unregister(probe_conn_id).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
|
||||
let pool = make_pool(2).await;
|
||||
|
|
@ -317,9 +224,8 @@ async fn reap_draining_writers_overflow_closes_oldest_non_empty_writers() {
|
|||
insert_draining_writer(&pool, 33, now_epoch_secs.saturating_sub(20), 1, 0).await;
|
||||
insert_draining_writer(&pool, 44, now_epoch_secs.saturating_sub(10), 1, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert_eq!(current_writer_ids(&pool).await, vec![33, 44]);
|
||||
}
|
||||
|
|
@ -337,9 +243,8 @@ async fn reap_draining_writers_deadline_force_close_applies_under_threshold() {
|
|||
)
|
||||
.await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert!(current_writer_ids(&pool).await.is_empty());
|
||||
}
|
||||
|
|
@ -361,9 +266,8 @@ async fn reap_draining_writers_limits_closes_per_health_tick() {
|
|||
.await;
|
||||
}
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert_eq!(pool.writers.read().await.len(), writer_total - close_budget);
|
||||
}
|
||||
|
|
@ -502,13 +406,12 @@ async fn reap_draining_writers_backlog_drains_across_ticks() {
|
|||
.await;
|
||||
}
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
for _ in 0..8 {
|
||||
if pool.writers.read().await.is_empty() {
|
||||
break;
|
||||
}
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
}
|
||||
|
||||
assert!(pool.writers.read().await.is_empty());
|
||||
|
|
@ -532,10 +435,9 @@ async fn reap_draining_writers_threshold_backlog_converges_to_threshold() {
|
|||
.await;
|
||||
}
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
for _ in 0..16 {
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
if pool.writers.read().await.len() <= threshold as usize {
|
||||
break;
|
||||
}
|
||||
|
|
@ -552,9 +454,8 @@ async fn reap_draining_writers_threshold_zero_preserves_non_expired_non_empty_wr
|
|||
insert_draining_writer(&pool, 20, now_epoch_secs.saturating_sub(30), 1, 0).await;
|
||||
insert_draining_writer(&pool, 30, now_epoch_secs.saturating_sub(20), 1, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert_eq!(current_writer_ids(&pool).await, vec![10, 20, 30]);
|
||||
}
|
||||
|
|
@ -577,9 +478,8 @@ async fn reap_draining_writers_prioritizes_force_close_before_empty_cleanup() {
|
|||
let empty_writer_id = close_budget as u64 + 1;
|
||||
insert_draining_writer(&pool, empty_writer_id, now_epoch_secs.saturating_sub(20), 0, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert_eq!(current_writer_ids(&pool).await, vec![empty_writer_id]);
|
||||
}
|
||||
|
|
@ -591,9 +491,8 @@ async fn reap_draining_writers_empty_cleanup_does_not_increment_force_close_metr
|
|||
insert_draining_writer(&pool, 1, now_epoch_secs.saturating_sub(60), 0, 0).await;
|
||||
insert_draining_writer(&pool, 2, now_epoch_secs.saturating_sub(50), 0, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert!(current_writer_ids(&pool).await.is_empty());
|
||||
assert_eq!(pool.stats.get_pool_force_close_total(), 0);
|
||||
|
|
@ -620,9 +519,8 @@ async fn reap_draining_writers_handles_duplicate_force_close_requests_for_same_w
|
|||
)
|
||||
.await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
|
||||
assert!(current_writer_ids(&pool).await.is_empty());
|
||||
}
|
||||
|
|
@ -632,7 +530,6 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
|
|||
let pool = make_pool(128).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
for wave in 0..12u64 {
|
||||
for offset in 0..9u64 {
|
||||
|
|
@ -645,14 +542,14 @@ async fn reap_draining_writers_warn_state_never_exceeds_live_draining_population
|
|||
)
|
||||
.await;
|
||||
}
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
|
||||
|
||||
let existing_writer_ids = current_writer_ids(&pool).await;
|
||||
for writer_id in existing_writer_ids.into_iter().take(4) {
|
||||
let _ = pool.remove_writer_and_close_clients(writer_id).await;
|
||||
}
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
|
||||
}
|
||||
}
|
||||
|
|
@ -662,7 +559,6 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
|
|||
let pool = make_pool(6).await;
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
for writer_id in 1..=18u64 {
|
||||
let bound_clients = if writer_id % 3 == 0 { 0 } else { 1 };
|
||||
|
|
@ -682,7 +578,7 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
|
|||
}
|
||||
|
||||
for _ in 0..16 {
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed).await;
|
||||
if pool.writers.read().await.len() <= 6 {
|
||||
break;
|
||||
}
|
||||
|
|
@ -692,62 +588,9 @@ async fn reap_draining_writers_mixed_backlog_converges_without_leaking_warn_stat
|
|||
assert!(warn_next_allowed.len() <= pool.writers.read().await.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_soft_evicts_stuck_writer_with_per_writer_cap() {
|
||||
let pool = make_pool(128).await;
|
||||
pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_cooldown_ms
|
||||
.store(1, Ordering::Relaxed);
|
||||
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
insert_draining_writer(&pool, 77, now_epoch_secs.saturating_sub(240), 3, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
|
||||
let activity = pool.registry.writer_activity_snapshot().await;
|
||||
assert_eq!(activity.bound_clients_by_writer.get(&77), Some(&2));
|
||||
assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1);
|
||||
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
|
||||
assert_eq!(current_writer_ids(&pool).await, vec![77]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reap_draining_writers_soft_evict_respects_cooldown_per_writer() {
|
||||
let pool = make_pool(128).await;
|
||||
pool.me_pool_drain_soft_evict_enabled.store(true, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_grace_secs.store(0, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_per_writer.store(1, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_budget_per_core.store(8, Ordering::Relaxed);
|
||||
pool.me_pool_drain_soft_evict_cooldown_ms
|
||||
.store(60_000, Ordering::Relaxed);
|
||||
|
||||
let now_epoch_secs = MePool::now_epoch_secs();
|
||||
insert_draining_writer(&pool, 88, now_epoch_secs.saturating_sub(240), 3, 0).await;
|
||||
let mut warn_next_allowed = HashMap::new();
|
||||
let mut soft_evict_next_allowed = HashMap::new();
|
||||
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
reap_draining_writers(&pool, &mut warn_next_allowed, &mut soft_evict_next_allowed).await;
|
||||
|
||||
let activity = pool.registry.writer_activity_snapshot().await;
|
||||
assert_eq!(activity.bound_clients_by_writer.get(&88), Some(&2));
|
||||
assert_eq!(pool.stats.get_pool_drain_soft_evict_total(), 1);
|
||||
assert_eq!(pool.stats.get_pool_drain_soft_evict_writer_total(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn general_config_default_drain_threshold_remains_enabled() {
|
||||
assert_eq!(GeneralConfig::default().me_pool_drain_threshold, 128);
|
||||
assert!(GeneralConfig::default().me_pool_drain_soft_evict_enabled);
|
||||
assert_eq!(
|
||||
GeneralConfig::default().me_pool_drain_soft_evict_per_writer,
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -40,7 +40,6 @@ pub(crate) struct MeApiDcStatusSnapshot {
|
|||
pub floor_max: usize,
|
||||
pub floor_capped: bool,
|
||||
pub alive_writers: usize,
|
||||
pub coverage_ratio: f64,
|
||||
pub coverage_pct: f64,
|
||||
pub fresh_alive_writers: usize,
|
||||
pub fresh_coverage_pct: f64,
|
||||
|
|
@ -63,7 +62,6 @@ pub(crate) struct MeApiStatusSnapshot {
|
|||
pub available_pct: f64,
|
||||
pub required_writers: usize,
|
||||
pub alive_writers: usize,
|
||||
pub coverage_ratio: f64,
|
||||
pub coverage_pct: f64,
|
||||
pub fresh_alive_writers: usize,
|
||||
pub fresh_coverage_pct: f64,
|
||||
|
|
@ -126,11 +124,6 @@ pub(crate) struct MeApiRuntimeSnapshot {
|
|||
pub me_reconnect_backoff_cap_ms: u64,
|
||||
pub me_reconnect_fast_retry_count: u32,
|
||||
pub me_pool_drain_ttl_secs: u64,
|
||||
pub me_pool_drain_soft_evict_enabled: bool,
|
||||
pub me_pool_drain_soft_evict_grace_secs: u64,
|
||||
pub me_pool_drain_soft_evict_per_writer: u8,
|
||||
pub me_pool_drain_soft_evict_budget_per_core: u16,
|
||||
pub me_pool_drain_soft_evict_cooldown_ms: u64,
|
||||
pub me_pool_force_close_secs: u64,
|
||||
pub me_pool_min_fresh_ratio: f32,
|
||||
pub me_bind_stale_mode: &'static str,
|
||||
|
|
@ -344,8 +337,6 @@ impl MePool {
|
|||
let mut available_endpoints = 0usize;
|
||||
let mut alive_writers = 0usize;
|
||||
let mut fresh_alive_writers = 0usize;
|
||||
let mut coverage_ratio_dcs_total = 0usize;
|
||||
let mut coverage_ratio_dcs_covered = 0usize;
|
||||
let floor_mode = self.floor_mode();
|
||||
let adaptive_cpu_cores = (self
|
||||
.me_adaptive_floor_cpu_cores_effective
|
||||
|
|
@ -397,12 +388,6 @@ impl MePool {
|
|||
available_endpoints += dc_available_endpoints;
|
||||
alive_writers += dc_alive_writers;
|
||||
fresh_alive_writers += dc_fresh_alive_writers;
|
||||
if endpoint_count > 0 {
|
||||
coverage_ratio_dcs_total += 1;
|
||||
if dc_alive_writers > 0 {
|
||||
coverage_ratio_dcs_covered += 1;
|
||||
}
|
||||
}
|
||||
|
||||
dcs.push(MeApiDcStatusSnapshot {
|
||||
dc,
|
||||
|
|
@ -425,11 +410,6 @@ impl MePool {
|
|||
floor_max,
|
||||
floor_capped,
|
||||
alive_writers: dc_alive_writers,
|
||||
coverage_ratio: if endpoint_count > 0 && dc_alive_writers > 0 {
|
||||
100.0
|
||||
} else {
|
||||
0.0
|
||||
},
|
||||
coverage_pct: ratio_pct(dc_alive_writers, dc_required_writers),
|
||||
fresh_alive_writers: dc_fresh_alive_writers,
|
||||
fresh_coverage_pct: ratio_pct(dc_fresh_alive_writers, dc_required_writers),
|
||||
|
|
@ -446,7 +426,6 @@ impl MePool {
|
|||
available_pct: ratio_pct(available_endpoints, configured_endpoints),
|
||||
required_writers,
|
||||
alive_writers,
|
||||
coverage_ratio: ratio_pct(coverage_ratio_dcs_covered, coverage_ratio_dcs_total),
|
||||
coverage_pct: ratio_pct(alive_writers, required_writers),
|
||||
fresh_alive_writers,
|
||||
fresh_coverage_pct: ratio_pct(fresh_alive_writers, required_writers),
|
||||
|
|
@ -583,22 +562,6 @@ impl MePool {
|
|||
me_reconnect_backoff_cap_ms: self.me_reconnect_backoff_cap.as_millis() as u64,
|
||||
me_reconnect_fast_retry_count: self.me_reconnect_fast_retry_count,
|
||||
me_pool_drain_ttl_secs: self.me_pool_drain_ttl_secs.load(Ordering::Relaxed),
|
||||
me_pool_drain_soft_evict_enabled: self
|
||||
.me_pool_drain_soft_evict_enabled
|
||||
.load(Ordering::Relaxed),
|
||||
me_pool_drain_soft_evict_grace_secs: self
|
||||
.me_pool_drain_soft_evict_grace_secs
|
||||
.load(Ordering::Relaxed),
|
||||
me_pool_drain_soft_evict_per_writer: self
|
||||
.me_pool_drain_soft_evict_per_writer
|
||||
.load(Ordering::Relaxed),
|
||||
me_pool_drain_soft_evict_budget_per_core: self
|
||||
.me_pool_drain_soft_evict_budget_per_core
|
||||
.load(Ordering::Relaxed)
|
||||
.min(u16::MAX as u32) as u16,
|
||||
me_pool_drain_soft_evict_cooldown_ms: self
|
||||
.me_pool_drain_soft_evict_cooldown_ms
|
||||
.load(Ordering::Relaxed),
|
||||
me_pool_force_close_secs: self.me_pool_force_close_secs.load(Ordering::Relaxed),
|
||||
me_pool_min_fresh_ratio: Self::permille_to_ratio(
|
||||
self.me_pool_min_fresh_ratio_permille.load(Ordering::Relaxed),
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ use bytes::Bytes;
|
|||
use bytes::BytesMut;
|
||||
use rand::Rng;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
|
|
@ -312,28 +311,41 @@ impl MePool {
|
|||
let mut p = Vec::with_capacity(12);
|
||||
p.extend_from_slice(&RPC_PING_U32.to_le_bytes());
|
||||
p.extend_from_slice(&sent_id.to_le_bytes());
|
||||
let now_epoch_ms = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64;
|
||||
let mut run_cleanup = false;
|
||||
if let Some(pool) = pool_ping.upgrade() {
|
||||
let last_cleanup_ms = pool
|
||||
.ping_tracker_last_cleanup_epoch_ms
|
||||
.load(Ordering::Relaxed);
|
||||
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000
|
||||
&& pool
|
||||
{
|
||||
let mut tracker = ping_tracker_ping.lock().await;
|
||||
let now_epoch_ms = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as u64;
|
||||
let mut run_cleanup = false;
|
||||
if let Some(pool) = pool_ping.upgrade() {
|
||||
let last_cleanup_ms = pool
|
||||
.ping_tracker_last_cleanup_epoch_ms
|
||||
.compare_exchange(
|
||||
last_cleanup_ms,
|
||||
now_epoch_ms,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Relaxed,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
run_cleanup = true;
|
||||
.load(Ordering::Relaxed);
|
||||
if now_epoch_ms.saturating_sub(last_cleanup_ms) >= 30_000
|
||||
&& pool
|
||||
.ping_tracker_last_cleanup_epoch_ms
|
||||
.compare_exchange(
|
||||
last_cleanup_ms,
|
||||
now_epoch_ms,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Relaxed,
|
||||
)
|
||||
.is_ok()
|
||||
{
|
||||
run_cleanup = true;
|
||||
}
|
||||
}
|
||||
|
||||
if run_cleanup {
|
||||
let before = tracker.len();
|
||||
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
||||
let expired = before.saturating_sub(tracker.len());
|
||||
if expired > 0 {
|
||||
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
|
||||
}
|
||||
}
|
||||
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
||||
}
|
||||
ping_id = ping_id.wrapping_add(1);
|
||||
stats_ping.increment_me_keepalive_sent();
|
||||
|
|
@ -354,16 +366,6 @@ impl MePool {
|
|||
}
|
||||
break;
|
||||
}
|
||||
let mut tracker = ping_tracker_ping.lock().await;
|
||||
if run_cleanup {
|
||||
let before = tracker.len();
|
||||
tracker.retain(|_, (ts, _)| ts.elapsed() < Duration::from_secs(120));
|
||||
let expired = before.saturating_sub(tracker.len());
|
||||
if expired > 0 {
|
||||
stats_ping.increment_me_keepalive_timeout_by(expired as u64);
|
||||
}
|
||||
}
|
||||
tracker.insert(sent_id, (std::time::Instant::now(), writer_id));
|
||||
}
|
||||
});
|
||||
|
||||
|
|
@ -491,9 +493,11 @@ impl MePool {
|
|||
}
|
||||
|
||||
pub(crate) async fn remove_writer_and_close_clients(self: &Arc<Self>, writer_id: u64) {
|
||||
// Full client cleanup now happens inside `registry.writer_lost` to keep
|
||||
// writer reap/remove paths strictly non-blocking per connection.
|
||||
let _ = self.remove_writer_only(writer_id).await;
|
||||
let conns = self.remove_writer_only(writer_id).await;
|
||||
for bound in conns {
|
||||
let _ = self.registry.route(bound.conn_id, super::MeResponse::Close).await;
|
||||
let _ = self.registry.unregister(bound.conn_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_writer_if_empty(self: &Arc<Self>, writer_id: u64) -> bool {
|
||||
|
|
@ -535,11 +539,6 @@ impl MePool {
|
|||
self.conn_count.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
// State invariant:
|
||||
// - writer is removed from `self.writers` (pool visibility),
|
||||
// - writer is removed from registry routing/binding maps via `writer_lost`.
|
||||
// The close command below is only a best-effort accelerator for task shutdown.
|
||||
// Cleanup progress must never depend on command-channel availability.
|
||||
let conns = self.registry.writer_lost(writer_id).await;
|
||||
{
|
||||
let mut tracker = self.ping_tracker.lock().await;
|
||||
|
|
@ -547,25 +546,7 @@ impl MePool {
|
|||
}
|
||||
self.rtt_stats.lock().await.remove(&writer_id);
|
||||
if let Some(tx) = close_tx {
|
||||
match tx.try_send(WriterCommand::Close) {
|
||||
Ok(()) => {}
|
||||
Err(TrySendError::Full(_)) => {
|
||||
self.stats.increment_me_writer_close_signal_drop_total();
|
||||
self.stats
|
||||
.increment_me_writer_close_signal_channel_full_total();
|
||||
debug!(
|
||||
writer_id,
|
||||
"Skipping close signal for removed writer: command channel is full"
|
||||
);
|
||||
}
|
||||
Err(TrySendError::Closed(_)) => {
|
||||
self.stats.increment_me_writer_close_signal_drop_total();
|
||||
debug!(
|
||||
writer_id,
|
||||
"Skipping close signal for removed writer: command channel is closed"
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = tx.send(WriterCommand::Close).await;
|
||||
}
|
||||
if trigger_refill
|
||||
&& let Some(addr) = removed_addr
|
||||
|
|
|
|||
Loading…
Reference in New Issue